Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
perf: optimize transactions to repeatedly copy data when performing r…
Browse files Browse the repository at this point in the history
…epeated iterative scans
  • Loading branch information
KKould committed Jan 19, 2024
1 parent 76acf12 commit b60c110
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 110 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kip_db"
version = "0.1.2-alpha.21"
version = "0.1.2-alpha.22"
edition = "2021"
authors = ["Kould <[email protected]>"]
description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库"
Expand Down Expand Up @@ -60,6 +60,7 @@ parking_lot = "0.12.1"
crc32fast = "1.3.2"
skiplist = "0.5.1"
fslock = "0.2.1"
once_cell = "1.19.0"
# grpc
tonic = { version = "0.10.2", optional = true }
prost = { version = "0.12", optional = true }
Expand Down
76 changes: 63 additions & 13 deletions src/kernel/lsm/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,20 +355,65 @@ impl MemTable {
option_seq: Option<i64>,
) -> Vec<KeyValue> {
let inner = self.inner.lock();
let de_dupe_merge_sort_fn = |mem: Vec<KeyValue>, immut_mem: Vec<KeyValue>| {
assert!(mem.is_sorted_by_key(|(k, _)| k));
assert!(mem.iter().all_unique());
assert!(immut_mem.is_sorted_by_key(|(k, _)| k));
assert!(immut_mem.iter().all_unique());

let mut merged = Vec::with_capacity(mem.len() + immut_mem.len());
let (mut mem_iter, mut immut_mem_iter) = (mem.into_iter(), immut_mem.into_iter());
let (mut mem_current, mut immut_mem_current) = (mem_iter.next(), immut_mem_iter.next());

while let (Some(mem_item), Some(immut_mem_item)) =
(mem_current.take(), immut_mem_current.take())
{
if mem_item.0 < immut_mem_item.0 {
merged.push(mem_item);
immut_mem_current = Some(immut_mem_item);

inner
._immut
.as_ref()
.map(|mem_map| Self::_range_scan(mem_map, min, max, option_seq))
.unwrap_or_default()
.into_iter()
.chain(Self::_range_scan(&inner._mem, min, max, option_seq))
.rev()
.unique_by(|(key, _)| key.clone())
.collect_vec()
mem_current = mem_iter.next();
if mem_current.is_none() {
break;
}
} else if mem_item.0 > immut_mem_item.0 {
merged.push(immut_mem_item);
mem_current = Some(mem_item);

immut_mem_current = immut_mem_iter.next();
if immut_mem_current.is_none() {
break;
}
} else {
merged.push(mem_item);
}
}

if let Some(kv) = mem_current {
merged.push(kv)
}
if let Some(kv) = immut_mem_current {
merged.push(kv)
}
// one of the two is empty
mem_iter
.chain(immut_mem_iter)
.for_each(|kv| merged.push(kv));

assert!(merged.is_sorted_by_key(|(k, _)| k));
assert!(merged.iter().all_unique());
merged
};
let mut mem_scan = Self::_range_scan(&inner._mem, min, max, option_seq);

if let Some(immut) = &inner._immut {
let immut_scan = Self::_range_scan(immut, min, max, option_seq);

mem_scan = de_dupe_merge_sort_fn(mem_scan, immut_scan);
}
mem_scan
}

/// Tips: 返回的数据为倒序
fn _range_scan(
mem_map: &MemMap,
min: Bound<&[u8]>,
Expand All @@ -395,15 +440,20 @@ impl MemTable {
let min_key = to_internal_key(&min, i64::MIN, i64::MAX);
let max_key = to_internal_key(&max, i64::MAX, i64::MIN);

mem_map
let mut scan = mem_map
.range(min_key.as_ref(), max_key.as_ref())
.rev()
.filter(|(InternalKey { seq_id, .. }, _)| {
option_seq.map_or(true, |current_seq| &current_seq >= seq_id)
})
.unique_by(|(internal_key, _)| &internal_key.key)
.map(|(key, value)| (key.key.clone(), value.clone()))
.collect_vec()
.collect_vec();
scan.reverse();

assert!(scan.is_sorted_by_key(|(k, _)| k));
assert!(scan.iter().all_unique());
scan
}
}

