Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track progress for SourceBackfill (blocking DDL) #18112

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ clap = { workspace = true }
comfy-table = "7"
crepe = "0.1"
easy-ext = "1"
educe = "0.6"
either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ impl ReplaceTablePlan {
}
}

#[derive(Debug, Clone)]
#[derive(educe::Educe, Clone)]
#[educe(Debug)]
pub struct CreateStreamingJobCommandInfo {
#[educe(Debug(ignore))]
pub table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(super) struct Progress {
upstream_mv_count: HashMap<TableId, usize>,

/// Total key count in the upstream materialized view
/// TODO: implement this for source backfill
upstream_total_key_count: u64,

/// Consumed rows
Expand Down Expand Up @@ -122,6 +123,12 @@ impl Progress {

/// Returns whether all backfill executors are done.
fn is_done(&self) -> bool {
tracing::trace!(
"Progress::is_done? {}, {}, {:?}",
self.done_count,
self.states.len(),
self.states
);
self.done_count == self.states.len()
}

Expand Down Expand Up @@ -274,6 +281,7 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// TODO: add a specialized progress for source
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

Expand Down Expand Up @@ -494,6 +502,7 @@ impl CreateMviewProgressTracker {
replace_table: Option<&ReplaceTablePlan>,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?info, "add job to track");
let (info, actors, replace_table_info) = {
let CreateStreamingJobCommandInfo {
table_fragments, ..
Expand Down Expand Up @@ -596,6 +605,7 @@ impl CreateMviewProgressTracker {
progress: &CreateMviewProgress,
version_stats: &HummockVersionStats,
) -> Option<TrackingJob> {
tracing::trace!(?progress, "update progress");
let actor = progress.backfill_actor_id;
let Some(table_id) = self.actor_map.get(&actor).copied() else {
// On restart, backfill will ALWAYS notify CreateMviewProgressTracker,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ impl MetadataManager {
&self,
job: &StreamingJob,
) -> MetaResult<NotificationVersion> {
tracing::debug!("wait_streaming_job_finished: {job:?}");
match self {
MetadataManager::V1(mgr) => mgr.wait_streaming_job_finished(job).await,
MetadataManager::V2(mgr) => mgr.wait_streaming_job_finished(job.id() as _).await,
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ impl TableFragments {
return vec![];
}
if (fragment.fragment_type_mask
& (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32))
& (FragmentTypeFlag::Values as u32
| FragmentTypeFlag::StreamScan as u32
| FragmentTypeFlag::SourceScan as u32))
Comment on lines +365 to +367
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is core change 1

!= 0
{
actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id));
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ pub struct ScaleController {

pub env: MetaSrvEnv,

/// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
/// e.g., a MV cannot be rescheduled during foreground backfill.
pub reschedule_lock: RwLock<()>,
}

Expand Down
81 changes: 71 additions & 10 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Once;
use std::time::Instant;

use anyhow::anyhow;
Expand All @@ -30,6 +31,7 @@ use risingwave_connector::source::{
BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand All @@ -40,6 +42,7 @@ use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
Expand Down Expand Up @@ -88,6 +91,8 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {

/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

progress: CreateMviewProgress,
}

/// Local variables used in the backfill stage.
Expand Down Expand Up @@ -230,6 +235,7 @@ impl BackfillStage {
}

impl<S: StateStore> SourceBackfillExecutorInner<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
actor_ctx: ActorContextRef,
info: ExecutorInfo,
Expand All @@ -238,6 +244,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
system_params: SystemParamsReaderRef,
backfill_state_store: BackfillStateTableHandler<S>,
rate_limit_rps: Option<u32>,
progress: CreateMviewProgress,
) -> Self {
let source_split_change_count = metrics
.source_split_change_count
Expand All @@ -247,6 +254,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
&actor_ctx.id.to_string(),
&actor_ctx.fragment_id.to_string(),
]);

Self {
actor_ctx,
info,
Expand All @@ -256,6 +264,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_split_change_count,
system_params,
rate_limit_rps,
progress,
}
}

Expand Down Expand Up @@ -346,7 +355,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
splits: owned_splits,
};
backfill_stage.debug_assert_consistent();
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;
Expand All @@ -370,6 +378,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}
}
tracing::debug!(?backfill_stage, "source backfill started");

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
Expand Down Expand Up @@ -407,9 +416,23 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
pause_reader!();
}

let state_store = self.backfill_state_store.state_store.state_store().clone();
static STATE_TABLE_INITIALIZED: Once = Once::new();
tokio::spawn(async move {
// This is for self.backfill_finished() to be safe.
// We wait for 1st epoch's curr, i.e., the 2nd epoch's prev.
let epoch = barrier.epoch.curr;
tracing::info!("waiting for epoch: {}", epoch);
state_store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.await
.expect("failed to wait epoch");
STATE_TABLE_INITIALIZED.call_once(|| ());
tracing::info!("finished waiting for epoch: {}", epoch);
});
yield Message::Barrier(barrier);

