From f168f357188edad72423f3bb5c6a8715380551d0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 5 Sep 2024 14:02:24 +0800 Subject: [PATCH] fix vnode in key Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/vnode.rs | 4 + .../sink_coordination/coordinator_worker.rs | 4 +- .../src/manager/sink_coordination/manager.rs | 24 +++--- src/storage/benches/bench_table_watermarks.rs | 10 +-- src/storage/hummock_sdk/src/key.rs | 35 ++++++-- .../hummock_sdk/src/table_watermark.rs | 84 ++++++++++++------- .../hummock_test/src/compactor_tests.rs | 12 +-- .../src/hummock_read_version_tests.rs | 4 +- .../hummock_test/src/hummock_storage_tests.rs | 4 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_trace/src/opts.rs | 2 +- .../event_handler/hummock_event_handler.rs | 4 +- .../src/hummock/event_handler/uploader/mod.rs | 13 ++- .../shared_buffer/shared_buffer_batch.rs | 32 ------- .../src/hummock/sstable/multi_builder.rs | 34 +++++--- src/storage/src/store.rs | 2 +- .../common/log_store_impl/kv_log_store/mod.rs | 8 +- .../log_store_impl/kv_log_store/serde.rs | 16 ++-- .../log_store_impl/kv_log_store/writer.rs | 2 +- 19 files changed, 167 insertions(+), 129 deletions(-) diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 685f99d6cf4f4..177c104cf5f48 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -69,6 +69,10 @@ impl VirtualNode { impl VirtualNode { /// The maximum count of virtual nodes that fits in [`VirtualNodeInner`]. pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + /// The maximum value of the virtual node that can be represented. + /// + /// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration. + pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1); /// The size of a virtual node in bytes, in memory or serialized representation. pub const SIZE: usize = std::mem::size_of::(); } diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 8409e714852c2..ebfac14f88139 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -160,8 +160,8 @@ impl CoordinatorWorker { } async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { - let mut remaining_count = VirtualNode::COUNT; - let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT); + let mut remaining_count = first_vnode_bitmap.len(); + let mut registered_vnode = HashSet::with_capacity(remaining_count); for vnode in first_vnode_bitmap.iter_vnodes() { remaining_count -= 1; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index fd2b986be28e7..27af18309dec6 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -414,11 +414,11 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -584,9 +584,9 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -705,7 +705,7 @@ mod tests { let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( param.to_proto(), - Bitmap::zeros(VirtualNode::COUNT), + Bitmap::zeros(VirtualNode::COUNT_FOR_TEST), |rx| async { Ok(tonic::Response::new( manager @@ -742,11 +742,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -821,11 +821,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 4a9e1c5edda0b..f3a2630e3515a 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel; fn vnode_bitmaps(part_count: usize) -> impl Iterator> { static BITMAP_CACHE: LazyLock>>>> = LazyLock::new(|| Mutex::new(HashMap::new())); - assert_eq!(VirtualNode::COUNT % part_count, 0); + assert_eq!(VirtualNode::COUNT_FOR_TEST % part_count, 0); let mut cache = BITMAP_CACHE.lock(); match cache.entry(part_count) { Entry::Occupied(entry) => entry.get().clone().into_iter(), Entry::Vacant(entry) => entry .insert({ - let part_size = VirtualNode::COUNT / part_count; + let part_size = VirtualNode::COUNT_FOR_TEST / part_count; (0..part_count) .map(move |part_idx| { let start = part_idx * part_size; let end = part_idx * part_size + part_size; - let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in start..end { bitmap.set(i, true); } @@ -253,7 +253,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read latest watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); } }) @@ -261,7 +261,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read committed watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { let _ = table_watermarks.read_watermark( VirtualNode::from_index(i), test_epoch(committed_epoch_idx as u64), diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 6a33d1ff1a09b..ed6029f24c290 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -55,7 +55,15 @@ pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool { } } -// returning left inclusive and right exclusive +/// Returns left inclusive and right exclusive vnode index of the given range. +/// +/// # Vnode count unawareness +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still return `Excluded(256)`. +/// +/// See also [`vnode`] and [`end_bound_of_vnode`] which hold such invariant. pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { let (left, right) = range; let left = match left { @@ -74,12 +82,20 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { vnode.to_index() + 1 } } - Unbounded => VirtualNode::COUNT, + Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1, }; (left, right) } -// Ensure there is only one vnode involved in table key range and return the vnode +/// Ensure there is only one vnode involved in table key range and return the vnode. +/// +/// # Vnode count unawareness +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still require `Excluded(256)`. +/// +/// See also [`vnode_range`] and [`end_bound_of_vnode`] which hold such invariant. pub fn vnode(range: &TableKeyRange) -> VirtualNode { let (l, r_exclusive) = vnode_range(range); assert_eq!(r_exclusive - l, 1); @@ -320,8 +336,15 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec { } } +/// [`Unbounded`] if the vnode is the maximum representable value (i.e. [`VirtualNode::MAX_REPRESENTABLE`]), +/// otherwise [`Excluded`] the next vnode. +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still return `Excluded(256)`. See also [`vnode`] and [`vnode_range`] which +/// rely on such invariant. pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound { - if vnode == VirtualNode::MAX { + if vnode == VirtualNode::MAX_REPRESENTABLE { Unbounded } else { let end_bound_index = vnode.to_index() + 1; @@ -1299,7 +1322,7 @@ mod tests { Excluded(TableKey(concat(234, b""))) ) ); - let max_vnode = VirtualNode::COUNT - 1; + let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index(); assert_eq!( prefixed_range_with_vnode( (Bound::::Unbounded, Bound::::Unbounded), @@ -1332,7 +1355,7 @@ mod tests { Excluded(b"1".as_slice()), Unbounded, ]; - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::MAX_COUNT { for left in &left_bound { for right in &right_bound { assert_eq!( diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 250e9014a1d36..339c79fe938d6 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -160,6 +160,8 @@ impl TableWatermarksIndex { pub fn filter_regress_watermarks(&self, watermarks: &mut Vec) { let mut ret = Vec::with_capacity(watermarks.len()); for watermark in watermarks.drain(..) { + let vnode_count = watermark.vnode_count(); + let mut regress_vnodes = None; for vnode in watermark.vnode_bitmap.iter_vnodes() { if let Some(prev_watermark) = self.latest_watermark(vnode) { @@ -176,7 +178,7 @@ impl TableWatermarksIndex { prev_watermark ); regress_vnodes - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count)) .set(vnode.to_index(), true); } } @@ -187,7 +189,7 @@ impl TableWatermarksIndex { let vnode_index = vnode.to_index(); if !regress_vnodes.is_set(vnode_index) { bitmap_builder - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count)) .set(vnode_index, true); } } @@ -219,8 +221,9 @@ impl TableWatermarksIndex { assert_eq!(self.watermark_direction, direction); self.latest_epoch = epoch; #[cfg(debug_assertions)] - { - let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + if !vnode_watermark_list.is_empty() { + let vnode_count = vnode_watermark_list[0].vnode_count(); + let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count); for vnode_watermark in vnode_watermark_list.as_ref() { for vnode in vnode_watermark.vnode_bitmap.iter_ones() { assert!(!vnode_is_set.is_set(vnode)); @@ -324,6 +327,11 @@ impl VnodeWatermark { &self.vnode_bitmap } + /// Vnode count derived from the bitmap. + pub fn vnode_count(&self) -> usize { + self.vnode_bitmap.len() + } + pub fn watermark(&self) -> &Bytes { &self.watermark } @@ -382,10 +390,12 @@ impl TableWatermarks { watermarks: Vec, direction: WatermarkDirection, ) -> Self { - Self { + let mut this = Self { direction, - watermarks: vec![(epoch, Arc::from(watermarks))], - } + watermarks: Vec::new(), + }; + this.add_new_epoch_watermarks(epoch, watermarks.into(), direction); + this } pub fn add_new_epoch_watermarks( @@ -398,9 +408,27 @@ impl TableWatermarks { if let Some((prev_epoch, _)) = self.watermarks.last() { assert!(*prev_epoch < epoch); } + if !watermarks.is_empty() { + let vnode_count = watermarks[0].vnode_count(); + for watermark in &*watermarks { + assert_eq!(watermark.vnode_count(), vnode_count); + } + if let Some(existing_vnode_count) = self.vnode_count() { + assert_eq!(existing_vnode_count, vnode_count); + } + } self.watermarks.push((epoch, watermarks)); } + /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark. + fn vnode_count(&self) -> Option { + self.watermarks + .iter() + .flat_map(|(_, watermarks)| watermarks.as_ref()) + .next() + .map(|w| w.vnode_count()) + } + pub fn from_protobuf(pb: &PbTableWatermarks) -> Self { Self { watermarks: pb @@ -507,15 +535,13 @@ impl TableWatermarks { } debug!("clear stale table watermark below epoch {}", safe_epoch); let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); - let mut unset_vnode: HashSet = (0..VirtualNode::COUNT) - .map(VirtualNode::from_index) - .collect(); + let mut set_vnode: HashSet = HashSet::new(); while let Some((epoch, _)) = self.watermarks.last() { if *epoch >= safe_epoch { let (epoch, watermarks) = self.watermarks.pop().expect("have check Some"); for watermark in watermarks.as_ref() { for vnode in watermark.vnode_bitmap.iter_vnodes() { - unset_vnode.remove(&vnode); + set_vnode.insert(vnode); } } result_epoch_watermark.push((epoch, watermarks)); @@ -523,20 +549,18 @@ impl TableWatermarks { break; } } - while !unset_vnode.is_empty() - && let Some((_, watermarks)) = self.watermarks.pop() - { + while let Some((_, watermarks)) = self.watermarks.pop() { let mut new_vnode_watermarks = Vec::new(); for vnode_watermark in watermarks.as_ref() { - let mut set_vnode = Vec::new(); + let mut new_set_vnode = Vec::new(); for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { - if unset_vnode.remove(&vnode) { - set_vnode.push(vnode); + if set_vnode.insert(vnode) { + new_set_vnode.push(vnode); } } - if !set_vnode.is_empty() { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for vnode in set_vnode { + if !new_set_vnode.is_empty() { + let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count()); + for vnode in new_set_vnode { builder.set(vnode.to_index(), true); } let bitmap = Arc::new(builder.finish()); @@ -706,7 +730,7 @@ mod tests { use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for vnode in vnodes { builder.set(vnode, true); } @@ -746,7 +770,7 @@ mod tests { let mut second_table_watermark = TableWatermarks::single_epoch( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )], direction, @@ -754,7 +778,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into(), @@ -815,7 +839,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into(), @@ -853,7 +877,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into() @@ -879,7 +903,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into() @@ -905,7 +929,7 @@ mod tests { ( epoch4, vec![VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)), watermark3.clone() )] .into() @@ -932,7 +956,7 @@ mod tests { vec![ VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()), VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)), watermark3.clone() ) ] @@ -1164,7 +1188,7 @@ mod tests { EPOCH1, vec![VnodeWatermark { watermark: watermark1.clone(), - vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), + vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST), }] .into(), )], @@ -1182,7 +1206,7 @@ mod tests { ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::COUNT_FOR_TEST { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f2..a5cd3aa64acbb 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1836,8 +1836,8 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), None, ); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - for vnode_id in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::COUNT_FOR_TEST * 2; + for vnode_id in 0..VirtualNode::COUNT_FOR_TEST / 2 { let mut last_k: u64 = 1; let init_epoch = test_epoch(100 * object_id); let mut last_epoch = init_epoch; @@ -1880,9 +1880,9 @@ pub(crate) mod tests { let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for i in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::COUNT_FOR_TEST * 2; + let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); + for i in 0..VirtualNode::COUNT_FOR_TEST / 2 { if i % 2 == 0 { vnode_builder.set(i, true); } else { @@ -1947,7 +1947,7 @@ pub(crate) mod tests { direction: WatermarkDirection::Ascending, vnode_watermarks: BTreeMap::default(), }; - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { if i % 2 == 0 { watermark .vnode_watermarks diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index e9721dd8a3197..9fae89b520bc2 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -50,7 +50,7 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)); let mut read_version = HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, @@ -278,7 +278,7 @@ async fn test_read_filter_basic() { let epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index fc0fd6ae97b4f..b49ce91e9f267 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2025,13 +2025,13 @@ async fn test_table_watermark() { let vnode1 = VirtualNode::from_index(1); let vnode_bitmap1 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); builder.set(1, true); builder.finish() }); let vnode2 = VirtualNode::from_index(2); let vnode_bitmap2 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); builder.set(2, true); builder.finish() }); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 67da2150735af..2b422543457ce 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1375,7 +1375,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, - Arc::new(Bitmap::ones(VirtualNode::COUNT)), + Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 5d480cca96b58..8b9975666b40f 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -184,7 +184,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f2aa2ea7fd88d..cb2d72551eda0 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -930,7 +930,7 @@ mod tests { use futures::FutureExt; use parking_lot::Mutex; - use risingwave_common::bitmap::BitmapBuilder; + use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::version::HummockVersion; @@ -1148,7 +1148,7 @@ mod tests { table_id: TEST_TABLE_ID, new_read_version_sender: tx, is_replicated: false, - vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::COUNT).finish()), + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), }); rx.await.unwrap() }; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 4494049d93b0b..a7f163c209635 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -32,7 +32,6 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{HistogramTimer, IntGauge}; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, @@ -339,6 +338,14 @@ impl TableUnsyncData { table_watermarks: Vec, direction: WatermarkDirection, ) { + if table_watermarks.is_empty() { + return; + } + let vnode_count = table_watermarks[0].vnode_count(); + for watermark in &table_watermarks { + assert_eq!(vnode_count, watermark.vnode_count()); + } + fn apply_new_vnodes( vnode_bitmap: &mut BitmapBuilder, vnode_watermarks: &Vec, @@ -368,14 +375,14 @@ impl TableUnsyncData { prev_watermarks.extend(table_watermarks); } Entry::Vacant(entry) => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); entry.insert((table_watermarks, vnode_bitmap)); } } } None => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); self.table_watermarks = Some(( direction, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1f8b17fb6c662..ad29c6ed88e6c 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -26,7 +26,6 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use prometheus::IntGauge; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -532,37 +531,6 @@ impl SharedBufferBatch { } } - pub fn collect_vnodes(&self) -> Vec { - let mut vnodes = Vec::with_capacity(VirtualNode::COUNT); - let mut next_vnode_id = 0; - while next_vnode_id < VirtualNode::COUNT { - let seek_key = TableKey( - VirtualNode::from_index(next_vnode_id) - .to_be_bytes() - .to_vec(), - ); - let idx = match self - .inner - .entries - .binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice())) - { - Ok(idx) => idx, - Err(idx) => idx, - }; - if idx >= self.inner.entries.len() { - break; - } - let item = &self.inner.entries[idx]; - if item.key.len() <= VirtualNode::SIZE { - break; - } - let current_vnode_id = item.key.vnode_part().to_index(); - vnodes.push(current_vnode_id); - next_vnode_id = current_vnode_id + 1; - } - vnodes - } - #[cfg(any(test, feature = "test"))] pub fn build_shared_buffer_batch_for_test( epoch: HummockEpoch, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 97b448faec8d7..25bdd54df720f 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -67,7 +67,9 @@ where task_progress: Option>, last_table_id: u32, - table_partition_vnode: BTreeMap, + + vnode_count: usize, + table_vnode_partition: BTreeMap, split_weight_by_vnode: u32, /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will /// switch SST. @@ -88,9 +90,12 @@ where builder_factory: F, compactor_metrics: Arc, task_progress: Option>, - table_partition_vnode: BTreeMap, + table_vnode_partition: BTreeMap, concurrent_uploading_sst_count: Option, ) -> Self { + // TODO(var-vnode): should use value from caller + let vnode_count = VirtualNode::COUNT; + Self { builder_factory, sst_outputs: Vec::new(), @@ -98,9 +103,10 @@ where compactor_metrics, task_progress, last_table_id: 0, - table_partition_vnode, + table_vnode_partition, + vnode_count, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: vnode_count - 1, concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count, } @@ -114,9 +120,10 @@ where compactor_metrics: Arc::new(CompactorMetrics::unused()), task_progress: None, last_table_id: 0, - table_partition_vnode: BTreeMap::default(), + table_vnode_partition: BTreeMap::default(), + vnode_count: VirtualNode::COUNT_FOR_TEST, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::MAX_FOR_TEST.to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count: None, } @@ -213,10 +220,10 @@ where let mut switch_builder = false; if user_key.table_id.table_id != self.last_table_id { let new_vnode_partition_count = - self.table_partition_vnode.get(&user_key.table_id.table_id); + self.table_vnode_partition.get(&user_key.table_id.table_id); if new_vnode_partition_count.is_some() - || self.table_partition_vnode.contains_key(&self.last_table_id) + || self.table_vnode_partition.contains_key(&self.last_table_id) { if new_vnode_partition_count.is_some() { self.split_weight_by_vnode = *new_vnode_partition_count.unwrap(); @@ -229,22 +236,23 @@ where switch_builder = true; if self.split_weight_by_vnode > 1 { self.largest_vnode_in_current_partition = - VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + self.vnode_count / (self.split_weight_by_vnode as usize) - 1; } else { // default - self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index(); + self.largest_vnode_in_current_partition = self.vnode_count - 1; } } } - if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() { + if self.largest_vnode_in_current_partition != self.vnode_count - 1 { let key_vnode = user_key.get_vnode_id(); if key_vnode > self.largest_vnode_in_current_partition { // vnode partition change switch_builder = true; // SAFETY: `self.split_weight_by_vnode > 1` here. - let (basic, remainder) = - VirtualNode::COUNT.div_rem(&(self.split_weight_by_vnode as usize)); + let (basic, remainder) = self + .vnode_count + .div_rem(&(self.split_weight_by_vnode as usize)); let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder); self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area { (key_vnode / basic + 1) * basic diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 91f79231f6939..0e09f60c6d154 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -727,7 +727,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index f4e62e429effa..c3dab9ca16703 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -997,15 +997,15 @@ mod tests { test_env.register_table(table.clone()).await; fn build_bitmap(indexes: impl Iterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(i, true); } Arc::new(builder.finish()) } - let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0)); - let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1)); + let vnodes1 = build_bitmap((0..VirtualNode::COUNT_FOR_TEST).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::COUNT_FOR_TEST).filter(|i| i % 2 == 1)); let factory1 = KvLogStoreFactory::new( test_env.storage.clone(), @@ -1146,7 +1146,7 @@ mod tests { .clear_shared_buffer(test_env.manager.get_current_version().await.id) .await; - let vnodes = build_bitmap(0..VirtualNode::COUNT); + let vnodes = build_bitmap(0..VirtualNode::COUNT_FOR_TEST); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 17ab103d758b4..ec7cc62f2d49c 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -335,7 +335,11 @@ impl LogStoreRowSerde { ) -> Bytes { let (epoch, seq_id) = offset; Bytes::from(next_key(&serialize_pk( - (self.pk_info.compute_pk)(VirtualNode::MAX, Self::encode_epoch(epoch), seq_id), + (self.pk_info.compute_pk)( + VirtualNode::MAX_REPRESENTABLE, + Self::encode_epoch(epoch), + seq_id, + ), &self.pk_serde, ))) } @@ -980,7 +984,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1124,7 +1128,7 @@ mod tests { let table = gen_test_log_store_table(pk_info); let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); let (ops, rows) = gen_test_data(0); @@ -1283,7 +1287,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1428,7 +1432,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1538,7 +1542,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 120a49e8e7ee3..10dbc7ff39de8 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -110,7 +110,7 @@ impl LogWriter for KvLogStoreWriter { { // When enter this branch, the chunk cannot be added directly, and should be add to // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(self.serde.vnodes().len()); let mut flush_info = FlushInfo::new(); for (i, (op, row)) in chunk.rows().enumerate() { let seq_id = start_seq_id + (i as SeqIdType);