diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_ddl_progress.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_ddl_progress.rs index 9f592d4e4f6b..032b0f82907e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_ddl_progress.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_ddl_progress.rs @@ -31,7 +31,7 @@ struct RwDdlProgress { #[system_catalog(table, "rw_catalog.rw_ddl_progress")] async fn read(reader: &SysCatalogReaderImpl) -> Result> { - let ddl_progresses = reader.meta_client.list_ddl_progress().await?; + let ddl_progresses = reader.meta_client.get_ddl_progress().await?; let table_ids = ddl_progresses .iter() diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 6cd8b95f95b4..1821ccc289eb 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -450,7 +450,7 @@ pub async fn handle_show_object( .into()); } ShowObject::Jobs => { - let resp = session.env().meta_client().list_ddl_progress().await?; + let resp = session.env().meta_client().get_ddl_progress().await?; let rows = resp.into_iter().map(|job| ShowJobRow { id: job.id as i64, statement: job.statement, diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 020e3380b29b..c58dcc365f43 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -91,7 +91,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn set_session_param(&self, param: String, value: Option) -> Result; - async fn list_ddl_progress(&self) -> Result>; + async fn get_ddl_progress(&self) -> Result>; async fn get_tables(&self, table_ids: &[u32]) -> Result>; @@ -232,7 +232,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.set_session_param(param, value).await } - async fn list_ddl_progress(&self) -> Result> { + async fn get_ddl_progress(&self) -> Result> { let ddl_progress = self.0.get_ddl_progress().await?; Ok(ddl_progress) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 10dad2105ed9..612388926215 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1013,7 +1013,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok("".to_string()) } - async fn list_ddl_progress(&self) -> RpcResult> { + async fn get_ddl_progress(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e4b9cdb8b3a9..0772bac6699e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -763,7 +763,9 @@ impl GlobalBarrierManager { if let Some(request) = request { match request { BarrierManagerRequest::GetDdlProgress(result_tx) => { + // Progress of normal backfill let mut progress = self.checkpoint_control.create_mview_tracker.gen_ddl_progress(); + // Progress of snapshot backfill for creating_job in self.checkpoint_control.creating_streaming_job_controls.values() { progress.extend([(creating_job.info.table_fragments.table_id().table_id, creating_job.gen_ddl_progress())]); } @@ -1634,6 +1636,7 @@ impl GlobalBarrierManagerContext { Ok(info) } + /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress` pub async fn get_ddl_progress(&self) -> MetaResult> { let mut ddl_progress = { let (tx, rx) = oneshot::channel(); diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index e3979496731b..540ffe1a020f 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -34,7 +34,7 @@ use crate::executor::backfill::utils::{ update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, }; use crate::executor::prelude::*; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; type Builders = HashMap; @@ -56,7 +56,7 @@ pub struct ArrangementBackfillExecutor { /// The column indices need to be forwarded to the downstream from the upstream and table scan. output_indices: Vec, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, actor_id: ActorId, @@ -79,7 +79,7 @@ where upstream: Executor, state_table: StateTable, output_indices: Vec, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, metrics: Arc, chunk_size: usize, rate_limit: Option, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index bfffa066fc26..066dc86ba551 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -43,7 +43,7 @@ use crate::executor::backfill::CdcScanOptions; use crate::executor::monitor::CdcBackfillMetrics; use crate::executor::prelude::*; use crate::executor::UpdateMutation; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; /// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each. const METADATA_STATE_LEN: usize = 4; @@ -68,7 +68,7 @@ pub struct CdcBackfillExecutor { // TODO: introduce a CdcBackfillProgress to report finish to Meta // This object is just a stub right now - progress: Option, + progress: Option, metrics: CdcBackfillMetrics, @@ -86,7 +86,7 @@ impl CdcBackfillExecutor { upstream: Executor, output_indices: Vec, output_columns: Vec, - progress: Option, + progress: Option, metrics: Arc, state_table: StateTable, rate_limit_rps: Option, diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 761aedfa55ee..d8de07375d72 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -30,7 +30,7 @@ use crate::executor::backfill::utils::{ METADATA_STATE_LEN, }; use crate::executor::prelude::*; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` | /// We can decode that into `BackfillState` on recovery. @@ -76,7 +76,7 @@ pub struct BackfillExecutor { output_indices: Vec, /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it. - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, actor_id: ActorId, @@ -100,7 +100,7 @@ where upstream: Executor, state_table: Option>, output_indices: Vec, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, metrics: Arc, chunk_size: usize, rate_limit: Option, diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index ac625f53a02d..593a13df9cbc 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -43,7 +43,7 @@ use crate::executor::{ DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation, StreamExecutorError, StreamExecutorResult, }; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; pub struct SnapshotBackfillExecutor { /// Upstream table @@ -55,7 +55,7 @@ pub struct SnapshotBackfillExecutor { /// The column indices need to be forwarded to the downstream from the upstream and table scan. output_indices: Vec, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, chunk_size: usize, rate_limit: Option, @@ -73,7 +73,7 @@ impl SnapshotBackfillExecutor { upstream: Executor, output_indices: Vec, actor_ctx: ActorContextRef, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, chunk_size: usize, rate_limit: Option, barrier_rx: UnboundedReceiver, @@ -617,7 +617,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, output_indices: &'a [usize], - mut progress: CreateMviewProgress, + mut progress: CreateMviewProgressReporter, first_recv_barrier: Barrier, ) { let mut barrier_epoch = first_recv_barrier.epoch; diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index 6f198ff2b7e1..ca06319e11bf 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::executor::prelude::*; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; /// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and /// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV @@ -24,7 +24,7 @@ pub struct ChainExecutor { upstream: Executor, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, actor_id: ActorId, @@ -36,7 +36,7 @@ impl ChainExecutor { pub fn new( snapshot: Executor, upstream: Executor, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, upstream_only: bool, ) -> Self { Self { @@ -115,12 +115,12 @@ mod test { use super::ChainExecutor; use crate::executor::test_utils::MockSource; use crate::executor::{AddMutation, Barrier, Execute, Message, Mutation, PkIndices}; - use crate::task::{CreateMviewProgress, LocalBarrierManager}; + use crate::task::{CreateMviewProgressReporter, LocalBarrierManager}; #[tokio::test] async fn test_basic() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = CreateMviewProgress::for_test(barrier_manager); + let progress = CreateMviewProgressReporter::for_test(barrier_manager); let actor_id = progress.actor_id(); let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]); diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index 37717d270d90..d70d6c2955c3 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -17,7 +17,7 @@ use futures::stream; use futures::stream::select_with_strategy; use crate::executor::prelude::*; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; /// `ChainExecutor` is an executor that enables synchronization between the existing stream and /// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV @@ -31,7 +31,7 @@ pub struct RearrangedChainExecutor { upstream: Executor, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, actor_id: ActorId, } @@ -74,7 +74,11 @@ impl RearrangedMessage { } impl RearrangedChainExecutor { - pub fn new(snapshot: Executor, upstream: Executor, progress: CreateMviewProgress) -> Self { + pub fn new( + snapshot: Executor, + upstream: Executor, + progress: CreateMviewProgressReporter, + ) -> Self { Self { snapshot, upstream, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 09a4d0a40f1c..3f2cd83aca28 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -42,7 +42,7 @@ use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; use crate::executor::UpdateMutation; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { @@ -92,7 +92,7 @@ pub struct SourceBackfillExecutorInner { /// Rate limit in rows/s. rate_limit_rps: Option, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, } /// Local variables used in the backfill stage. @@ -244,7 +244,7 @@ impl SourceBackfillExecutorInner { system_params: SystemParamsReaderRef, backfill_state_store: BackfillStateTableHandler, rate_limit_rps: Option, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, ) -> Self { let source_split_change_count = metrics .source_split_change_count diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index 83da0ff68a7d..89946d9dc94e 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -21,7 +21,7 @@ use risingwave_expr::expr::NonStrictExpression; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::prelude::*; -use crate::task::CreateMviewProgress; +use crate::task::CreateMviewProgressReporter; const DEFAULT_CHUNK_SIZE: usize = 1024; @@ -33,7 +33,7 @@ pub struct ValuesExecutor { schema: Schema, // Receiver of barrier channel. barrier_receiver: UnboundedReceiver, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, rows: vec::IntoIter>, } @@ -43,7 +43,7 @@ impl ValuesExecutor { pub fn new( ctx: ActorContextRef, schema: Schema, - progress: CreateMviewProgress, + progress: CreateMviewProgressReporter, rows: Vec>, barrier_receiver: UnboundedReceiver, ) -> Self { @@ -150,12 +150,12 @@ mod tests { use super::ValuesExecutor; use crate::executor::test_utils::StreamExecutorTestExt; use crate::executor::{ActorContext, AddMutation, Barrier, Execute, Mutation}; - use crate::task::{CreateMviewProgress, LocalBarrierManager}; + use crate::task::{CreateMviewProgressReporter, LocalBarrierManager}; #[tokio::test] async fn test_values() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = CreateMviewProgress::for_test(barrier_manager); + let progress = CreateMviewProgressReporter::for_test(barrier_manager); let actor_id = progress.actor_id(); let (tx, barrier_receiver) = unbounded_channel(); let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 67fac31ed8f6..406459e25c38 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -45,7 +45,7 @@ mod progress; #[cfg(test)] mod tests; -pub use progress::CreateMviewProgress; +pub use progress::CreateMviewProgressReporter; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index ddc0c7f13bab..43a979af2b56 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -31,7 +31,6 @@ use risingwave_common::must_match; use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::SyncResult; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BuildActorInfo; use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; use thiserror_ext::AsReport; @@ -381,7 +380,7 @@ pub(super) struct PartialGraphManagedBarrierState { /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints. /// - /// This is updated by [`super::CreateMviewProgress::update`] and will be reported to meta + /// This is updated by [`super::CreateMviewProgressReporter::update`] and will be reported to meta /// in [`BarrierCompleteResult`]. pub(super) create_mview_progress: HashMap>, @@ -752,18 +751,7 @@ impl PartialGraphManagedBarrierState { .remove(&barrier_state.barrier.epoch.curr) .unwrap_or_default() .into_iter() - .map(|(actor, state)| CreateMviewProgress { - backfill_actor_id: actor, - done: matches!(state, BackfillState::Done(_)), - consumed_epoch: match state { - BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, - BackfillState::Done(_) => barrier_state.barrier.epoch.curr, - }, - consumed_rows: match state { - BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows, - BackfillState::Done(consumed_rows) => consumed_rows, - }, - }) + .map(|(actor, state)| state.to_pb(actor)) .collect(); let complete_barrier_future = match kind { diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 9a243c2e975d..9b2820bb3bfe 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -15,6 +15,7 @@ use std::fmt::{Display, Formatter}; use risingwave_common::util::epoch::EpochPair; +use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; use super::LocalBarrierManager; use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; @@ -30,6 +31,23 @@ pub(crate) enum BackfillState { Done(ConsumedRows), } +impl BackfillState { + pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress { + PbCreateMviewProgress { + backfill_actor_id: actor_id, + done: matches!(self, BackfillState::Done(_)), + consumed_epoch: match self { + BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, + BackfillState::Done(_) => 0, // unused field for done + }, + consumed_rows: match self { + BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows, + BackfillState::Done(consumed_rows) => consumed_rows, + }, + } + } +} + impl Display for BackfillState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -103,7 +121,7 @@ impl LocalBarrierManager { /// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table /// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording /// `row_count` state for it. -pub struct CreateMviewProgress { +pub struct CreateMviewProgressReporter { barrier_manager: LocalBarrierManager, /// The id of the actor containing the backfill executors. @@ -112,7 +130,7 @@ pub struct CreateMviewProgress { state: Option, } -impl CreateMviewProgress { +impl CreateMviewProgressReporter { pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self { Self { barrier_manager, @@ -186,8 +204,8 @@ impl LocalBarrierManager { pub fn register_create_mview_progress( &self, backfill_actor_id: ActorId, - ) -> CreateMviewProgress { + ) -> CreateMviewProgressReporter { trace!("register create mview progress: {}", backfill_actor_id); - CreateMviewProgress::new(self.clone(), backfill_actor_id) + CreateMviewProgressReporter::new(self.clone(), backfill_actor_id) } }