Skip to content

Commit

Permalink
remove schema by Gil Forsyth
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Mar 29, 2024
1 parent e5b0d8f commit 7c46775
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 72 deletions.
81 changes: 15 additions & 66 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,6 @@ def create_table(
if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")

if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
f"Creating tables in other databases is not supported by {self.name}"
)
else:
database = None

if connector_properties is not None and (
encode_format is None or data_format is None
):
Expand Down Expand Up @@ -223,7 +216,7 @@ def create_table(
else:
temp_name = name

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=column_defs)

if connector_properties is None:
Expand All @@ -244,14 +237,14 @@ def create_table(
data_format, encode_format, encode_properties
)

this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
this = sg.table(name, db=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect)
cur.execute(insert_stmt)

if overwrite:
self.drop_table(name, database=database, schema=schema, force=True)
self.drop_table(name, database=database, force=True)
cur.execute(
f"ALTER TABLE {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
)
Expand Down Expand Up @@ -330,7 +323,6 @@ def create_materialized_view(
obj: ir.Table,
*,
database: str | None = None,
schema: str | None = None,
overwrite: bool = False,
) -> ir.Table:
"""Creating a materialized view. The created materialized view can be accessed like a normal table.
Expand All @@ -342,9 +334,7 @@ def create_materialized_view(
obj
The select statement to materialize.
database
Name of the database where the view exists, if not the default.
schema
Name of the schema where the view exists, if not the default
Name of the database where the view exists, if not the default
overwrite
Whether to overwrite the existing materialized view with the same name
Expand All @@ -353,21 +343,12 @@ def create_materialized_view(
Table
Table expression
"""
if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
f"Creating materialized views in other databases is not supported by {self.name}"
)
else:
database = None

if overwrite:
temp_name = util.gen_name(f"{self.name}_table")
else:
temp_name = name

table = sg.table(
temp_name, db=schema, catalog=database, quoted=self.compiler.quoted
)
table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)

create_stmt = sge.Create(
this=table,
Expand All @@ -376,14 +357,14 @@ def create_materialized_view(
)
self._register_in_memory_tables(obj)

this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if overwrite:
self.drop_materialized_view(
name, database=database, schema=schema, force=True
)
target = sg.table(name, db=database).sql(self.dialect)

self.drop_materialized_view(target, database=database, force=True)

cur.execute(
f"ALTER MATERIALIZED VIEW {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
f"ALTER MATERIALIZED VIEW {table.sql(self.dialect)} RENAME TO {target}"
)

return self.table(name, database=database)
Expand All @@ -393,7 +374,6 @@ def drop_materialized_view(
name: str,
*,
database: str | None = None,
schema: str | None = None,
force: bool = False,
) -> None:
"""Drop a materialized view.
Expand All @@ -404,15 +384,11 @@ def drop_materialized_view(
Materialized view name to drop.
database
Name of the database where the view exists, if not the default.
schema
Name of the schema where the view exists, if not the default.
force
If `False`, an exception is raised if the view does not exist.
"""
src = sge.Drop(
this=sg.table(
name, db=schema, catalog=database, quoted=self.compiler.quoted
),
this=sg.table(name, db=database, quoted=self.compiler.quoted),
kind="MATERIALIZED VIEW",
exists=force,
)
Expand Down Expand Up @@ -455,13 +431,6 @@ def create_source(
Table
Table expression
"""
if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
f"Creating sources in other databases is not supported by {self.name}"
)
else:
database = None

column_defs = [
sge.ColumnDef(
this=sg.to_identifier(colname, quoted=self.compiler.quoted),
Expand All @@ -475,7 +444,7 @@ def create_source(
for colname, typ in schema.items()
]

table = sg.table(name, catalog=database, quoted=self.compiler.quoted)
table = sg.table(name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=column_defs)

create_stmt = sge.Create(
Expand All @@ -500,7 +469,6 @@ def drop_source(
name: str,
*,
database: str | None = None,
schema: str | None = None,
force: bool = False,
) -> None:
"""Drop a Source.
Expand All @@ -511,15 +479,11 @@ def drop_source(
Source name to drop.
database
Name of the database where the view exists, if not the default.
schema
Name of the schema where the view exists, if not the default.
force
If `False`, an exception is raised if the source does not exist.
"""
src = sge.Drop(
this=sg.table(
name, db=schema, catalog=database, quoted=self.compiler.quoted
),
this=sg.table(name, db=database, quoted=self.compiler.quoted),
kind="SOURCE",
exists=force,
)
Expand All @@ -534,7 +498,6 @@ def create_sink(
*,
obj: ir.Table | None = None,
database: str | None = None,
schema: str | None = None,
data_format: str | None = None,
encode_format: str | None = None,
encode_properties: dict | None = None,
Expand All @@ -555,23 +518,14 @@ def create_sink(
An Ibis table expression or pandas table that will be used to extract the schema and the data of the new table. Only one of `sink_from` or `obj` can be provided.
database
Name of the database where the source exists, if not the default.
schema
Name of the schema where the view exists, if not the default.
data_format
The data format for the new source, e.g., "PLAIN". data_format and encode_format must be specified at the same time.
encode_format
The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time.
encode_properties
The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details.
"""
if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
f"Creating sinks in other databases is not supported by {self.name}"
)
else:
database = None

table = sg.table(name, db=schema, catalog=database, quoted=self.compiler.quoted)
table = sg.table(name, db=database, quoted=self.compiler.quoted)

if encode_format is None != data_format is None:
raise com.RelationError(
Expand Down Expand Up @@ -600,7 +554,6 @@ def drop_sink(
name: str,
*,
database: str | None = None,
schema: str | None = None,
force: bool = False,
) -> None:
"""Drop a Sink.
Expand All @@ -611,15 +564,11 @@ def drop_sink(
Sink name to drop.
database
Name of the database where the view exists, if not the default.
schema
Name of the schema where the view exists, if not the default.
force
If `False`, an exception is raised if the source does not exist.
"""
src = sge.Drop(
this=sg.table(
name, db=schema, catalog=database, quoted=self.compiler.quoted
),
this=sg.table(name, db=database, quoted=self.compiler.quoted),
kind="SINK",
exists=force,
)
Expand Down
4 changes: 0 additions & 4 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,10 +1320,6 @@ def test_repr_timestamp_array(con, monkeypatch):
@pytest.mark.broken(
["dask"], raises=AssertionError, reason="DataFrame.index are different"
)
@pytest.mark.notimpl(
["risingwave"],
reason="Only table-in-out functions can have subquery parameters",
)
def test_unnest_range(con):
expr = ibis.range(2).unnest().name("x").as_table().mutate({"y": 1.0})
result = con.execute(expr)
Expand Down
8 changes: 6 additions & 2 deletions ibis/backends/tests/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,9 @@ def test_today_from_projection(alltypes):
}


@pytest.mark.notimpl(["pandas", "dask", "exasol"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["pandas", "dask", "exasol", "risingwave"], raises=com.OperationNotDefinedError
)
@pytest.mark.notimpl(
["druid"], raises=PyDruidProgrammingError, reason="SQL parse failed"
)
Expand Down Expand Up @@ -1825,7 +1827,9 @@ def test_interval_literal(con, backend):
assert con.execute(expr.typeof()) == INTERVAL_BACKEND_TYPES[backend_name]


@pytest.mark.notimpl(["pandas", "dask", "exasol"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["pandas", "dask", "exasol", "risingwave"], raises=com.OperationNotDefinedError
)
@pytest.mark.broken(
["druid"],
raises=AttributeError,
Expand Down

0 comments on commit 7c46775

Please sign in to comment.