From 7fef2be18d8142914acdf25c300dabf7b76eb400 Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 16 Jul 2024 18:17:32 +0800 Subject: [PATCH] test: add test case `mutable::range` & `merge_stream::merge_mutable_remove_duplicates` (cherry picked from commit 26014432cd9939352122dff3da0f01a43630e2e0) --- src/inmem/mutable.rs | 57 ++++++++++++-------------------------------- src/stream/merge.rs | 34 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 3e6199f..9e19ea1 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -13,7 +13,7 @@ use crate::{ record::{KeyRef, Record}, }; -pub(crate) type MutableScanInner<'scan, R> = Range< +pub(crate) type MutableScan<'scan, R> = Range< 'scan, TimestampedRef<::Key>, ( @@ -32,15 +32,6 @@ where data: SkipMap, Option>, } -pub(crate) struct MutableScan<'scan, R> -where - R: Record, -{ - inner: MutableScanInner<'scan, R>, - item_buf: Option, Option>>, - ts: Timestamp, -} - impl Default for Mutable where R: Record, @@ -108,13 +99,7 @@ where let lower = range.0.map(|key| TimestampedRef::new(key, ts)); let upper = range.1.map(|key| TimestampedRef::new(key, EPOCH)); - let mut scan = MutableScan { - inner: self.data.range((lower, upper)), - item_buf: None, - ts, - }; - let _ = scan.next(); - scan + self.data.range((lower, upper)) } } @@ -127,33 +112,9 @@ where } } -impl<'scan, R> Iterator for MutableScan<'scan, R> -where - R: Record, -{ - type Item = Entry<'scan, Timestamped, Option>; - - fn next(&mut self) -> Option { - for entry in self.inner.by_ref() { - let key = entry.key(); - if key.ts <= self.ts - && matches!( - self.item_buf - .as_ref() - .map(|entry| entry.key().value() != key.value()), - Some(true) | None - ) - { - return self.item_buf.replace(entry); - } - } - self.item_buf.take() - } -} - #[cfg(test)] mod tests { - use std::ops::Bound; + use std::collections::Bound; use super::Mutable; use crate::{ @@ -213,10 +174,18 @@ mod tests { scan.next().unwrap().key(), &Timestamped::new("1".into(), 0_u32.into()) ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 1_u32.into()) + ); assert_eq!( scan.next().unwrap().key(), &Timestamped::new("2".into(), 0_u32.into()) ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("3".into(), 1_u32.into()) + ); assert_eq!( scan.next().unwrap().key(), &Timestamped::new("4".into(), 0_u32.into()) @@ -237,6 +206,10 @@ mod tests { scan.next().unwrap().key(), &Timestamped::new("2".into(), 1_u32.into()) ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 0_u32.into()) + ); assert_eq!( scan.next().unwrap().key(), &Timestamped::new("3".into(), 1_u32.into()) diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 227cf50..2734572 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -173,4 +173,38 @@ mod tests { dbg!(merge.next().await); dbg!(merge.next().await); } + + #[tokio::test] + async fn merge_mutable_remove_duplicates() { + let m1 = Mutable::::new(); + m1.insert("1".into(), 0_u32.into()); + m1.insert("2".into(), 0_u32.into()); + m1.insert("2".into(), 1_u32.into()); + m1.insert("3".into(), 1_u32.into()); + m1.insert("4".into(), 0_u32.into()); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let bound = (Bound::Included(&lower), Bound::Included(&upper)); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); + + dbg!(merge.next().await); + dbg!(merge.next().await); + dbg!(merge.next().await); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let bound = (Bound::Included(&lower), Bound::Included(&upper)); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); + + dbg!(merge.next().await); + dbg!(merge.next().await); + dbg!(merge.next().await); + } }