Expand Down
172 changes: 90 additions & 82 deletions src/kernel/lsm/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,23 @@ use crate::kernel::lsm::iterator::merging_iter::MergingIter;
use crate::kernel::lsm::iterator::{Iter, Seek};
use crate::kernel::lsm::mem_table::{KeyValue, MemTable};
use crate::kernel::lsm::query_and_compaction;
use crate::kernel::lsm::storage::{Sequence, StoreInner};
use crate::kernel::lsm::storage::{KipStorage, Sequence, StoreInner};
use crate::kernel::lsm::version::iter::VersionIter;
use crate::kernel::lsm::version::Version;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use core::slice::SlicePattern;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use skiplist::SkipMap;
use std::collections::Bound;
use std::iter::Map;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::Sender;

type MapIter<'a> = Map<
skiplist::skipmap::Iter<'a, Bytes, Option<Bytes>>,
fn((&Bytes, &Option<Bytes>)) -> KeyValue,
>;

unsafe impl Send for BufPtr {}
unsafe impl Sync for BufPtr {}

Expand All @@ -34,16 +30,33 @@ pub enum CheckType {
}

pub struct Transaction {
pub(crate) store_inner: Arc<StoreInner>,
pub(crate) compactor_tx: Sender<CompactTask>,
store_inner: Arc<StoreInner>,
compactor_tx: Sender<CompactTask>,

version: Arc<Version>,
seq_id: i64,
check_type: CheckType,

pub(crate) version: Arc<Version>,
pub(crate) write_buf: Option<SkipMap<Bytes, Option<Bytes>>>,
pub(crate) seq_id: i64,
pub(crate) check_type: CheckType,
write_buf: Option<SkipMap<Bytes, Option<Bytes>>>,
mem_buf: OnceCell<BufPtr>,
}

