From 2d650db7c6932ec59c414a26b564ea71f290f5a5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 16 Aug 2024 14:00:57 +0800 Subject: [PATCH] feat: track progress for SourceBackfill --- Cargo.lock | 1 + src/meta/Cargo.toml | 1 + src/meta/src/barrier/command.rs | 4 +++- src/meta/src/barrier/progress.rs | 9 +++++++++ src/meta/src/manager/metadata.rs | 1 + src/meta/src/model/stream.rs | 4 +++- .../executor/source/source_backfill_executor.rs | 17 +++++++++++++++++ src/stream/src/from_proto/source_backfill.rs | 4 ++++ .../src/task/barrier_manager/managed_state.rs | 3 +++ 9 files changed, 42 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8bb3bb7afa86..90bc30c5f742e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11207,6 +11207,7 @@ dependencies = [ "comfy-table", "crepe", "easy-ext", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 4511e9f61d894..a7f37bf505910 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -28,6 +28,7 @@ clap = { workspace = true } comfy-table = "7" crepe = "0.1" easy-ext = "1" +educe = "0.6" either = "1" enum-as-inner = "0.6" etcd-client = { workspace = true } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0bea5f37940d6..ffdf8d3bad8bc 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -146,8 +146,10 @@ impl ReplaceTablePlan { } } -#[derive(Debug, Clone)] +#[derive(educe::Educe, Clone)] +#[educe(Debug)] pub struct CreateStreamingJobCommandInfo { + #[educe(Debug(ignore))] pub table_fragments: TableFragments, /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". pub upstream_root_actors: HashMap>, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 5754e4c60e364..a4630e75e91f0 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -122,6 +122,12 @@ impl Progress { /// Returns whether all backfill executors are done. fn is_done(&self) -> bool { + tracing::info!( + "Progress::is_done? {}, {}, {:?}", + self.done_count, + self.states.len(), + self.states + ); self.done_count == self.states.len() } @@ -274,6 +280,7 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. #[derive(Default, Debug)] pub(super) struct CreateMviewProgressTracker { + // XXX: which one should contain source backfill? /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, @@ -494,6 +501,7 @@ impl CreateMviewProgressTracker { replace_table: Option<&ReplaceTablePlan>, version_stats: &HummockVersionStats, ) -> Option { + tracing::info!(?info, "add job to track"); let (info, actors, replace_table_info) = { let CreateStreamingJobCommandInfo { table_fragments, .. @@ -596,6 +604,7 @@ impl CreateMviewProgressTracker { progress: &CreateMviewProgress, version_stats: &HummockVersionStats, ) -> Option { + tracing::debug!(?progress, "update progress"); let actor = progress.backfill_actor_id; let Some(table_id) = self.actor_map.get(&actor).copied() else { // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 52fc811787d30..2963b5a10bc87 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -917,6 +917,7 @@ impl MetadataManager { &self, job: &StreamingJob, ) -> MetaResult { + tracing::info!("wait_streaming_job_finished: {job:?}"); match self { MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await, MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index bec6b95cfb0f9..38d704ab0f6d1 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -362,7 +362,9 @@ impl TableFragments { return vec![]; } if (fragment.fragment_type_mask - & (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32)) + & (FragmentTypeFlag::Values as u32 + | FragmentTypeFlag::StreamScan as u32 + | FragmentTypeFlag::SourceScan as u32)) != 0 { actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id)); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 6182dd7b14da4..c5f01b645ac53 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -40,6 +40,7 @@ use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; use crate::executor::{AddMutation, UpdateMutation}; +use crate::task::CreateMviewProgress; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { @@ -88,6 +89,8 @@ pub struct SourceBackfillExecutorInner { /// Rate limit in rows/s. rate_limit_rps: Option, + + progress: CreateMviewProgress, } /// Local variables used in the backfill stage. @@ -238,6 +241,7 @@ impl SourceBackfillExecutorInner { system_params: SystemParamsReaderRef, backfill_state_store: BackfillStateTableHandler, rate_limit_rps: Option, + progress: CreateMviewProgress, ) -> Self { let source_split_change_count = metrics .source_split_change_count @@ -247,6 +251,7 @@ impl SourceBackfillExecutorInner { &actor_ctx.id.to_string(), &actor_ctx.fragment_id.to_string(), ]); + Self { actor_ctx, info, @@ -256,6 +261,7 @@ impl SourceBackfillExecutorInner { source_split_change_count, system_params, rate_limit_rps, + progress, } } @@ -554,6 +560,13 @@ impl SourceBackfillExecutorInner { ); } + // TODO: use a specialized progress for source? + // self.progress.update( + // barrier.epoch, + // snapshot_read_epoch, + // total_snapshot_processed_rows, + // ); + self.backfill_state_store .set_states(backfill_stage.states.clone()) .await?; @@ -673,6 +686,10 @@ impl SourceBackfillExecutorInner { _ => {} } } + // TODO: use a specialized progress for source? + tracing::error!("progress finish"); + self.progress.finish(barrier.epoch, 114514); + self.backfill_state_store .set_states( splits diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index ba3ab599af700..65329a26bd40b 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { source_desc_builder, state_table_handler, ); + let progress = params + .local_barrier_manager + .register_create_mview_progress(params.actor_context.id); let exec = SourceBackfillExecutorInner::new( params.actor_context.clone(), @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { params.env.system_params_manager_ref().get_params(), backfill_state_table, node.rate_limit, + progress, ); let [input]: [_; 1] = params.input.try_into().unwrap(); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 5ccde5004801d..6f21e32adc107 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState { prev_barrier_table_ids: Option<(EpochPair, HashSet)>, /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. + /// + /// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta + /// in [`BarrierCompleteResult`]. pub(super) create_mview_progress: HashMap>, pub(super) state_store: StateStoreImpl,