From 85ca081a2ae75ae7560ce29323b2072351307867 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 24 Sep 2024 17:16:53 +0800 Subject: [PATCH] table max parallelism Signed-off-by: Bugen Zhao --- proto/meta.proto | 1 + .../rw_catalog/rw_streaming_parallelism.rs | 4 +++- .../rw_catalog/rw_table_fragments.rs | 2 ++ src/meta/service/src/stream_service.rs | 2 ++ src/meta/src/model/stream.rs | 14 +++++++++++++- 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 1ba8dd708b82..36bf8c0f9eb0 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -243,6 +243,7 @@ message ListTableFragmentStatesResponse { uint32 table_id = 1; TableFragments.State state = 2; TableParallelism parallelism = 3; + uint32 max_vnode_count = 4; } repeated TableFragmentState states = 1; } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs index 1d4a44d3b47c..84f9cbae78e7 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs @@ -31,7 +31,8 @@ use risingwave_frontend_macro::system_catalog; job.id, job.name, job.relation_type, - tf.parallelism + tf.parallelism, + tf.max_parallelism FROM all_streaming_jobs job INNER JOIN rw_table_fragments tf ON job.id = tf.table_id ORDER BY job.id" @@ -42,4 +43,5 @@ struct RwStreamingParallelism { name: String, relation_type: String, parallelism: String, + max_parallelism: i32, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs index 6bfb835f1d53..fce23b714494 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs @@ -24,6 +24,7 @@ struct RwTableFragment { table_id: i32, status: String, parallelism: String, + max_parallelism: i32, } #[system_catalog(table, "rw_catalog.rw_table_fragments")] @@ -40,6 +41,7 @@ async fn read_rw_table_fragments_info( table_id: state.table_id as i32, status: state.state().as_str_name().into(), parallelism: parallelism.to_uppercase(), + max_parallelism: state.max_vnode_count as i32, } }) .collect()) diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 9edefabdf9f1..46214e3e5bb2 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -271,6 +271,7 @@ impl StreamManagerService for StreamServiceImpl { table_id: tf.table_id().table_id, state: tf.state() as i32, parallelism: Some(tf.assigned_parallelism.into()), + max_vnode_count: tf.max_vnode_count() as _, }, ) .collect_vec() @@ -292,6 +293,7 @@ impl StreamManagerService for StreamServiceImpl { table_id: table_id as _, state: PbState::from(state) as _, parallelism: Some(parallelism.into()), + max_vnode_count: 0, // TODO(var-vnode): write query to obtain this from `fragment` } }) .collect_vec() diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 954ee820e6a7..bfe979a89032 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -17,7 +17,7 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{VirtualNode, WorkerSlotId}; +use risingwave_common::hash::{VirtualNode, VnodeCountCompat, WorkerSlotId}; use risingwave_connector::source::SplitImpl; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -282,6 +282,18 @@ impl TableFragments { self.ctx.timezone.clone() } + /// Returns the max vnode count of all fragments in the table. + /// + /// Setting the parallelism of a streaming job to a value greater than this will result in + /// an error, as there won't be any fragment that fits that value. + pub fn max_vnode_count(&self) -> usize { + self.fragments + .values() + .map(|f| f.vnode_count()) + .max() + .expect("should be at least one fragment") + } + /// Returns whether the table fragments is in `Created` state. pub fn is_created(&self) -> bool { self.state == State::Created