Skip to content

Commit

Permalink
refactor: sink metrics (#18730)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Sep 26, 2024
1 parent 75080b7 commit 39141cc
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 387 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
use crate::error::ConnectorResult;
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand Down Expand Up @@ -553,7 +553,7 @@ impl Sink for ClickHouseSink {

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
SinkWriterMetrics::new(&writer_param),
commit_checkpoint_interval,
))
}
Expand Down
19 changes: 10 additions & 9 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use async_trait::async_trait;

use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};

pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
Expand All @@ -33,7 +34,7 @@ pub fn default_commit_checkpoint_interval() -> u64 {
/// we delay the checkpoint barrier to make commits less frequent.
pub struct DecoupleCheckpointLogSinkerOf<W> {
writer: W,
sink_metrics: SinkMetrics,
sink_writer_metrics: SinkWriterMetrics,
commit_checkpoint_interval: NonZeroU64,
}

Expand All @@ -42,12 +43,12 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {
/// decouple log reader `KvLogStoreReader`.
pub fn new(
writer: W,
sink_metrics: SinkMetrics,
sink_writer_metrics: SinkWriterMetrics,
commit_checkpoint_interval: NonZeroU64,
) -> Self {
DecoupleCheckpointLogSinkerOf {
writer,
sink_metrics,
sink_writer_metrics,
commit_checkpoint_interval,
}
}
Expand All @@ -57,7 +58,6 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
enum LogConsumerState {
/// Mark that the log consumer is not initialized yet
Expand All @@ -74,6 +74,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink

let mut current_checkpoint: u64 = 0;
let commit_checkpoint_interval = self.commit_checkpoint_interval;
let sink_writer_metrics = self.sink_writer_metrics;

loop {
let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
Expand All @@ -87,8 +88,8 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
// force commit on update vnode bitmap
let start_time = Instant::now();
sink_writer.barrier(true).await?;
sink_metrics
.sink_commit_duration_metrics
sink_writer_metrics
.sink_commit_duration
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?;
current_checkpoint = 0;
Expand Down Expand Up @@ -149,8 +150,8 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
if current_checkpoint >= commit_checkpoint_interval.get() {
let start_time = Instant::now();
sink_writer.barrier(true).await?;
sink_metrics
.sink_commit_duration_metrics
sink_writer_metrics
.sink_commit_duration
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
current_checkpoint = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::decouple_checkpoint_log_sink::{
};
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};

Expand Down Expand Up @@ -289,6 +289,8 @@ impl Sink for DeltaLakeSink {
self.param.downstream_pk.clone(),
)
.await?;

let metrics = SinkWriterMetrics::new(&writer_param);
let writer = CoordinatedSinkWriter::new(
writer_param
.meta_client
Expand All @@ -312,7 +314,7 @@ impl Sink for DeltaLakeSink {

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
metrics,
commit_checkpoint_interval,
))
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use super::doris_starrocks_connector::{
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS,
POOL_IDLE_TIMEOUT,
};
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use super::{
Result, SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::sink::encoder::{JsonEncoder, RowEncoder};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
Expand Down Expand Up @@ -209,7 +211,7 @@ impl Sink for DorisSink {
self.is_append_only,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(SinkWriterMetrics::new(&writer_param)))
}

async fn validate(&self) -> Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::sink::catalog::SinkEncode;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter,
SinkWriterMetrics,
};
use crate::source::TryFromBTreeMap;
use crate::with_options::WithOptions;
Expand Down Expand Up @@ -130,7 +131,7 @@ impl<S: OpendalSinkBackend> Sink for FileSink<S> {
self.format_desc.encode.clone(),
self.engine_type.clone(),
)?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(SinkWriterMetrics::new(&writer_param)))
}
}

Expand Down
Loading

0 comments on commit 39141cc

Please sign in to comment.