if !self.backfill_finished(&backfill_stage.states).await? {
{
Comment on lines -412 to +435
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the early exit condition, let it only exit in one place to make it safer & simpler

let source_backfill_row_count = self
.metrics
.source_backfill_row_count
Expand Down Expand Up @@ -552,10 +575,26 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.commit(barrier.epoch)
.await?;

yield Message::Barrier(barrier);

if self.backfill_finished(&backfill_stage.states).await? {
break 'backfill_loop;
if self.should_report_finished(&backfill_stage.states) {
// TODO: use a specialized progress for source
Copy link
Member

@fuyufjh fuyufjh Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An idea: We may report the process and total rows in the progress as well.

current: (show jobs)

  Id  |                     Statement                      | Progress
------+----------------------------------------------------+----------
 1010 | CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM mv1  | 2.21%

expected:

  Id  |                     Statement                         | Processed | Total | Progress
------+-------------------------------------------------------+-----------+-------+---------
 1010 | CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM mv1     | 221       | 1000  | 2.21% 
 1012 | CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM source1 | 114514    |       | 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found #17735 already did similar thing when set streaming_use_snapshot_backfill = true. It just changed the Progress string.

dev=> show jobs;
┌────┬────────────────────────────────────────────────┬───────────────────┐
│ Id │                   Statement                    │     Progress      │
├────┼────────────────────────────────────────────────┼───────────────────┤
│ 29 │ CREATE MATERIALIZED VIEW mv AS SELECT * FROM t │ Snapshot [32.08%] │
└────┴────────────────────────────────────────────────┴───────────────────┘
(1 row)

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
let progress = match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
create_mview_tracker,
..
} => {
if create_mview_tracker.has_pending_finished_jobs() {
"Snapshot finished".to_string()
} else {
let progress = create_mview_tracker
.gen_ddl_progress()
.remove(&self.info.table_fragments.table_id().table_id)
.expect("should exist");
format!("Snapshot [{}]", progress.progress)
}
}
CreatingStreamingJobStatus::ConsumingLogStore {
start_consume_log_store_epoch,
..
} => {
let max_collected_epoch = max(
self.barrier_control.max_collected_epoch().unwrap_or(0),
self.backfill_epoch,
);
let lag = Duration::from_millis(
Epoch(*start_consume_log_store_epoch)
.physical_time()
.saturating_sub(Epoch(max_collected_epoch).physical_time()),
);
format!(
"LogStore [remain lag: {:?}, epoch cnt: {}]",
lag,
self.barrier_control.inflight_barrier_count()
)
}
CreatingStreamingJobStatus::ConsumingUpstream { .. } => {
format!(
"Upstream [unattached: {}, epoch cnt: {}]",
self.barrier_control.unattached_epochs().count(),
self.barrier_control.inflight_barrier_count(),
)
}
CreatingStreamingJobStatus::Finishing { .. } => {
format!(
"Finishing [epoch count: {}]",
self.barrier_control.inflight_barrier_count()
)
}
};
DdlProgress {
id: self.info.table_fragments.table_id().table_id as u64,
statement: self.info.definition.clone(),
progress,
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do this in a separate PR, because it's more complex than I expected. Since one MV may have multiple backfill upstreams, and I want to distinguish MV backfill rows and source backfill rows.

// Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates
// progress based on the number of consumed rows and an estimated total number of rows from hummock.
// For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%.
tracing::info!("progress finish");
let epoch = barrier.epoch;
self.progress.finish(epoch, 114514);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2nd argument of progress.finish() is current_consumed_rows, which I think it's easy to count and report an accurate number. Why not doing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Not did it just because no total_consumed_rows and thought it meaningless

Copy link
Member

@fuyufjh fuyufjh Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the #18112 (comment) will make it more meaningful. I think we can do the counting part in this PR.

// yield barrier after reporting progress
yield Message::Barrier(barrier);

// After we reported finished, we still don't exit the loop.
// Because we need to handle split migration.
if STATE_TABLE_INITIALIZED.is_completed()
&& self.backfill_finished(&backfill_stage.states).await?
{
break 'backfill_loop;
}
} else {
yield Message::Barrier(barrier);
}
}
Message::Chunk(chunk) => {
Expand Down Expand Up @@ -665,7 +704,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.apply_split_change_forward_stage(
actor_splits,
&mut splits,
true,
false,
Comment on lines -668 to +707
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor bug. (not related

)
.await?;
}
Expand All @@ -688,11 +727,34 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

/// All splits finished backfilling.
/// When we should call `progress.finish()` to let blocking DDL return.
/// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
///
/// Note: split migration (online scaling) is related with progress tracking.
/// - For foreground DDL, scaling is not allowed before progress is finished.
/// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
fn should_report_finished(&self, states: &BackfillStates) -> bool {
states.values().all(|state| {
matches!(
state,
BackfillState::Finished | BackfillState::SourceCachingUp(_)
)
})
}

/// All splits entered `Finished` state.
///
/// We check all splits for the source, including other actors' splits here, before going to the forward stage.
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
///
/// Note: at the beginning, the actor will only read the state written by itself.
/// It needs to _wait until it can read all actors' written data_.
/// i.e., wait for the first checkpoint has been available.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
Ok(states
.values()
Expand Down Expand Up @@ -761,7 +823,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
// TODO: disallow online scaling during backfilling.
target_state.insert(split_id, backfill_state);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
};
ret.push(state);
}
tracing::trace!("scan SourceBackfill state table: {:?}", ret);
Ok(ret)
}

Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/from_proto/source_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
source_desc_builder,
state_table_handler,
);
let progress = params
.local_barrier_manager
.register_create_mview_progress(params.actor_context.id);

let exec = SourceBackfillExecutorInner::new(
params.actor_context.clone(),
Expand All @@ -81,6 +84,7 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder {
params.env.system_params_manager_ref().get_params(),
backfill_state_table,
node.rate_limit,
progress,
);
let [input]: [_; 1] = params.input.try_into().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ pub(super) struct PartialGraphManagedBarrierState {
prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,

/// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
///
/// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta
/// in [`BarrierCompleteResult`].
pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,

pub(super) state_store: StateStoreImpl,
Expand Down
Loading