Skip to content

Commit

Permalink
update for scaling
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Sep 4, 2024
1 parent 45323d0 commit a54a89f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub(super) struct TrackingCommand {
/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`.
#[derive(Default, Debug)]
pub(super) struct CreateMviewProgressTracker {
// XXX: which one should contain source backfill?
// TODO: add a specialized progress for source
/// Progress of the create-mview DDL indicated by the `TableId`.
progress_map: HashMap<TableId, (Progress, TrackingJob)>,

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ pub struct ScaleController {

pub env: MetaSrvEnv,

/// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
/// e.g., a MV cannot be rescheduled during foreground backfill.
pub reschedule_lock: RwLock<()>,
}

Expand Down
83 changes: 65 additions & 18 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Once;
use std::time::Instant;

use anyhow::anyhow;
Expand All @@ -30,6 +31,7 @@ use risingwave_connector::source::{
BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -233,6 +235,7 @@ impl BackfillStage {
}

impl<S: StateStore> SourceBackfillExecutorInner<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
actor_ctx: ActorContextRef,
info: ExecutorInfo,
Expand Down Expand Up @@ -352,7 +355,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
splits: owned_splits,
};
backfill_stage.debug_assert_consistent();
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;
Expand All @@ -376,6 +378,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}
}
tracing::debug!(?backfill_stage, "source backfill started");

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
Expand Down Expand Up @@ -413,9 +416,23 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
pause_reader!();
}

let state_store = self.backfill_state_store.state_store.state_store().clone();
static STATE_TABLE_INITIALIZED: Once = Once::new();
tokio::spawn(async move {
// This is for self.backfill_finished() to be safe.
// We wait for 1st epoch's curr, i.e., the 2nd epoch's prev.
let epoch = barrier.epoch.curr;
tracing::info!("waiting for epoch: {}", epoch);
state_store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.await
.expect("failed to wait epoch");
STATE_TABLE_INITIALIZED.call_once(|| ());
tracing::info!("finished waiting for epoch: {}", epoch);
});
yield Message::Barrier(barrier);

if !self.backfill_finished(&backfill_stage.states).await? {
{
let source_backfill_row_count = self
.metrics
.source_backfill_row_count
Expand Down Expand Up @@ -550,13 +567,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
);
}

// TODO: use a specialized progress for source?
// self.progress.update(
// barrier.epoch,
// snapshot_read_epoch,
// total_snapshot_processed_rows,
// );

self.backfill_state_store
.set_states(backfill_stage.states.clone())
.await?;
Expand All @@ -565,10 +575,26 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.commit(barrier.epoch)
.await?;

yield Message::Barrier(barrier);

if self.backfill_finished(&backfill_stage.states).await? {
break 'backfill_loop;
if self.should_report_finished(&backfill_stage.states) {
// TODO: use a specialized progress for source
// Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates
// progress based on the number of consumed rows and an estimated total number of rows from hummock.
// For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%.
tracing::info!("progress finish");
let epoch = barrier.epoch;
self.progress.finish(epoch, 114514);
// yield barrier after reporting progress
yield Message::Barrier(barrier);

// After we reported finished, we still don't exit the loop.
// Because we need to handle split migration.
if STATE_TABLE_INITIALIZED.is_completed()
&& self.backfill_finished(&backfill_stage.states).await?
{
break 'backfill_loop;
}
} else {
yield Message::Barrier(barrier);
}
}
Message::Chunk(chunk) => {
Expand Down Expand Up @@ -640,7 +666,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

let mut first_barrier_after_finish = true;
let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();
// Make sure `Finished` state is persisted.
self.backfill_state_store
Expand Down Expand Up @@ -679,7 +704,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.apply_split_change_forward_stage(
actor_splits,
&mut splits,
true,
false,
)
.await?;
}
Expand All @@ -702,11 +727,34 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

/// All splits finished backfilling.
/// When we should call `progress.finish()` to let blocking DDL return.
/// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
///
/// Note: split migration (online scaling) is related with progress tracking.
/// - For foreground DDL, scaling is not allowed before progress is finished.
/// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
fn should_report_finished(&self, states: &BackfillStates) -> bool {
states.values().all(|state| {
matches!(
state,
BackfillState::Finished | BackfillState::SourceCachingUp(_)
)
})
}

/// All splits entered `Finished` state.
///
/// We check all splits for the source, including other actors' splits here, before going to the forward stage.
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
///
/// Note: at the beginning, the actor will only read the state written by itself.
/// It needs to _wait until it can read all actors' written data_.
/// i.e., wait for the first checkpoint has been available.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
Ok(states
.values()
Expand Down Expand Up @@ -775,7 +823,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
// TODO: disallow online scaling during backfilling.
target_state.insert(split_id, backfill_state);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
};
ret.push(state);
}
tracing::trace!("scan SourceBackfill state table: {:?}", ret);
Ok(ret)
}

Expand Down

0 comments on commit a54a89f

Please sign in to comment.