impl Transaction {
pub(crate) async fn new(storage: &KipStorage, check_type: CheckType) -> Self {
let _ = storage.mem_table().tx_count.fetch_add(1, Ordering::Release);

Transaction {
store_inner: Arc::clone(&storage.inner),
version: storage.current_version().await,
compactor_tx: storage.compactor_tx.clone(),

seq_id: Sequence::create(),
write_buf: None,
check_type,
mem_buf: OnceCell::new(),
}
}

fn write_buf_or_init(&mut self) -> &mut SkipMap<Bytes, Option<Bytes>> {
self.write_buf.get_or_insert_with(SkipMap::new)
}
Expand Down Expand Up @@ -115,34 +128,6 @@ impl Transaction {
Ok(())
}

#[inline]
pub fn mem_range(&self, min: Bound<&[u8]>, max: Bound<&[u8]>) -> Vec<KeyValue> {
let mem_table_range = self.mem_table().range_scan(min, max, Some(self.seq_id));

if let Some(buf_iter) = self._mem_range(min, max) {
buf_iter
.chain(mem_table_range)
.unique_by(|(key, _)| key.clone())
.sorted_by_key(|(key, _)| key.clone())
.collect_vec()
} else {
mem_table_range
}
}

fn _mem_range(&self, min: Bound<&[u8]>, max: Bound<&[u8]>) -> Option<MapIter> {
#[allow(clippy::option_map_or_none)]
self.write_buf.as_ref().map_or(None, |buf| {
Some(
buf.range(
min.map(Bytes::copy_from_slice).as_ref(),
max.map(Bytes::copy_from_slice).as_ref(),
)
.map(|(key, value)| (key.clone(), value.clone())),
)
})
}

fn mem_table(&self) -> &MemTable {
&self.store_inner.mem_table
}
Expand All @@ -158,50 +143,52 @@ impl Transaction {
min: Bound<&[u8]>,
max: Bound<&[u8]>,
) -> KernelResult<TransactionIter> {
let range_buf = self.mem_range(min, max);
let ptr = BufPtr(Box::leak(Box::new(range_buf)).into());
let option_write_buf = self.write_buf.as_ref().map(|buf| {
buf.range(
min.map(Bytes::copy_from_slice).as_ref(),
max.map(Bytes::copy_from_slice).as_ref(),
)
.map(|(key, value)| (key.clone(), value.clone()))
.collect_vec()
});

let mem_ptr = self.mem_buf.get_or_init(|| {
let kvs = self.mem_table().range_scan(min, max, Some(self.seq_id));

BufPtr(Box::leak(Box::new(kvs)).into())
});
let mut write_buf_ptr = None;
let mut vec_iter: Vec<Box<dyn Iter<'a, Item = KeyValue> + 'a + Send + Sync>> =
Vec::with_capacity(3);

if let Some(write_buf) = option_write_buf {
let buf_ptr = BufPtr(Box::leak(Box::new(write_buf)).into());
let buf_iter = unsafe {
BufIter {
inner: buf_ptr.0.as_ref(),
pos: 0,
}
};

let mem_iter = unsafe {
write_buf_ptr = Some(buf_ptr);
vec_iter.push(Box::new(buf_iter));
}
vec_iter.push(Box::new(unsafe {
BufIter {
inner: ptr.0.as_ref(),
inner: mem_ptr.0.as_ref(),
pos: 0,
}
};

let mut version_iter = VersionIter::new(&self.version)?;
let mut seek_buf = None;

match min {
Bound::Included(key) => {
let ver_seek_option = version_iter.seek(Seek::Backward(key))?;
unsafe {
let op = |disk_option: Option<&KeyValue>, mem_option: Option<&KeyValue>| match (
disk_option,
mem_option,
) {
(Some(disk), Some(mem)) => disk.0 >= mem.0,
_ => false,
};

if !op(ver_seek_option.as_ref(), ptr.0.as_ref().first()) {
seek_buf = ver_seek_option;
}
}
}
Bound::Excluded(key) => {
let _ = version_iter.seek(Seek::Backward(key))?;
}
Bound::Unbounded => (),
}
}));
vec_iter.push(Box::new(VersionIter::new(&self.version)?));

let vec_iter: Vec<Box<dyn Iter<'a, Item = KeyValue> + 'a + Send + Sync>> =
vec![Box::new(mem_iter), Box::new(version_iter)];
let inner = MergingIter::new(vec_iter)?;

Ok(TransactionIter {
inner: MergingIter::new(vec_iter)?,
inner,
min: min.map(Bytes::copy_from_slice),
max: max.map(Bytes::copy_from_slice),
ptr,
seek_buf,
write_buf_ptr,
is_seeked: false,
})
}
}
Expand All @@ -210,6 +197,10 @@ impl Drop for Transaction {
#[inline]
fn drop(&mut self) {
let _ = self.mem_table().tx_count.fetch_sub(1, Ordering::Release);

if let Some(mem_ptr) = self.mem_buf.take() {
unsafe { drop(Box::from_raw(mem_ptr.0.as_ptr())) }
}
}
}

Expand All @@ -219,18 +210,33 @@ unsafe impl Send for TransactionIter<'_> {}

pub struct TransactionIter<'a> {
inner: MergingIter<'a>,
ptr: BufPtr,
write_buf_ptr: Option<BufPtr>,
min: Bound<Bytes>,
max: Bound<Bytes>,
seek_buf: Option<KeyValue>,
is_seeked: bool,
}

impl<'a> Iter<'a> for TransactionIter<'a> {
type Item = KeyValue;

#[inline]
fn try_next(&mut self) -> KernelResult<Option<Self::Item>> {
if let Some(item) = self.seek_buf.take() {
return Ok(Some(item));
if !self.is_seeked {
self.is_seeked = true;

match &self.min {
Bound::Included(key) => return self.inner.seek(Seek::Backward(key.as_slice())),
Bound::Excluded(key) => {
if let Some(kv) = self.inner.seek(Seek::Backward(key.as_slice()))? {
if kv.0 != key {
return Ok(Some(kv));
}
} else {
return Ok(None);
}
}
Bound::Unbounded => (),
};
}

let option = match &self.max {
Expand Down Expand Up @@ -262,7 +268,9 @@ impl<'a> Iter<'a> for TransactionIter<'a> {
impl Drop for TransactionIter<'_> {
#[inline]
fn drop(&mut self) {
unsafe { drop(Box::from_raw(self.ptr.0.as_ptr())) }
if let Some(buf_prt) = &self.write_buf_ptr {
unsafe { drop(Box::from_raw(buf_prt.0.as_ptr())) }
}
}
}

Expand Down
Loading

0 comments on commit b60c110

Please sign in to comment.