diff --git a/Cargo.toml b/Cargo.toml index fe521da..7d50ded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kip_db" -version = "0.1.2-alpha.18" +version = "0.1.2-alpha.19" edition = "2021" authors = ["Kould "] description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库" diff --git a/examples/mvcc.rs b/examples/mvcc.rs index 46b940f..3411e34 100644 --- a/examples/mvcc.rs +++ b/examples/mvcc.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use kip_db::kernel::lsm::mvcc::CheckType; use kip_db::kernel::lsm::storage::{Config, KipStorage}; use kip_db::kernel::Storage; use kip_db::KernelError; @@ -11,7 +12,7 @@ async fn main() -> Result<(), KernelError> { let kip_storage = KipStorage::open_with_config(config).await?; println!("New Transaction"); - let mut tx = kip_storage.new_transaction().await; + let mut tx = kip_storage.new_transaction(CheckType::Optimistic).await; println!("Set KeyValue after the transaction -> (key_1, value_1)"); kip_storage diff --git a/examples/scan_read.rs b/examples/scan_read.rs index da93fea..0b3f5e6 100644 --- a/examples/scan_read.rs +++ b/examples/scan_read.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use kip_db::kernel::lsm::iterator::Iter; +use kip_db::kernel::lsm::mvcc::CheckType; use kip_db::kernel::lsm::storage::{Config, KipStorage}; use kip_db::kernel::Storage; use kip_db::KernelError; @@ -35,7 +36,7 @@ async fn main() -> Result<(), KernelError> { .await?; println!("New Transaction"); - let tx = kip_storage.new_transaction().await; + let tx = kip_storage.new_transaction(CheckType::Optimistic).await; println!("Iter without key_3 By Transaction:"); let mut iter = tx.iter(Bound::Unbounded, Bound::Excluded(b"key_3"))?; diff --git a/src/error.rs b/src/error.rs index 37c1f77..bd7d42a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -72,7 +72,7 @@ pub enum KernelError { #[error("Process already exists")] ProcessExists, - #[error("channel is closed")] + #[error("Channel is closed")] ChannelClose, #[error("{0}")] @@ -80,6 +80,9 @@ pub enum KernelError { #[error("The number of caches cannot be divisible by the number of shards")] ShardingNotAlign, + + #[error("Same write in different transactions")] + RepeatedWrite, } #[derive(Error, Debug)] diff --git a/src/kernel/lsm/mem_table.rs b/src/kernel/lsm/mem_table.rs index a96ab3b..1ef591e 100644 --- a/src/kernel/lsm/mem_table.rs +++ b/src/kernel/lsm/mem_table.rs @@ -201,6 +201,24 @@ impl MemTable { }) } + pub(crate) fn check_key_conflict(&self, kvs: &[KeyValue], seq_id: i64) -> bool { + let inner = self.inner.lock(); + + for (key, _) in kvs { + let internal_key = InternalKey::new_with_seq(key.clone(), seq_id); + + if let Some(true) = inner + ._mem + .lower_bound(Bound::Excluded(&internal_key)) + .map(|(lower_key, _)| lower_key.key == key) + { + return true; + } + } + + false + } + /// 插入并判断是否溢出 /// /// 插入时不会去除重复键值,而是进行追加 @@ -506,6 +524,32 @@ mod tests { Ok(()) } + #[test] + fn test_mem_table_check_key_conflict() -> KernelResult<()> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + + let mem_table = MemTable::new(&Config::new(temp_dir.path()))?; + + let key1 = vec![b'k', b'1']; + let bytes_key1 = Bytes::copy_from_slice(&key1); + let kv_1 = (bytes_key1.clone(), Some(bytes_key1.clone())); + + let key2 = vec![b'k', b'2']; + let bytes_key2 = Bytes::copy_from_slice(&key2); + let kv_2 = (bytes_key2.clone(), Some(bytes_key2.clone())); + + let _ = mem_table.insert_data_with_seq(kv_1.clone(), 0)?; + let _ = mem_table.insert_data_with_seq(kv_1.clone(), 1)?; + let _ = mem_table.insert_data_with_seq(kv_1.clone(), 2)?; + let _ = mem_table.insert_data_with_seq(kv_2.clone(), 3)?; + + assert!(mem_table.check_key_conflict(&[kv_1.clone()], 1)); + + assert!(!mem_table.check_key_conflict(&[kv_1.clone()], 2)); + + Ok(()) + } + #[test] fn test_mem_table_range_scan() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); diff --git a/src/kernel/lsm/mvcc.rs b/src/kernel/lsm/mvcc.rs index 25359f4..d9c9457 100644 --- a/src/kernel/lsm/mvcc.rs +++ b/src/kernel/lsm/mvcc.rs @@ -29,6 +29,10 @@ unsafe impl Sync for BufPtr {} struct BufPtr(NonNull>); +pub enum CheckType { + Optimistic, +} + pub struct Transaction { pub(crate) store_inner: Arc, pub(crate) compactor_tx: Sender, @@ -36,6 +40,7 @@ pub struct Transaction { pub(crate) version: Arc, pub(crate) write_buf: Option>>, pub(crate) seq_id: i64, + pub(crate) check_type: CheckType, } impl Transaction { @@ -80,9 +85,18 @@ impl Transaction { #[inline] pub async fn commit(mut self) -> KernelResult<()> { if let Some(buf) = self.write_buf.take() { - let batch_data = buf - .into_iter() - .collect_vec(); + let batch_data = buf.into_iter().collect_vec(); + + match self.check_type { + CheckType::Optimistic => { + if self + .mem_table() + .check_key_conflict(&batch_data, self.seq_id) + { + return Err(KernelError::RepeatedWrite); + } + } + } let is_exceeds = self .store_inner @@ -292,90 +306,113 @@ impl<'a> Iter<'a> for BufIter<'a> { #[cfg(test)] mod tests { use crate::kernel::lsm::iterator::Iter; + use crate::kernel::lsm::mvcc::CheckType; use crate::kernel::lsm::storage::{Config, KipStorage}; use crate::kernel::{KernelResult, Storage}; + use crate::KernelError; use bincode::Options; use bytes::Bytes; use itertools::Itertools; use std::collections::Bound; use tempfile::TempDir; - #[test] - fn test_transaction() -> KernelResult<()> { + #[tokio::test] + async fn test_transaction() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); - tokio_test::block_on(async move { - let times = 5000; + let times = 5000; - let value = b"0"; + let value = b"0"; - let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4); - let kv_store = KipStorage::open_with_config(config).await?; + let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4); + let kv_store = KipStorage::open_with_config(config).await?; - let mut vec_kv = Vec::new(); + let mut vec_kv = Vec::new(); - for i in 0..times { - let vec_u8 = bincode::options().with_big_endian().serialize(&i)?; - vec_kv.push(( - Bytes::from(vec_u8.clone()), - Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()), - )); - } + for i in 0..times { + let vec_u8 = bincode::options().with_big_endian().serialize(&i)?; + vec_kv.push(( + Bytes::from(vec_u8.clone()), + Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()), + )); + } - // 模拟数据分布在MemTable以及SSTable中 - for kv in vec_kv.iter().take(50) { - kv_store.set(kv.0.clone(), kv.1.clone()).await?; - } + // 模拟数据分布在MemTable以及SSTable中 + for kv in vec_kv.iter().take(50) { + kv_store.set(kv.0.clone(), kv.1.clone()).await?; + } - kv_store.flush().await?; + kv_store.flush().await?; - for kv in vec_kv.iter().take(100).skip(50) { - kv_store.set(kv.0.clone(), kv.1.clone()).await?; - } + for kv in vec_kv.iter().take(100).skip(50) { + kv_store.set(kv.0.clone(), kv.1.clone()).await?; + } - let mut tx_1 = kv_store.new_transaction().await; + let mut tx_1 = kv_store.new_transaction(CheckType::Optimistic).await; - for kv in vec_kv.iter().take(times).skip(100) { - tx_1.set(kv.0.clone(), kv.1.clone()); - } + for kv in vec_kv.iter().take(times).skip(100) { + tx_1.set(kv.0.clone(), kv.1.clone()); + } - tx_1.remove(&vec_kv[times - 1].0)?; + tx_1.remove(&vec_kv[times - 1].0)?; - // 事务在提交前事务可以读取到自身以及Store已写入的数据 - for kv in vec_kv.iter().take(times - 1) { - assert_eq!(tx_1.get(&kv.0)?, Some(kv.1.clone())); - } + // 事务在提交前事务可以读取到自身以及Store已写入的数据 + for kv in vec_kv.iter().take(times - 1) { + assert_eq!(tx_1.get(&kv.0)?, Some(kv.1.clone())); + } - assert_eq!(tx_1.get(&vec_kv[times - 1].0)?, None); + assert_eq!(tx_1.get(&vec_kv[times - 1].0)?, None); - // 事务在提交前Store不应该读取到事务中的数据 - for kv in vec_kv.iter().take(times).skip(100) { - assert_eq!(kv_store.get(&kv.0).await?, None); - } + // 事务在提交前Store不应该读取到事务中的数据 + for kv in vec_kv.iter().take(times).skip(100) { + assert_eq!(kv_store.get(&kv.0).await?, None); + } - let vec_test = vec_kv[25..] - .iter() - .cloned() - .map(|(key, value)| (key, Some(value))) - .collect_vec(); + let vec_test = vec_kv[25..] + .iter() + .cloned() + .map(|(key, value)| (key, Some(value))) + .collect_vec(); - let mut iter = tx_1.iter(Bound::Included(&vec_kv[25].0), Bound::Unbounded)?; + let mut iter = tx_1.iter(Bound::Included(&vec_kv[25].0), Bound::Unbounded)?; - // -1是因为最后一个元素在之前tx中删除了,因此为None - for kv in vec_test.iter().take(vec_test.len() - 1) { - // 元素太多,因此这里就单个对比,否则会导致报错时日志过多 - assert_eq!(iter.try_next()?.unwrap(), kv.clone()); - } + // -1是因为最后一个元素在之前tx中删除了,因此为None + for kv in vec_test.iter().take(vec_test.len() - 1) { + // 元素太多,因此这里就单个对比,否则会导致报错时日志过多 + assert_eq!(iter.try_next()?.unwrap(), kv.clone()); + } - drop(iter); + drop(iter); - tx_1.commit().await?; + tx_1.commit().await?; - for kv in vec_kv.iter().take(times - 1) { - assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone())); - } + for kv in vec_kv.iter().take(times - 1) { + assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone())); + } - Ok(()) - }) + Ok(()) + } + + #[tokio::test] + async fn test_transaction_check_optimistic() -> KernelResult<()> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + + let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4); + let kv_store = KipStorage::open_with_config(config).await?; + + let mut tx_1 = kv_store.new_transaction(CheckType::Optimistic).await; + let mut tx_2 = kv_store.new_transaction(CheckType::Optimistic).await; + + tx_1.set(Bytes::from("same_key"), Bytes::new()); + tx_2.set(Bytes::from("same_key"), Bytes::new()); + + tx_1.commit().await?; + + assert!(matches!( + tx_2.commit().await, + Err(KernelError::RepeatedWrite) + )); + + Ok(()) } } diff --git a/src/kernel/lsm/storage.rs b/src/kernel/lsm/storage.rs index eafe89c..31adc5a 100644 --- a/src/kernel/lsm/storage.rs +++ b/src/kernel/lsm/storage.rs @@ -1,7 +1,7 @@ use crate::kernel::io::IoType; use crate::kernel::lsm::compactor::{CompactTask, Compactor}; use crate::kernel::lsm::mem_table::{KeyValue, MemTable}; -use crate::kernel::lsm::mvcc::Transaction; +use crate::kernel::lsm::mvcc::{CheckType, Transaction}; use crate::kernel::lsm::table::scope::Scope; use crate::kernel::lsm::table::ss_table::block; use crate::kernel::lsm::table::TableType; @@ -243,7 +243,7 @@ impl KipStorage { /// 创建事务 #[inline] - pub async fn new_transaction(&self) -> Transaction { + pub async fn new_transaction(&self, check_type: CheckType) -> Transaction { let _ = self.mem_table().tx_count.fetch_add(1, Ordering::Release); Transaction { @@ -253,6 +253,7 @@ impl KipStorage { seq_id: Sequence::create(), write_buf: None, + check_type, } } diff --git a/src/kernel/lsm/table/btree_table/iter.rs b/src/kernel/lsm/table/btree_table/iter.rs index 4b1190b..a71e0d8 100644 --- a/src/kernel/lsm/table/btree_table/iter.rs +++ b/src/kernel/lsm/table/btree_table/iter.rs @@ -103,10 +103,7 @@ mod tests { assert_eq!(iter.seek(Seek::First)?, Some(vec[0].clone())); - assert_eq!( - iter.seek(Seek::Backward(&[b'3']))?, - Some(vec[2].clone()) - ); + assert_eq!(iter.seek(Seek::Backward(&[b'3']))?, Some(vec[2].clone())); assert_eq!(iter.seek(Seek::Last)?, Some(vec[5].clone()));