From 4acc6a83d180dc87af6ba614c447e71c767df3bd Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 24 Sep 2024 17:40:30 +0800 Subject: [PATCH] filter out singletons in table fragments max parallelism Signed-off-by: Bugen Zhao --- proto/meta.proto | 2 +- .../system_catalog/rw_catalog/rw_table_fragments.rs | 2 +- src/meta/service/src/stream_service.rs | 4 ++-- src/meta/src/model/stream.rs | 12 ++++++------ 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 36bf8c0f9eb0..bdd2ed570817 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -243,7 +243,7 @@ message ListTableFragmentStatesResponse { uint32 table_id = 1; TableFragments.State state = 2; TableParallelism parallelism = 3; - uint32 max_vnode_count = 4; + uint32 max_parallelism = 4; } repeated TableFragmentState states = 1; } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs index fce23b714494..8ef273174f36 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_fragments.rs @@ -41,7 +41,7 @@ async fn read_rw_table_fragments_info( table_id: state.table_id as i32, status: state.state().as_str_name().into(), parallelism: parallelism.to_uppercase(), - max_parallelism: state.max_vnode_count as i32, + max_parallelism: state.max_parallelism as i32, } }) .collect()) diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 46214e3e5bb2..f16a5a6ea1f5 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -271,7 +271,7 @@ impl StreamManagerService for StreamServiceImpl { table_id: tf.table_id().table_id, state: tf.state() as i32, parallelism: Some(tf.assigned_parallelism.into()), - max_vnode_count: tf.max_vnode_count() as _, + max_parallelism: tf.max_parallelism() as _, }, ) .collect_vec() @@ -293,7 +293,7 @@ impl StreamManagerService for StreamServiceImpl { table_id: table_id as _, state: PbState::from(state) as _, parallelism: Some(parallelism.into()), - max_vnode_count: 0, // TODO(var-vnode): write query to obtain this from `fragment` + max_parallelism: 0, // TODO(var-vnode): write query to obtain this from `fragment` } }) .collect_vec() diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index bfe979a89032..da68ba469fa6 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -21,6 +21,7 @@ use risingwave_common::hash::{VirtualNode, VnodeCountCompat, WorkerSlotId}; use risingwave_connector::source::SplitImpl; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; use risingwave_pb::meta::table_parallelism::{ FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism, @@ -282,16 +283,15 @@ impl TableFragments { self.ctx.timezone.clone() } - /// Returns the max vnode count of all fragments in the table. - /// - /// Setting the parallelism of a streaming job to a value greater than this will result in - /// an error, as there won't be any fragment that fits that value. - pub fn max_vnode_count(&self) -> usize { + /// Returns the maximum value of the `vnode_count` of all hash-distributed fragments. + /// Returns 1 if all fragments are singleton. + pub fn max_parallelism(&self) -> usize { self.fragments .values() + .filter(|f| matches!(f.distribution_type(), FragmentDistributionType::Hash)) .map(|f| f.vnode_count()) .max() - .expect("should be at least one fragment") + .unwrap_or(1) // if all fragments are singleton, return 1 } /// Returns whether the table fragments is in `Created` state.