Skip to content

Commit

Permalink
use max_parallelism field
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 26, 2024
1 parent 7b55631 commit 1509b47
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl StreamManagerService for StreamServiceImpl {
table_id: tf.table_id().table_id,
state: tf.state() as i32,
parallelism: Some(tf.assigned_parallelism.into()),
max_parallelism: tf.max_parallelism() as _,
max_parallelism: tf.max_parallelism as _,
},
)
.collect_vec()
Expand Down
14 changes: 1 addition & 13 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use std::ops::AddAssign;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, VnodeCountCompat, WorkerSlotId};
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::table_parallelism::{
FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
Expand Down Expand Up @@ -283,17 +282,6 @@ impl TableFragments {
self.ctx.timezone.clone()
}

/// Returns the maximum value of the `vnode_count` of all hash-distributed fragments.
/// Returns 1 if all fragments are singleton.
pub fn max_parallelism(&self) -> usize {
self.fragments
.values()
.filter(|f| matches!(f.distribution_type(), FragmentDistributionType::Hash))
.map(|f| f.vnode_count())
.max()
.unwrap_or(1) // if all fragments are singleton, return 1
}

/// Returns whether the table fragments is in `Created` state.
pub fn is_created(&self) -> bool {
self.state == State::Created
Expand Down

0 comments on commit 1509b47

Please sign in to comment.