Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(risingwave): add streaming DDLs #8239

Merged

Conversation

KeXiangWang
Copy link
Contributor

@KeXiangWang KeXiangWang commented Feb 6, 2024

Description of changes

Risingwave plays a role as a computing engine and storage system in Streaming ecosystems.
Usually, a streaming workload will include two/three systems like this:
upstream data source --> Risingwave
Or,
upstream data source --> Risingwave --> downstream data sink

This PR adds streaming support, mainly new relations related to streaming, for the Risingwave backend.

To be specific, Four types of new relations are introduced:

  • Source, data sources that encompass a connector connected to an upstream data system like Kafka. A source works like an extraction or a placeholder for an upstream system, and although it has columns, it does not store any data itself.
  • Materialized View, which is Risingwave's core concept for streaming. One can create an MV with a query on existing tables or sources. The data in MV is automatically updated in a real-time way. Users can access the data with a select statement just like accessing a table.
  • Table with connector, works like a combination of a source and a normal table. The difference between Table with connector and Source is that, once the table is created, it will automatically start to consume data from upstream systems and update it into Risingwave.
  • Sink, unlink a Materialized View which stores the result of a query in RW, users can choose to sink the result to a downstream system, e.g. Redis, and then read the data in the downstream system. Unlike an MV, User cannot access the data directly from a sink.

Some minor changes:

  1. Pump the image version to a new nightly build, as the new image solves some issues found in the previous PR.
  2. Pump the sqlalchemy-risingwave version from 1.0.0 to 1.0.1, some implementations are updated.
  3. Mark one test, test_semi_join, as skipped for risingwave backend. The test sometimes stuck and it seems to be Risingwave's fault. I'll continue to investigate.

One issue introduced in this PR:
Risingwave backend's implementation is sqlalchemy-based. But sqlalchemy has no corresponding concept for sources and sinks. So, in order to work around this issue, I temporarily categorize a source as a view. So sqlalchemy can access a source's metadata (names and types of its columns) like a view. However, this causes a side effect, when users call list_view, they may see some unexpected views that are actually sources. This can be fixed by rewriting the implementation of the list_view func in the Risingwave Backend. I'll fix it later.

Issues closed

@KeXiangWang
Copy link
Contributor Author

It seems I have no access to invite reviewers. @gforsyth @cpcloud @chloeh13q @deepyaman Could you please help take a review or invite the appropriate ones?

@jcrist
Copy link
Member

jcrist commented Feb 6, 2024

Hi @KeXiangWang - thanks for opening this!

We're unfortunately in the midst of a large internals refactor moving all backends to use sqlglot instead of sqlalchemy. We hope to merge https://github.com/ibis-project/ibis/tree/the-epic-split into main soon (maybe by end of week?) after which we'll be back to accepting PRs to main. Until that's done though we don't really want to merge additional large PRs as this would complicate getting the refactor in. We'll follow up after the-epic-split is merged to discuss how best to implement the work here using sqlglot, but until then things are blocked on us finishing up the internals refactor.

@KeXiangWang
Copy link
Contributor Author

We're unfortunately in the midst of a large internals refactor moving all backends to use sqlglot instead of sqlalchemy.

It's OK. I have no experience on sqlplot. After the PR is merged, could your engineers please also help refactor the related codes in this PR? I'd like to provide help if needed.

@zhenzhongxu
Copy link
Contributor

@chloeh13q @cpcloud: bump this up, is this something we can help wth getting the sqlglot refactoring?

@cpcloud
Copy link
Member

cpcloud commented Feb 23, 2024

It looks like the first order of business would be to rebase this PR on main. @KeXiangWang Can you do that?

@KeXiangWang
Copy link
Contributor Author

It looks like the first order of business would be to rebase this PR on main.

OK, I'll try my best.

@KeXiangWang KeXiangWang force-pushed the wkx/risingwave-streaming-features branch 2 times, most recently from 35e6474 to a3e2717 Compare February 28, 2024 21:58
@KeXiangWang
Copy link
Contributor Author

Hi @cpcloud, I've rebased the PR to newest main and fix all the issue introduced by the sqlglot refactoring. Could you please help take a look.

Besides, I find some tests are unable to pass even with the main branch, so I leave them unchanged for now. Do you have any ideas? Is there any configuration I missed?

Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @KeXiangWang -- thanks for putting this together and sorry for the delay in reviewing!

I've taken first pass over the new DDL stuff. I am not a streaming expert, so maybe @chloeh13q and @deepyaman can also chime in on some of the API design stuff.

Since some of these methods may become "standard" on other (future or current) streaming backends we want to be very deliberate in how we design them. We especially want to make sure that we don't overindex on a single backend when coming up with names (this is very difficult).

ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
Comment on lines 344 to 335
def create_materialized_view(
self,
name: str,
obj: ir.Table,
*,
database: str | None = None,
schema: str | None = None,
overwrite: bool = False,
) -> ir.Table:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a new API for Ibis -- should this be a standalone method? One possible alternative is to have a keyword argument to create_view that creates a materialized view instead of a "regular" view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this information may be helpful in helping us decide on a suitable API: https://materialize.com/guides/materialized-views/#how-do-materialized-views-work-in-specific-databases

Personally I think it depends on whether people tend to think of materialized views as a special type of views, or completely separate from normal views. It sounds like several backends just treat materialized views as a special type of views, as opposed to "its own thing".

Copy link
Contributor

@chloeh13q chloeh13q Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wondering:

