Skip to content

Commit

Permalink
fix vnode in key
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 10, 2024
1 parent 952962c commit 18e0165
Show file tree
Hide file tree
Showing 19 changed files with 167 additions and 129 deletions.
4 changes: 4 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl VirtualNode {
impl VirtualNode {
/// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS;
/// The maximum value of the virtual node that can be represented.
///
/// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration.
pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/sink_coordination/coordinator_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ impl CoordinatorWorker {
}

async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> {
let mut remaining_count = VirtualNode::COUNT;
let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT);
let mut remaining_count = first_vnode_bitmap.len();
let mut registered_vnode = HashSet::with_capacity(remaining_count);

for vnode in first_vnode_bitmap.iter_vnodes() {
remaining_count -= 1;
Expand Down
24 changes: 12 additions & 12 deletions src/meta/src/manager/sink_coordination/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ mod tests {
let epoch1 = 233;
let epoch2 = 234;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -584,9 +584,9 @@ mod tests {
let epoch1 = 233;
let epoch2 = 234;

let all_vnode = (0..VirtualNode::COUNT).collect_vec();
let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {

let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream(
param.to_proto(),
Bitmap::zeros(VirtualNode::COUNT),
Bitmap::zeros(VirtualNode::COUNT_FOR_TEST),
|rx| async {
Ok(tonic::Response::new(
manager
Expand Down Expand Up @@ -742,11 +742,11 @@ mod tests {

let epoch = 233;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -821,11 +821,11 @@ mod tests {

let epoch = 233;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down
10 changes: 5 additions & 5 deletions src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel;
fn vnode_bitmaps(part_count: usize) -> impl Iterator<Item = Arc<Bitmap>> {
static BITMAP_CACHE: LazyLock<Mutex<HashMap<usize, Vec<Arc<Bitmap>>>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
assert_eq!(VirtualNode::COUNT % part_count, 0);
assert_eq!(VirtualNode::COUNT_FOR_TEST % part_count, 0);
let mut cache = BITMAP_CACHE.lock();
match cache.entry(part_count) {
Entry::Occupied(entry) => entry.get().clone().into_iter(),
Entry::Vacant(entry) => entry
.insert({
let part_size = VirtualNode::COUNT / part_count;
let part_size = VirtualNode::COUNT_FOR_TEST / part_count;
(0..part_count)
.map(move |part_idx| {
let start = part_idx * part_size;
let end = part_idx * part_size + part_size;
let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in start..end {
bitmap.set(i, true);
}
Expand Down Expand Up @@ -253,15 +253,15 @@ fn bench_table_watermarks(c: &mut Criterion) {

c.bench_function("read latest watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
for i in 0..VirtualNode::COUNT_FOR_TEST {
let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i));
}
})
});

c.bench_function("read committed watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
for i in 0..VirtualNode::COUNT_FOR_TEST {
let _ = table_watermarks.read_watermark(
VirtualNode::from_index(i),
test_epoch(committed_epoch_idx as u64),
Expand Down
35 changes: 29 additions & 6 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool {
}
}

// returning left inclusive and right exclusive
/// Returns left inclusive and right exclusive vnode index of the given range.
///
/// # Vnode count unawareness
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still return `Excluded(256)`.
///
/// See also [`vnode`] and [`end_bound_of_vnode`] which hold such invariant.
pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
let (left, right) = range;
let left = match left {
Expand All @@ -74,12 +82,20 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
vnode.to_index() + 1
}
}
Unbounded => VirtualNode::COUNT,
Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1,
};
(left, right)
}

// Ensure there is only one vnode involved in table key range and return the vnode
/// Ensure there is only one vnode involved in table key range and return the vnode.
///
/// # Vnode count unawareness
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still require `Excluded(256)`.
///
/// See also [`vnode_range`] and [`end_bound_of_vnode`] which hold such invariant.
pub fn vnode(range: &TableKeyRange) -> VirtualNode {
let (l, r_exclusive) = vnode_range(range);
assert_eq!(r_exclusive - l, 1);
Expand Down Expand Up @@ -320,8 +336,15 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec<u8> {
}
}

/// [`Unbounded`] if the vnode is the maximum representable value (i.e. [`VirtualNode::MAX_REPRESENTABLE`]),
/// otherwise [`Excluded`] the next vnode.
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still return `Excluded(256)`. See also [`vnode`] and [`vnode_range`] which
/// rely on such invariant.
pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound<Bytes> {
if vnode == VirtualNode::MAX {
if vnode == VirtualNode::MAX_REPRESENTABLE {
Unbounded
} else {
let end_bound_index = vnode.to_index() + 1;
Expand Down Expand Up @@ -1299,7 +1322,7 @@ mod tests {
Excluded(TableKey(concat(234, b"")))
)
);
let max_vnode = VirtualNode::COUNT - 1;
let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index();
assert_eq!(
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
Expand Down Expand Up @@ -1332,7 +1355,7 @@ mod tests {
Excluded(b"1".as_slice()),
Unbounded,
];
for vnode in 0..VirtualNode::COUNT {
for vnode in 0..VirtualNode::MAX_COUNT {
for left in &left_bound {
for right in &right_bound {
assert_eq!(
Expand Down
Loading

0 comments on commit 18e0165

Please sign in to comment.