Skip to content

Commit

Permalink
Merge branch 'main' into wkx/risingwave-streaming-features
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Mar 29, 2024
2 parents 7c46775 + bb736fe commit 1421a4e
Show file tree
Hide file tree
Showing 31 changed files with 277 additions and 147 deletions.
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ services:
- trino

minio:
image: bitnami/minio:2024.3.21
image: bitnami/minio:2024.3.26
environment:
MINIO_ROOT_USER: accesskey
MINIO_ROOT_PASSWORD: secretkey
Expand Down

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions docs/contribute/02_workflow.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ export PGPASSWORD=postgres
psql -t -A -h localhost -U postgres -d ibis_testing -c "select 'success'"
```

#### Adding appropriate tests

If you pull request involves a new feature, you should add appropriate tests to cover
all ordinary and edge cases.

Pytest markers can be used to assert that a test should fail or raise a specific error.
We use a number of pytest markers in ibis:

- `pytest.mark.notimpl`: the backend can do a thing, we haven't mapped the op
- `pytest.mark.notyet`: the backend cannot do a thing, but might in the future
- `pytest.mark.never`: the backend will never support this / pass this test (common example
here is a test running on sqlite that relies on strong typing)
- `pytest.mark.broken`: this test broke and it's demonstrably unrelated to the PR I'm working
on and fixing it shouldn't block this PR from going in (but we should fix it up pronto)

Refrain from using a generic marker like `pytest.mark.xfail`.


### Writing the commit

Ibis follows the [Conventional Commits](https://www.conventionalcommits.org/) structure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ file](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-
and manually add it into the classpath:

```{python}
# | include: false
#| output: false
!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
connection.raw_sql("ADD JAR './flink-sql-connector-kafka-3.0.2-1.18.jar'")
```
Expand Down
6 changes: 3 additions & 3 deletions ibis/backends/clickhouse/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ def visit_StringFind(self, op, *, arg, substr, start, end):
)

if start is not None:
return self.f.locate(arg, substr, start)
return self.f.position(arg, substr, start)

return self.f.locate(arg, substr)
return self.f.position(arg, substr)

def visit_RegexSearch(self, op, *, arg, pattern):
return sge.RegexpLike(this=arg, expression=pattern)
Expand Down Expand Up @@ -478,7 +478,7 @@ def visit_Repeat(self, op, *, arg, times):
return self.f.repeat(arg, self.f.accurateCast(times, "UInt64"))

def visit_StringContains(self, op, haystack, needle):
return self.f.locate(haystack, needle) > 0
return self.f.position(haystack, needle) > 0

def visit_DayOfWeekIndex(self, op, *, arg):
weekdays = len(calendar.day_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT
locate("t0"."string_col", 'a') - 1 AS "StringFind(string_col, 'a')"
position("t0"."string_col", 'a') - 1 AS "StringFind(string_col, 'a')"
FROM "functional_alltypes" AS "t0"
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT
locate("t0"."string_col", "t0"."string_col") - 1 AS "StringFind(string_col, string_col)"
position("t0"."string_col", "t0"."string_col") - 1 AS "StringFind(string_col, string_col)"
FROM "functional_alltypes" AS "t0"
15 changes: 8 additions & 7 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

# import the pandas execution module to register dispatched implementations of
# execute_node that the dask backend will later override
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import NoUrl
Expand Down Expand Up @@ -167,11 +165,14 @@ def read_parquet(
self.dictionary[table_name] = df
return self.table(table_name)

def table(self, name: str, schema: sch.Schema | None = None):
df = self.dictionary[name]
schema = schema or self.schemas.get(name, None)
schema = PandasData.infer_table(df.head(1), schema=schema)
return ops.DatabaseTable(name, schema, self).to_expr()
def get_schema(self, table_name, *, database=None):
try:
schema = self.schemas[table_name]
except KeyError:
df = self.dictionary[table_name]
self.schemas[table_name] = schema = PandasData.infer_table(df.head(1))

return schema

def _convert_object(self, obj) -> dd.DataFrame:
if isinstance(obj, dd.DataFrame):
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/druid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import parse_qs, urlparse

import pydruid
import pydruid.db
import sqlglot as sg

import ibis.common.exceptions as com
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/flink/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,6 @@ def visit_MapMerge(self, op: ops.MapMerge, *, left, right):
values = self.f.array_concat(left_values, right_values)

return self.cast(self.f.map_from_arrays(keys, values), op.dtype)

def visit_StructColumn(self, op, *, names, values):
return self.cast(sge.Struct(expressions=list(values)), op.dtype)
48 changes: 48 additions & 0 deletions ibis/backends/flink/tests/test_memtable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import pytest
from pyflink.common.types import Row

import ibis
from ibis.backends.tests.errors import Py4JJavaError


@pytest.mark.parametrize(
"data,schema,expected",
[
pytest.param(
{"value": [{"a": 1}, {"a": 2}]},
{"value": "!struct<a: !int>"},
[Row(Row([1])), Row(Row([2]))],
id="simple_named_struct",
),
pytest.param(
{"value": [[{"a": 1}, {"a": 2}], [{"a": 3}, {"a": 4}]]},
{"value": "!array<!struct<a: !int>>"},
[Row([Row([1]), Row([2])]), Row([Row([3]), Row([4])])],
id="single_field_named_struct_array",
),
pytest.param(
{"value": [[{"a": 1, "b": 2}, {"a": 2, "b": 2}]]},
{"value": "!array<!struct<a: !int, b: !int>>"},
[Row([Row([1, 2]), Row([2, 2])])],
id="named_struct_array",
),
],
)
def test_create_memtable(con, data, schema, expected):
t = ibis.memtable(data, schema=ibis.schema(schema))
# cannot use con.execute(t) directly because of some behavioral discrepancy between
# `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`
result = con.raw_sql(con.compile(t))
# raw_sql() returns a `TableResult` object and doesn't natively convert to pandas
assert list(result.collect()) == expected


@pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="cannot create an ARRAY of named STRUCTs directly from the ARRAY[] constructor; https://issues.apache.org/jira/browse/FLINK-34898",
)
def test_create_named_struct_array_with_array_constructor(con):
con.raw_sql("SELECT ARRAY[cast(ROW(1) as ROW<a INT>)];")
12 changes: 5 additions & 7 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,16 @@ def list_tables(self, like=None, database=None):
return self._filter_with_like(list(self.dictionary.keys()), like)

def table(self, name: str, schema: sch.Schema | None = None):
df = self.dictionary[name]
schema = schema or self.schemas.get(name, None)
schema = PandasData.infer_table(df, schema=schema)
return ops.DatabaseTable(name, schema, self).to_expr()
inferred_schema = self.get_schema(name)
overridden_schema = {**inferred_schema, **(schema or {})}
return ops.DatabaseTable(name, overridden_schema, self).to_expr()

def get_schema(self, table_name, *, database=None):
schemas = self.schemas
try:
schema = schemas[table_name]
schema = self.schemas[table_name]
except KeyError:
df = self.dictionary[table_name]
schemas[table_name] = schema = PandasData.infer_table(df)
self.schemas[table_name] = schema = PandasData.infer_table(df)

return schema

Expand Down
60 changes: 56 additions & 4 deletions ibis/backends/sql/dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import contextlib
import math
from copy import deepcopy

import sqlglot.expressions as sge
from sqlglot import transforms
Expand All @@ -18,6 +19,7 @@
Trino,
)
from sqlglot.dialects.dialect import rename_func
from sqlglot.helper import seq_get

ClickHouse.Generator.TRANSFORMS |= {
sge.ArraySize: rename_func("length"),
Expand Down Expand Up @@ -113,6 +115,7 @@ class Flink(Hive):
class Generator(Hive.Generator):
TYPE_MAPPING = Hive.Generator.TYPE_MAPPING.copy() | {
sge.DataType.Type.TIME: "TIME",
sge.DataType.Type.STRUCT: "ROW",
}

TRANSFORMS = Hive.Generator.TRANSFORMS.copy() | {
Expand All @@ -121,10 +124,6 @@ class Generator(Hive.Generator):
sge.StddevSamp: rename_func("stddev_samp"),
sge.Variance: rename_func("var_samp"),
sge.VariancePop: rename_func("var_pop"),
sge.Array: (
lambda self,
e: f"ARRAY[{', '.join(arg.sql(self.dialect) for arg in e.expressions)}]"
),
sge.ArrayConcat: rename_func("array_concat"),
sge.Length: rename_func("char_length"),
sge.TryCast: lambda self,
Expand All @@ -135,6 +134,59 @@ class Generator(Hive.Generator):
sge.Interval: _interval_with_precision,
}

def struct_sql(self, expression: sge.Struct) -> str:
from sqlglot.optimizer.annotate_types import annotate_types

expression = annotate_types(expression)

values = []
schema = []

for e in expression.expressions:
if isinstance(e, sge.PropertyEQ):
e = sge.alias_(e.expression, e.this)
# named structs
if isinstance(e, sge.Alias):
if e.type and e.type.is_type(sge.DataType.Type.UNKNOWN):
self.unsupported(
"Cannot convert untyped key-value definitions (try annotate_types)."
)
else:
schema.append(f"{self.sql(e, 'alias')} {self.sql(e.type)}")
values.append(self.sql(e, "this"))
else:
values.append(self.sql(e))

if not (size := len(expression.expressions)) or len(schema) != size:
return self.func("ROW", *values)
return f"CAST(ROW({', '.join(values)}) AS ROW({', '.join(schema)}))"

def array_sql(self, expression: sge.Array) -> str:
# workaround for the time being because you cannot construct an array of named
# STRUCTs directly from the ARRAY[] constructor
# https://issues.apache.org/jira/browse/FLINK-34898
from sqlglot.optimizer.annotate_types import annotate_types

expression = annotate_types(expression)
first_arg = seq_get(expression.expressions, 0)
# it's an array of structs
if isinstance(first_arg, sge.Struct):
# get rid of aliasing because we want to compile this as CAST instead
args = deepcopy(expression.expressions)
for arg in args:
for e in arg.expressions:
arg.set("expressions", [e.unalias() for e in arg.expressions])

format_values = ", ".join([self.sql(arg) for arg in args])
# all elements of the array should have the same type
format_dtypes = self.sql(first_arg.type)

return f"CAST(ARRAY[{format_values}] AS ARRAY<{format_dtypes}>)"

return (
f"ARRAY[{', '.join(self.sql(arg) for arg in expression.expressions)}]"
)

class Tokenizer(Hive.Tokenizer):
# In Flink, embedded single quotes are escaped like most other SQL
# dialects: doubling up the single quote
Expand Down
7 changes: 5 additions & 2 deletions ibis/backends/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ def test_literal_map_getitem_broadcast(backend, alltypes, df):
),
pytest.mark.notyet(["pandas", "dask"]),
mark_notyet_postgres,
pytest.mark.notimpl("flink"),
pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="does not support selecting struct key from map",
),
mark_notyet_snowflake,
],
id="struct",
Expand Down Expand Up @@ -304,7 +308,6 @@ def test_literal_map_getitem_broadcast(backend, alltypes, df):
marks=[
pytest.mark.notyet("clickhouse", reason="nested types can't be null"),
mark_notyet_postgres,
pytest.mark.notimpl("flink", reason="can't construct structs"),
],
id="struct",
),
Expand Down
1 change: 0 additions & 1 deletion ibis/backends/tests/test_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def test_scalar_param_array(con):
["mysql", "sqlite", "mssql"],
reason="mysql and sqlite will never implement struct types",
)
@pytest.mark.notimpl(["flink"], "WIP")
def test_scalar_param_struct(con):
value = dict(a=1, b="abc", c=3.0)
param = ibis.param("struct<a: int64, b: string, c: float64>")
Expand Down
9 changes: 0 additions & 9 deletions ibis/backends/tests/test_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ def test_all_fields(struct, struct_df):

@pytest.mark.notimpl(["postgres", "risingwave"])
@pytest.mark.parametrize("field", ["a", "b", "c"])
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_literal(backend, con, field):
query = _STRUCT_LITERAL[field]
dtype = query.type().to_pandas()
Expand All @@ -89,9 +86,6 @@ def test_literal(backend, con, field):
@pytest.mark.notyet(
["clickhouse"], reason="clickhouse doesn't support nullable nested types"
)
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_null_literal(backend, con, field):
query = _NULL_STRUCT_LITERAL[field]
result = pd.Series([con.execute(query)])
Expand All @@ -101,9 +95,6 @@ def test_null_literal(backend, con, field):


@pytest.mark.notimpl(["dask", "pandas", "postgres", "risingwave"])
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_struct_column(alltypes, df):
t = alltypes
expr = t.select(s=ibis.struct(dict(a=t.string_col, b=1, c=t.bigint_col)))
Expand Down
Loading

0 comments on commit 1421a4e

Please sign in to comment.