Skip to content

Commit

Permalink
test: add test case mutable::range & `merge_stream::merge_mutable_r…
Browse files Browse the repository at this point in the history
…emove_duplicates`

(cherry picked from commit 2601443)
  • Loading branch information
KKould committed Jul 16, 2024
1 parent 19905c6 commit 7fef2be
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
57 changes: 15 additions & 42 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
record::{KeyRef, Record},
};

pub(crate) type MutableScanInner<'scan, R> = Range<
pub(crate) type MutableScan<'scan, R> = Range<
'scan,
TimestampedRef<<R as Record>::Key>,
(
Expand All @@ -32,15 +32,6 @@ where
data: SkipMap<Timestamped<R::Key>, Option<R>>,
}

pub(crate) struct MutableScan<'scan, R>
where
R: Record,
{
inner: MutableScanInner<'scan, R>,
item_buf: Option<Entry<'scan, Timestamped<R::Key>, Option<R>>>,
ts: Timestamp,
}

impl<R> Default for Mutable<R>
where
R: Record,
Expand Down Expand Up @@ -108,13 +99,7 @@ where
let lower = range.0.map(|key| TimestampedRef::new(key, ts));
let upper = range.1.map(|key| TimestampedRef::new(key, EPOCH));

let mut scan = MutableScan {
inner: self.data.range((lower, upper)),
item_buf: None,
ts,
};
let _ = scan.next();
scan
self.data.range((lower, upper))
}
}

Expand All @@ -127,33 +112,9 @@ where
}
}

impl<'scan, R> Iterator for MutableScan<'scan, R>
where
R: Record,
{
type Item = Entry<'scan, Timestamped<R::Key>, Option<R>>;

fn next(&mut self) -> Option<Self::Item> {
for entry in self.inner.by_ref() {
let key = entry.key();
if key.ts <= self.ts
&& matches!(
self.item_buf
.as_ref()
.map(|entry| entry.key().value() != key.value()),
Some(true) | None
)
{
return self.item_buf.replace(entry);
}
}
self.item_buf.take()
}
}

#[cfg(test)]
mod tests {
use std::ops::Bound;
use std::collections::Bound;

use super::Mutable;
use crate::{
Expand Down Expand Up @@ -213,10 +174,18 @@ mod tests {
scan.next().unwrap().key(),
&Timestamped::new("1".into(), 0_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("2".into(), 1_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("2".into(), 0_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("3".into(), 1_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("4".into(), 0_u32.into())
Expand All @@ -237,6 +206,10 @@ mod tests {
scan.next().unwrap().key(),
&Timestamped::new("2".into(), 1_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("2".into(), 0_u32.into())
);
assert_eq!(
scan.next().unwrap().key(),
&Timestamped::new("3".into(), 1_u32.into())
Expand Down
34 changes: 34 additions & 0 deletions src/stream/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,38 @@ mod tests {
dbg!(merge.next().await);
dbg!(merge.next().await);
}

#[tokio::test]
async fn merge_mutable_remove_duplicates() {
let m1 = Mutable::<String>::new();
m1.insert("1".into(), 0_u32.into());
m1.insert("2".into(), 0_u32.into());
m1.insert("2".into(), 1_u32.into());
m1.insert("3".into(), 1_u32.into());
m1.insert("4".into(), 0_u32.into());

let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
dbg!(merge.next().await);

let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
dbg!(merge.next().await);
}
}

0 comments on commit 7fef2be

Please sign in to comment.