From 7b98b7797ee40e2e874587dbbc8e123e780bf33b Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 10 Jun 2024 19:24:32 -0700 Subject: [PATCH 01/18] Adding the compiler, and Backend implementation for E6data. --- ibis/backends/e6data/__init__.py | 556 ++++++++++++++++++++++ ibis/backends/e6data/compiler.py | 363 ++++++++++++++ ibis/backends/e6data/converter.py | 27 ++ ibis/backends/e6data/tests/__init__.py | 0 ibis/backends/e6data/tests/conftest.py | 79 +++ ibis/backends/e6data/tests/test_client.py | 211 ++++++++ 6 files changed, 1236 insertions(+) create mode 100644 ibis/backends/e6data/__init__.py create mode 100644 ibis/backends/e6data/compiler.py create mode 100644 ibis/backends/e6data/converter.py create mode 100644 ibis/backends/e6data/tests/__init__.py create mode 100644 ibis/backends/e6data/tests/conftest.py create mode 100644 ibis/backends/e6data/tests/test_client.py diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py new file mode 100644 index 000000000000..98ceeb4f5e66 --- /dev/null +++ b/ibis/backends/e6data/__init__.py @@ -0,0 +1,556 @@ +"""The E6Data backend.""" + +from __future__ import annotations + +import contextlib +import re +import warnings +from functools import cached_property, partial +from itertools import repeat +from operator import itemgetter +from typing import TYPE_CHECKING, Any +from urllib.parse import parse_qs, urlparse + +import numpy as np +import pymysql +import sqlglot as sg +import sqlglot.expressions as sge + +import ibis +import ibis.common.exceptions as com +import ibis.expr.operations as ops +import ibis.expr.schema as sch +import ibis.expr.types as ir +from ibis import util +from ibis.backends import CanCreateDatabase +from ibis.backends.e6data.compiler import E6DataCompiler +from ibis.backends.sql import SQLBackend +from ibis.backends.sql.compiler import TRUE, C + +if TYPE_CHECKING: + from collections.abc import Mapping + + import pandas as pd + import polars as pl + import pyarrow as pa + + +class Backend(SQLBackend, CanCreateDatabase): + name = "e6data" + compiler = E6DataCompiler() + supports_create_or_replace = False + + def _from_url(self, url: str, **kwargs): + """Connect to a backend using a URL `url`. + + Parameters + ---------- + url + URL with which to connect to a backend. + kwargs + Additional keyword arguments + + Returns + ------- + BaseBackend + A backend instance + + """ + + url = urlparse(url) + database, *_ = url.path[1:].split("/", 1) + query_params = parse_qs(url.query) + connect_args = { + "user": url.username, + "password": url.password or "", + "host": url.hostname, + "database": database or "", + } + + for name, value in query_params.items(): + if len(value) > 1: + connect_args[name] = value + elif len(value) == 1: + connect_args[name] = value[0] + else: + raise com.IbisError(f"Invalid URL parameter: {name}") + + kwargs.update(connect_args) + self._convert_kwargs(kwargs) + + if "user" in kwargs and not kwargs["user"]: + del kwargs["user"] + + if "host" in kwargs and not kwargs["host"]: + del kwargs["host"] + + if "database" in kwargs and not kwargs["database"]: + del kwargs["database"] + + if "password" in kwargs and kwargs["password"] is None: + del kwargs["password"] + + return self.connect(**kwargs) + + @cached_property + def version(self): + matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) + return ".".join(matched.groups()) + + def do_connect( + self, + host: str = "localhost", + user: str | None = None, + password: str | None = None, + port: int = 3306, + database: str | None = None, + autocommit: bool = True, + **kwargs, + ) -> None: + """Create an Ibis client using the passed connection parameters. + + Parameters + ---------- + host + Hostname + user + Username + password + Password + port + Port + database + Database to connect to + autocommit + Autocommit mode + kwargs + Additional keyword arguments passed to `pymysql.connect` + + Examples + -------- + >>> import os + >>> import getpass + >>> host = os.environ.get("IBIS_TEST_E6DATA_HOST", "localhost") + >>> user = os.environ.get("IBIS_TEST_E6DATA_USER", getpass.getuser()) + >>> password = os.environ.get("IBIS_TEST_E6DATA_PASSWORD") + >>> database = os.environ.get("IBIS_TEST_E6DATA_DATABASE", "ibis_testing") + >>> con = connect(database=database, host=host, user=user, password=password) + >>> con.list_tables() # doctest: +ELLIPSIS + [...] + >>> t = con.table("functional_alltypes") + >>> t + MySQLTable[table] + name: functional_alltypes + schema: + id : int32 + bool_col : int8 + tinyint_col : int8 + smallint_col : int16 + int_col : int32 + bigint_col : int64 + float_col : float32 + double_col : float64 + date_string_col : string + string_col : string + timestamp_col : timestamp + year : int32 + month : int32 + + """ + con = pymysql.connect( + user=user, + host=host, + port=port, + password=password, + database=database, + autocommit=autocommit, + conv=pymysql.converters.conversions, + **kwargs, + ) + + with contextlib.closing(con.cursor()) as cur: + try: + cur.execute("SET @@session.time_zone = 'UTC'") + except Exception as e: # noqa: BLE001 + warnings.warn(f"Unable to set session timezone to UTC: {e}") + + self.con = con + + @property + def current_database(self) -> str: + with self._safe_raw_sql(sg.select(self.compiler.f.database())) as cur: + [(database,)] = cur.fetchall() + return database + + def list_databases(self, like: str | None = None) -> list[str]: + # In MySQL, "database" and "schema" are synonymous + with self._safe_raw_sql("SHOW DATABASES") as cur: + databases = list(map(itemgetter(0), cur.fetchall())) + return self._filter_with_like(databases, like) + + def _get_schema_using_query(self, query: str) -> sch.Schema: + table = util.gen_name(f"{self.name}_metadata") + + with self.begin() as cur: + cur.execute( + f"CREATE TEMPORARY TABLE {table} AS SELECT * FROM ({query}) AS tmp LIMIT 0" + ) + try: + return self.get_schema(table) + finally: + cur.execute(f"DROP TABLE {table}") + + def get_schema( + self, name: str, *, catalog: str | None = None, database: str | None = None + ) -> sch.Schema: + table = sg.table(name, db=database, catalog=catalog, quoted=True).sql(self.name) + + with self.begin() as cur: + cur.execute(f"DESCRIBE {table}") + result = cur.fetchall() + + type_mapper = self.compiler.type_mapper + fields = { + name: type_mapper.from_string(type_string, nullable=is_nullable == "YES") + for name, type_string, is_nullable, *_ in result + } + + return sch.Schema(fields) + + def create_database(self, name: str, force: bool = False) -> None: + sql = sge.Create(kind="DATABASE", exist=force, this=sg.to_identifier(name)).sql( + self.name + ) + with self.begin() as cur: + cur.execute(sql) + + def drop_database(self, name: str, force: bool = False) -> None: + sql = sge.Drop(kind="DATABASE", exist=force, this=sg.to_identifier(name)).sql( + self.name + ) + with self.begin() as cur: + cur.execute(sql) + + @contextlib.contextmanager + def begin(self): + con = self.con + cur = con.cursor() + try: + yield cur + except Exception: + con.rollback() + raise + else: + con.commit() + finally: + cur.close() + + # TODO(kszucs): should make it an abstract method or remove the use of it + # from .execute() + @contextlib.contextmanager + def _safe_raw_sql(self, *args, **kwargs): + with contextlib.closing(self.raw_sql(*args, **kwargs)) as result: + yield result + + def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: + with contextlib.suppress(AttributeError): + query = query.sql(dialect=self.name) + + con = self.con + cursor = con.cursor() + + try: + cursor.execute(query, **kwargs) + except Exception: + con.rollback() + cursor.close() + raise + else: + con.commit() + return cursor + + # TODO: disable positional arguments + def list_tables( + self, + like: str | None = None, + schema: str | None = None, + database: tuple[str, str] | str | None = None, + ) -> list[str]: + """List the tables in the database. + + ::: {.callout-note} + ## Ibis does not use the word `schema` to refer to database hierarchy. + + A collection of tables is referred to as a `database`. + A collection of `database` is referred to as a `catalog`. + + These terms are mapped onto the corresponding features in each + backend (where available), regardless of whether the backend itself + uses the same terminology. + ::: + + Parameters + ---------- + like + A pattern to use for listing tables. + schema + [deprecated] The schema to perform the list against. + database + Database to list tables from. Default behavior is to show tables in + the current database (``self.current_database``). + """ + if schema is not None: + self._warn_schema() + + if schema is not None and database is not None: + raise ValueError( + "Using both the `schema` and `database` kwargs is not supported. " + "`schema` is deprecated and will be removed in Ibis 10.0" + "\nUse the `database` kwarg with one of the following patterns:" + '\ndatabase="database"' + '\ndatabase=("catalog", "database")' + '\ndatabase="catalog.database"', + ) + elif schema is not None: + table_loc = schema + elif database is not None: + table_loc = database + else: + table_loc = self.current_database + + table_loc = self._to_sqlglot_table(table_loc) + + conditions = [TRUE] + + if table_loc is not None: + if (sg_cat := table_loc.args["catalog"]) is not None: + sg_cat.args["quoted"] = False + if (sg_db := table_loc.args["db"]) is not None: + sg_db.args["quoted"] = False + conditions = [C.table_schema.eq(sge.convert(table_loc.sql(self.name)))] + + col = "table_name" + sql = ( + sg.select(col) + .from_(sg.table("tables", db="information_schema")) + .distinct() + .where(*conditions) + .sql(self.name) + ) + + with self._safe_raw_sql(sql) as cur: + out = cur.fetchall() + + return self._filter_with_like(map(itemgetter(0), out), like) + + def execute( + self, expr: ir.Expr, limit: str | None = "default", **kwargs: Any + ) -> Any: + """Execute an expression.""" + + self._run_pre_execute_hooks(expr) + table = expr.as_table() + sql = self.compile(table, limit=limit, **kwargs) + + schema = table.schema() + + with self._safe_raw_sql(sql) as cur: + result = self._fetch_from_cursor(cur, schema) + return expr.__pandas_result__(result) + + def create_table( + self, + name: str, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, + *, + schema: ibis.Schema | None = None, + database: str | None = None, + temp: bool = False, + overwrite: bool = False, + ) -> ir.Table: + if obj is None and schema is None: + raise ValueError("Either `obj` or `schema` must be specified") + + if database is not None and database != self.current_database: + raise com.UnsupportedOperationError( + "Creating tables in other databases is not supported by Postgres" + ) + else: + database = None + + properties = [] + + if temp: + properties.append(sge.TemporaryProperty()) + + temp_memtable_view = None + if obj is not None: + if not isinstance(obj, ir.Expr): + table = ibis.memtable(obj) + temp_memtable_view = table.op().name + else: + table = obj + + self._run_pre_execute_hooks(table) + + query = self._to_sqlglot(table) + else: + query = None + + column_defs = [ + sge.ColumnDef( + this=sg.to_identifier(colname, quoted=self.compiler.quoted), + kind=self.compiler.type_mapper.from_ibis(typ), + constraints=( + None + if typ.nullable + else [sge.ColumnConstraint(kind=sge.NotNullColumnConstraint())] + ), + ) + for colname, typ in (schema or table.schema()).items() + ] + + if overwrite: + temp_name = util.gen_name(f"{self.name}_table") + else: + temp_name = name + + table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) + target = sge.Schema(this=table, expressions=column_defs) + + create_stmt = sge.Create( + kind="TABLE", + this=target, + properties=sge.Properties(expressions=properties), + ) + + this = sg.table(name, catalog=database, quoted=self.compiler.quoted) + with self._safe_raw_sql(create_stmt) as cur: + if query is not None: + insert_stmt = sge.Insert(this=table, expression=query).sql(self.name) + cur.execute(insert_stmt) + + if overwrite: + cur.execute( + sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) + ) + cur.execute( + f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}" + ) + + if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + + return self.table(name, database=database) + + # preserve the input schema if it was provided + return ops.DatabaseTable( + name, schema=schema, source=self, namespace=ops.Namespace(database=database) + ).to_expr() + + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: + schema = op.schema + if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]: + raise com.IbisTypeError( + "MySQL cannot yet reliably handle `null` typed columns; " + f"got null typed columns: {null_columns}" + ) + + # only register if we haven't already done so + if (name := op.name) not in self.list_tables(): + quoted = self.compiler.quoted + column_defs = [ + sg.exp.ColumnDef( + this=sg.to_identifier(colname, quoted=quoted), + kind=self.compiler.type_mapper.from_ibis(typ), + constraints=( + None + if typ.nullable + else [ + sg.exp.ColumnConstraint( + kind=sg.exp.NotNullColumnConstraint() + ) + ] + ), + ) + for colname, typ in schema.items() + ] + + create_stmt = sg.exp.Create( + kind="TABLE", + this=sg.exp.Schema( + this=sg.to_identifier(name, quoted=quoted), expressions=column_defs + ), + properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]), + ) + create_stmt_sql = create_stmt.sql(self.name) + + columns = schema.keys() + df = op.data.to_frame() + # nan can not be used with MySQL + df = df.replace(np.nan, None) + + data = df.itertuples(index=False) + cols = ", ".join( + ident.sql(self.name) + for ident in map(partial(sg.to_identifier, quoted=quoted), columns) + ) + specs = ", ".join(repeat("%s", len(columns))) + table = sg.table(name, quoted=quoted) + sql = f"INSERT INTO {table.sql(self.name)} ({cols}) VALUES ({specs})" + with self.begin() as cur: + cur.execute(create_stmt_sql) + + if not df.empty: + cur.executemany(sql, data) + + @util.experimental + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **_: Any, + ) -> pa.ipc.RecordBatchReader: + import pyarrow as pa + + self._run_pre_execute_hooks(expr) + + schema = expr.as_table().schema() + with self._safe_raw_sql( + self.compile(expr, limit=limit, params=params) + ) as cursor: + df = self._fetch_from_cursor(cursor, schema) + table = pa.Table.from_pandas( + df, schema=schema.to_pyarrow(), preserve_index=False + ) + return table.to_reader(max_chunksize=chunk_size) + + def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: + import pandas as pd + + from ibis.backends.mysql.converter import MySQLPandasData + + try: + df = pd.DataFrame.from_records( + cursor, columns=schema.names, coerce_float=True + ) + except Exception: + # clean up the cursor if we fail to create the DataFrame + # + # in the sqlite case failing to close the cursor results in + # artificially locked tables + cursor.close() + raise + df = MySQLPandasData.convert_table(df, schema) + return df diff --git a/ibis/backends/e6data/compiler.py b/ibis/backends/e6data/compiler.py new file mode 100644 index 000000000000..dd93f2d6c771 --- /dev/null +++ b/ibis/backends/e6data/compiler.py @@ -0,0 +1,363 @@ +from __future__ import annotations + +import string +from functools import partial, reduce + +import sqlglot as sg +import sqlglot.expressions as sge +from public import public + +import ibis.common.exceptions as com +import ibis.expr.datatypes as dt +import ibis.expr.operations as ops +from ibis.backends.sql.compiler import NULL, STAR, SQLGlotCompiler +from ibis.backends.sql.datatypes import MySQLType +from ibis.backends.sql.dialects import MySQL +from ibis.backends.sql.rewrites import ( + exclude_unsupported_window_frame_from_ops, + exclude_unsupported_window_frame_from_rank, + exclude_unsupported_window_frame_from_row_number, + rewrite_empty_order_by_window, +) +from ibis.common.patterns import replace +from ibis.expr.rewrites import p + + +@replace(p.Limit) +def rewrite_limit(_, **kwargs): + """Rewrite limit for MySQL to include a large upper bound. + + From the MySQL docs @ https://dev.mysql.com/doc/refman/8.0/en/select.html + + > To retrieve all rows from a certain offset up to the end of the result + > set, you can use some large number for the second parameter. This statement + > retrieves all rows from the 96th row to the last: + > + > SELECT * FROM tbl LIMIT 95,18446744073709551615; + """ + if _.n is None and _.offset is not None: + some_large_number = (1 << 64) - 1 + return _.copy(n=some_large_number) + return _ + + +@public +class E6DataCompiler(SQLGlotCompiler): + __slots__ = () + + dialect = MySQL + type_mapper = MySQLType + rewrites = ( + rewrite_limit, + exclude_unsupported_window_frame_from_ops, + exclude_unsupported_window_frame_from_rank, + exclude_unsupported_window_frame_from_row_number, + rewrite_empty_order_by_window, + *SQLGlotCompiler.rewrites, + ) + + @property + def NAN(self): + raise NotImplementedError("MySQL does not support NaN") + + @property + def POS_INF(self): + raise NotImplementedError("MySQL does not support Infinity") + + NEG_INF = POS_INF + UNSUPPORTED_OPS = ( + ops.ApproxMedian, + ops.ArgMax, + ops.ArgMin, + ops.ArrayCollect, + ops.Array, + ops.ArrayFlatten, + ops.ArrayMap, + ops.Covariance, + ops.First, + ops.Last, + ops.Levenshtein, + ops.Median, + ops.Mode, + ops.MultiQuantile, + ops.Quantile, + ops.RegexReplace, + ops.RegexSplit, + ops.RowID, + ops.StringSplit, + ops.StructColumn, + ops.TimestampBucket, + ops.TimestampDelta, + ops.Translate, + ops.Unnest, + ) + + SIMPLE_OPS = { + ops.BitAnd: "bit_and", + ops.BitOr: "bit_or", + ops.BitXor: "bit_xor", + ops.DayOfWeekName: "dayname", + ops.Log10: "log10", + ops.StringContains: "instr", + ops.ExtractWeekOfYear: "weekofyear", + ops.ExtractEpochSeconds: "unix_timestamp", + ops.ExtractDayOfYear: "dayofyear", + ops.Strftime: "date_format", + ops.StringToTimestamp: "str_to_date", + ops.Log2: "log2", + } + + @staticmethod + def _minimize_spec(start, end, spec): + if ( + start is None + and isinstance(getattr(end, "value", None), ops.Literal) + and end.value.value == 0 + and end.following + ): + return None + return spec + + def visit_Cast(self, op, *, arg, to): + from_ = op.arg.dtype + if (from_.is_json() or from_.is_string()) and to.is_json(): + # MariaDB does not support casting to JSON because it's an alias + # for TEXT (except when casting of course!) + return arg + elif from_.is_integer() and to.is_interval(): + return self.visit_IntervalFromInteger( + ops.IntervalFromInteger(op.arg, unit=to.unit), arg=arg, unit=to.unit + ) + elif from_.is_integer() and to.is_timestamp(): + return self.f.from_unixtime(arg) + return super().visit_Cast(op, arg=arg, to=to) + + def visit_TimestampDiff(self, op, *, left, right): + return self.f.timestampdiff( + sge.Var(this="SECOND"), right, left, dialect=self.dialect + ) + + def visit_DateDiff(self, op, *, left, right): + return self.f.timestampdiff( + sge.Var(this="DAY"), right, left, dialect=self.dialect + ) + + def visit_ApproxCountDistinct(self, op, *, arg, where): + if where is not None: + arg = self.if_(where, arg) + return self.f.count(sge.Distinct(expressions=[arg])) + + def visit_CountStar(self, op, *, arg, where): + if where is not None: + return self.f.sum(self.cast(where, op.dtype)) + return self.f.count(STAR) + + def visit_CountDistinct(self, op, *, arg, where): + if where is not None: + arg = self.if_(where, arg) + return self.f.count(sge.Distinct(expressions=[arg])) + + def visit_CountDistinctStar(self, op, *, arg, where): + if where is not None: + raise com.UnsupportedOperationError( + "Filtered table count distinct is not supported in MySQL" + ) + func = partial(sg.column, table=arg.alias_or_name, quoted=self.quoted) + return self.f.count( + sge.Distinct(expressions=list(map(func, op.arg.schema.keys()))) + ) + + def visit_GroupConcat(self, op, *, arg, sep, where): + if not isinstance(op.sep, ops.Literal): + raise com.UnsupportedOperationError( + "Only string literal separators are supported" + ) + if where is not None: + arg = self.if_(where, arg) + return self.f.group_concat(arg, sep) + + def visit_DayOfWeekIndex(self, op, *, arg): + return (self.f.dayofweek(arg) + 5) % 7 + + def visit_Literal(self, op, *, value, dtype): + # avoid casting NULL: the set of types allowed by MySQL and + # MariaDB when casting is a strict subset of allowed types in other + # contexts like CREATE TABLE + if value is None: + return NULL + return super().visit_Literal(op, value=value, dtype=dtype) + + def visit_NonNullLiteral(self, op, *, value, dtype): + if dtype.is_decimal() and not value.is_finite(): + raise com.UnsupportedOperationError( + "MySQL does not support NaN or infinity" + ) + elif dtype.is_binary(): + return self.f.unhex(value.hex()) + elif dtype.is_date(): + return self.f.date(value.isoformat()) + elif dtype.is_timestamp(): + return self.f.timestamp(value.isoformat()) + elif dtype.is_time(): + return self.f.maketime( + value.hour, value.minute, value.second + value.microsecond / 1e6 + ) + elif dtype.is_array() or dtype.is_struct() or dtype.is_map(): + raise com.UnsupportedBackendType( + "MySQL does not support arrays, structs or maps" + ) + return None + + def visit_JSONGetItem(self, op, *, arg, index): + if op.index.dtype.is_integer(): + path = self.f.concat("$[", self.cast(index, dt.string), "]") + else: + path = self.f.concat("$.", index) + return self.f.json_extract(arg, path) + + def visit_DateFromYMD(self, op, *, year, month, day): + return self.f.str_to_date( + self.f.concat( + self.f.lpad(year, 4, "0"), + self.f.lpad(month, 2, "0"), + self.f.lpad(day, 2, "0"), + ), + "%Y%m%d", + ) + + def visit_FindInSet(self, op, *, needle, values): + return self.f.find_in_set(needle, self.f.concat_ws(",", values)) + + def visit_EndsWith(self, op, *, arg, end): + to = sge.DataType(this=sge.DataType.Type.BINARY) + return self.f.right(arg, self.f.char_length(end)).eq(sge.Cast(this=end, to=to)) + + def visit_StartsWith(self, op, *, arg, start): + to = sge.DataType(this=sge.DataType.Type.BINARY) + return self.f.left(arg, self.f.length(start)).eq(sge.Cast(this=start, to=to)) + + def visit_RegexSearch(self, op, *, arg, pattern): + return arg.rlike(pattern) + + def visit_RegexExtract(self, op, *, arg, pattern, index): + extracted = self.f.regexp_substr(arg, pattern) + return self.if_( + arg.rlike(pattern), + self.if_( + index.eq(0), + extracted, + self.f.regexp_replace( + extracted, pattern, f"\\{index.sql(self.dialect)}" + ), + ), + NULL, + ) + + def visit_Equals(self, op, *, left, right): + if op.left.dtype.is_string(): + assert op.right.dtype.is_string(), op.right.dtype + to = sge.DataType(this=sge.DataType.Type.BINARY) + return sge.Cast(this=left, to=to).eq(right) + return super().visit_Equals(op, left=left, right=right) + + def visit_StringContains(self, op, *, haystack, needle): + return self.f.instr(haystack, needle) > 0 + + def visit_StringFind(self, op, *, arg, substr, start, end): + if end is not None: + raise NotImplementedError( + "`end` argument is not implemented for MySQL `StringValue.find`" + ) + substr = sge.Cast(this=substr, to=sge.DataType(this=sge.DataType.Type.BINARY)) + + if start is not None: + return self.f.locate(substr, arg, start + 1) + return self.f.locate(substr, arg) + + def visit_LRStrip(self, op, *, arg, position): + return reduce( + lambda arg, char: self.f.trim( + this=arg, position=self.v[position], expression=char + ), + map( + partial(self.cast, to=dt.string), + map(self.f.unhex, map(self.f.hex, string.whitespace.encode())), + ), + arg, + ) + + def visit_DateTimestampTruncate(self, op, *, arg, unit): + truncate_formats = { + "s": "%Y-%m-%d %H:%i:%s", + "m": "%Y-%m-%d %H:%i:00", + "h": "%Y-%m-%d %H:00:00", + "D": "%Y-%m-%d", + # 'W': 'week', + "M": "%Y-%m-01", + "Y": "%Y-01-01", + } + if (format := truncate_formats.get(unit.short)) is None: + raise com.UnsupportedOperationError(f"Unsupported truncate unit {op.unit}") + return self.f.date_format(arg, format) + + visit_DateTruncate = visit_TimestampTruncate = visit_DateTimestampTruncate + + def visit_DateTimeDelta(self, op, *, left, right, part): + return self.f.timestampdiff( + sge.Var(this=part.this), right, left, dialect=self.dialect + ) + + visit_TimeDelta = visit_DateDelta = visit_DateTimeDelta + + def visit_ExtractMillisecond(self, op, *, arg): + return self.f.floor(self.f.extract(sge.Var(this="microsecond"), arg) / 1_000) + + def visit_ExtractMicrosecond(self, op, *, arg): + return self.f.floor(self.f.extract(sge.Var(this="microsecond"), arg)) + + def visit_Strip(self, op, *, arg): + return self.visit_LRStrip(op, arg=arg, position="BOTH") + + def visit_LStrip(self, op, *, arg): + return self.visit_LRStrip(op, arg=arg, position="LEADING") + + def visit_RStrip(self, op, *, arg): + return self.visit_LRStrip(op, arg=arg, position="TRAILING") + + def visit_IntervalFromInteger(self, op, *, arg, unit): + return sge.Interval(this=arg, unit=sge.Var(this=op.resolution.upper())) + + def visit_TimestampAdd(self, op, *, left, right): + if op.right.dtype.unit.short == "ms": + right = sge.Interval( + this=right.this * 1_000, unit=sge.Var(this="MICROSECOND") + ) + return self.f.date_add(left, right, dialect=self.dialect) + + def visit_UnwrapJSONString(self, op, *, arg): + return self.if_( + self.f.json_type(arg).eq(sge.convert("STRING")), + self.f.json_unquote(arg), + NULL, + ) + + def visit_UnwrapJSONInt64(self, op, *, arg): + return self.if_( + self.f.json_type(arg).eq(sge.convert("INTEGER")), + self.cast(arg, op.dtype), + NULL, + ) + + def visit_UnwrapJSONFloat64(self, op, *, arg): + return self.if_( + self.f.json_type(arg).isin(sge.convert("DOUBLE"), sge.convert("INTEGER")), + self.cast(arg, op.dtype), + NULL, + ) + + def visit_UnwrapJSONBoolean(self, op, *, arg): + return self.if_( + self.f.json_type(arg).eq(sge.convert("BOOLEAN")), + self.if_(arg.eq(sge.convert("true")), 1, 0), + NULL, + ) diff --git a/ibis/backends/e6data/converter.py b/ibis/backends/e6data/converter.py new file mode 100644 index 000000000000..4f2010225a5b --- /dev/null +++ b/ibis/backends/e6data/converter.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import datetime + +from ibis.formats.pandas import PandasData + + +class MySQLPandasData(PandasData): + # TODO(kszucs): this could be reused at other backends, like pyspark + @classmethod + def convert_Time(cls, s, dtype, pandas_type): + def convert(timedelta): + comps = timedelta.components + return datetime.time( + hour=comps.hours, + minute=comps.minutes, + second=comps.seconds, + microsecond=comps.milliseconds * 1000 + comps.microseconds, + ) + + return s.map(convert, na_action="ignore") + + @classmethod + def convert_Timestamp(cls, s, dtype, pandas_type): + if s.dtype == "object": + s = s.replace("0000-00-00 00:00:00", None) + return super().convert_Timestamp(s, dtype, pandas_type) diff --git a/ibis/backends/e6data/tests/__init__.py b/ibis/backends/e6data/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ibis/backends/e6data/tests/conftest.py b/ibis/backends/e6data/tests/conftest.py new file mode 100644 index 000000000000..f7c463048767 --- /dev/null +++ b/ibis/backends/e6data/tests/conftest.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any + +import pytest + +import ibis +from ibis.backends.conftest import TEST_TABLES +from ibis.backends.tests.base import ServiceBackendTest + +if TYPE_CHECKING: + from collections.abc import Iterable + from pathlib import Path + +MYSQL_USER = os.environ.get("IBIS_TEST_MYSQL_USER", "ibis") +MYSQL_PASS = os.environ.get("IBIS_TEST_MYSQL_PASSWORD", "ibis") +MYSQL_HOST = os.environ.get("IBIS_TEST_MYSQL_HOST", "localhost") +MYSQL_PORT = int(os.environ.get("IBIS_TEST_MYSQL_PORT", 3306)) +IBIS_TEST_MYSQL_DB = os.environ.get("IBIS_TEST_MYSQL_DATABASE", "ibis_testing") + + +class TestConf(ServiceBackendTest): + # mysql has the same rounding behavior as postgres + check_dtype = False + returned_timestamp_unit = "s" + supports_arrays = False + native_bool = False + supports_structs = False + rounding_method = "half_to_even" + service_name = "mysql" + deps = ("pymysql",) + + @property + def test_files(self) -> Iterable[Path]: + return self.data_dir.joinpath("csv").glob("*.csv") + + def _load_data(self, **kwargs: Any) -> None: + """Load test data into a MySql backend instance. + + Parameters + ---------- + data_dir + Location of testdata + script_dir + Location of scripts defining schemas + """ + super()._load_data(**kwargs) + + with self.connection.begin() as cur: + for table in TEST_TABLES: + csv_path = self.data_dir / "csv" / f"{table}.csv" + lines = [ + f"LOAD DATA LOCAL INFILE {str(csv_path)!r}", + f"INTO TABLE {table}", + "COLUMNS TERMINATED BY ','", + """OPTIONALLY ENCLOSED BY '"'""", + "LINES TERMINATED BY '\\n'", + "IGNORE 1 LINES", + ] + cur.execute("\n".join(lines)) + + @staticmethod + def connect(*, tmpdir, worker_id, **kw): + return ibis.mysql.connect( + host=MYSQL_HOST, + user=MYSQL_USER, + password=MYSQL_PASS, + database=IBIS_TEST_MYSQL_DB, + port=MYSQL_PORT, + local_infile=1, + autocommit=True, + **kw, + ) + + +@pytest.fixture(scope="session") +def con(tmp_path_factory, data_dir, worker_id): + return TestConf.load_data(data_dir, tmp_path_factory, worker_id).connection diff --git a/ibis/backends/e6data/tests/test_client.py b/ibis/backends/e6data/tests/test_client.py new file mode 100644 index 000000000000..ee7031d6d85b --- /dev/null +++ b/ibis/backends/e6data/tests/test_client.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +from datetime import date +from operator import methodcaller + +import pandas as pd +import pandas.testing as tm +import pytest +import sqlglot as sg +from pytest import param + +import ibis +import ibis.expr.datatypes as dt +from ibis import udf +from ibis.util import gen_name + +MYSQL_TYPES = [ + param("tinyint", dt.int8, id="tinyint"), + param("int1", dt.int8, id="int1"), + param("boolean", dt.int8, id="boolean"), + param("smallint", dt.int16, id="smallint"), + param("int2", dt.int16, id="int2"), + # ("mediumint", dt.int32), => https://github.com/tobymao/sqlglot/issues/2109 + # ("int3", dt.int32), => https://github.com/tobymao/sqlglot/issues/2109 + param("int", dt.int32, id="int"), + param("int4", dt.int32, id="int4"), + param("integer", dt.int32, id="integer"), + param("bigint", dt.int64, id="bigint"), + param("decimal", dt.Decimal(10, 0), id="decimal"), + param("decimal(5, 2)", dt.Decimal(5, 2), id="decimal_5_2"), + param("dec", dt.Decimal(10, 0), id="dec"), + param("numeric", dt.Decimal(10, 0), id="numeric"), + param("fixed", dt.Decimal(10, 0), id="fixed"), + param("float", dt.float32, id="float"), + param("double", dt.float64, id="double"), + param("timestamp", dt.Timestamp("UTC"), id="timestamp"), + param("date", dt.date, id="date"), + param("time", dt.time, id="time"), + param("datetime", dt.timestamp, id="datetime"), + param("year", dt.int8, id="year"), + param("char(32)", dt.string, id="char"), + param("char byte", dt.binary, id="char_byte"), + param("varchar(42)", dt.string, id="varchar"), + param("mediumtext", dt.string, id="mediumtext"), + param("text", dt.string, id="text"), + param("binary(42)", dt.binary, id="binary"), + param("varbinary(42)", dt.binary, id="varbinary"), + param("bit(1)", dt.int8, id="bit_1"), + param("bit(9)", dt.int16, id="bit_9"), + param("bit(17)", dt.int32, id="bit_17"), + param("bit(33)", dt.int64, id="bit_33"), + # mariadb doesn't have a distinct json type + param("json", dt.string, id="json"), + param("enum('small', 'medium', 'large')", dt.string, id="enum"), + param("inet6", dt.inet, id="inet"), + param("set('a', 'b', 'c', 'd')", dt.Array(dt.string), id="set"), + param("mediumblob", dt.binary, id="mediumblob"), + param("blob", dt.binary, id="blob"), + param("uuid", dt.uuid, id="uuid"), +] + [ + param( + f"datetime({scale:d})", + dt.Timestamp(scale=scale or None), + id=f"datetime{scale:d}", + ) + for scale in range(7) +] + + +@pytest.mark.parametrize(("mysql_type", "expected_type"), MYSQL_TYPES) +def test_get_schema_from_query(con, mysql_type, expected_type): + raw_name = ibis.util.guid() + name = sg.to_identifier(raw_name, quoted=True).sql("mysql") + expected_schema = ibis.schema(dict(x=expected_type)) + + # temporary tables get cleaned up by the db when the session ends, so we + # don't need to explicitly drop the table + with con.begin() as c: + c.execute(f"CREATE TEMPORARY TABLE {name} (x {mysql_type})") + + result_schema = con._get_schema_using_query(f"SELECT * FROM {name}") + assert result_schema == expected_schema + + t = con.table(raw_name) + assert t.schema() == expected_schema + + +@pytest.mark.parametrize("coltype", ["TINYBLOB", "MEDIUMBLOB", "BLOB", "LONGBLOB"]) +def test_blob_type(con, coltype): + tmp = f"tmp_{ibis.util.guid()}" + with con.begin() as c: + c.execute(f"CREATE TEMPORARY TABLE {tmp} (a {coltype})") + t = con.table(tmp) + assert t.schema() == ibis.schema({"a": dt.binary}) + + +@pytest.fixture(scope="session") +def tmp_t(con): + with con.begin() as c: + c.execute("CREATE TABLE IF NOT EXISTS test_schema.t (x INET6)") + yield "t" + with con.begin() as c: + c.execute("DROP TABLE IF EXISTS test_schema.t") + + +def test_get_schema_from_query_other_schema(con, tmp_t): + t = con.table(tmp_t, database="test_schema") + assert t.schema() == ibis.schema({"x": dt.inet}) + + +def test_zero_timestamp_data(con): + sql = """ + CREATE TEMPORARY TABLE ztmp_date_issue + ( + name CHAR(10) NULL, + tradedate DATETIME NOT NULL, + date DATETIME NULL + ) + """ + with con.begin() as c: + c.execute(sql) + c.execute( + """ + INSERT INTO ztmp_date_issue VALUES + ('C', '2018-10-22', 0), + ('B', '2017-06-07', 0), + ('C', '2022-12-21', 0) + """ + ) + t = con.table("ztmp_date_issue") + result = t.execute() + expected = pd.DataFrame( + { + "name": ["C", "B", "C"], + "tradedate": pd.to_datetime( + [date(2018, 10, 22), date(2017, 6, 7), date(2022, 12, 21)] + ), + "date": [pd.NaT, pd.NaT, pd.NaT], + } + ) + tm.assert_frame_equal(result, expected) + + +@pytest.fixture(scope="module") +def enum_t(con): + name = gen_name("mysql_enum_test") + with con.begin() as cur: + cur.execute( + f"CREATE TEMPORARY TABLE {name} (sml ENUM('small', 'medium', 'large'))" + ) + cur.execute(f"INSERT INTO {name} VALUES ('small')") + + yield con.table(name) + con.drop_table(name, force=True) + + +@pytest.mark.parametrize( + ("expr_fn", "expected"), + [ + (methodcaller("startswith", "s"), pd.Series([True], name="sml")), + (methodcaller("endswith", "m"), pd.Series([False], name="sml")), + (methodcaller("re_search", "mall"), pd.Series([True], name="sml")), + (methodcaller("lstrip"), pd.Series(["small"], name="sml")), + (methodcaller("rstrip"), pd.Series(["small"], name="sml")), + (methodcaller("strip"), pd.Series(["small"], name="sml")), + ], + ids=["startswith", "endswith", "re_search", "lstrip", "rstrip", "strip"], +) +def test_enum_as_string(enum_t, expr_fn, expected): + expr = expr_fn(enum_t.sml).name("sml") + res = expr.execute() + tm.assert_series_equal(res, expected) + + +def test_builtin_scalar_udf(con): + @udf.scalar.builtin + def soundex(a: str) -> str: + """Soundex of a string.""" + + expr = soundex("foo") + result = con.execute(expr) + assert result == "F000" + + +def test_builtin_agg_udf(con): + @udf.agg.builtin + def json_arrayagg(a) -> str: + """Glom together some JSON.""" + + ft = con.tables.functional_alltypes[:5] + expr = json_arrayagg(ft.string_col) + result = expr.execute() + expected = '["0","1","2","3","4"]' + assert result == expected + + +def test_list_tables_schema_warning_refactor(con): + mysql_tables = { + "column_stats", + "columns_priv", + "db", + "event", + "func", + } + assert con.list_tables() + + with pytest.warns(FutureWarning): + assert mysql_tables.issubset(con.list_tables(schema="mysql")) + + assert mysql_tables.issubset(con.list_tables(database="mysql")) + assert mysql_tables.issubset(con.list_tables(database=("mysql",))) From 13a196c63387600efdfcd848ae901dd32ab1b069 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Fri, 14 Jun 2024 03:52:17 -0700 Subject: [PATCH 02/18] Add e6data_python_connector import and update do_connect method --- ibis/backends/e6data/__init__.py | 125 ++++++------------------------- 1 file changed, 23 insertions(+), 102 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index 98ceeb4f5e66..b8010fb2adbe 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -26,6 +26,7 @@ from ibis.backends.e6data.compiler import E6DataCompiler from ibis.backends.sql import SQLBackend from ibis.backends.sql.compiler import TRUE, C +from e6data_python_connector import Connection if TYPE_CHECKING: from collections.abc import Mapping @@ -34,64 +35,11 @@ import polars as pl import pyarrow as pa - class Backend(SQLBackend, CanCreateDatabase): name = "e6data" compiler = E6DataCompiler() supports_create_or_replace = False - def _from_url(self, url: str, **kwargs): - """Connect to a backend using a URL `url`. - - Parameters - ---------- - url - URL with which to connect to a backend. - kwargs - Additional keyword arguments - - Returns - ------- - BaseBackend - A backend instance - - """ - - url = urlparse(url) - database, *_ = url.path[1:].split("/", 1) - query_params = parse_qs(url.query) - connect_args = { - "user": url.username, - "password": url.password or "", - "host": url.hostname, - "database": database or "", - } - - for name, value in query_params.items(): - if len(value) > 1: - connect_args[name] = value - elif len(value) == 1: - connect_args[name] = value[0] - else: - raise com.IbisError(f"Invalid URL parameter: {name}") - - kwargs.update(connect_args) - self._convert_kwargs(kwargs) - - if "user" in kwargs and not kwargs["user"]: - del kwargs["user"] - - if "host" in kwargs and not kwargs["host"]: - del kwargs["host"] - - if "database" in kwargs and not kwargs["database"]: - del kwargs["database"] - - if "password" in kwargs and kwargs["password"] is None: - del kwargs["password"] - - return self.connect(**kwargs) - @cached_property def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) @@ -99,32 +47,31 @@ def version(self): def do_connect( self, - host: str = "localhost", - user: str | None = None, - password: str | None = None, - port: int = 3306, - database: str | None = None, - autocommit: bool = True, + host: str, + port: int, + username: str, + password: str, + database: str, + catalog_name: str, **kwargs, ) -> None: """Create an Ibis client using the passed connection parameters. - Parameters - ---------- - host + host Hostname - user + port + Port + username Username - password + password Password - port - Port - database + database Database to connect to - autocommit - Autocommit mode - kwargs - Additional keyword arguments passed to `pymysql.connect` + catalog_name + Catalog name + kwargs + Additional keyword arguments + Examples -------- @@ -139,42 +86,16 @@ def do_connect( [...] >>> t = con.table("functional_alltypes") >>> t - MySQLTable[table] - name: functional_alltypes - schema: - id : int32 - bool_col : int8 - tinyint_col : int8 - smallint_col : int16 - int_col : int32 - bigint_col : int64 - float_col : float32 - double_col : float64 - date_string_col : string - string_col : string - timestamp_col : timestamp - year : int32 - month : int32 - + """ - con = pymysql.connect( - user=user, + self._connection = Connection( host=host, port=port, + username=username, password=password, - database=database, - autocommit=autocommit, - conv=pymysql.converters.conversions, - **kwargs, + database=database ) - - with contextlib.closing(con.cursor()) as cur: - try: - cur.execute("SET @@session.time_zone = 'UTC'") - except Exception as e: # noqa: BLE001 - warnings.warn(f"Unable to set session timezone to UTC: {e}") - - self.con = con + self.catalog_name = catalog_name @property def current_database(self) -> str: From 2f142b03da9ca16a5a5c1cf4866aac84b0cd094c Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Fri, 14 Jun 2024 03:52:41 -0700 Subject: [PATCH 03/18] Add e6data-python-connector dependency --- poetry.lock | 150 +++++++++++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 3 + 2 files changed, 149 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index 6870472d87bc..247711dd2dbe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1681,6 +1681,27 @@ files = [ [package.dependencies] packaging = ">=20.9" +[[package]] +name = "e6data-python-connector" +version = "2.2.0" +description = "Client for the e6data distributed SQL Engine." +optional = false +python-versions = "*" +files = [ + {file = "e6data_python_connector-2.2.0-py3-none-any.whl", hash = "sha256:6c03161bac80b751b2f8f5cc58d3c1cadfe4d9c780c69df5d694271ac59d7cbf"}, + {file = "e6data_python_connector-2.2.0.tar.gz", hash = "sha256:93a59315c9576339a9645dcdce3d42021b8dd1c702b1f21167b0cb4307474f3c"}, +] + +[package.dependencies] +future = "*" +grpcio = "*" +grpcio-tools = "*" +pycryptodome = "*" +python-dateutil = "*" +pytz = "*" +sqlalchemy = ">=1.0.0" +thrift = "*" + [[package]] name = "exceptiongroup" version = "1.2.1" @@ -2022,6 +2043,17 @@ test-downstream = ["aiobotocore (>=2.5.4,<3.0.0)", "dask-expr", "dask[dataframe, test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard"] tqdm = ["tqdm"] +[[package]] +name = "future" +version = "1.0.0" +description = "Clean single-source support for Python 3 and 2" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, +] + [[package]] name = "gcsfs" version = "2024.6.0" @@ -2484,7 +2516,7 @@ colorama = ">=0.4" name = "grpcio" version = "1.64.1" description = "HTTP/2-based RPC framework" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "grpcio-1.64.1-cp310-cp310-linux_armv7l.whl", hash = "sha256:55697ecec192bc3f2f3cc13a295ab670f51de29884ca9ae6cd6247df55df2502"}, @@ -2554,6 +2586,74 @@ googleapis-common-protos = ">=1.5.5" grpcio = ">=1.62.2" protobuf = ">=4.21.6" +[[package]] +name = "grpcio-tools" +version = "1.62.2" +description = "Protobuf code generator for gRPC" +optional = false +python-versions = ">=3.7" +files = [ + {file = "grpcio-tools-1.62.2.tar.gz", hash = "sha256:5fd5e1582b678e6b941ee5f5809340be5e0724691df5299aae8226640f94e18f"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-linux_armv7l.whl", hash = "sha256:1679b4903aed2dc5bd8cb22a452225b05dc8470a076f14fd703581efc0740cdb"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:9d41e0e47dd075c075bb8f103422968a65dd0d8dc8613288f573ae91eb1053ba"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:987e774f74296842bbffd55ea8826370f70c499e5b5f71a8cf3103838b6ee9c3"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40cd4eeea4b25bcb6903b82930d579027d034ba944393c4751cdefd9c49e6989"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b6746bc823958499a3cf8963cc1de00072962fb5e629f26d658882d3f4c35095"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:2ed775e844566ce9ce089be9a81a8b928623b8ee5820f5e4d58c1a9d33dfc5ae"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:bdc5dd3f57b5368d5d661d5d3703bcaa38bceca59d25955dff66244dbc987271"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-win32.whl", hash = "sha256:3a8d6f07e64c0c7756f4e0c4781d9d5a2b9cc9cbd28f7032a6fb8d4f847d0445"}, + {file = "grpcio_tools-1.62.2-cp310-cp310-win_amd64.whl", hash = "sha256:e33b59fb3efdddeb97ded988a871710033e8638534c826567738d3edce528752"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-linux_armv7l.whl", hash = "sha256:472505d030135d73afe4143b0873efe0dcb385bd6d847553b4f3afe07679af00"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-macosx_10_10_universal2.whl", hash = "sha256:ec674b4440ef4311ac1245a709e87b36aca493ddc6850eebe0b278d1f2b6e7d1"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:184b4174d4bd82089d706e8223e46c42390a6ebac191073b9772abc77308f9fa"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c195d74fe98541178ece7a50dad2197d43991e0f77372b9a88da438be2486f12"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a34d97c62e61bfe9e6cff0410fe144ac8cca2fc979ad0be46b7edf026339d161"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:cbb8453ae83a1db2452b7fe0f4b78e4a8dd32be0f2b2b73591ae620d4d784d3d"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4f989e5cebead3ae92c6abf6bf7b19949e1563a776aea896ac5933f143f0c45d"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-win32.whl", hash = "sha256:c48fabe40b9170f4e3d7dd2c252e4f1ff395dc24e49ac15fc724b1b6f11724da"}, + {file = "grpcio_tools-1.62.2-cp311-cp311-win_amd64.whl", hash = "sha256:8c616d0ad872e3780693fce6a3ac8ef00fc0963e6d7815ce9dcfae68ba0fc287"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-linux_armv7l.whl", hash = "sha256:10cc3321704ecd17c93cf68c99c35467a8a97ffaaed53207e9b2da6ae0308ee1"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-macosx_10_10_universal2.whl", hash = "sha256:9be84ff6d47fd61462be7523b49d7ba01adf67ce4e1447eae37721ab32464dd8"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:d82f681c9a9d933a9d8068e8e382977768e7779ddb8870fa0cf918d8250d1532"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:04c607029ae3660fb1624ed273811ffe09d57d84287d37e63b5b802a35897329"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72b61332f1b439c14cbd3815174a8f1d35067a02047c32decd406b3a09bb9890"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:8214820990d01b52845f9fbcb92d2b7384a0c321b303e3ac614c219dc7d1d3af"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:462e0ab8dd7c7b70bfd6e3195eebc177549ede5cf3189814850c76f9a340d7ce"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-win32.whl", hash = "sha256:fa107460c842e4c1a6266150881694fefd4f33baa544ea9489601810c2210ef8"}, + {file = "grpcio_tools-1.62.2-cp312-cp312-win_amd64.whl", hash = "sha256:759c60f24c33a181bbbc1232a6752f9b49fbb1583312a4917e2b389fea0fb0f2"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-linux_armv7l.whl", hash = "sha256:45db5da2bcfa88f2b86b57ef35daaae85c60bd6754a051d35d9449c959925b57"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-macosx_10_10_universal2.whl", hash = "sha256:ab84bae88597133f6ea7a2bdc57b2fda98a266fe8d8d4763652cbefd20e73ad7"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-manylinux_2_17_aarch64.whl", hash = "sha256:7a49bccae1c7d154b78e991885c3111c9ad8c8fa98e91233de425718f47c6139"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a7e439476b29d6dac363b321781a113794397afceeb97dad85349db5f1cb5e9a"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ea369c4d1567d1acdf69c8ea74144f4ccad9e545df7f9a4fc64c94fa7684ba3"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:4f955702dc4b530696375251319d05223b729ed24e8673c2129f7a75d2caefbb"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:3708a747aa4b6b505727282ca887041174e146ae030ebcadaf4c1d346858df62"}, + {file = "grpcio_tools-1.62.2-cp37-cp37m-win_amd64.whl", hash = "sha256:2ce149ea55eadb486a7fb75a20f63ef3ac065ee6a0240ed25f3549ce7954c653"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-linux_armv7l.whl", hash = "sha256:58cbb24b3fa6ae35aa9c210fcea3a51aa5fef0cd25618eb4fd94f746d5a9b703"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-macosx_10_10_universal2.whl", hash = "sha256:6413581e14a80e0b4532577766cf0586de4dd33766a31b3eb5374a746771c07d"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-manylinux_2_17_aarch64.whl", hash = "sha256:47117c8a7e861382470d0e22d336e5a91fdc5f851d1db44fa784b9acea190d87"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9f1ba79a253df9e553d20319c615fa2b429684580fa042dba618d7f6649ac7e4"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:04a394cf5e51ba9be412eb9f6c482b6270bd81016e033e8eb7d21b8cc28fe8b5"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:3c53b221378b035ae2f1881cbc3aca42a6075a8e90e1a342c2f205eb1d1aa6a1"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:c384c838b34d1b67068e51b5bbe49caa6aa3633acd158f1ab16b5da8d226bc53"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-win32.whl", hash = "sha256:19ea69e41c3565932aa28a202d1875ec56786aea46a2eab54a3b28e8a27f9517"}, + {file = "grpcio_tools-1.62.2-cp38-cp38-win_amd64.whl", hash = "sha256:1d768a5c07279a4c461ebf52d0cec1c6ca85c6291c71ec2703fe3c3e7e28e8c4"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-linux_armv7l.whl", hash = "sha256:5b07b5874187e170edfbd7aa2ca3a54ebf3b2952487653e8c0b0d83601c33035"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-macosx_10_10_universal2.whl", hash = "sha256:d58389fe8be206ddfb4fa703db1e24c956856fcb9a81da62b13577b3a8f7fda7"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:7d8b4e00c3d7237b92260fc18a561cd81f1da82e8be100db1b7d816250defc66"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fe08d2038f2b7c53259b5c49e0ad08c8e0ce2b548d8185993e7ef67e8592cca"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:19216e1fb26dbe23d12a810517e1b3fbb8d4f98b1a3fbebeec9d93a79f092de4"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:b8574469ecc4ff41d6bb95f44e0297cdb0d95bade388552a9a444db9cd7485cd"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4f6f32d39283ea834a493fccf0ebe9cfddee7577bdcc27736ad4be1732a36399"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-win32.whl", hash = "sha256:76eb459bdf3fb666e01883270beee18f3f11ed44488486b61cd210b4e0e17cc1"}, + {file = "grpcio_tools-1.62.2-cp39-cp39-win_amd64.whl", hash = "sha256:217c2ee6a7ce519a55958b8622e21804f6fdb774db08c322f4c9536c35fdce7c"}, +] + +[package.dependencies] +grpcio = ">=1.62.2" +protobuf = ">=4.21.6,<5.0dev" +setuptools = "*" + [[package]] name = "h11" version = "0.14.0" @@ -5174,6 +5274,47 @@ files = [ {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, ] +[[package]] +name = "pycryptodome" +version = "3.20.0" +description = "Cryptographic library for Python" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +files = [ + {file = "pycryptodome-3.20.0-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:f0e6d631bae3f231d3634f91ae4da7a960f7ff87f2865b2d2b831af1dfb04e9a"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:baee115a9ba6c5d2709a1e88ffe62b73ecc044852a925dcb67713a288c4ec70f"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:417a276aaa9cb3be91f9014e9d18d10e840a7a9b9a9be64a42f553c5b50b4d1d"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2a1250b7ea809f752b68e3e6f3fd946b5939a52eaeea18c73bdab53e9ba3c2dd"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-musllinux_1_1_aarch64.whl", hash = "sha256:d5954acfe9e00bc83ed9f5cb082ed22c592fbbef86dc48b907238be64ead5c33"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-win32.whl", hash = "sha256:06d6de87c19f967f03b4cf9b34e538ef46e99a337e9a61a77dbe44b2cbcf0690"}, + {file = "pycryptodome-3.20.0-cp27-cp27m-win_amd64.whl", hash = "sha256:ec0bb1188c1d13426039af8ffcb4dbe3aad1d7680c35a62d8eaf2a529b5d3d4f"}, + {file = "pycryptodome-3.20.0-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:5601c934c498cd267640b57569e73793cb9a83506f7c73a8ec57a516f5b0b091"}, + {file = "pycryptodome-3.20.0-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d29daa681517f4bc318cd8a23af87e1f2a7bad2fe361e8aa29c77d652a065de4"}, + {file = "pycryptodome-3.20.0-cp27-cp27mu-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3427d9e5310af6680678f4cce149f54e0bb4af60101c7f2c16fdf878b39ccccc"}, + {file = "pycryptodome-3.20.0-cp27-cp27mu-musllinux_1_1_aarch64.whl", hash = "sha256:3cd3ef3aee1079ae44afaeee13393cf68b1058f70576b11439483e34f93cf818"}, + {file = "pycryptodome-3.20.0-cp35-abi3-macosx_10_9_universal2.whl", hash = "sha256:ac1c7c0624a862f2e53438a15c9259d1655325fc2ec4392e66dc46cdae24d044"}, + {file = "pycryptodome-3.20.0-cp35-abi3-macosx_10_9_x86_64.whl", hash = "sha256:76658f0d942051d12a9bd08ca1b6b34fd762a8ee4240984f7c06ddfb55eaf15a"}, + {file = "pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f35d6cee81fa145333137009d9c8ba90951d7d77b67c79cbe5f03c7eb74d8fe2"}, + {file = "pycryptodome-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:76cb39afede7055127e35a444c1c041d2e8d2f1f9c121ecef573757ba4cd2c3c"}, + {file = "pycryptodome-3.20.0-cp35-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:49a4c4dc60b78ec41d2afa392491d788c2e06edf48580fbfb0dd0f828af49d25"}, + {file = "pycryptodome-3.20.0-cp35-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:fb3b87461fa35afa19c971b0a2b7456a7b1db7b4eba9a8424666104925b78128"}, + {file = "pycryptodome-3.20.0-cp35-abi3-musllinux_1_1_i686.whl", hash = "sha256:acc2614e2e5346a4a4eab6e199203034924313626f9620b7b4b38e9ad74b7e0c"}, + {file = "pycryptodome-3.20.0-cp35-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:210ba1b647837bfc42dd5a813cdecb5b86193ae11a3f5d972b9a0ae2c7e9e4b4"}, + {file = "pycryptodome-3.20.0-cp35-abi3-win32.whl", hash = "sha256:8d6b98d0d83d21fb757a182d52940d028564efe8147baa9ce0f38d057104ae72"}, + {file = "pycryptodome-3.20.0-cp35-abi3-win_amd64.whl", hash = "sha256:9b3ae153c89a480a0ec402e23db8d8d84a3833b65fa4b15b81b83be9d637aab9"}, + {file = "pycryptodome-3.20.0-pp27-pypy_73-manylinux2010_x86_64.whl", hash = "sha256:4401564ebf37dfde45d096974c7a159b52eeabd9969135f0426907db367a652a"}, + {file = "pycryptodome-3.20.0-pp27-pypy_73-win32.whl", hash = "sha256:ec1f93feb3bb93380ab0ebf8b859e8e5678c0f010d2d78367cf6bc30bfeb148e"}, + {file = "pycryptodome-3.20.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:acae12b9ede49f38eb0ef76fdec2df2e94aad85ae46ec85be3648a57f0a7db04"}, + {file = "pycryptodome-3.20.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f47888542a0633baff535a04726948e876bf1ed880fddb7c10a736fa99146ab3"}, + {file = "pycryptodome-3.20.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e0e4a987d38cfc2e71b4a1b591bae4891eeabe5fa0f56154f576e26287bfdea"}, + {file = "pycryptodome-3.20.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:c18b381553638414b38705f07d1ef0a7cf301bc78a5f9bc17a957eb19446834b"}, + {file = "pycryptodome-3.20.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:a60fedd2b37b4cb11ccb5d0399efe26db9e0dd149016c1cc6c8161974ceac2d6"}, + {file = "pycryptodome-3.20.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:405002eafad114a2f9a930f5db65feef7b53c4784495dd8758069b89baf68eab"}, + {file = "pycryptodome-3.20.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2ab6ab0cb755154ad14e507d1df72de9897e99fd2d4922851a276ccc14f4f1a5"}, + {file = "pycryptodome-3.20.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:acf6e43fa75aca2d33e93409f2dafe386fe051818ee79ee8a3e21de9caa2ac9e"}, + {file = "pycryptodome-3.20.0.tar.gz", hash = "sha256:09609209ed7de61c2b560cc5c8c4fbf892f8b15b1faf7e4cbffac97db1fffda7"}, +] + [[package]] name = "pydantic" version = "2.7.3" @@ -6784,7 +6925,7 @@ win32 = ["pywin32"] name = "setuptools" version = "70.0.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"}, @@ -7230,7 +7371,7 @@ files = [ name = "thrift" version = "0.16.0" description = "Python bindings for the Apache Thrift RPC system" -optional = true +optional = false python-versions = "*" files = [ {file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"}, @@ -8064,6 +8205,7 @@ decompiler = ["black"] deltalake = ["deltalake"] druid = ["pydruid"] duckdb = ["duckdb"] +e6data = ["e6data-python-connector", "pymysql"] examples = ["fsspec", "pins"] exasol = ["pyexasol"] flink = [] @@ -8085,4 +8227,4 @@ visualization = ["graphviz"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "3613d324a066d1a405963af828ebbcf31f2f9b34584c42e583aa177bbc39b2f2" +content-hash = "4fca2ecc3c23dfadbf5f4c07235c27fa72069b24c756832dc051090ae664e63c" diff --git a/pyproject.toml b/pyproject.toml index dbfbc56187df..5cda49034368 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ shapely = { version = ">=2,<3", optional = true } # issues with versions <3.0.2 snowflake-connector-python = { version = ">=3.0.2,<4,!=3.3.0b1", optional = true } trino = { version = ">=0.321,<1", optional = true } +e6data-python-connector = { version = "2.2.0" } [tool.poetry.group.dev.dependencies] codespell = { version = ">=2.2.6,<3", extras = [ @@ -154,6 +155,7 @@ dask = ["dask", "regex", "packaging"] datafusion = ["datafusion"] druid = ["pydruid"] duckdb = ["duckdb"] +e6data = ["e6data-python-connector","pymysql"] exasol = ["pyexasol"] flink = [] impala = ["impyla"] @@ -182,6 +184,7 @@ dask = "ibis.backends.dask" datafusion = "ibis.backends.datafusion" druid = "ibis.backends.druid" duckdb = "ibis.backends.duckdb" +e6data = "ibis.backends.e6data" exasol = "ibis.backends.exasol" flink = "ibis.backends.flink" impala = "ibis.backends.impala" From 876820b310094a4ff7be3b88bf09652a25ea166f Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Fri, 14 Jun 2024 04:25:59 -0700 Subject: [PATCH 04/18] Update Backend name and add _from_url method --- ibis/backends/e6data/__init__.py | 69 ++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index b8010fb2adbe..fa066bbfeeca 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -36,7 +36,7 @@ import pyarrow as pa class Backend(SQLBackend, CanCreateDatabase): - name = "e6data" + name = "mysql" compiler = E6DataCompiler() supports_create_or_replace = False @@ -44,7 +44,67 @@ class Backend(SQLBackend, CanCreateDatabase): def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) return ".".join(matched.groups()) + def _from_url(self, url: str, **kwargs): + """Connect to a backend using a URL `url`. + Parameters + ---------- + url + URL with which to connect to a backend. + kwargs + Additional keyword arguments + + Returns + ------- + BaseBackend + A backend instance + + """ + + url = urlparse(url) + database, *_ = url.path[1:].split("/", 1) + query_params = parse_qs(url.query) + connect_args = { + "user": url.username, + "password": url.password or "", + "host": url.hostname, + "database": database or "", + "catalog_name": "", + } + + for name, value in query_params.items(): + if len(value) > 1: + connect_args[name] = value + elif len(value) == 1: + connect_args[name] = value[0] + else: + raise com.IbisError(f"Invalid URL parameter: {name}") + + kwargs.update(connect_args) + self._convert_kwargs(kwargs) + + if "user" in kwargs and not kwargs["user"]: + del kwargs["user"] + + if "host" in kwargs and not kwargs["host"]: + del kwargs["host"] + + if "database" in kwargs and not kwargs["database"]: + del kwargs["database"] + + if "password" in kwargs and kwargs["password"] is None: + del kwargs["password"] + + if "catalog_name" in kwargs and not kwargs["catalog_name"]: + del kwargs["catalog_name"] + + return self.connect(**kwargs) + + @cached_property + def version(self): + matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) + return ".".join(matched.groups()) + def do_connect( self, host: str, @@ -88,14 +148,15 @@ def do_connect( >>> t """ - self._connection = Connection( + self.con = Connection( host=host, port=port, username=username, password=password, - database=database + database=database, + catalog=catalog_name, ) - self.catalog_name = catalog_name + @property def current_database(self) -> str: From ccec1e079be308c5905b2a3a645c1867083fb7e4 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Fri, 14 Jun 2024 04:26:35 -0700 Subject: [PATCH 05/18] Remove unnecessary transaction rollback in Backend class for E6data since it doesn't support transactions. --- ibis/backends/e6data/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index fa066bbfeeca..2a8382cda40e 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -244,7 +244,6 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: try: cursor.execute(query, **kwargs) except Exception: - con.rollback() cursor.close() raise else: From 6b762f4ab2348995f09c7f251b860bfda2fab9a7 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 24 Jun 2024 17:10:44 +0530 Subject: [PATCH 06/18] Refactor database listing and query execution --- ibis/backends/e6data/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index 2a8382cda40e..2366b4d8fe56 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -165,9 +165,9 @@ def current_database(self) -> str: return database def list_databases(self, like: str | None = None) -> list[str]: - # In MySQL, "database" and "schema" are synonymous - with self._safe_raw_sql("SHOW DATABASES") as cur: - databases = list(map(itemgetter(0), cur.fetchall())) + # In MySQL syntax, "database" and "schema" are synonymous + + databases = self.con.get_schema_names() return self._filter_with_like(databases, like) def _get_schema_using_query(self, query: str) -> sch.Schema: @@ -220,10 +220,7 @@ def begin(self): try: yield cur except Exception: - con.rollback() raise - else: - con.commit() finally: cur.close() @@ -242,6 +239,7 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: cursor = con.cursor() try: + print("Query: ", query) cursor.execute(query, **kwargs) except Exception: cursor.close() @@ -319,6 +317,7 @@ def list_tables( .sql(self.name) ) + print("SQL List Tables: ", sql) with self._safe_raw_sql(sql) as cur: out = cur.fetchall() From 623c0064cd013478f097fb69e479c49988bac1c9 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Thu, 4 Jul 2024 12:58:50 +0530 Subject: [PATCH 07/18] Adapt E6Data backend for Ibis integration - Add support for catalog, secure connection, auto-resume, and cluster UUID - Implement custom table() method to handle catalog and database hierarchy - Modify get_schema() to use E6Data-specific column information - Adjust execute() method for E6Data compatibility - Update _fetch_from_cursor() to handle E6Data result format --- ibis/backends/e6data/__init__.py | 91 ++++++++++++++++++++++++++------ 1 file changed, 76 insertions(+), 15 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index 2366b4d8fe56..33c2dd267592 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -36,7 +36,7 @@ import pyarrow as pa class Backend(SQLBackend, CanCreateDatabase): - name = "mysql" + name = "e6data" compiler = E6DataCompiler() supports_create_or_replace = False @@ -69,7 +69,10 @@ def _from_url(self, url: str, **kwargs): "password": url.password or "", "host": url.hostname, "database": database or "", - "catalog_name": "", + "catalog_name": url.catalog or "", + "secure": url.secure == 'true', + "auto_resume": query_params.get('auto-resume', [False])[0] == 'true', + "cluster_uuid": query_params.get('cluster-uuid', [None])[0] or "", } for name, value in query_params.items(): @@ -182,23 +185,67 @@ def _get_schema_using_query(self, query: str) -> sch.Schema: finally: cur.execute(f"DROP TABLE {table}") + def table( + self, + name: str, + database: tuple[str, str] | str | None = None + ) -> ir.Table: + """Construct a table expression. + + Parameters + ---------- + name : str + Table name + database : tuple[str, str] | str | None, optional + Database name. If not provided, the current database is used. + For backends that support multi-level table hierarchies, you can + pass in a dotted string path like "catalog.database" or a tuple of + strings like ("catalog", "database"). + + Returns + ------- + ir.Table + Table expression + """ + catalog = None + db = None + + print("inside e6 table") + if isinstance(database, tuple): + print("inside tuple") + catalog, db = database + elif isinstance(database, str): + if '.' in database: + catalog, db = database.split('.', 1) + else: + db = database + + print("Name: %s, Catalog: %s, Database: %s", name, catalog, db) + table_schema = self.get_schema(name, catalog=catalog, database=db) + + print('ops namespace++++++++++++++') + print(ops.Namespace(catalog=catalog, database=db)) + return ops.DatabaseTable( + name, + schema=table_schema, + source=self, + namespace=ops.Namespace(catalog=catalog, database=db), + ).to_expr() + def get_schema( self, name: str, *, catalog: str | None = None, database: str | None = None ) -> sch.Schema: - table = sg.table(name, db=database, catalog=catalog, quoted=True).sql(self.name) - - with self.begin() as cur: - cur.execute(f"DESCRIBE {table}") - result = cur.fetchall() - + # print db, catalog, table + print("Table: %s , Catalog: %s, database: %s", name, catalog, database) + columns = self.con.get_columns(database=database, catalog=catalog, table=name) type_mapper = self.compiler.type_mapper fields = { - name: type_mapper.from_string(type_string, nullable=is_nullable == "YES") - for name, type_string, is_nullable, *_ in result + column["fieldName"]: type_mapper.from_string(column["fieldType"], nullable=True) + for column in columns } - + print(fields) return sch.Schema(fields) - + def create_database(self, name: str, force: bool = False) -> None: sql = sge.Create(kind="DATABASE", exist=force, this=sg.to_identifier(name)).sql( self.name @@ -327,15 +374,25 @@ def execute( self, expr: ir.Expr, limit: str | None = "default", **kwargs: Any ) -> Any: """Execute an expression.""" - + print("step 1") + print(expr) self._run_pre_execute_hooks(expr) table = expr.as_table() + print("step 2") + print(table) sql = self.compile(table, limit=limit, **kwargs) + print("---sql") + print(sql) schema = table.schema() - + print("++schema") + print(schema) with self._safe_raw_sql(sql) as cur: + print("cur-------") + print(cur) result = self._fetch_from_cursor(cur, schema) + print("successfull execution========") + print(result) return expr.__pandas_result__(result) def create_table( @@ -521,9 +578,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: from ibis.backends.mysql.converter import MySQLPandasData + print("passed data---------") + #print(cursor.fetchall()) + print("after fetchall") + print(cursor) try: df = pd.DataFrame.from_records( - cursor, columns=schema.names, coerce_float=True + cursor.fetchall(), columns=schema.names, coerce_float=True ) except Exception: # clean up the cursor if we fail to create the DataFrame From 942277eeb4b593a3588cef59e824a3e06bea564f Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Thu, 4 Jul 2024 16:37:56 +0530 Subject: [PATCH 08/18] Add E6data dialect to SQL generator - Customize Tokenizer to use double quotes for identifiers - Modify Generator to map VARCHAR, CHAR, and TEXT to STRING - Add custom TRANSFORMS for concat and length functions --- ibis/backends/sql/dialects.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index b7166deeb90a..6a2968ef8aa4 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -399,6 +399,7 @@ class Generator(Postgres.Generator): JSON_TYPE_REQUIRED_FOR_EXTRACTION = True SUPPORTS_UNLOGGED_TABLES = True + TYPE_MAPPING = Postgres.Generator.TYPE_MAPPING.copy() | { sge.DataType.Type.TIMESTAMPTZ: "TIMESTAMPTZ" } @@ -427,3 +428,18 @@ def make_cross_joins_explicit(node): sge.Join: transforms.preprocess([make_cross_joins_explicit]), sge.LastValue: rename_func("last_value"), } + +class E6data(MySQL): + class Tokenizer(MySQL.Tokenizer): + IDENTIFIERS = ['"'] + class Generator(MySQL.Generator): + TYPE_MAPPING = { + sge.DataType.Type.VARCHAR: "STRING", + sge.DataType.Type.CHAR: "STRING", + sge.DataType.Type.TEXT: "STRING", + } + + TRANSFORMS = { + sge.Concat: lambda self, e: f"concat({self.sql(e.left)}, {self.sql(e.right)})", + sge.Length: rename_func("length"), + } \ No newline at end of file From 2332c5ba6a58282308d0614a694ca7530baf419a Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Thu, 4 Jul 2024 16:39:23 +0530 Subject: [PATCH 09/18] Adding E6Datatype class. --- ibis/backends/sql/datatypes.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ibis/backends/sql/datatypes.py b/ibis/backends/sql/datatypes.py index 30932a65ffbb..1ee4fa37f34d 100644 --- a/ibis/backends/sql/datatypes.py +++ b/ibis/backends/sql/datatypes.py @@ -1110,3 +1110,6 @@ def _from_ibis_Map(cls, dtype: dt.Map) -> sge.DataType: ], nested=True, ) + +class E6DataType(MySQLType): + dialect = "e6data" From 6436b16088d09bbf62b97e021902254db18d8556 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Thu, 4 Jul 2024 16:40:26 +0530 Subject: [PATCH 10/18] Update E6DataCompiler to use E6data dialect and E6DataType - Add E6data dialect - Use E6DataType for type mapping - Retain existing rewrites including custom limit rewrite - Keep other compiler configurations unchanged --- ibis/backends/e6data/compiler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ibis/backends/e6data/compiler.py b/ibis/backends/e6data/compiler.py index dd93f2d6c771..76a7458f842f 100644 --- a/ibis/backends/e6data/compiler.py +++ b/ibis/backends/e6data/compiler.py @@ -11,8 +11,8 @@ import ibis.expr.datatypes as dt import ibis.expr.operations as ops from ibis.backends.sql.compiler import NULL, STAR, SQLGlotCompiler -from ibis.backends.sql.datatypes import MySQLType -from ibis.backends.sql.dialects import MySQL +from ibis.backends.sql.datatypes import MySQLType, E6DataType +from ibis.backends.sql.dialects import MySQL, E6data from ibis.backends.sql.rewrites import ( exclude_unsupported_window_frame_from_ops, exclude_unsupported_window_frame_from_rank, @@ -45,8 +45,8 @@ def rewrite_limit(_, **kwargs): class E6DataCompiler(SQLGlotCompiler): __slots__ = () - dialect = MySQL - type_mapper = MySQLType + dialect = E6data + type_mapper = E6DataType rewrites = ( rewrite_limit, exclude_unsupported_window_frame_from_ops, From 76aa6877749db974268e6d6d6e61239f008eea3b Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Thu, 4 Jul 2024 16:43:36 +0530 Subject: [PATCH 11/18] remove debug print statements. --- ibis/backends/e6data/__init__.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index 33c2dd267592..74cfc6cc1830 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -210,9 +210,7 @@ def table( catalog = None db = None - print("inside e6 table") if isinstance(database, tuple): - print("inside tuple") catalog, db = database elif isinstance(database, str): if '.' in database: @@ -220,11 +218,8 @@ def table( else: db = database - print("Name: %s, Catalog: %s, Database: %s", name, catalog, db) table_schema = self.get_schema(name, catalog=catalog, database=db) - print('ops namespace++++++++++++++') - print(ops.Namespace(catalog=catalog, database=db)) return ops.DatabaseTable( name, schema=table_schema, @@ -236,7 +231,6 @@ def get_schema( self, name: str, *, catalog: str | None = None, database: str | None = None ) -> sch.Schema: # print db, catalog, table - print("Table: %s , Catalog: %s, database: %s", name, catalog, database) columns = self.con.get_columns(database=database, catalog=catalog, table=name) type_mapper = self.compiler.type_mapper fields = { @@ -286,7 +280,6 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: cursor = con.cursor() try: - print("Query: ", query) cursor.execute(query, **kwargs) except Exception: cursor.close() @@ -364,7 +357,6 @@ def list_tables( .sql(self.name) ) - print("SQL List Tables: ", sql) with self._safe_raw_sql(sql) as cur: out = cur.fetchall() @@ -374,25 +366,13 @@ def execute( self, expr: ir.Expr, limit: str | None = "default", **kwargs: Any ) -> Any: """Execute an expression.""" - print("step 1") - print(expr) self._run_pre_execute_hooks(expr) table = expr.as_table() - print("step 2") - print(table) sql = self.compile(table, limit=limit, **kwargs) - print("---sql") - print(sql) schema = table.schema() - print("++schema") - print(schema) with self._safe_raw_sql(sql) as cur: - print("cur-------") - print(cur) result = self._fetch_from_cursor(cur, schema) - print("successfull execution========") - print(result) return expr.__pandas_result__(result) def create_table( @@ -578,10 +558,6 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: from ibis.backends.mysql.converter import MySQLPandasData - print("passed data---------") - #print(cursor.fetchall()) - print("after fetchall") - print(cursor) try: df = pd.DataFrame.from_records( cursor.fetchall(), columns=schema.names, coerce_float=True From c4c26447006774e47dc8ebf39243ee981b41baab Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 09:05:26 +0530 Subject: [PATCH 12/18] Update e6data-python-connector version in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 136491673b09..145e39c62c06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,7 +87,7 @@ shapely = { version = ">=2,<3", optional = true } # issues with versions <3.0.2 snowflake-connector-python = { version = ">=3.0.2,<4,!=3.3.0b1", optional = true } trino = { version = ">=0.321,<1", optional = true } -e6data-python-connector = { version = "2.2.0" } +e6data-python-connector = { version = ">=2.2.0,<3", optional = true } [tool.poetry.group.dev.dependencies] codespell = { version = ">=2.2.6,<3", extras = [ From 1ba682e247f1265a9d22b28c11febdafbef8c680 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 09:07:03 +0530 Subject: [PATCH 13/18] Remove empty line in class definition --- ibis/backends/sql/dialects.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index 9e2efb128f90..e6701fee6221 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -403,7 +403,6 @@ class Generator(Postgres.Generator): JSON_TYPE_REQUIRED_FOR_EXTRACTION = True SUPPORTS_UNLOGGED_TABLES = True - TYPE_MAPPING = Postgres.Generator.TYPE_MAPPING.copy() | { sge.DataType.Type.TIMESTAMPTZ: "TIMESTAMPTZ" } From daef522768d63ff0a4be263a1e4a7e5bf66ee30a Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 09:10:01 +0530 Subject: [PATCH 14/18] Removing the tests till they are implemented correctly. --- ibis/backends/e6data/tests/__init__.py | 0 ibis/backends/e6data/tests/conftest.py | 79 -------- ibis/backends/e6data/tests/test_client.py | 211 ---------------------- 3 files changed, 290 deletions(-) delete mode 100644 ibis/backends/e6data/tests/__init__.py delete mode 100644 ibis/backends/e6data/tests/conftest.py delete mode 100644 ibis/backends/e6data/tests/test_client.py diff --git a/ibis/backends/e6data/tests/__init__.py b/ibis/backends/e6data/tests/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/ibis/backends/e6data/tests/conftest.py b/ibis/backends/e6data/tests/conftest.py deleted file mode 100644 index f7c463048767..000000000000 --- a/ibis/backends/e6data/tests/conftest.py +++ /dev/null @@ -1,79 +0,0 @@ -from __future__ import annotations - -import os -from typing import TYPE_CHECKING, Any - -import pytest - -import ibis -from ibis.backends.conftest import TEST_TABLES -from ibis.backends.tests.base import ServiceBackendTest - -if TYPE_CHECKING: - from collections.abc import Iterable - from pathlib import Path - -MYSQL_USER = os.environ.get("IBIS_TEST_MYSQL_USER", "ibis") -MYSQL_PASS = os.environ.get("IBIS_TEST_MYSQL_PASSWORD", "ibis") -MYSQL_HOST = os.environ.get("IBIS_TEST_MYSQL_HOST", "localhost") -MYSQL_PORT = int(os.environ.get("IBIS_TEST_MYSQL_PORT", 3306)) -IBIS_TEST_MYSQL_DB = os.environ.get("IBIS_TEST_MYSQL_DATABASE", "ibis_testing") - - -class TestConf(ServiceBackendTest): - # mysql has the same rounding behavior as postgres - check_dtype = False - returned_timestamp_unit = "s" - supports_arrays = False - native_bool = False - supports_structs = False - rounding_method = "half_to_even" - service_name = "mysql" - deps = ("pymysql",) - - @property - def test_files(self) -> Iterable[Path]: - return self.data_dir.joinpath("csv").glob("*.csv") - - def _load_data(self, **kwargs: Any) -> None: - """Load test data into a MySql backend instance. - - Parameters - ---------- - data_dir - Location of testdata - script_dir - Location of scripts defining schemas - """ - super()._load_data(**kwargs) - - with self.connection.begin() as cur: - for table in TEST_TABLES: - csv_path = self.data_dir / "csv" / f"{table}.csv" - lines = [ - f"LOAD DATA LOCAL INFILE {str(csv_path)!r}", - f"INTO TABLE {table}", - "COLUMNS TERMINATED BY ','", - """OPTIONALLY ENCLOSED BY '"'""", - "LINES TERMINATED BY '\\n'", - "IGNORE 1 LINES", - ] - cur.execute("\n".join(lines)) - - @staticmethod - def connect(*, tmpdir, worker_id, **kw): - return ibis.mysql.connect( - host=MYSQL_HOST, - user=MYSQL_USER, - password=MYSQL_PASS, - database=IBIS_TEST_MYSQL_DB, - port=MYSQL_PORT, - local_infile=1, - autocommit=True, - **kw, - ) - - -@pytest.fixture(scope="session") -def con(tmp_path_factory, data_dir, worker_id): - return TestConf.load_data(data_dir, tmp_path_factory, worker_id).connection diff --git a/ibis/backends/e6data/tests/test_client.py b/ibis/backends/e6data/tests/test_client.py deleted file mode 100644 index ee7031d6d85b..000000000000 --- a/ibis/backends/e6data/tests/test_client.py +++ /dev/null @@ -1,211 +0,0 @@ -from __future__ import annotations - -from datetime import date -from operator import methodcaller - -import pandas as pd -import pandas.testing as tm -import pytest -import sqlglot as sg -from pytest import param - -import ibis -import ibis.expr.datatypes as dt -from ibis import udf -from ibis.util import gen_name - -MYSQL_TYPES = [ - param("tinyint", dt.int8, id="tinyint"), - param("int1", dt.int8, id="int1"), - param("boolean", dt.int8, id="boolean"), - param("smallint", dt.int16, id="smallint"), - param("int2", dt.int16, id="int2"), - # ("mediumint", dt.int32), => https://github.com/tobymao/sqlglot/issues/2109 - # ("int3", dt.int32), => https://github.com/tobymao/sqlglot/issues/2109 - param("int", dt.int32, id="int"), - param("int4", dt.int32, id="int4"), - param("integer", dt.int32, id="integer"), - param("bigint", dt.int64, id="bigint"), - param("decimal", dt.Decimal(10, 0), id="decimal"), - param("decimal(5, 2)", dt.Decimal(5, 2), id="decimal_5_2"), - param("dec", dt.Decimal(10, 0), id="dec"), - param("numeric", dt.Decimal(10, 0), id="numeric"), - param("fixed", dt.Decimal(10, 0), id="fixed"), - param("float", dt.float32, id="float"), - param("double", dt.float64, id="double"), - param("timestamp", dt.Timestamp("UTC"), id="timestamp"), - param("date", dt.date, id="date"), - param("time", dt.time, id="time"), - param("datetime", dt.timestamp, id="datetime"), - param("year", dt.int8, id="year"), - param("char(32)", dt.string, id="char"), - param("char byte", dt.binary, id="char_byte"), - param("varchar(42)", dt.string, id="varchar"), - param("mediumtext", dt.string, id="mediumtext"), - param("text", dt.string, id="text"), - param("binary(42)", dt.binary, id="binary"), - param("varbinary(42)", dt.binary, id="varbinary"), - param("bit(1)", dt.int8, id="bit_1"), - param("bit(9)", dt.int16, id="bit_9"), - param("bit(17)", dt.int32, id="bit_17"), - param("bit(33)", dt.int64, id="bit_33"), - # mariadb doesn't have a distinct json type - param("json", dt.string, id="json"), - param("enum('small', 'medium', 'large')", dt.string, id="enum"), - param("inet6", dt.inet, id="inet"), - param("set('a', 'b', 'c', 'd')", dt.Array(dt.string), id="set"), - param("mediumblob", dt.binary, id="mediumblob"), - param("blob", dt.binary, id="blob"), - param("uuid", dt.uuid, id="uuid"), -] + [ - param( - f"datetime({scale:d})", - dt.Timestamp(scale=scale or None), - id=f"datetime{scale:d}", - ) - for scale in range(7) -] - - -@pytest.mark.parametrize(("mysql_type", "expected_type"), MYSQL_TYPES) -def test_get_schema_from_query(con, mysql_type, expected_type): - raw_name = ibis.util.guid() - name = sg.to_identifier(raw_name, quoted=True).sql("mysql") - expected_schema = ibis.schema(dict(x=expected_type)) - - # temporary tables get cleaned up by the db when the session ends, so we - # don't need to explicitly drop the table - with con.begin() as c: - c.execute(f"CREATE TEMPORARY TABLE {name} (x {mysql_type})") - - result_schema = con._get_schema_using_query(f"SELECT * FROM {name}") - assert result_schema == expected_schema - - t = con.table(raw_name) - assert t.schema() == expected_schema - - -@pytest.mark.parametrize("coltype", ["TINYBLOB", "MEDIUMBLOB", "BLOB", "LONGBLOB"]) -def test_blob_type(con, coltype): - tmp = f"tmp_{ibis.util.guid()}" - with con.begin() as c: - c.execute(f"CREATE TEMPORARY TABLE {tmp} (a {coltype})") - t = con.table(tmp) - assert t.schema() == ibis.schema({"a": dt.binary}) - - -@pytest.fixture(scope="session") -def tmp_t(con): - with con.begin() as c: - c.execute("CREATE TABLE IF NOT EXISTS test_schema.t (x INET6)") - yield "t" - with con.begin() as c: - c.execute("DROP TABLE IF EXISTS test_schema.t") - - -def test_get_schema_from_query_other_schema(con, tmp_t): - t = con.table(tmp_t, database="test_schema") - assert t.schema() == ibis.schema({"x": dt.inet}) - - -def test_zero_timestamp_data(con): - sql = """ - CREATE TEMPORARY TABLE ztmp_date_issue - ( - name CHAR(10) NULL, - tradedate DATETIME NOT NULL, - date DATETIME NULL - ) - """ - with con.begin() as c: - c.execute(sql) - c.execute( - """ - INSERT INTO ztmp_date_issue VALUES - ('C', '2018-10-22', 0), - ('B', '2017-06-07', 0), - ('C', '2022-12-21', 0) - """ - ) - t = con.table("ztmp_date_issue") - result = t.execute() - expected = pd.DataFrame( - { - "name": ["C", "B", "C"], - "tradedate": pd.to_datetime( - [date(2018, 10, 22), date(2017, 6, 7), date(2022, 12, 21)] - ), - "date": [pd.NaT, pd.NaT, pd.NaT], - } - ) - tm.assert_frame_equal(result, expected) - - -@pytest.fixture(scope="module") -def enum_t(con): - name = gen_name("mysql_enum_test") - with con.begin() as cur: - cur.execute( - f"CREATE TEMPORARY TABLE {name} (sml ENUM('small', 'medium', 'large'))" - ) - cur.execute(f"INSERT INTO {name} VALUES ('small')") - - yield con.table(name) - con.drop_table(name, force=True) - - -@pytest.mark.parametrize( - ("expr_fn", "expected"), - [ - (methodcaller("startswith", "s"), pd.Series([True], name="sml")), - (methodcaller("endswith", "m"), pd.Series([False], name="sml")), - (methodcaller("re_search", "mall"), pd.Series([True], name="sml")), - (methodcaller("lstrip"), pd.Series(["small"], name="sml")), - (methodcaller("rstrip"), pd.Series(["small"], name="sml")), - (methodcaller("strip"), pd.Series(["small"], name="sml")), - ], - ids=["startswith", "endswith", "re_search", "lstrip", "rstrip", "strip"], -) -def test_enum_as_string(enum_t, expr_fn, expected): - expr = expr_fn(enum_t.sml).name("sml") - res = expr.execute() - tm.assert_series_equal(res, expected) - - -def test_builtin_scalar_udf(con): - @udf.scalar.builtin - def soundex(a: str) -> str: - """Soundex of a string.""" - - expr = soundex("foo") - result = con.execute(expr) - assert result == "F000" - - -def test_builtin_agg_udf(con): - @udf.agg.builtin - def json_arrayagg(a) -> str: - """Glom together some JSON.""" - - ft = con.tables.functional_alltypes[:5] - expr = json_arrayagg(ft.string_col) - result = expr.execute() - expected = '["0","1","2","3","4"]' - assert result == expected - - -def test_list_tables_schema_warning_refactor(con): - mysql_tables = { - "column_stats", - "columns_priv", - "db", - "event", - "func", - } - assert con.list_tables() - - with pytest.warns(FutureWarning): - assert mysql_tables.issubset(con.list_tables(schema="mysql")) - - assert mysql_tables.issubset(con.list_tables(database="mysql")) - assert mysql_tables.issubset(con.list_tables(database=("mysql",))) From 2361d7eefc644200e9573cdee3062e83f03e76b5 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 13:00:45 +0530 Subject: [PATCH 15/18] Refactor E6data backend to inherit from MySQLBackend - Change Backend class to inherit from MySQLBackend instead of SQLBackend - Remove unnecessary imports and methods duplicated in MySQLBackend - Update connection handling to use E6data_python_connector - Modify schema and table retrieval methods for E6data compatibility - Replace MySQLPandasData with E6DataPandasData for data conversion - Clean up and streamline code, removing print statements and unused methods --- ibis/backends/e6data/__init__.py | 155 ++++++------------------------- 1 file changed, 28 insertions(+), 127 deletions(-) diff --git a/ibis/backends/e6data/__init__.py b/ibis/backends/e6data/__init__.py index 74cfc6cc1830..e8e47ed9aeac 100644 --- a/ibis/backends/e6data/__init__.py +++ b/ibis/backends/e6data/__init__.py @@ -12,7 +12,6 @@ from urllib.parse import parse_qs, urlparse import numpy as np -import pymysql import sqlglot as sg import sqlglot.expressions as sge @@ -23,8 +22,8 @@ import ibis.expr.types as ir from ibis import util from ibis.backends import CanCreateDatabase -from ibis.backends.e6data.compiler import E6DataCompiler -from ibis.backends.sql import SQLBackend +from ibis.backends.e6data.compiler import E6DataCompiler +from ibis.backends.mysql import Backend as MySQLBackend from ibis.backends.sql.compiler import TRUE, C from e6data_python_connector import Connection @@ -35,7 +34,8 @@ import polars as pl import pyarrow as pa -class Backend(SQLBackend, CanCreateDatabase): + +class Backend(MySQLBackend, CanCreateDatabase): name = "e6data" compiler = E6DataCompiler() supports_create_or_replace = False @@ -44,6 +44,7 @@ class Backend(SQLBackend, CanCreateDatabase): def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) return ".".join(matched.groups()) + def _from_url(self, url: str, **kwargs): """Connect to a backend using a URL `url`. @@ -97,7 +98,7 @@ def _from_url(self, url: str, **kwargs): if "password" in kwargs and kwargs["password"] is None: del kwargs["password"] - + if "catalog_name" in kwargs and not kwargs["catalog_name"]: del kwargs["catalog_name"] @@ -107,7 +108,7 @@ def _from_url(self, url: str, **kwargs): def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version) return ".".join(matched.groups()) - + def do_connect( self, host: str, @@ -134,7 +135,7 @@ def do_connect( Catalog name kwargs Additional keyword arguments - + Examples -------- @@ -149,7 +150,7 @@ def do_connect( [...] >>> t = con.table("functional_alltypes") >>> t - + """ self.con = Connection( host=host, @@ -159,7 +160,6 @@ def do_connect( database=database, catalog=catalog_name, ) - @property def current_database(self) -> str: @@ -169,8 +169,8 @@ def current_database(self) -> str: def list_databases(self, like: str | None = None) -> list[str]: # In MySQL syntax, "database" and "schema" are synonymous - - databases = self.con.get_schema_names() + + databases = self.con.get_schema_names() return self._filter_with_like(databases, like) def _get_schema_using_query(self, query: str) -> sch.Schema: @@ -226,44 +226,20 @@ def table( source=self, namespace=ops.Namespace(catalog=catalog, database=db), ).to_expr() - + def get_schema( self, name: str, *, catalog: str | None = None, database: str | None = None ) -> sch.Schema: # print db, catalog, table - columns = self.con.get_columns(database=database, catalog=catalog, table=name) + columns = self.con.get_columns( + database=database, catalog=catalog, table=name) type_mapper = self.compiler.type_mapper fields = { - column["fieldName"]: type_mapper.from_string(column["fieldType"], nullable=True) + column["fieldName"]: type_mapper.from_string( + column["fieldType"], nullable=True) for column in columns } - print(fields) return sch.Schema(fields) - - def create_database(self, name: str, force: bool = False) -> None: - sql = sge.Create(kind="DATABASE", exist=force, this=sg.to_identifier(name)).sql( - self.name - ) - with self.begin() as cur: - cur.execute(sql) - - def drop_database(self, name: str, force: bool = False) -> None: - sql = sge.Drop(kind="DATABASE", exist=force, this=sg.to_identifier(name)).sql( - self.name - ) - with self.begin() as cur: - cur.execute(sql) - - @contextlib.contextmanager - def begin(self): - con = self.con - cur = con.cursor() - try: - yield cur - except Exception: - raise - finally: - cur.close() # TODO(kszucs): should make it an abstract method or remove the use of it # from .execute() @@ -346,7 +322,8 @@ def list_tables( sg_cat.args["quoted"] = False if (sg_db := table_loc.args["db"]) is not None: sg_db.args["quoted"] = False - conditions = [C.table_schema.eq(sge.convert(table_loc.sql(self.name)))] + conditions = [C.table_schema.eq( + sge.convert(table_loc.sql(self.name)))] col = "table_name" sql = ( @@ -437,7 +414,8 @@ def create_table( else: temp_name = name - table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) + table = sg.table(temp_name, catalog=database, + quoted=self.compiler.quoted) target = sge.Schema(this=table, expressions=column_defs) create_stmt = sge.Create( @@ -449,12 +427,14 @@ def create_table( this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.name) + insert_stmt = sge.Insert( + this=table, expression=query).sql(self.name) cur.execute(insert_stmt) if overwrite: cur.execute( - sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) + sge.Drop(kind="TABLE", this=this, + exists=True).sql(self.name) ) cur.execute( f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}" @@ -470,93 +450,14 @@ def create_table( # preserve the input schema if it was provided return ops.DatabaseTable( - name, schema=schema, source=self, namespace=ops.Namespace(database=database) + name, schema=schema, source=self, namespace=ops.Namespace( + database=database) ).to_expr() - def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: - schema = op.schema - if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]: - raise com.IbisTypeError( - "MySQL cannot yet reliably handle `null` typed columns; " - f"got null typed columns: {null_columns}" - ) - - # only register if we haven't already done so - if (name := op.name) not in self.list_tables(): - quoted = self.compiler.quoted - column_defs = [ - sg.exp.ColumnDef( - this=sg.to_identifier(colname, quoted=quoted), - kind=self.compiler.type_mapper.from_ibis(typ), - constraints=( - None - if typ.nullable - else [ - sg.exp.ColumnConstraint( - kind=sg.exp.NotNullColumnConstraint() - ) - ] - ), - ) - for colname, typ in schema.items() - ] - - create_stmt = sg.exp.Create( - kind="TABLE", - this=sg.exp.Schema( - this=sg.to_identifier(name, quoted=quoted), expressions=column_defs - ), - properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]), - ) - create_stmt_sql = create_stmt.sql(self.name) - - columns = schema.keys() - df = op.data.to_frame() - # nan can not be used with MySQL - df = df.replace(np.nan, None) - - data = df.itertuples(index=False) - cols = ", ".join( - ident.sql(self.name) - for ident in map(partial(sg.to_identifier, quoted=quoted), columns) - ) - specs = ", ".join(repeat("%s", len(columns))) - table = sg.table(name, quoted=quoted) - sql = f"INSERT INTO {table.sql(self.name)} ({cols}) VALUES ({specs})" - with self.begin() as cur: - cur.execute(create_stmt_sql) - - if not df.empty: - cur.executemany(sql, data) - - @util.experimental - def to_pyarrow_batches( - self, - expr: ir.Expr, - *, - params: Mapping[ir.Scalar, Any] | None = None, - limit: int | str | None = None, - chunk_size: int = 1_000_000, - **_: Any, - ) -> pa.ipc.RecordBatchReader: - import pyarrow as pa - - self._run_pre_execute_hooks(expr) - - schema = expr.as_table().schema() - with self._safe_raw_sql( - self.compile(expr, limit=limit, params=params) - ) as cursor: - df = self._fetch_from_cursor(cursor, schema) - table = pa.Table.from_pandas( - df, schema=schema.to_pyarrow(), preserve_index=False - ) - return table.to_reader(max_chunksize=chunk_size) - def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: import pandas as pd - from ibis.backends.mysql.converter import MySQLPandasData + from ibis.backends.e6data.converter import E6DataPandasData try: df = pd.DataFrame.from_records( @@ -569,5 +470,5 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: # artificially locked tables cursor.close() raise - df = MySQLPandasData.convert_table(df, schema) + df = E6DataPandasData.convert_table(df, schema) return df From 7ce06cf48a501e9da7133afcc43d8031a80590ef Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 13:01:36 +0530 Subject: [PATCH 16/18] Inherting compiler operations from MySQL. --- ibis/backends/e6data/compiler.py | 335 +------------------------------ 1 file changed, 4 insertions(+), 331 deletions(-) diff --git a/ibis/backends/e6data/compiler.py b/ibis/backends/e6data/compiler.py index 76a7458f842f..5b9d8394d4cd 100644 --- a/ibis/backends/e6data/compiler.py +++ b/ibis/backends/e6data/compiler.py @@ -1,24 +1,11 @@ from __future__ import annotations import string -from functools import partial, reduce - -import sqlglot as sg -import sqlglot.expressions as sge from public import public -import ibis.common.exceptions as com -import ibis.expr.datatypes as dt -import ibis.expr.operations as ops -from ibis.backends.sql.compiler import NULL, STAR, SQLGlotCompiler -from ibis.backends.sql.datatypes import MySQLType, E6DataType -from ibis.backends.sql.dialects import MySQL, E6data -from ibis.backends.sql.rewrites import ( - exclude_unsupported_window_frame_from_ops, - exclude_unsupported_window_frame_from_rank, - exclude_unsupported_window_frame_from_row_number, - rewrite_empty_order_by_window, -) +from ibis.backends.mysql.compiler import MySQLCompiler +from ibis.backends.sql.datatypes import E6DataType +from ibis.backends.sql.dialects import E6data from ibis.common.patterns import replace from ibis.expr.rewrites import p @@ -42,322 +29,8 @@ def rewrite_limit(_, **kwargs): @public -class E6DataCompiler(SQLGlotCompiler): +class E6DataCompiler(MySQLCompiler): __slots__ = () dialect = E6data type_mapper = E6DataType - rewrites = ( - rewrite_limit, - exclude_unsupported_window_frame_from_ops, - exclude_unsupported_window_frame_from_rank, - exclude_unsupported_window_frame_from_row_number, - rewrite_empty_order_by_window, - *SQLGlotCompiler.rewrites, - ) - - @property - def NAN(self): - raise NotImplementedError("MySQL does not support NaN") - - @property - def POS_INF(self): - raise NotImplementedError("MySQL does not support Infinity") - - NEG_INF = POS_INF - UNSUPPORTED_OPS = ( - ops.ApproxMedian, - ops.ArgMax, - ops.ArgMin, - ops.ArrayCollect, - ops.Array, - ops.ArrayFlatten, - ops.ArrayMap, - ops.Covariance, - ops.First, - ops.Last, - ops.Levenshtein, - ops.Median, - ops.Mode, - ops.MultiQuantile, - ops.Quantile, - ops.RegexReplace, - ops.RegexSplit, - ops.RowID, - ops.StringSplit, - ops.StructColumn, - ops.TimestampBucket, - ops.TimestampDelta, - ops.Translate, - ops.Unnest, - ) - - SIMPLE_OPS = { - ops.BitAnd: "bit_and", - ops.BitOr: "bit_or", - ops.BitXor: "bit_xor", - ops.DayOfWeekName: "dayname", - ops.Log10: "log10", - ops.StringContains: "instr", - ops.ExtractWeekOfYear: "weekofyear", - ops.ExtractEpochSeconds: "unix_timestamp", - ops.ExtractDayOfYear: "dayofyear", - ops.Strftime: "date_format", - ops.StringToTimestamp: "str_to_date", - ops.Log2: "log2", - } - - @staticmethod - def _minimize_spec(start, end, spec): - if ( - start is None - and isinstance(getattr(end, "value", None), ops.Literal) - and end.value.value == 0 - and end.following - ): - return None - return spec - - def visit_Cast(self, op, *, arg, to): - from_ = op.arg.dtype - if (from_.is_json() or from_.is_string()) and to.is_json(): - # MariaDB does not support casting to JSON because it's an alias - # for TEXT (except when casting of course!) - return arg - elif from_.is_integer() and to.is_interval(): - return self.visit_IntervalFromInteger( - ops.IntervalFromInteger(op.arg, unit=to.unit), arg=arg, unit=to.unit - ) - elif from_.is_integer() and to.is_timestamp(): - return self.f.from_unixtime(arg) - return super().visit_Cast(op, arg=arg, to=to) - - def visit_TimestampDiff(self, op, *, left, right): - return self.f.timestampdiff( - sge.Var(this="SECOND"), right, left, dialect=self.dialect - ) - - def visit_DateDiff(self, op, *, left, right): - return self.f.timestampdiff( - sge.Var(this="DAY"), right, left, dialect=self.dialect - ) - - def visit_ApproxCountDistinct(self, op, *, arg, where): - if where is not None: - arg = self.if_(where, arg) - return self.f.count(sge.Distinct(expressions=[arg])) - - def visit_CountStar(self, op, *, arg, where): - if where is not None: - return self.f.sum(self.cast(where, op.dtype)) - return self.f.count(STAR) - - def visit_CountDistinct(self, op, *, arg, where): - if where is not None: - arg = self.if_(where, arg) - return self.f.count(sge.Distinct(expressions=[arg])) - - def visit_CountDistinctStar(self, op, *, arg, where): - if where is not None: - raise com.UnsupportedOperationError( - "Filtered table count distinct is not supported in MySQL" - ) - func = partial(sg.column, table=arg.alias_or_name, quoted=self.quoted) - return self.f.count( - sge.Distinct(expressions=list(map(func, op.arg.schema.keys()))) - ) - - def visit_GroupConcat(self, op, *, arg, sep, where): - if not isinstance(op.sep, ops.Literal): - raise com.UnsupportedOperationError( - "Only string literal separators are supported" - ) - if where is not None: - arg = self.if_(where, arg) - return self.f.group_concat(arg, sep) - - def visit_DayOfWeekIndex(self, op, *, arg): - return (self.f.dayofweek(arg) + 5) % 7 - - def visit_Literal(self, op, *, value, dtype): - # avoid casting NULL: the set of types allowed by MySQL and - # MariaDB when casting is a strict subset of allowed types in other - # contexts like CREATE TABLE - if value is None: - return NULL - return super().visit_Literal(op, value=value, dtype=dtype) - - def visit_NonNullLiteral(self, op, *, value, dtype): - if dtype.is_decimal() and not value.is_finite(): - raise com.UnsupportedOperationError( - "MySQL does not support NaN or infinity" - ) - elif dtype.is_binary(): - return self.f.unhex(value.hex()) - elif dtype.is_date(): - return self.f.date(value.isoformat()) - elif dtype.is_timestamp(): - return self.f.timestamp(value.isoformat()) - elif dtype.is_time(): - return self.f.maketime( - value.hour, value.minute, value.second + value.microsecond / 1e6 - ) - elif dtype.is_array() or dtype.is_struct() or dtype.is_map(): - raise com.UnsupportedBackendType( - "MySQL does not support arrays, structs or maps" - ) - return None - - def visit_JSONGetItem(self, op, *, arg, index): - if op.index.dtype.is_integer(): - path = self.f.concat("$[", self.cast(index, dt.string), "]") - else: - path = self.f.concat("$.", index) - return self.f.json_extract(arg, path) - - def visit_DateFromYMD(self, op, *, year, month, day): - return self.f.str_to_date( - self.f.concat( - self.f.lpad(year, 4, "0"), - self.f.lpad(month, 2, "0"), - self.f.lpad(day, 2, "0"), - ), - "%Y%m%d", - ) - - def visit_FindInSet(self, op, *, needle, values): - return self.f.find_in_set(needle, self.f.concat_ws(",", values)) - - def visit_EndsWith(self, op, *, arg, end): - to = sge.DataType(this=sge.DataType.Type.BINARY) - return self.f.right(arg, self.f.char_length(end)).eq(sge.Cast(this=end, to=to)) - - def visit_StartsWith(self, op, *, arg, start): - to = sge.DataType(this=sge.DataType.Type.BINARY) - return self.f.left(arg, self.f.length(start)).eq(sge.Cast(this=start, to=to)) - - def visit_RegexSearch(self, op, *, arg, pattern): - return arg.rlike(pattern) - - def visit_RegexExtract(self, op, *, arg, pattern, index): - extracted = self.f.regexp_substr(arg, pattern) - return self.if_( - arg.rlike(pattern), - self.if_( - index.eq(0), - extracted, - self.f.regexp_replace( - extracted, pattern, f"\\{index.sql(self.dialect)}" - ), - ), - NULL, - ) - - def visit_Equals(self, op, *, left, right): - if op.left.dtype.is_string(): - assert op.right.dtype.is_string(), op.right.dtype - to = sge.DataType(this=sge.DataType.Type.BINARY) - return sge.Cast(this=left, to=to).eq(right) - return super().visit_Equals(op, left=left, right=right) - - def visit_StringContains(self, op, *, haystack, needle): - return self.f.instr(haystack, needle) > 0 - - def visit_StringFind(self, op, *, arg, substr, start, end): - if end is not None: - raise NotImplementedError( - "`end` argument is not implemented for MySQL `StringValue.find`" - ) - substr = sge.Cast(this=substr, to=sge.DataType(this=sge.DataType.Type.BINARY)) - - if start is not None: - return self.f.locate(substr, arg, start + 1) - return self.f.locate(substr, arg) - - def visit_LRStrip(self, op, *, arg, position): - return reduce( - lambda arg, char: self.f.trim( - this=arg, position=self.v[position], expression=char - ), - map( - partial(self.cast, to=dt.string), - map(self.f.unhex, map(self.f.hex, string.whitespace.encode())), - ), - arg, - ) - - def visit_DateTimestampTruncate(self, op, *, arg, unit): - truncate_formats = { - "s": "%Y-%m-%d %H:%i:%s", - "m": "%Y-%m-%d %H:%i:00", - "h": "%Y-%m-%d %H:00:00", - "D": "%Y-%m-%d", - # 'W': 'week', - "M": "%Y-%m-01", - "Y": "%Y-01-01", - } - if (format := truncate_formats.get(unit.short)) is None: - raise com.UnsupportedOperationError(f"Unsupported truncate unit {op.unit}") - return self.f.date_format(arg, format) - - visit_DateTruncate = visit_TimestampTruncate = visit_DateTimestampTruncate - - def visit_DateTimeDelta(self, op, *, left, right, part): - return self.f.timestampdiff( - sge.Var(this=part.this), right, left, dialect=self.dialect - ) - - visit_TimeDelta = visit_DateDelta = visit_DateTimeDelta - - def visit_ExtractMillisecond(self, op, *, arg): - return self.f.floor(self.f.extract(sge.Var(this="microsecond"), arg) / 1_000) - - def visit_ExtractMicrosecond(self, op, *, arg): - return self.f.floor(self.f.extract(sge.Var(this="microsecond"), arg)) - - def visit_Strip(self, op, *, arg): - return self.visit_LRStrip(op, arg=arg, position="BOTH") - - def visit_LStrip(self, op, *, arg): - return self.visit_LRStrip(op, arg=arg, position="LEADING") - - def visit_RStrip(self, op, *, arg): - return self.visit_LRStrip(op, arg=arg, position="TRAILING") - - def visit_IntervalFromInteger(self, op, *, arg, unit): - return sge.Interval(this=arg, unit=sge.Var(this=op.resolution.upper())) - - def visit_TimestampAdd(self, op, *, left, right): - if op.right.dtype.unit.short == "ms": - right = sge.Interval( - this=right.this * 1_000, unit=sge.Var(this="MICROSECOND") - ) - return self.f.date_add(left, right, dialect=self.dialect) - - def visit_UnwrapJSONString(self, op, *, arg): - return self.if_( - self.f.json_type(arg).eq(sge.convert("STRING")), - self.f.json_unquote(arg), - NULL, - ) - - def visit_UnwrapJSONInt64(self, op, *, arg): - return self.if_( - self.f.json_type(arg).eq(sge.convert("INTEGER")), - self.cast(arg, op.dtype), - NULL, - ) - - def visit_UnwrapJSONFloat64(self, op, *, arg): - return self.if_( - self.f.json_type(arg).isin(sge.convert("DOUBLE"), sge.convert("INTEGER")), - self.cast(arg, op.dtype), - NULL, - ) - - def visit_UnwrapJSONBoolean(self, op, *, arg): - return self.if_( - self.f.json_type(arg).eq(sge.convert("BOOLEAN")), - self.if_(arg.eq(sge.convert("true")), 1, 0), - NULL, - ) From 20f875f02e4f611f4c8b67a82c82a6c79fd10a75 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 13:02:53 +0530 Subject: [PATCH 17/18] Creating a placeholder E6data data converter. --- ibis/backends/e6data/converter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ibis/backends/e6data/converter.py b/ibis/backends/e6data/converter.py index 4f2010225a5b..910fff906556 100644 --- a/ibis/backends/e6data/converter.py +++ b/ibis/backends/e6data/converter.py @@ -5,8 +5,8 @@ from ibis.formats.pandas import PandasData -class MySQLPandasData(PandasData): - # TODO(kszucs): this could be reused at other backends, like pyspark +class E6DataPandasData(PandasData): + # Placeholder for implementing furture data type conversion @classmethod def convert_Time(cls, s, dtype, pandas_type): def convert(timedelta): From c8520f8c985a82d2f930b4637f9a4accda708f1f Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Mon, 8 Jul 2024 13:11:27 +0530 Subject: [PATCH 18/18] Remove pymysql from e6data dependencies --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 145e39c62c06..83d615ab7667 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -153,7 +153,7 @@ dask = ["dask", "regex", "packaging"] datafusion = ["datafusion"] druid = ["pydruid"] duckdb = ["duckdb"] -e6data = ["e6data-python-connector","pymysql"] +e6data = ["e6data-python-connector"] exasol = ["pyexasol"] flink = [] impala = ["impyla"]