I saw that there is a ticket for implementing materialized views for a broader set of backends: #5964 - is this an API we'd want to introduce in the base class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to make materialized view a special type if view in this PR. But I would say they should be different objects in databases. The underlying mechanism and performance are largely different. For streaming databases like Risingwave and Materialize, Materialized view is the core concept and hardest part to design, while view is much more trivial. So we would like to keep materialized view and view separate.

Copy link
Contributor

@zhenzhongxu zhenzhongxu Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to distinguish between incremental materialized view? I think with streaming incremental materialized view, the line becomes blurry between view and materialized view. But across different backends, they are still pretty distinct concepts. I'd vote to make them distinct for now, and make them explicit on what backends support which.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should distinguish here without a clear explanation of what the difference between incremental and non-incremental streaming materialized views are.

It's also adding more scope than is needed for now.

Copy link
Member

@cpcloud cpcloud Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep this API separate from create_view for now, it's likely that materialized views will require some additional kwargs that don't apply to views. The word "view" here is really being stretched to its limit by this functionality and thinking about materialized views as another kind of view isn't really correct.

ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
@chloeh13q
Copy link
Contributor

I have some open questions on a high level:

I understand the motivation behind introducing separate APIs for sources, materialized views, tables, and sinks for the RW backend. Sources, materialized views, tables, and sinks are distinct objects in RW. However, I wonder how generalizable this is across different streaming backends.

For the user who is already familiar with RW's API, this implementation feels natural. But for the user who is coming from another backend, it may cause some confusion.

For example, Flink also has the same concepts, but it doesn't use a different API for each. Source, tables, and sinks are all created with CREATE TABLE. A lot of these underlying differences are abstracted away from the user.

Of course, it's also okay that we just do this for the RW backend for now and refactor at a later point if there is a need to consolidate some of the APIs.

@KeXiangWang KeXiangWang force-pushed the wkx/risingwave-streaming-features branch from a3e2717 to 71e8cc7 Compare March 11, 2024 04:36
@KeXiangWang
Copy link
Contributor Author

KeXiangWang commented Mar 11, 2024

I wonder how generalizable this is across different streaming backends.

Good question. These concepts are common in streaming backends, although different systems may have different names for them. For example, as Flink doesn't have a normal persistent table in a traditional database, its tables are used to express a streaming job. While in RW, Materialize, and Timeplus, we use materialized views. For sources, Flink combines a streaming job and the dependent sources together as a Table, while RW and materialize use sources objects and Timeplus use stream objects.

Flink also has the same concepts, but it doesn't use a different API for each.

Flink positions itself as a 'compute engine,' which means it's designed not to store data but to process it. In Flink, a table is essentially a streaming job that interacts with sources or sinks. The outcomes of these jobs are not stored within Flink itself. In contrast, streaming databases such as RW, Materialize, and Timeplus, function as databases that store the results locally. While Flink tables are declared as Tables, you cannot directly query data from them. Instead, you must route the data to an external system to access the results. With Flink, you trade off a simpler API for the flexibility and simplicity of its architecture. On the other hand, streaming databases offer the ability to customize your streaming jobs by combining various objects to suit your needs, providing direct accessing or sinking to external systems at the same time.

@chloeh13q
Copy link
Contributor

@KeXiangWang Yep that makes sense. Since Ibis works across engines & databases, I'd imagine that these are questions that Ibis will need to address. But I think it also depends on the background of the user, e.g., whether it's someone who's experimenting with streaming or someone who already. has some expertise in streaming and wants to try out the Ibis API. In any case, these may be questions that we can leave open and come back to, when there is user feedback backing up one way or another.

@KeXiangWang KeXiangWang force-pushed the wkx/risingwave-streaming-features branch from 5373f05 to f72698f Compare March 21, 2024 09:33
@KeXiangWang KeXiangWang force-pushed the wkx/risingwave-streaming-features branch from f72698f to e5b0d8f Compare March 26, 2024 15:51
@gforsyth gforsyth added ddl Issues related to creating or altering data definitions risingwave The RisingWave backend streaming Issue related to streaming APIs or backends labels Mar 26, 2024
@gforsyth
Copy link
Member

@KeXiangWang -- we've just merged in #8655 which moves us away from using the word schema in any hierarchical sense. We're standardizing on "database" as a collection of tables and "catalog" as a collection of database.
If this PR allows maintainers to push updates, I can take on porting over your work here to the new naming and push that up.

@KeXiangWang
Copy link
Contributor Author

If this PR allows maintainers to push updates, I can take on porting over your work here to the new naming and push that up.

Thx @gforsyth . I really cannot fiind a button in this page to enable it, so I create a new PR here. We can push the following updates to the new PR. Thanks!

@gforsyth
Copy link
Member

I've pushed up the fixes to #8781 -- @KeXiangWang, you can either cherry-pick that commit back to this PR, or we can close this PR out in favor of your newer one, whichever you prefer!

@KeXiangWang
Copy link
Contributor Author

Hi @gforsyth . Thanks for the effort. I've update the patch to this PR and I'll close #8781 when this PR is merged. Thanks!

@KeXiangWang
Copy link
Contributor Author

@cpcloud @gforsyth Any further comments? Can we merge this PR now?

@KeXiangWang KeXiangWang requested a review from cpcloud April 2, 2024 15:28
Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing on this @KeXiangWang !

I flagged a few style things and one bit of incorrect logic, I think.

ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
ibis/backends/risingwave/__init__.py Outdated Show resolved Hide resolved
Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all of your work on this @KeXiangWang !

@gforsyth gforsyth merged commit 356e459 into ibis-project:main Apr 7, 2024
85 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ddl Issues related to creating or altering data definitions risingwave The RisingWave backend streaming Issue related to streaming APIs or backends
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants