Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add metrics for time travel and recent versions #18690

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

40 changes: 35 additions & 5 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2442,6 +2442,20 @@ def section_hummock_read(outer_panels):
),
],
),
panels.timeseries_count(
"Safe Version Fetch Count",
"",
[
panels.target(
f"{metric('state_store_safe_version_hit')}",
"",
),
panels.target(
f"{metric('state_store_safe_version_miss')}",
"",
),
],
),
],
)
]
Expand Down Expand Up @@ -3206,6 +3220,10 @@ def section_hummock_manager(outer_panels):
f"{metric('storage_current_version_object_count')}",
"referenced by current version",
),
panels.target(
f"{metric('storage_time_travel_object_count')}",
"referenced by time travel",
),
panels.target(
f"{metric('storage_total_object_count')}",
"all objects (including dangling ones)",
Expand Down Expand Up @@ -3364,6 +3382,23 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_latency(
"Time Travel Latency",
"The latency of replaying a hummock version for time travel",
quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('storage_time_travel_version_replay_latency_bucket')}[$__rate_interval])) by (le))",
f"time_travel_version_replay_latency_p{legend}",
),
[50, 90, 99, "max"],
)
+ [
panels.target(
f"rate({metric('storage_time_travel_version_replay_latency_sum')}[$__rate_interval]) / rate({metric('storage_time_travel_version_replay_latency_count')}[$__rate_interval]) > 0",
"time_travel_version_replay_avg",
),
],
),
],
)
]
Expand Down Expand Up @@ -3498,11 +3533,6 @@ def section_grpc_meta_hummock_manager(outer_panels):
"UnpinVersionBefore",
"path='/meta.HummockManagerService/UnpinVersionBefore'",
),
grpc_metrics_target(
panels,
"UnpinSnapshotBefore",
"path='/meta.HummockManagerService/UnpinSnapshotBefore'",
),
grpc_metrics_target(
panels,
"ReportCompactionTasks",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ impl HummockManager {
.all_object_ids_in_time_travel()
.await?
.collect::<HashSet<_>>();
self.metrics
.time_travel_object_count
.set(pinned_object_ids.len() as _);
// 1. filter by watermark
let object_ids = object_ids
.into_iter()
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ impl HummockManager {
query_epoch
)))
})?;
let timer = self
.metrics
.time_travel_version_replay_latency
.start_timer();
let actual_version_id = epoch_to_version.version_id;
tracing::debug!(
query_epoch,
Expand Down Expand Up @@ -342,7 +346,7 @@ impl HummockManager {
let sst_count = sst_ids.len();
let mut sst_id_to_info = HashMap::with_capacity(sst_count);
let sst_info_fetch_batch_size = std::env::var("RW_TIME_TRAVEL_SST_INFO_FETCH_BATCH_SIZE")
.unwrap_or_else(|_| "100".into())
.unwrap_or_else(|_| "10000".into())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this env var to the meta opt instead of implicitly reading an env var here?

And any reason to change from 100 to 10000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this env var to the meta opt instead of implicitly reading an env var here?

OK

And any reason to change from 100 to 10000?

To speed up time travel version replay, i.e. less SELECT queries.

.parse()
.unwrap();
while !sst_ids.is_empty() {
Expand All @@ -364,6 +368,7 @@ impl HummockManager {
))));
}
refill_version(&mut actual_version, &sst_id_to_info);
timer.observe_duration();
Ok(actual_version)
}

Expand Down
21 changes: 21 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub struct MetaMetrics {
pub old_version_object_count: IntGauge,
/// Total size of objects that is still referenced by non-current versions.
pub old_version_object_size: IntGauge,
/// Total number of objects that is referenced by time travel.
pub time_travel_object_count: IntGauge,
/// Total number of objects that is referenced by current version.
pub current_version_object_count: IntGauge,
/// Total size of objects that is referenced by current version.
Expand Down Expand Up @@ -206,6 +208,8 @@ pub struct MetaMetrics {
pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_latency: LabelGuardedHistogramVec<2>,

pub time_travel_version_replay_latency: Histogram,
}

pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
Expand Down Expand Up @@ -500,6 +504,13 @@ impl MetaMetrics {
registry
).unwrap();

let time_travel_object_count = register_int_gauge_with_registry!(
"storage_time_travel_object_count",
"total number of objects that is referenced by time travel.",
registry
)
.unwrap();

let delta_log_count = register_int_gauge_with_registry!(
"storage_delta_log_count",
"total number of hummock version delta log",
Expand Down Expand Up @@ -740,6 +751,14 @@ impl MetaMetrics {
)
.unwrap();

let opts = histogram_opts!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we monitored the rate (frequency) of fetching time travel version?

Copy link
Contributor Author

@zwang28 zwang28 Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be derived from time_travel_version_replay_latency.

Added to grafana.

"storage_time_travel_version_replay_latency",
"The latency(ms) of replaying a hummock version for time travel",
exponential_buckets(0.01, 10.0, 6).unwrap()
);
let time_travel_version_replay_latency =
register_histogram_with_registry!(opts, registry).unwrap();

Self {
grpc_latency,
barrier_latency,
Expand Down Expand Up @@ -771,6 +790,7 @@ impl MetaMetrics {
stale_object_size,
old_version_object_count,
old_version_object_size,
time_travel_object_count,
current_version_object_count,
current_version_object_size,
total_object_count,
Expand Down Expand Up @@ -814,6 +834,7 @@ impl MetaMetrics {
auto_schema_change_success_cnt,
auto_schema_change_latency,
merge_compaction_group_count,
time_travel_version_replay_latency,
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/storage/hummock_sdk/src/sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl From<&PbSstableInfo> for SstableInfo {

impl From<SstableInfo> for PbSstableInfo {
fn from(sstable_info: SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.sst_size > 0);
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl From<SstableInfo> for PbSstableInfo {

impl From<&SstableInfo> for PbSstableInfo {
fn from(sstable_info: &SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.sst_size > 0);
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
Expand Down Expand Up @@ -216,10 +216,3 @@ impl SstableInfo {
self.key_range = KeyRange::default();
}
}

// Time travel
impl SstableInfo {
pub fn is_stripped(&self) -> bool {
self.object_id == 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl HummockEventHandler {
};

let uploader = HummockUploader::new(
state_store_metrics,
state_store_metrics.clone(),
pinned_version.clone(),
spawn_upload_task,
buffer_tracker,
Expand All @@ -359,6 +359,7 @@ impl HummockEventHandler {
recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new(
pinned_version,
storage_opts.max_cached_recent_versions_number,
state_store_metrics,
))),
write_conflict_detector,
read_version_mapping,
Expand Down
33 changes: 29 additions & 4 deletions src/storage/src/hummock/local_version/recent_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,35 @@
// limitations under the License.

use std::cmp::Ordering;
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockEpoch;

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::monitor::HummockStateStoreMetrics;

pub struct RecentVersions {
latest_version: PinnedVersion,
is_latest_committed: bool,
recent_versions: Vec<PinnedVersion>, // earlier version at the front
max_version_num: usize,
metric: Arc<HummockStateStoreMetrics>,
}

impl RecentVersions {
pub fn new(version: PinnedVersion, max_version_num: usize) -> Self {
pub fn new(
version: PinnedVersion,
max_version_num: usize,
metric: Arc<HummockStateStoreMetrics>,
) -> Self {
assert!(max_version_num > 0);
Self {
latest_version: version,
is_latest_committed: true, // The first version is always treated as committed epochs
recent_versions: Vec::new(),
max_version_num,
metric,
}
}

Expand Down Expand Up @@ -89,6 +97,7 @@ impl RecentVersions {
is_latest_committed: is_committed,
recent_versions,
max_version_num: self.max_version_num,
metric: self.metric.clone(),
}
}

Expand All @@ -104,21 +113,28 @@ impl RecentVersions {
table_id: TableId,
epoch: HummockEpoch,
) -> Option<PinnedVersion> {
if let Some(info) = self
let result = if let Some(info) = self
.latest_version
.version()
.state_table_info
.info()
.get(&table_id)
{
if info.committed_epoch <= epoch {
self.metric.safe_version_hit_index.observe(0 as _);
Some(self.latest_version.clone())
} else {
self.get_safe_version_from_recent(table_id, epoch)
}
} else {
None
};
if result.is_some() {
self.metric.safe_version_hit.inc();
} else {
self.metric.safe_version_miss.inc();
}
result
}

fn get_safe_version_from_recent(
Expand Down Expand Up @@ -155,7 +171,10 @@ impl RecentVersions {
}
});
match result {
Ok(index) => Some(self.recent_versions[index].clone()),
Ok(index) => {
self.metric.safe_version_hit_index.observe((index + 1) as _);
Some(self.recent_versions[index].clone())
}
Err(index) => {
// `index` is index of the first version that has `committed_epoch` greater than `epoch`
// or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch`
Expand All @@ -175,6 +194,7 @@ impl RecentVersions {
.info()
.contains_key(&table_id)
{
self.metric.safe_version_hit_index.observe(index as _);
Some(version)
} else {
// if the table does not exist in the version, return `None` to try get a time travel version
Expand All @@ -197,6 +217,7 @@ mod tests {

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::local_version::recent_versions::RecentVersions;
use crate::monitor::HummockStateStoreMetrics;

const TEST_TABLE_ID1: TableId = TableId::new(233);
const TEST_TABLE_ID2: TableId = TableId::new(234);
Expand Down Expand Up @@ -251,7 +272,11 @@ mod tests {
let epoch4 = epoch3 + 1;
let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]);
// with at most 2 historical versions
let recent_versions = RecentVersions::new(version1.clone(), 2);
let recent_versions = RecentVersions::new(
version1.clone(),
2,
HummockStateStoreMetrics::unused().into(),
);
assert!(recent_versions.recent_versions.is_empty());
assert!(recent_versions.is_latest_committed);
assert_query_equal(
Expand Down
32 changes: 32 additions & 0 deletions src/storage/src/monitor/hummock_state_store_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ pub struct HummockStateStoreMetrics {

pub event_handler_pending_event: IntGauge,
pub event_handler_latency: HistogramVec,

pub safe_version_hit: GenericCounter<AtomicU64>,
pub safe_version_hit_index: Histogram,
pub safe_version_miss: GenericCounter<AtomicU64>,
}

pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
Expand Down Expand Up @@ -445,6 +449,31 @@ impl HummockStateStoreMetrics {
let event_handler_latency =
register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();

let safe_version_hit = GenericCounter::new(
"state_store_safe_version_hit",
"The total count of a safe version that can be retrieved successfully",
)
.unwrap();
registry
.register(Box::new(safe_version_hit.clone()))
.unwrap();

let safe_version_miss = GenericCounter::new(
"state_store_safe_version_miss",
"The total count of a safe version that cannot be retrieved",
)
.unwrap();
registry
.register(Box::new(safe_version_miss.clone()))
.unwrap();

let opts = histogram_opts!(
"safe_version_hit_index_histogram",
"The index of safe version found in recent versions",
exponential_buckets(1.0, 2.0, 10).unwrap(),
);
let safe_version_hit_index = register_histogram_with_registry!(opts, registry).unwrap();
zwang28 marked this conversation as resolved.
Show resolved Hide resolved

Self {
bloom_filter_true_negative_counts,
bloom_filter_check_counts,
Expand Down Expand Up @@ -480,6 +509,9 @@ impl HummockStateStoreMetrics {
block_efficiency_histogram,
event_handler_pending_event,
event_handler_latency,
safe_version_hit,
safe_version_hit_index,
safe_version_miss,
}
}

Expand Down
Loading