Skip to content

Commit

Permalink
feat: allow append only table with pk (#18634)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Sep 25, 2024
1 parent ce70a51 commit 0bcebd8
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 26 deletions.
2 changes: 1 addition & 1 deletion dashboard/next-env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/basic-features/typescript for more information.
// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information.
67 changes: 67 additions & 0 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,73 @@ drop table t_primary_key;
statement ok
drop table t_s3;



# target table append only with primary key

statement ok
create table t_s3 (v1 int, v2 int) append only;

statement ok
insert into t_s3 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_primary_key_append_only (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2) APPEND ONLY;

statement error
create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3;

statement ok
create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3 with (type = 'append-only');


statement ok
flush;

query IIII rowsort
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s3 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_primary_key_append_only values (100, 100);

query IIII
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

statement ok
drop sink s3;

statement ok
drop table t_primary_key_append_only;

statement ok
drop table t_s3;


# multi sinks

statement ok
Expand Down
5 changes: 4 additions & 1 deletion e2e_test/streaming/on_conflict.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int, v3 int, primary key(v1)) on conflict ignore;
create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict ignore;

statement ok
insert into t1 values (1,4,2), (2,3,3);
Expand All @@ -26,6 +26,9 @@ select v1, v2, v3 from mv1;
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement error
create table t2 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict overwrite;

statement ok
create table t2 (v1 int, v2 int, v3 int, primary key(v1)) on conflict overwrite;

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@
explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
- sql: |
explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
expected_outputs:
- explain_output
31 changes: 31 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,34 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: t, columns: [v1, v2, _row_id] }
└─StreamExchange [no_shuffle] { dist: SomeShard }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: Overwrite }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(v2) }
│ └─StreamSource { source: t, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v2) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: IgnoreConflict }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(v2) }
│ └─StreamSource { source: t, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v2) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
18 changes: 11 additions & 7 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,19 @@ pub async fn gen_sink_plan(
}
}

let user_defined_primary_key_table =
!(table_catalog.append_only || table_catalog.row_id_index.is_some());
let user_defined_primary_key_table = table_catalog.row_id_index.is_none();
let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly;

if !(user_defined_primary_key_table
|| sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly)
{
if !user_defined_primary_key_table && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys.".to_string(),
"Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

if table_catalog.append_only && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

Expand Down
20 changes: 14 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,20 @@ fn gen_table_plan_inner(
vec![],
);

if append_only && row_id_index.is_none() {
return Err(ErrorCode::InvalidInputSyntax(
"PRIMARY KEY constraint can not be applied to an append-only table.".to_owned(),
)
.into());
}
let pk_on_append_only = append_only && row_id_index.is_none();

let on_conflict = if pk_on_append_only {
let on_conflict = on_conflict.unwrap_or(OnConflict::Ignore);
if on_conflict != OnConflict::Ignore {
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be IGNORE.".to_owned(),
)
.into());
}
Some(on_conflict)
} else {
on_conflict
};

if !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
Expand Down
25 changes: 14 additions & 11 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ impl PlanRoot {
#[derive(PartialEq, Debug, Copy, Clone)]
enum PrimaryKeyKind {
UserDefinedPrimaryKey,
RowIdAsPrimaryKey,
AppendOnly,
NonAppendOnlyRowIdPk,
AppendOnlyRowIdPk,
}

fn inject_dml_node(
Expand All @@ -694,25 +694,28 @@ impl PlanRoot {
dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;

dml_node = match kind {
PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => {
PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
RequiredDist::hash_shard(pk_column_indices)
.enforce_if_not_satisfies(dml_node, &Order::any())?
}
PrimaryKeyKind::AppendOnly => StreamExchange::new_no_shuffle(dml_node).into(),
PrimaryKeyKind::AppendOnlyRowIdPk => {
StreamExchange::new_no_shuffle(dml_node).into()
}
};

Ok(dml_node)
}

let kind = if append_only {
assert!(row_id_index.is_some());
PrimaryKeyKind::AppendOnly
} else if let Some(row_id_index) = row_id_index {
let kind = if let Some(row_id_index) = row_id_index {
assert_eq!(
pk_column_indices.iter().exactly_one().copied().unwrap(),
row_id_index
);
PrimaryKeyKind::RowIdAsPrimaryKey
if append_only {
PrimaryKeyKind::AppendOnlyRowIdPk
} else {
PrimaryKeyKind::NonAppendOnlyRowIdPk
}
} else {
PrimaryKeyKind::UserDefinedPrimaryKey
};
Expand All @@ -739,7 +742,7 @@ impl PlanRoot {
.enforce_if_not_satisfies(external_source_node, &Order::any())?
}

PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => {
PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
StreamExchange::new_no_shuffle(external_source_node).into()
}
};
Expand Down Expand Up @@ -815,7 +818,7 @@ impl PlanRoot {
PrimaryKeyKind::UserDefinedPrimaryKey => {
unreachable!()
}
PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => {
PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
stream_plan = StreamRowIdGen::new_with_dist(
stream_plan,
row_id_index,
Expand Down

0 comments on commit 0bcebd8

Please sign in to comment.