Skip to content

Commit

Permalink
meta table fragments
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 9, 2024
1 parent 16318a2 commit 31e2d77
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 6 deletions.
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1014,4 +1014,6 @@ message StreamFragmentGraph {
StreamContext ctx = 5;
// If none, default parallelism will be applied.
Parallelism parallelism = 6;

optional uint32 maybe_vnode_count = 7;
}
7 changes: 7 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ impl VnodeCountCompat for risingwave_pb::catalog::Table {
}
}

impl VnodeCountCompat for risingwave_pb::stream_plan::StreamFragmentGraph {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl StreamFragmentGraph {
dependent_table_ids: vec![],
table_ids_cnt: 0,
parallelism: None,
maybe_vnode_count: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,13 +665,15 @@ impl ActorGraphBuilder {
cluster_info: StreamingClusterInfo,
default_parallelism: NonZeroUsize,
) -> MetaResult<Self> {
let vnode_count = fragment_graph.vnode_count();
let existing_distributions = fragment_graph.existing_distribution();

// Schedule the distribution of all building fragments.
let scheduler = schedule::Scheduler::new(
streaming_job_id,
&cluster_info.worker_nodes,
default_parallelism,
vnode_count,
)?;
let distributions = scheduler.schedule(&fragment_graph)?;

Expand Down
24 changes: 19 additions & 5 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::{
generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM,
};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::stream_graph_visitor;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
Expand Down Expand Up @@ -319,6 +320,8 @@ pub struct StreamFragmentGraph {
/// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
/// variable. If not specified, all active worker slots will be used.
specified_parallelism: Option<NonZeroUsize>,

vnode_count: usize,
}

impl StreamFragmentGraph {
Expand All @@ -343,10 +346,10 @@ impl StreamFragmentGraph {
// Create nodes.
let fragments: HashMap<_, _> = proto
.fragments
.into_iter()
.map(|(id, fragment)| {
.iter()
.map(|(&id, fragment)| {
let id = fragment_id_gen.to_global_id(id);
let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
let fragment = BuildingFragment::new(id, fragment.clone(), job, table_id_gen);
(id, fragment)
})
.collect();
Expand All @@ -363,10 +366,10 @@ impl StreamFragmentGraph {
let mut downstreams = HashMap::new();
let mut upstreams = HashMap::new();

for edge in proto.edges {
for edge in &proto.edges {
let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
let edge = StreamFragmentEdge::from_protobuf(&edge);
let edge = StreamFragmentEdge::from_protobuf(edge);

upstreams
.entry(downstream_id)
Expand Down Expand Up @@ -394,12 +397,15 @@ impl StreamFragmentGraph {
None
};

let vnode_count = proto.vnode_count();

Ok(Self {
fragments,
downstreams,
upstreams,
dependent_table_ids,
specified_parallelism,
vnode_count,
})
}

Expand Down Expand Up @@ -499,6 +505,10 @@ impl StreamFragmentGraph {
self.specified_parallelism
}

pub fn vnode_count(&self) -> usize {
self.vnode_count
}

/// Get downstreams of a fragment.
fn get_downstreams(
&self,
Expand Down Expand Up @@ -1148,4 +1158,8 @@ impl CompleteStreamFragmentGraph {
pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
&self.building_graph.fragments
}

pub(super) fn vnode_count(&self) -> usize {
self.building_graph.vnode_count()
}
}
8 changes: 7 additions & 1 deletion src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl Scheduler {
streaming_job_id: u32,
workers: &HashMap<u32, WorkerNode>,
default_parallelism: NonZeroUsize,
vnode_count: usize,
) -> MetaResult<Self> {
// Group worker slots with worker node.

Expand All @@ -223,6 +224,11 @@ impl Scheduler {
.collect();

let parallelism = default_parallelism.get();
assert!(
parallelism <= vnode_count,
"parallelism should be limited by vnode count in previous steps"
);

let scheduled = schedule_units_for_slots(&slots, parallelism, streaming_job_id)?;

let scheduled_worker_slots = scheduled
Expand All @@ -236,7 +242,7 @@ impl Scheduler {

// Build the default hash mapping uniformly.
let default_hash_mapping =
WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, VirtualNode::COUNT);
WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, vnode_count);

let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?;
let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ fn make_stream_graph() -> StreamFragmentGraphProto {
dependent_table_ids: vec![],
table_ids_cnt: 3,
parallelism: None,
maybe_vnode_count: None,
}
}

Expand Down

0 comments on commit 31e2d77

Please sign in to comment.