Skip to content

Commit

Permalink
refactor: track progress minor refactor (#18446)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Sep 9, 2024
1 parent 462253e commit 29d2e1e
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct RwDdlProgress {

#[system_catalog(table, "rw_catalog.rw_ddl_progress")]
async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwDdlProgress>> {
let ddl_progresses = reader.meta_client.list_ddl_progress().await?;
let ddl_progresses = reader.meta_client.get_ddl_progress().await?;

let table_ids = ddl_progresses
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub async fn handle_show_object(
.into());
}
ShowObject::Jobs => {
let resp = session.env().meta_client().list_ddl_progress().await?;
let resp = session.env().meta_client().get_ddl_progress().await?;
let rows = resp.into_iter().map(|job| ShowJobRow {
id: job.id as i64,
statement: job.statement,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub trait FrontendMetaClient: Send + Sync {

async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;

async fn list_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;

async fn get_tables(&self, table_ids: &[u32]) -> Result<HashMap<u32, Table>>;

Expand Down Expand Up @@ -232,7 +232,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.set_session_param(param, value).await
}

async fn list_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
let ddl_progress = self.0.get_ddl_progress().await?;
Ok(ddl_progress)
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok("".to_string())
}

async fn list_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
Ok(vec![])
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,9 @@ impl GlobalBarrierManager {
if let Some(request) = request {
match request {
BarrierManagerRequest::GetDdlProgress(result_tx) => {
// Progress of normal backfill
let mut progress = self.checkpoint_control.create_mview_tracker.gen_ddl_progress();
// Progress of snapshot backfill
for creating_job in self.checkpoint_control.creating_streaming_job_controls.values() {
progress.extend([(creating_job.info.table_fragments.table_id().table_id, creating_job.gen_ddl_progress())]);
}
Expand Down Expand Up @@ -1634,6 +1636,7 @@ impl GlobalBarrierManagerContext {
Ok(info)
}

/// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
let mut ddl_progress = {
let (tx, rx) = oneshot::channel();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::executor::backfill::utils::{
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

type Builders = HashMap<VirtualNode, DataChunkBuilder>;

Expand All @@ -56,7 +56,7 @@ pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -79,7 +79,7 @@ where
upstream: Executor,
state_table: StateTable<S>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
rate_limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::backfill::CdcScanOptions;
use crate::executor::monitor::CdcBackfillMetrics;
use crate::executor::prelude::*;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
const METADATA_STATE_LEN: usize = 4;
Expand All @@ -68,7 +68,7 @@ pub struct CdcBackfillExecutor<S: StateStore> {

// TODO: introduce a CdcBackfillProgress to report finish to Meta
// This object is just a stub right now
progress: Option<CreateMviewProgress>,
progress: Option<CreateMviewProgressReporter>,

metrics: CdcBackfillMetrics,

Expand All @@ -86,7 +86,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
upstream: Executor,
output_indices: Vec<usize>,
output_columns: Vec<ColumnDesc>,
progress: Option<CreateMviewProgress>,
progress: Option<CreateMviewProgressReporter>,
metrics: Arc<StreamingMetrics>,
state_table: StateTable<S>,
rate_limit_rps: Option<u32>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::executor::backfill::utils::{
METADATA_STATE_LEN,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
/// We can decode that into `BackfillState` on recovery.
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct BackfillExecutor<S: StateStore> {
output_indices: Vec<usize>,

/// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -100,7 +100,7 @@ where
upstream: Executor,
state_table: Option<StateTable<S>>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
rate_limit: Option<usize>,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::{
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

pub struct SnapshotBackfillExecutor<S: StateStore> {
/// Upstream table
Expand All @@ -55,7 +55,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

chunk_size: usize,
rate_limit: Option<usize>,
Expand All @@ -73,7 +73,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
upstream: Executor,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
chunk_size: usize,
rate_limit: Option<usize>,
barrier_rx: UnboundedReceiver<Barrier>,
Expand Down Expand Up @@ -617,7 +617,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
output_indices: &'a [usize],
mut progress: CreateMviewProgress,
mut progress: CreateMviewProgressReporter,
first_recv_barrier: Barrier,
) {
let mut barrier_epoch = first_recv_barrier.epoch;
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV
Expand All @@ -24,7 +24,7 @@ pub struct ChainExecutor {

upstream: Executor,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -36,7 +36,7 @@ impl ChainExecutor {
pub fn new(
snapshot: Executor,
upstream: Executor,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
upstream_only: bool,
) -> Self {
Self {
Expand Down Expand Up @@ -115,12 +115,12 @@ mod test {
use super::ChainExecutor;
use crate::executor::test_utils::MockSource;
use crate::executor::{AddMutation, Barrier, Execute, Message, Mutation, PkIndices};
use crate::task::{CreateMviewProgress, LocalBarrierManager};
use crate::task::{CreateMviewProgressReporter, LocalBarrierManager};

#[tokio::test]
async fn test_basic() {
let barrier_manager = LocalBarrierManager::for_test();
let progress = CreateMviewProgress::for_test(barrier_manager);
let progress = CreateMviewProgressReporter::for_test(barrier_manager);
let actor_id = progress.actor_id();

let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]);
Expand Down
10 changes: 7 additions & 3 deletions src/stream/src/executor/rearranged_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::stream;
use futures::stream::select_with_strategy;

use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV
Expand All @@ -31,7 +31,7 @@ pub struct RearrangedChainExecutor {

upstream: Executor,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,
}
Expand Down Expand Up @@ -74,7 +74,11 @@ impl RearrangedMessage {
}

impl RearrangedChainExecutor {
pub fn new(snapshot: Executor, upstream: Executor, progress: CreateMviewProgress) -> Self {
pub fn new(
snapshot: Executor,
upstream: Executor,
progress: CreateMviewProgressReporter,
) -> Self {
Self {
snapshot,
upstream,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
Expand Down Expand Up @@ -92,7 +92,7 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {
/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
}

/// Local variables used in the backfill stage.
Expand Down Expand Up @@ -244,7 +244,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
system_params: SystemParamsReaderRef,
backfill_state_store: BackfillStateTableHandler<S>,
rate_limit_rps: Option<u32>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
) -> Self {
let source_split_change_count = metrics
.source_split_change_count
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/executor/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_expr::expr::NonStrictExpression;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

const DEFAULT_CHUNK_SIZE: usize = 1024;

Expand All @@ -33,7 +33,7 @@ pub struct ValuesExecutor {
schema: Schema,
// Receiver of barrier channel.
barrier_receiver: UnboundedReceiver<Barrier>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

rows: vec::IntoIter<Vec<NonStrictExpression>>,
}
Expand All @@ -43,7 +43,7 @@ impl ValuesExecutor {
pub fn new(
ctx: ActorContextRef,
schema: Schema,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
rows: Vec<Vec<NonStrictExpression>>,
barrier_receiver: UnboundedReceiver<Barrier>,
) -> Self {
Expand Down Expand Up @@ -150,12 +150,12 @@ mod tests {
use super::ValuesExecutor;
use crate::executor::test_utils::StreamExecutorTestExt;
use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation};
use crate::task::{CreateMviewProgress, LocalBarrierManager};
use crate::task::{CreateMviewProgressReporter, LocalBarrierManager};

#[tokio::test]
async fn test_values() {
let barrier_manager = LocalBarrierManager::for_test();
let progress = CreateMviewProgress::for_test(barrier_manager);
let progress = CreateMviewProgressReporter::for_test(barrier_manager);
let actor_id = progress.actor_id();
let (tx, barrier_receiver) = unbounded_channel();
let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod progress;
#[cfg(test)]
mod tests;

pub use progress::CreateMviewProgress;
pub use progress::CreateMviewProgressReporter;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
Expand Down
16 changes: 2 additions & 14 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use risingwave_common::must_match;
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::SyncResult;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;
use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -381,7 +380,7 @@ pub(super) struct PartialGraphManagedBarrierState {

/// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
///
/// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta
/// This is updated by [`super::CreateMviewProgressReporter::update`] and will be reported to meta
/// in [`BarrierCompleteResult`].
pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,

Expand Down Expand Up @@ -752,18 +751,7 @@ impl PartialGraphManagedBarrierState {
.remove(&barrier_state.barrier.epoch.curr)
.unwrap_or_default()
.into_iter()
.map(|(actor, state)| CreateMviewProgress {
backfill_actor_id: actor,
done: matches!(state, BackfillState::Done(_)),
consumed_epoch: match state {
BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch,
BackfillState::Done(_) => barrier_state.barrier.epoch.curr,
},
consumed_rows: match state {
BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows,
BackfillState::Done(consumed_rows) => consumed_rows,
},
})
.map(|(actor, state)| state.to_pb(actor))
.collect();

let complete_barrier_future = match kind {
Expand Down
Loading

0 comments on commit 29d2e1e

Please sign in to comment.