Skip to content

Commit

Permalink
fix: deny create MV on shared CDC source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 12, 2024
1 parent 287d40f commit 92e20d1
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 19 deletions.
3 changes: 3 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ create source mysql_mytest with (
server.id = '5601'
);

statement error Should not create MATERIALIZED VIEW directly on shared CDC source.
create materialized view mv as select * from mysql_mytest;

statement error The upstream table name must contain database name prefix*
create table products_test ( id INT,
name STRING,
Expand Down
8 changes: 3 additions & 5 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::mysql_row_to_owned_row;
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
use crate::WithPropertiesExt;

#[derive(Debug)]
pub enum CdcTableType {
Expand All @@ -46,11 +47,8 @@ pub enum CdcTableType {
}

impl CdcTableType {
pub fn from_properties(with_properties: &HashMap<String, String>) -> Self {
let connector = with_properties
.get("connector")
.map(|c| c.to_ascii_lowercase())
.unwrap_or_default();
pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
let connector = with_properties.get_connector().unwrap_or_default();
match connector.as_str() {
"mysql-cdc" => Self::MySql,
"postgres-cdc" => Self::Postgres,
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashMap};

use crate::source::cdc::external::CdcTableType;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
Expand Down Expand Up @@ -78,7 +79,7 @@ impl Get for BTreeMap<String, String> {
}

/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
pub trait WithPropertiesExt: Get {
pub trait WithPropertiesExt: Get + Sized {
#[inline(always)]
fn get_connector(&self) -> Option<String> {
self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
Expand All @@ -100,6 +101,10 @@ pub trait WithPropertiesExt: Get {
connector.contains("-cdc")
}

fn is_shared_cdc_source(&self) -> bool {
self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
}

#[inline(always)]
fn is_iceberg_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{Statement, TableAlias};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -52,6 +53,12 @@ pub struct BoundSource {
pub catalog: SourceCatalog,
}

impl BoundSource {
pub fn is_shared_cdc_source(&self) -> bool {
self.catalog.with_properties.is_shared_cdc_source()
}
}

impl From<&SourceCatalog> for BoundSource {
fn from(s: &SourceCatalog) -> Self {
Self { catalog: s.clone() }
Expand Down
8 changes: 1 addition & 7 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::external::CdcTableType;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
Expand Down Expand Up @@ -1304,12 +1303,7 @@ pub async fn handle_create_source(
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if with_properties.is_cdc_connector() {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};
let create_cdc_source_job = with_properties.is_shared_cdc_source();

let (columns_from_resolve_source, source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
Expand Down
19 changes: 13 additions & 6 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,19 @@ impl Planner {
}

pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
Ok(LogicalSource::with_catalog(
Rc::new(source.catalog),
SourceNodeKind::CreateMViewOrBatch,
self.ctx(),
)?
.into())
if source.is_shared_cdc_source() {
Err(ErrorCode::InternalError(
"Should not create MATERIALIZED VIEW directly on shared CDC source. HINT: create TABLE from the source instead.".to_string(),
)
.into())
} else {
Ok(LogicalSource::with_catalog(
Rc::new(source.catalog),
SourceNodeKind::CreateMViewOrBatch,
self.ctx(),
)?
.into())
}
}

pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
Expand Down

0 comments on commit 92e20d1

Please sign in to comment.