Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add target_offsets to determinine if source backfill finished #18297

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 133 additions & 82 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::time::Instant;

Expand Down Expand Up @@ -45,7 +44,7 @@ use crate::executor::{AddMutation, UpdateMutation};
pub enum BackfillState {
/// `None` means not started yet. It's the initial state.
Backfilling(Option<String>),
/// Backfill is stopped at this offset. Source needs to filter out messages before this offset.
/// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
SourceCachingUp(String),
Finished,
}
Expand All @@ -59,54 +58,6 @@ impl BackfillState {
pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

/// Returns whether the row from upstream `SourceExecutor` is visible.
fn handle_upstream_row(&mut self, offset: &str) -> bool {
let mut vis = false;
match self {
BackfillState::Backfilling(None) => {
// backfilling for this split is not started yet. Ignore this row
}
BackfillState::Backfilling(Some(backfill_offset)) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// continue backfilling. Ignore this row
}
Ordering::Equal => {
// backfilling for this split is finished just right.
*self = BackfillState::Finished;
}
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*self = BackfillState::SourceCachingUp(backfill_offset.clone());
}
}
}
BackfillState::SourceCachingUp(backfill_offset) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// Source caught up, but doesn't contain the last backfilled row.
// This may happen e.g., if Kafka performed compaction.
vis = true;
*self = BackfillState::Finished;
}
Ordering::Equal => {
// Source just caught up with backfilling.
*self = BackfillState::Finished;
}
Ordering::Greater => {
// Source is still behind backfilling.
}
}
}
BackfillState::Finished => {
vis = true;
// This split's backfilling is finisehd, we are waiting for other splits
}
}
vis
}
}

pub struct SourceBackfillExecutor<S: StateStore> {
Expand Down Expand Up @@ -138,16 +89,37 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {
}

/// Local variables used in the backfill stage.
///
/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
#[derive(Debug)]
struct BackfillStage {
states: BackfillStates,
/// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
///
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
splits: Vec<SplitImpl>,
/// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
/// TODO: initialize this with high watermark so that we can finish backfilling even when upstream
/// doesn't emit any data.
target_offsets: HashMap<SplitId, Option<String>>,
}

impl BackfillStage {
fn debug_assert_consistent(&self) {
if cfg!(debug_assertions) {
let all_splits: HashSet<_> =
self.splits.iter().map(|split| split.id().clone()).collect();
assert_eq!(
self.states.keys().cloned().collect::<HashSet<_>>(),
all_splits
);
assert_eq!(
self.target_offsets.keys().cloned().collect::<HashSet<_>>(),
all_splits
);
}
}

/// Get unfinished splits with latest offsets according to the backfill states.
fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
let mut unfinished_splits = Vec::new();
Expand All @@ -165,6 +137,92 @@ impl BackfillStage {
}
Ok(unfinished_splits)
}

/// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
let mut vis = false;
let state = self.states.get_mut(split_id).unwrap();
match state {
BackfillState::Backfilling(None) => {
// backfilling for this split is not started yet. Ignore this row
}
BackfillState::Backfilling(Some(backfill_offset)) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// continue backfilling. Ignore this row
}
Ordering::Equal => {
// backfilling for this split is finished just right.
*state = BackfillState::Finished;
}
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*state = BackfillState::SourceCachingUp(backfill_offset.clone());
}
}
}
BackfillState::SourceCachingUp(backfill_offset) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// Source caught up, but doesn't contain the last backfilled row.
// This may happen e.g., if Kafka performed compaction.
vis = true;
*state = BackfillState::Finished;
}
Ordering::Equal => {
// Source just caught up with backfilling.
*state = BackfillState::Finished;
}
Ordering::Greater => {
// Source is still behind backfilling.
}
}
}
BackfillState::Finished => {
vis = true;
// This split's backfilling is finished, we are waiting for other splits
}
}
if matches!(state, BackfillState::Backfilling(_)) {
*self.target_offsets.get_mut(split_id).unwrap() = Some(offset.to_string());
}
if vis {
debug_assert_eq!(*state, BackfillState::Finished);
}
vis
}

/// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible.
fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
let state = self.states.get_mut(split_id).unwrap();
match state {
BackfillState::Backfilling(_old_offset) => {
let target_offset = self.target_offsets.get(split_id).unwrap();
if let Some(target_offset) = target_offset
&& compare_kafka_offset(offset, target_offset).is_ge()
{
// Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
// and dropping duplicated messages.
// But it's not true if target_offset is fetched from other places, like Kafka high watermark.
// In this case, upstream hasn't reached the target_offset yet.
//
// Note2: after this, all following rows in the current chunk will be invisible.
//
// Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
// keep backfilling.
*state = BackfillState::SourceCachingUp(offset.to_string());
} else {
*state = BackfillState::Backfilling(Some(offset.to_string()));
}
true
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
// backfilling stopped. ignore
false
}
}
}
}

impl<S: StateStore> SourceBackfillExecutorInner<S> {
Expand Down Expand Up @@ -275,9 +333,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
backfill_states.insert(split_id, backfill_state);
}
let mut backfill_stage = BackfillStage {
// init with None
target_offsets: backfill_states
.keys()
.map(|split_id| (split_id.clone(), None))
.collect(),
states: backfill_states,
splits: owned_splits,
};
backfill_stage.debug_assert_consistent();
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
Expand Down Expand Up @@ -348,6 +412,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let mut last_barrier_time = Instant::now();
let mut self_paused = false;

// The main logic of the loop is in handle_upstream_row and handle_backfill_row.
'backfill_loop: while let Some(either) = backfill_stream.next().await {
match either {
// Upstream
Expand Down Expand Up @@ -485,9 +550,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
for (i, (_, row)) in chunk.rows().enumerate() {
let split = row.datum_at(split_idx).unwrap().into_utf8();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let backfill_state =
backfill_stage.states.get_mut(split).unwrap();
let vis = backfill_state.handle_upstream_row(offset);
let vis = backfill_stage.handle_upstream_row(split, offset);
new_vis.set(i, vis);
}
// emit chunk if vis is not empty. i.e., some splits finished backfilling.
Expand Down Expand Up @@ -527,36 +590,12 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.system_params.load().barrier_interval_ms() as u128
* WAIT_BARRIER_MULTIPLE_TIMES;
}
// TODO(optimize): actually each msg is from one split. We can
// include split from the message and avoid iterating over all rows.
let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());

for (i, (_, row)) in chunk.rows().enumerate() {
let split_id: Arc<str> =
row.datum_at(split_idx).unwrap().into_utf8().into();
let offset: String =
row.datum_at(offset_idx).unwrap().into_utf8().into();
// update backfill progress
let mut vis = true;
match backfill_stage.states.entry(split_id.clone()) {
Entry::Occupied(mut entry) => {
let state = entry.get_mut();
match state {
BackfillState::Backfilling(_) => {
*state =
BackfillState::Backfilling(Some(offset.clone()));
}
BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {
// backfilling stopped. ignore
vis = false
}
}
}
Entry::Vacant(entry) => {
entry.insert(BackfillState::Backfilling(Some(offset.clone())));
}
}
let split_id = row.datum_at(split_idx).unwrap().into_utf8();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let vis = backfill_stage.handle_backfill_row(split_id, offset);
new_vis.set(i, vis);
}

Expand Down Expand Up @@ -678,7 +717,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// Iterate over the target (assigned) splits
// - check if any new splits are added
// - build target_state
for split in target_splits {
for split in &target_splits {
let split_id = split.id();
if let Some(s) = old_states.get(&split_id) {
target_state.insert(split_id, s.clone());
Expand Down Expand Up @@ -727,7 +766,19 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
debug_assert_eq!(old_states, target_state);
}
stage.states = target_state;

stage.splits = target_splits;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let old_target_offsets = std::mem::take(&mut stage.target_offsets);
stage.target_offsets = stage
.states
.keys()
.map(|split_id| {
(
split_id.clone(),
old_target_offsets.get(split_id).cloned().flatten(),
)
})
.collect();
Comment on lines +770 to +780
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap::retain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to consider newly added splits.

stage.debug_assert_consistent();
Ok(split_changed)
}

Expand Down
Loading