From f96179d6dbdbd0e7c3fc65c29ae618e91abb0c02 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Wed, 24 Jul 2024 07:36:53 -0400 Subject: [PATCH 1/7] test(benchmarks): add info/describe benchmark --- ibis/expr/types/relations.py | 12 ------ ibis/tests/benchmarks/test_benchmarks.py | 54 ++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index df26d1fe634a..ff992022858b 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -3022,13 +3022,10 @@ def describe( │ island │ 1 │ string │ 344 │ 0 │ 3 │ Biscoe │ └─────────┴───────┴────────┴───────┴───────┴────────┴────────┘ """ - import ibis.selectors as s from ibis import literal as lit quantile = sorted(quantile) aggs = [] - string_col = False - numeric_col = False for pos, colname in enumerate(self.columns): col = self[colname] typ = col.type() @@ -3045,7 +3042,6 @@ def describe( } if typ.is_numeric(): - numeric_col = True col_mean = col.mean() col_std = col.std() col_min = col.min().cast(float) @@ -3055,10 +3051,8 @@ def describe( for q in quantile } elif typ.is_string(): - string_col = True col_mode = col.mode() elif typ.is_boolean(): - numeric_col = True col_mean = col.mean() else: # Will not calculate statistics for other types @@ -3082,12 +3076,6 @@ def describe( t = ibis.union(*aggs) - # TODO(jiting): Need a better way to remove columns with all NULL - if string_col and not numeric_col: - t = t.select(~s.of_type("float")) - elif numeric_col and not string_col: - t = t.drop("mode") - return t def join( diff --git a/ibis/tests/benchmarks/test_benchmarks.py b/ibis/tests/benchmarks/test_benchmarks.py index 58bad8be2f16..88a2babcfb8d 100644 --- a/ibis/tests/benchmarks/test_benchmarks.py +++ b/ibis/tests/benchmarks/test_benchmarks.py @@ -994,3 +994,57 @@ def test_dedup_schema(benchmark): itertools.cycle(("int", "string", "array", "float")), ), ) + + +@pytest.fixture(scope="module") +def info_t(): + num_cols = 450 + return ibis.table({f"col_{i}": "float64" for i in range(num_cols)}) + + +@pytest.fixture(scope="module") +def info_t_with_data(): + np = pytest.importorskip("numpy") + pa = pytest.importorskip("pyarrow") + + num_cols = 450 + num_rows = 1_500 + data = pa.Table.from_arrays( + np.random.randn(num_rows, num_cols).T, + names=list(map("col_{}".format, range(num_cols))), + ) + return ibis.memtable(data) + + +@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +def test_summarize_construct(benchmark, info_t, method): + """Construct the expression.""" + benchmark(method, info_t) + + +@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +def test_summarize_compile(benchmark, info_t, method): + """Compile the expression.""" + benchmark(ibis.to_sql, method(info_t), dialect="duckdb") + + +@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +def test_summarize_execute(benchmark, info_t_with_data, method, con): + """Compile and execute the expression.""" + benchmark(con.execute, method(info_t_with_data)) + + +@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +def test_summarize_end_to_end(benchmark, info_t_with_data, method, con): + """Construct, compile, and execute the expression.""" + benchmark(lambda table: con.execute(method(table)), info_t_with_data) + + +def test_summarize_duckdb(benchmark, info_t_with_data, tmp_path): + """Construct, compile, and execute the expression.""" + duckdb = pytest.importorskip("duckdb") + + con = duckdb.connect(str(tmp_path / "test.ddb")) + con.register("t", info_t_with_data) + sql = "SUMMARIZE t" + benchmark(lambda sql: con.sql(sql).arrow(), sql) From 2c7a4b0841a22759ef34ca485b2e900a41edf2d3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:19:34 -0400 Subject: [PATCH 2/7] perf(exprs): speed up `.info()` and `.describe()` --- ibis/backends/sql/compilers/base.py | 286 ++++++++++++++++++++++ ibis/backends/sql/compilers/clickhouse.py | 8 + ibis/backends/tests/test_generic.py | 68 +---- ibis/expr/operations/relations.py | 57 ++++- ibis/expr/types/relations.py | 127 ++-------- 5 files changed, 382 insertions(+), 164 deletions(-) diff --git a/ibis/backends/sql/compilers/base.py b/ibis/backends/sql/compilers/base.py index a9d0f71ae9a0..12cfb3adf6f4 100644 --- a/ibis/backends/sql/compilers/base.py +++ b/ibis/backends/sql/compilers/base.py @@ -6,6 +6,7 @@ import math import operator import string +from collections import deque from functools import partial, reduce from typing import TYPE_CHECKING, Any, ClassVar @@ -1607,6 +1608,291 @@ def visit_DropColumns(self, op, *, parent, columns_to_drop): ) return sg.select(*columns_to_keep).from_(parent) + def visit_GenericInfo(self, op, *, parent, **_): + quoted = self.quoted + schema = op.parent.schema + + table = sg.to_identifier(parent.alias_or_name, quoted=quoted) + + aggs = deque() + for colname, pos in schema._name_locs.items(): + typ = schema[colname] + + col = sge.column(colname, table=table, quoted=quoted).is_(None) + isna = self.cast(col, dt.int32) + + aggs.append( + sg.select( + sge.convert(colname).as_("name", quoted=quoted), + sge.convert(str(typ)).as_("type", quoted=quoted), + sge.convert(typ.nullable).as_("nullable", quoted=quoted), + self.agg.sum(isna).as_("nulls", quoted=quoted), + self.agg.sum(1 - isna).as_("non_nulls", quoted=quoted), + self.agg.avg(isna).as_("null_frac", quoted=quoted), + sge.convert(pos).as_("pos", quoted=quoted), + ).from_(parent) + ) + + # rebalance aggs, this speeds up sqlglot compilation of huge unions + # significantly + while len(aggs) > 1: + left = aggs.popleft() + right = aggs.popleft() + aggs.append(sg.union(left, right, distinct=False)) + + unions = aggs.popleft() + + assert not aggs, "not all unions processed" + + return unions.order_by(sg.column("pos", quoted=quoted).asc()) + + def visit_FastInfo(self, op, *, parent, **_): + names = [] + types = [] + nullables = [] + nullses = [] + non_nullses = [] + null_fracs = [] + poses = [] + quoted = self.quoted + schema = op.parent.schema + + table = sg.to_identifier(parent.alias_or_name, quoted=quoted) + + for colname, pos in schema._name_locs.items(): + typ = schema[colname] + + col = sge.column(colname, table=table, quoted=quoted).is_(None) + isna = self.cast(col, dt.int32) + + names.append(sge.convert(colname)) + types.append(sge.convert(str(typ))) + nullables.append(sge.convert(typ.nullable)) + nullses.append(self.agg.sum(isna)) + non_nullses.append(self.agg.sum(1 - isna)) + null_fracs.append(self.agg.avg(isna)) + poses.append(sge.convert(pos)) + + return ( + sg.select( + self.f.explode(self.f.array(*names)).as_("name", quoted=quoted), + self.f.explode(self.f.array(*types)).as_("type", quoted=quoted), + self.f.explode(self.f.array(*nullables)).as_("nullable", quoted=quoted), + self.f.explode(self.f.array(*nullses)).as_("nulls", quoted=quoted), + self.f.explode(self.f.array(*non_nullses)).as_( + "non_nulls", quoted=quoted + ), + self.f.explode(self.f.array(*null_fracs)).as_( + "null_frac", quoted=quoted + ), + self.f.explode(self.f.array(*poses)).as_("pos", quoted=quoted), + ) + .from_(parent) + .order_by(sg.column("pos", quoted=quoted).asc()) + ) + + def visit_GenericDescribe(self, op, *, parent, quantile, **_): + quantile = sorted(quantile) + schema = op.parent.schema + opschema = op.schema + quoted = self.quoted + + quantile_keys = tuple( + f"p{100 * q:.6f}".rstrip("0").rstrip(".") for q in quantile + ) + default_quantiles = dict.fromkeys(quantile_keys, NULL) + table = sg.to_identifier(parent.alias_or_name, quoted=quoted) + aggs = deque() + + for colname, pos in schema._name_locs.items(): + col = sge.column(colname, table=table, quoted=quoted) + typ = schema[colname] + + # statistics default to NULL + col_mean = col_std = col_min = col_max = col_mode = NULL + quantile_values = default_quantiles.copy() + + if typ.is_numeric(): + col_mean = self.agg.avg(col) + col_std = self.agg.stddev(col) + col_min = self.agg.min(col) + col_max = self.agg.max(col) + for key, q in zip(quantile_keys, quantile): + quantile_values[key] = sge.Quantile( + this=col, quantile=sge.convert(q) + ) + + elif typ.is_string(): + if ops.Mode not in self.UNSUPPORTED_OPS: + col_mode = self.agg.mode(col) + else: + col_mode = self.cast(NULL, opschema["mode"]) + elif typ.is_boolean(): + col_mean = self.agg.avg(self.cast(col, dt.int32)) + else: + # Will not calculate statistics for other types + continue + + aggs.append( + sg.select( + sge.convert(colname).as_("name", quoted=quoted), + sge.convert(pos).as_("pos", quoted=quoted), + sge.convert(str(typ)).as_("type", quoted=quoted), + self.agg.count(col).as_("count", quoted=quoted), + self.agg.sum(self.cast(col.is_(NULL), dt.int32)).as_( + "nulls", quoted=quoted + ), + self.agg.count(sge.Distinct(expressions=[col])).as_( + "unique", quoted=quoted + ), + col_mode.as_("mode", quoted=quoted), + self.cast(col_mean, opschema["mean"]).as_("mean", quoted=quoted), + self.cast(col_std, opschema["std"]).as_("std", quoted=quoted), + self.cast(col_min, opschema["min"]).as_("min", quoted=quoted), + *( + self.cast(val, opschema[q]).as_(q, quoted=quoted) + for q, val in quantile_values.items() + ), + self.cast(col_max, opschema["max"]).as_("max", quoted=quoted), + ).from_(parent) + ) + + # rebalance aggs, this speeds up sqlglot compilation of huge unions + # significantly + while len(aggs) > 1: + left = aggs.popleft() + right = aggs.popleft() + aggs.append(sg.union(left, right, distinct=False)) + + unions = aggs.popleft() + + assert not aggs, "not all unions processed" + + return unions + + def visit_FastDescribe(self, op, *, parent, quantile, **_): + quantile = sorted(quantile) + schema = op.parent.schema + quoted = self.quoted + + name_locs = schema._name_locs + parent_schema_names = name_locs.keys() + + names = list(map(sge.convert, parent_schema_names)) + poses = list(map(sge.convert, name_locs.values())) + types = list(map(sge.convert, map(str, schema.values()))) + counts = [] + nulls = [] + uniques = [] + modes = [] + means = [] + stds = [] + mins = [] + quantiles = {} + maxs = [] + + quantile_keys = tuple( + f"p{100 * q:.6f}".rstrip("0").rstrip(".") for q in quantile + ) + default_quantiles = dict.fromkeys(quantile_keys, NULL) + quantiles = {key: [] for key in quantile_keys} + table = sg.to_identifier(parent.alias_or_name, quoted=quoted) + opschema = op.schema + + for colname in parent_schema_names: + col = sge.column(colname, table=table, quoted=quoted) + typ = schema[colname] + + # statistics default to NULL + col_mean = col_std = col_min = col_max = col_mode = NULL + quantile_values = default_quantiles.copy() + + if typ.is_numeric(): + col_mean = self.agg.avg(col) + col_std = self.agg.stddev(col) + col_min = self.agg.min(col) + col_max = self.agg.max(col) + for key, q in zip(quantile_keys, quantile): + quantile_values[key] = sge.Quantile( + this=col, quantile=sge.convert(q) + ) + + elif typ.is_string(): + if ops.Mode not in self.UNSUPPORTED_OPS: + col_mode = self.agg.mode(col) + else: + col_mode = self.cast(NULL, opschema["mode"]) + elif typ.is_boolean(): + col_mean = self.agg.avg(self.cast(col, dt.int32)) + else: + # Will not calculate statistics for other types + continue + + counts.append(self.agg.count(col)) + nulls.append(self.agg.sum(self.cast(col.is_(NULL), dt.int32))) + uniques.append(self.agg.count(sge.Distinct(expressions=[col]))) + modes.append(col_mode) + means.append(col_mean) + stds.append(col_std) + mins.append(col_min) + + for q, val in quantile_values.items(): + quantiles[q].append(val) + + maxs.append(col_max) + + return sg.select( + self.f.explode( + self.cast(self.f.array(*names), dt.Array(opschema["name"])) + ).as_("name", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*poses), dt.Array(opschema["pos"])) + ).as_("pos", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*types), dt.Array(opschema["type"])) + ).as_("type", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*counts), dt.Array(opschema["count"])) + ).as_("count", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*nulls), dt.Array(opschema["nulls"])) + ).as_("nulls", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*uniques), dt.Array(opschema["unique"])) + ).as_("unique", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*modes), dt.Array(opschema["mode"])) + ).as_("mode", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*means), dt.Array(opschema["mean"])) + ).as_("mean", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*stds), dt.Array(opschema["std"])) + ).as_("std", quoted=quoted), + self.f.explode( + self.cast(self.f.array(*mins), dt.Array(opschema["min"])) + ).as_("min", quoted=quoted), + *( + self.f.explode( + self.cast(self.f.array(*vals), dt.Array(opschema[q])) + ).as_(q, quoted=quoted) + for q, vals in quantiles.items() + ), + self.f.explode( + self.cast(self.f.array(*maxs), dt.Array(opschema["max"])) + ).as_("max", quoted=quoted), + ).from_(parent) + + def visit_Info(self, op, *, parent): + if ops.Unnest in self.UNSUPPORTED_OPS: + return self.visit_GenericInfo(op, parent=parent) + return self.visit_FastInfo(op, parent=parent) + + def visit_Describe(self, op, *, parent, quantile): + if ops.Unnest in self.UNSUPPORTED_OPS: + return self.visit_GenericDescribe(op, parent=parent, quantile=quantile) + return self.visit_FastDescribe(op, parent=parent, quantile=quantile) + # `__init_subclass__` is uncalled for subclasses - we manually call it here to # autogenerate the base class implementations as well. diff --git a/ibis/backends/sql/compilers/clickhouse.py b/ibis/backends/sql/compilers/clickhouse.py index d1f3db494f2e..93cb701e7f17 100644 --- a/ibis/backends/sql/compilers/clickhouse.py +++ b/ibis/backends/sql/compilers/clickhouse.py @@ -816,5 +816,13 @@ def visit_MapContains(self, op, *, arg, key): sg.or_(arg.is_(NULL), key.is_(NULL)), NULL, self.f.mapContains(arg, key) ) + def visit_Info(self, op, *, parent): + # clickhouse explode is not pairwise, so fall back to generic impl + return self.visit_GenericInfo(op, parent=parent) + + def visit_Describe(self, op, *, parent, quantile): + # clickhouse explode is not pairwise, so fall back to generic impl + return self.visit_GenericDescribe(op, parent=parent, quantile=quantile) + compiler = ClickHouseCompiler() diff --git a/ibis/backends/tests/test_generic.py b/ibis/backends/tests/test_generic.py index bd61852109a9..efee0a3b40b7 100644 --- a/ibis/backends/tests/test_generic.py +++ b/ibis/backends/tests/test_generic.py @@ -795,31 +795,10 @@ def test_table_info_large(con): reason="Druid only supports trivial unions", ) @pytest.mark.parametrize( - ("selector", "expected_columns"), + "selector", [ param( - s.any_of( - s.of_type("numeric"), - s.of_type("string"), - s.of_type("bool"), - s.of_type("timestamp"), - ), - [ - "name", - "pos", - "type", - "count", - "nulls", - "unique", - "mode", - "mean", - "std", - "min", - "p25", - "p50", - "p75", - "max", - ], + s.any_of(*map(s.of_type, ["numeric", "string", "bool", "timestamp"])), marks=[ pytest.mark.notimpl( ["sqlite"], @@ -827,13 +806,7 @@ def test_table_info_large(con): reason="quantile is not supported", ), pytest.mark.notimpl( - [ - "clickhouse", - "pyspark", - "clickhouse", - "risingwave", - "impala", - ], + ["pyspark", "risingwave", "impala"], raises=com.OperationNotDefinedError, reason="mode is not supported", ), @@ -862,21 +835,6 @@ def test_table_info_large(con): ), param( s.of_type("numeric"), - [ - "name", - "pos", - "type", - "count", - "nulls", - "unique", - "mean", - "std", - "min", - "p25", - "p50", - "p75", - "max", - ], marks=[ pytest.mark.notimpl( ["sqlite"], @@ -898,24 +856,9 @@ def test_table_info_large(con): ), param( s.of_type("string"), - [ - "name", - "pos", - "type", - "count", - "nulls", - "unique", - "mode", - ], marks=[ pytest.mark.notimpl( - [ - "clickhouse", - "pyspark", - "clickhouse", - "risingwave", - "impala", - ], + ["pyspark", "risingwave", "impala"], raises=com.OperationNotDefinedError, reason="mode is not supported", ), @@ -934,12 +877,11 @@ def test_table_info_large(con): ), ], ) -def test_table_describe(alltypes, selector, expected_columns): +def test_table_describe(alltypes, selector): sometypes = alltypes.select(selector) expr = sometypes.describe() df = expr.execute() assert sorted(sometypes.columns) == sorted(df.name) - assert sorted(expr.columns) == sorted(expected_columns) assert sorted(expr.columns) == sorted(df.columns) diff --git a/ibis/expr/operations/relations.py b/ibis/expr/operations/relations.py index be65a80e35d0..f2bd6217fbcc 100644 --- a/ibis/expr/operations/relations.py +++ b/ibis/expr/operations/relations.py @@ -304,7 +304,7 @@ class Aggregate(Relation): parent: Relation groups: FrozenOrderedDict[str, Unaliased[Value]] - metrics: FrozenOrderedDict[str, Unaliased[Scalar]] + metrics: FrozenOrderedDict[str, Unaliased[Value]] def __init__(self, parent, groups, metrics): _check_integrity(groups.values(), {parent}) @@ -520,4 +520,59 @@ def schema(self): return Schema(base) +@public +class Info(Relation): + parent: Relation + + @attribute + def schema(self) -> Schema: + return Schema( + { + "name": dt.string, + "type": dt.string, + "nullable": dt.boolean, + "nulls": dt.int64, + "non_nulls": dt.int64, + "null_frac": dt.float64, + "pos": dt.int16, + } + ) + + @attribute + def values(self): + return {} + + +@public +class Describe(Relation): + parent: Relation + quantile: VarTuple[float] + + @attribute + def schema(self) -> Schema: + return Schema( + { + "name": dt.string, + "pos": dt.int16, + "type": dt.string, + "count": dt.int64, + "nulls": dt.int64, + "unique": dt.int64, + "mode": dt.string, + "mean": dt.float64, + "std": dt.float64, + "min": dt.float64, + **{ + f"p{100 * q:.6f}".rstrip("0").rstrip("."): dt.float64 + for q in sorted(self.quantile) + }, + "max": dt.float64, + } + ) + + @attribute + def values(self): + return {} + + # TODO(kszucs): support t.select(*t) syntax by implementing Table.__iter__() diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index ff992022858b..d95d52d9436a 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -2935,26 +2935,7 @@ def info(self) -> Table: │ year │ int64 │ True │ 0 │ 344 │ 0.000000 │ … │ └───────────────────┴─────────┴──────────┴───────┴───────────┴───────────┴───┘ """ - from ibis import literal as lit - - aggs = [] - - for pos, colname in enumerate(self.columns): - col = self[colname] - typ = col.type() - agg = self.select( - isna=ibis.case().when(col.isnull(), 1).else_(0).end() - ).agg( - name=lit(colname), - type=lit(str(typ)), - nullable=lit(typ.nullable), - nulls=lambda t: t.isna.sum(), - non_nulls=lambda t: (1 - t.isna).sum(), - null_frac=lambda t: t.isna.mean(), - pos=lit(pos, type=dt.int16), - ) - aggs.append(agg) - return ibis.union(*aggs).order_by(ibis.asc("pos")) + return ops.Info(self).to_expr() def describe( self, quantile: Sequence[ir.NumericValue | float] = (0.25, 0.5, 0.75) @@ -2992,91 +2973,37 @@ def describe( ├───────────────────┼───────┼─────────┼───────┼───────┼────────┼────────┼───┤ │ species │ 0 │ string │ 344 │ 0 │ 3 │ Adelie │ … │ │ island │ 1 │ string │ 344 │ 0 │ 3 │ Biscoe │ … │ - │ bill_length_mm │ 2 │ float64 │ 344 │ 2 │ 164 │ NULL │ … │ - │ bill_depth_mm │ 3 │ float64 │ 344 │ 2 │ 80 │ NULL │ … │ - │ flipper_length_mm │ 4 │ int64 │ 344 │ 2 │ 55 │ NULL │ … │ - │ body_mass_g │ 5 │ int64 │ 344 │ 2 │ 94 │ NULL │ … │ - │ sex │ 6 │ string │ 344 │ 11 │ 2 │ male │ … │ + │ bill_length_mm │ 2 │ float64 │ 342 │ 2 │ 164 │ NULL │ … │ + │ bill_depth_mm │ 3 │ float64 │ 342 │ 2 │ 80 │ NULL │ … │ + │ flipper_length_mm │ 4 │ int64 │ 342 │ 2 │ 55 │ NULL │ … │ + │ body_mass_g │ 5 │ int64 │ 342 │ 2 │ 94 │ NULL │ … │ + │ sex │ 6 │ string │ 333 │ 11 │ 2 │ male │ … │ │ year │ 7 │ int64 │ 344 │ 0 │ 3 │ NULL │ … │ └───────────────────┴───────┴─────────┴───────┴───────┴────────┴────────┴───┘ >>> p.select(s.of_type("numeric")).describe() - ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━┓ - ┃ name ┃ pos ┃ type ┃ count ┃ nulls ┃ unique ┃ … ┃ - ┡━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━┩ - │ string │ int16 │ string │ int64 │ int64 │ int64 │ … │ - ├───────────────────┼───────┼─────────┼───────┼───────┼────────┼───┤ - │ flipper_length_mm │ 2 │ int64 │ 344 │ 2 │ 55 │ … │ - │ body_mass_g │ 3 │ int64 │ 344 │ 2 │ 94 │ … │ - │ year │ 4 │ int64 │ 344 │ 0 │ 3 │ … │ - │ bill_length_mm │ 0 │ float64 │ 344 │ 2 │ 164 │ … │ - │ bill_depth_mm │ 1 │ float64 │ 344 │ 2 │ 80 │ … │ - └───────────────────┴───────┴─────────┴───────┴───────┴────────┴───┘ + ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━━┳━━━┓ + ┃ name ┃ pos ┃ type ┃ count ┃ nulls ┃ unique ┃ mode ┃ … ┃ + ┡━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━━╇━━━┩ + │ string │ int16 │ string │ int64 │ int64 │ int64 │ string │ … │ + ├───────────────────┼───────┼─────────┼───────┼───────┼────────┼────────┼───┤ + │ bill_length_mm │ 0 │ float64 │ 342 │ 2 │ 164 │ NULL │ … │ + │ bill_depth_mm │ 1 │ float64 │ 342 │ 2 │ 80 │ NULL │ … │ + │ flipper_length_mm │ 2 │ int64 │ 342 │ 2 │ 55 │ NULL │ … │ + │ body_mass_g │ 3 │ int64 │ 342 │ 2 │ 94 │ NULL │ … │ + │ year │ 4 │ int64 │ 344 │ 0 │ 3 │ NULL │ … │ + └───────────────────┴───────┴─────────┴───────┴───────┴────────┴────────┴───┘ >>> p.select(s.of_type("string")).describe() - ┏━━━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━━┓ - ┃ name ┃ pos ┃ type ┃ count ┃ nulls ┃ unique ┃ mode ┃ - ┡━━━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━━┩ - │ string │ int16 │ string │ int64 │ int64 │ int64 │ string │ - ├─────────┼───────┼────────┼───────┼───────┼────────┼────────┤ - │ sex │ 2 │ string │ 344 │ 11 │ 2 │ male │ - │ species │ 0 │ string │ 344 │ 0 │ 3 │ Adelie │ - │ island │ 1 │ string │ 344 │ 0 │ 3 │ Biscoe │ - └─────────┴───────┴────────┴───────┴───────┴────────┴────────┘ + ┏━━━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━┓ + ┃ name ┃ pos ┃ type ┃ count ┃ nulls ┃ unique ┃ mode ┃ mean ┃ … ┃ + ┡━━━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━┩ + │ string │ int16 │ string │ int64 │ int64 │ int64 │ string │ float64 │ … │ + ├─────────┼───────┼────────┼───────┼───────┼────────┼────────┼─────────┼───┤ + │ species │ 0 │ string │ 344 │ 0 │ 3 │ Adelie │ NULL │ … │ + │ island │ 1 │ string │ 344 │ 0 │ 3 │ Biscoe │ NULL │ … │ + │ sex │ 2 │ string │ 333 │ 11 │ 2 │ male │ NULL │ … │ + └─────────┴───────┴────────┴───────┴───────┴────────┴────────┴─────────┴───┘ """ - from ibis import literal as lit - - quantile = sorted(quantile) - aggs = [] - for pos, colname in enumerate(self.columns): - col = self[colname] - typ = col.type() - - # default statistics to None - col_mean = lit(None).cast(float) - col_std = lit(None).cast(float) - col_min = lit(None).cast(float) - col_max = lit(None).cast(float) - col_mode = lit(None).cast(str) - quantile_values = { - f"p{100*q:.6f}".rstrip("0").rstrip("."): lit(None).cast(float) - for q in quantile - } - - if typ.is_numeric(): - col_mean = col.mean() - col_std = col.std() - col_min = col.min().cast(float) - col_max = col.max().cast(float) - quantile_values = { - f"p{100*q:.6f}".rstrip("0").rstrip("."): col.quantile(q).cast(float) - for q in quantile - } - elif typ.is_string(): - col_mode = col.mode() - elif typ.is_boolean(): - col_mean = col.mean() - else: - # Will not calculate statistics for other types - continue - - agg = self.agg( - name=lit(colname), - pos=lit(pos, type=dt.int16), - type=lit(str(typ)), - count=col.isnull().count(), - nulls=col.isnull().sum(), - unique=col.nunique(), - mode=col_mode, - mean=col_mean, - std=col_std, - min=col_min, - **quantile_values, - max=col_max, - ) - aggs.append(agg) - - t = ibis.union(*aggs) - - return t + return ops.Describe(self, quantile=quantile).to_expr() def join( left: Table, From 5398196a63b8ea6398ae5583284261e739afc8c5 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 25 Jul 2024 09:40:16 -0400 Subject: [PATCH 3/7] feat(clickhouse): implement mode as `topK(1)(col)` --- ibis/backends/sql/compilers/clickhouse.py | 17 +++++++++++++++-- ibis/backends/tests/test_aggregation.py | 2 -- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ibis/backends/sql/compilers/clickhouse.py b/ibis/backends/sql/compilers/clickhouse.py index 93cb701e7f17..43bdcc0bbcba 100644 --- a/ibis/backends/sql/compilers/clickhouse.py +++ b/ibis/backends/sql/compilers/clickhouse.py @@ -19,7 +19,7 @@ from collections.abc import Iterator, Mapping -class ClickhouseAggGen(AggGen): +class ClickHouseAggGen(AggGen): def aggregate(self, compiler, name, *args, where=None, order_by=()): if order_by: raise com.UnsupportedOperationError( @@ -33,6 +33,18 @@ def aggregate(self, compiler, name, *args, where=None, order_by=()): args += (where,) return compiler.f[name](*args, dialect=compiler.dialect) + def mode(self, arg, where=None): + func = "topK" + + params = [arg] + if where is not None: + func += "If" + params.append(where) + + return sge.ParameterizedAgg( + this=func, expressions=[sge.convert(1)], params=params + )[1] + class ClickHouseCompiler(SQLGlotCompiler): __slots__ = () @@ -40,7 +52,7 @@ class ClickHouseCompiler(SQLGlotCompiler): dialect = ClickHouse type_mapper = ClickHouseType - agg = ClickhouseAggGen() + agg = ClickHouseAggGen() supports_qualify = True @@ -104,6 +116,7 @@ class ClickHouseCompiler(SQLGlotCompiler): ops.MapMerge: "mapUpdate", ops.MapValues: "mapValues", ops.Median: "quantileExactExclusive", + ops.Mode: "mode", ops.NotNull: "isNotNull", ops.NullIf: "nullIf", ops.RStrip: "trimRight", diff --git a/ibis/backends/tests/test_aggregation.py b/ibis/backends/tests/test_aggregation.py index b0515729598c..d5fa630d2473 100644 --- a/ibis/backends/tests/test_aggregation.py +++ b/ibis/backends/tests/test_aggregation.py @@ -85,7 +85,6 @@ def mean_udf(s): pytest.mark.notyet( [ "bigquery", - "clickhouse", "datafusion", "impala", "mysql", @@ -393,7 +392,6 @@ def mean_and_std(v): pytest.mark.notyet( [ "bigquery", - "clickhouse", "datafusion", "impala", "mysql", From 89c210c970976b0c33d513d7ce4b143b90efb45b Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 25 Jul 2024 09:57:26 -0400 Subject: [PATCH 4/7] refactor(postgres): clean up mode compilation --- ibis/backends/sql/compilers/postgres.py | 30 ++++++++++++++++--------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/ibis/backends/sql/compilers/postgres.py b/ibis/backends/sql/compilers/postgres.py index 6bdd3f0fa388..9b73916b74a8 100644 --- a/ibis/backends/sql/compilers/postgres.py +++ b/ibis/backends/sql/compilers/postgres.py @@ -36,13 +36,30 @@ class PostgresUDFNode(ops.Value): shape = rlz.shape_like("args") +class PostgresAggGen(AggGen): + def aggregate(self, compiler, name, *args, where=None): + if (func := getattr(self, name, None)) is not None: + return func(*args, where=where) + return super().aggregate(compiler, name, *args, where=where) + + def mode(self, arg, where=None): + func = sg.func("mode") + expr = sge.WithinGroup( + this=func, + expression=sge.Order(expressions=[sge.Ordered(this=arg)]), + ) + if where is not None: + return sge.Filter(this=expr, expression=sge.Where(this=where)) + return expr + + class PostgresCompiler(SQLGlotCompiler): __slots__ = () dialect = Postgres type_mapper = PostgresType - agg = AggGen(supports_filter=True, supports_order_by=True) + agg = PostgresAggGen(supports_filter=True, supports_order_by=True) NAN = sge.Literal.number("'NaN'::double precision") POS_INF = sge.Literal.number("'Inf'::double precision") @@ -108,6 +125,7 @@ class PostgresCompiler(SQLGlotCompiler): ops.MapContains: "exist", ops.MapKeys: "akeys", ops.MapValues: "avals", + ops.Mode: "mode", ops.RegexSearch: "regexp_like", ops.TimeFromHMS: "make_time", } @@ -173,16 +191,6 @@ def _compile_python_udf(self, udf_node: ops.ScalarUDF): def visit_RandomUUID(self, op, **kwargs): return self.f.gen_random_uuid() - def visit_Mode(self, op, *, arg, where): - expr = self.f.mode() - expr = sge.WithinGroup( - this=expr, - expression=sge.Order(expressions=[sge.Ordered(this=arg)]), - ) - if where is not None: - expr = sge.Filter(this=expr, expression=sge.Where(this=where)) - return expr - def visit_ArgMinMax(self, op, *, arg, key, where, desc: bool): conditions = [arg.is_(sg.not_(NULL)), key.is_(sg.not_(NULL))] From e81eca50a7a0050008d6f481787b7485a0cac22d Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 25 Jul 2024 10:16:03 -0400 Subject: [PATCH 5/7] chore(sqlite): disable Unnest --- ibis/backends/sql/compilers/sqlite.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ibis/backends/sql/compilers/sqlite.py b/ibis/backends/sql/compilers/sqlite.py index 0d4db351c3b1..62ce583062da 100644 --- a/ibis/backends/sql/compilers/sqlite.py +++ b/ibis/backends/sql/compilers/sqlite.py @@ -64,6 +64,7 @@ class SQLiteCompiler(SQLGlotCompiler): ops.DateDelta, ops.TimestampDelta, ops.TryCast, + ops.Unnest, ) SIMPLE_OPS = { From 6a8fb76c8f71fc5929b10d3d949dfdfbc652b410 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 25 Jul 2024 10:16:24 -0400 Subject: [PATCH 6/7] chore(trino): use generic summary ops --- ibis/backends/sql/compilers/sqlite.py | 2 +- ibis/backends/sql/compilers/trino.py | 8 ++++++++ ibis/backends/sql/dialects.py | 17 ++++++++++++++++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/ibis/backends/sql/compilers/sqlite.py b/ibis/backends/sql/compilers/sqlite.py index 62ce583062da..4ad1debdcda8 100644 --- a/ibis/backends/sql/compilers/sqlite.py +++ b/ibis/backends/sql/compilers/sqlite.py @@ -450,7 +450,7 @@ def visit_DayOfWeekName(self, op, *, arg): ) def visit_Xor(self, op, *, left, right): - return (left.or_(right)).and_(sg.not_(left.and_(right))) + return left.or_(right).and_(sg.not_(left.and_(right))) def visit_NonNullLiteral(self, op, *, value, dtype): if dtype.is_binary(): diff --git a/ibis/backends/sql/compilers/trino.py b/ibis/backends/sql/compilers/trino.py index 4a19b9a37436..5836c8b5ee1b 100644 --- a/ibis/backends/sql/compilers/trino.py +++ b/ibis/backends/sql/compilers/trino.py @@ -659,5 +659,13 @@ def visit_ArraySum(self, op, *, arg): def visit_ArrayMean(self, op, *, arg): return self.visit_ArraySumAgg(op, arg=arg, output=operator.truediv) + def visit_Info(self, op, *, parent): + # unnest cannot contain aggregates + return self.visit_GenericInfo(op, parent=parent) + + def visit_Describe(self, op, *, parent, quantile): + # unnest cannot contain aggregates + return self.visit_GenericDescribe(op, parent=parent, quantile=quantile) + compiler = TrinoCompiler() diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index 217fdf34e1e2..c064a88fac77 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -20,7 +20,7 @@ Trino, ) from sqlglot.dialects.dialect import rename_func -from sqlglot.helper import find_new_name, seq_get +from sqlglot.helper import find_new_name, flatten, seq_get ClickHouse.Generator.TRANSFORMS |= { sge.ArraySize: rename_func("length"), @@ -440,7 +440,22 @@ class Generator(Postgres.Generator): sge.Levenshtein: rename_func("editdistance"), } + +# return lambda self, expression: self.func(name, *flatten(expression.args.values())) SQLite.Generator.TYPE_MAPPING |= {sge.DataType.Type.BOOLEAN: "BOOLEAN"} +SQLite.Generator.TRANSFORMS |= { + sge.Stddev: lambda self, e: self.func( + "sqrt", self.func("_ibis_var_sample", *flatten(e.args.values())) + ), + sge.StddevSamp: lambda self, e: self.func( + "sqrt", self.func("_ibis_var_sample", *flatten(e.args.values())) + ), + sge.StddevPop: lambda self, e: self.func( + "sqrt", self.func("_ibis_var_pop", *flatten(e.args.values())) + ), + sge.Variance: rename_func("_ibis_var_samp"), + sge.VariancePop: rename_func("_ibis_var_pop"), +} # TODO(cpcloud): remove this hack once From d4f84f7f1009036fb34a1d559d76d10faf88e563 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 30 Jul 2024 13:01:49 -0400 Subject: [PATCH 7/7] chore: benchmark grouping --- ibis/tests/benchmarks/test_benchmarks.py | 38 +++++++++++++++++------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/ibis/tests/benchmarks/test_benchmarks.py b/ibis/tests/benchmarks/test_benchmarks.py index 88a2babcfb8d..0a14497bc483 100644 --- a/ibis/tests/benchmarks/test_benchmarks.py +++ b/ibis/tests/benchmarks/test_benchmarks.py @@ -1003,7 +1003,7 @@ def info_t(): @pytest.fixture(scope="module") -def info_t_with_data(): +def info_t_raw(): np = pytest.importorskip("numpy") pa = pytest.importorskip("pyarrow") @@ -1013,38 +1013,56 @@ def info_t_with_data(): np.random.randn(num_rows, num_cols).T, names=list(map("col_{}".format, range(num_cols))), ) - return ibis.memtable(data) + return data -@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +@pytest.fixture(scope="module") +def info_t_with_data(info_t_raw): + return ibis.memtable(info_t_raw) + + +@pytest.mark.parametrize( + "method", [ir.Table.describe, ir.Table.info], ids=["describe", "info"] +) +@pytest.mark.benchmark(group="summarization") def test_summarize_construct(benchmark, info_t, method): """Construct the expression.""" benchmark(method, info_t) -@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +@pytest.mark.parametrize( + "method", [ir.Table.describe, ir.Table.info], ids=["describe", "info"] +) +@pytest.mark.benchmark(group="summarization") def test_summarize_compile(benchmark, info_t, method): """Compile the expression.""" benchmark(ibis.to_sql, method(info_t), dialect="duckdb") -@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +@pytest.mark.parametrize( + "method", [ir.Table.describe, ir.Table.info], ids=["describe", "info"] +) +@pytest.mark.benchmark(group="summarization") def test_summarize_execute(benchmark, info_t_with_data, method, con): """Compile and execute the expression.""" - benchmark(con.execute, method(info_t_with_data)) + benchmark(con.to_pyarrow, method(info_t_with_data)) -@pytest.mark.parametrize("method", [ir.Table.describe, ir.Table.info]) +@pytest.mark.parametrize( + "method", [ir.Table.describe, ir.Table.info], ids=["describe", "info"] +) +@pytest.mark.benchmark(group="summarization") def test_summarize_end_to_end(benchmark, info_t_with_data, method, con): """Construct, compile, and execute the expression.""" - benchmark(lambda table: con.execute(method(table)), info_t_with_data) + benchmark(lambda table: con.to_pyarrow(method(table)), info_t_with_data) -def test_summarize_duckdb(benchmark, info_t_with_data, tmp_path): +@pytest.mark.benchmark(group="summarization") +def test_summarize_duckdb(benchmark, info_t_raw, tmp_path): """Construct, compile, and execute the expression.""" duckdb = pytest.importorskip("duckdb") con = duckdb.connect(str(tmp_path / "test.ddb")) - con.register("t", info_t_with_data) + con.register("t", info_t_raw) sql = "SUMMARIZE t" benchmark(lambda sql: con.sql(sql).arrow(), sql)