Skip to content

Commit

Permalink
feat: track progress for SourceBackfill
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent efc2a77 commit a0ddd6d
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ clap = { workspace = true }
comfy-table = "7"
crepe = "0.1"
easy-ext = "1"
educe = "0.6"
either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ impl ReplaceTablePlan {
}
}

#[derive(Debug, Clone)]
#[derive(educe::Educe, Clone)]
#[educe(Debug)]
pub struct CreateStreamingJobCommandInfo {
#[educe(Debug(ignore))]
pub table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ impl Progress {

/// Returns whether all backfill executors are done.
fn is_done(&self) -> bool {
tracing::trace!(
"Progress::is_done? {}, {}, {:?}",
self.done_count,
self.states.len(),
self.states
);
self.done_count == self.states.len()
}

Expand Down Expand Up @@ -274,6 +280,7 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// XXX: which one should contain source backfill?
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

Expand Down Expand Up @@ -494,6 +501,7 @@ impl CreateMviewProgressTracker {
replace_table: Option<&ReplaceTablePlan>,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?info, "add job to track");
let (info, actors, replace_table_info) = {
let CreateStreamingJobCommandInfo {
table_fragments, ..
Expand Down Expand Up @@ -596,6 +604,7 @@ impl CreateMviewProgressTracker {
progress: &CreateMviewProgress,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?progress, "update progress");
let actor = progress.backfill_actor_id;
let Some(table_id) = self.actor_map.get(&actor).copied() else {
// On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ impl MetadataManager {
&self,
job: &StreamingJob,
) -> MetaResult<NotificationVersion> {
tracing::debug!("wait_streaming_job_finished: {job:?}");
match self {
MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await,
MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await,
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ impl TableFragments {
return vec![];
}
if (fragment.fragment_type_mask
& (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32))
& (FragmentTypeFlag::Values as u32
| FragmentTypeFlag::StreamScan as u32
| FragmentTypeFlag::SourceScan as u32))
!= 0
{
actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id));
Expand Down
38 changes: 30 additions & 8 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::{AddMutation, UpdateMutation};
use crate::task::CreateMviewProgress;

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
Expand Down Expand Up @@ -88,6 +89,8 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {

/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

progress: CreateMviewProgress,
}

/// Local variables used in the backfill stage.
Expand Down Expand Up @@ -238,6 +241,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
system_params: SystemParamsReaderRef,
backfill_state_store: BackfillStateTableHandler<S>,
rate_limit_rps: Option<u32>,
progress: CreateMviewProgress,
) -> Self {
let source_split_change_count = metrics
.source_split_change_count
Expand All @@ -247,6 +251,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
&actor_ctx.id.to_string(),
&actor_ctx.fragment_id.to_string(),
]);

Self {
actor_ctx,
info,
Expand All @@ -256,6 +261,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_split_change_count,
system_params,
rate_limit_rps,
progress,
}
}

Expand Down Expand Up @@ -554,6 +560,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
);
}

// TODO: use a specialized progress for source?
// self.progress.update(
// barrier.epoch,
// snapshot_read_epoch,
// total_snapshot_processed_rows,
// );

self.backfill_state_store
.set_states(backfill_stage.states.clone())
.await?;
Expand Down Expand Up @@ -637,6 +650,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

let mut first_barrier_after_finish = true;
let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();

// All splits finished backfilling. Now we only forward the source data.
Expand Down Expand Up @@ -673,14 +687,22 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
_ => {}
}
}
self.backfill_state_store
.set_states(
splits
.iter()
.map(|s| (s.clone(), BackfillState::Finished))
.collect(),
)
.await?;

if first_barrier_after_finish {
// Update state and report progress after the first barrier.
// TODO: use a specialized progress for source
tracing::error!("progress finish");
self.progress.finish(barrier.epoch, 114514);
self.backfill_state_store
.set_states(
splits
.iter()
.map(|s| (s.clone(), BackfillState::Finished))
.collect(),
)
.await?;
}
first_barrier_after_finish = false;
self.backfill_state_store
.state_store
.commit(barrier.epoch)
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/from_proto/source_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
source_desc_builder,
state_table_handler,
);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);

let exec = SourceBackfillExecutorInner::new(
params.actor_context.clone(),
Expand All @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
params.env.system_params_manager_ref().get_params(),
backfill_state_table,
node.rate_limit,
progress,
);
let [input]: [_; 1] = params.input.try_into().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState {
prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,

/// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
///
/// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta
/// in [`BarrierCompleteResult`].
pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,

pub(super) state_store: StateStoreImpl,
Expand Down

0 comments on commit a0ddd6d

Please sign in to comment.