Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime schema declaration #165

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ jobs:
command: fmt
args: -- --check

exmaples:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name: Rust exmaples
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run datafusion example
uses: actions-rs/cargo@v1
with:
command: run
args: --example datafusion --features=datafusion

- name: Run declare example
uses: actions-rs/cargo@v1
with:
command: run
args: --example declare --all-features

benchmark:
name: Rust benchmark
runs-on: self-hosted
Expand Down
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,23 @@ path = "benches/criterion/writes.rs"
required-features = ["sled"]

[dependencies]
arrow = "52"
arrow = "53"
async-lock = "3"
async-stream = "0.3"
async-trait = { version = "0.1", optional = true }
bytes = { version = "1.7", optional = true }
crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "41", optional = true }
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", package = "fusio", rev = "317b1b0621b297f52145b41b90506632f2dc7a1d", features = ["tokio", "dyn"] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", package = "fusio-parquet", rev = "317b1b0621b297f52145b41b90506632f2dc7a1d" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the latest is: ab6a8073b966f9c7d7ca61c32646b7d45dbc4f31

futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
lockable = "0.0.8"
once_cell = "1"
parquet = { version = "52", features = ["async"] }
parquet = { version = "53", features = ["async"] }
pin-project-lite = "0.2"
regex = "1"
thiserror = "1"
Expand All @@ -74,6 +76,7 @@ tracing = "0.1"
ulid = "1"

# Only used for benchmarks
log = "0.4.22"
redb = { version = "2", optional = true }
rocksdb = { version = "0.22", optional = true }
sled = { version = "0.34", optional = true }
Expand Down
78 changes: 55 additions & 23 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ use std::{
fs::File,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
};

use async_stream::stream;
use fusio::local::TokioFs;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet::data_type::AsBytes;
use redb::TableDefinition;
use rocksdb::{Direction, IteratorMode, TransactionDB};
use tonbo::{
executor::tokio::TokioExecutor, stream, transaction::TransactionEntry, DbOption, Projection,
executor::tokio::TokioExecutor, fs::manager::StoreManager, stream,
transaction::TransactionEntry, DbOption, Projection,
};
use tonbo_macros::Record;

Expand Down Expand Up @@ -196,8 +199,14 @@ impl TonboBenchDataBase {
}

impl BenchDatabase for TonboBenchDataBase {
type W<'db> = TonboBenchWriteTransaction<'db> where Self: 'db;
type R<'db> = TonboBenchReadTransaction<'db> where Self: 'db;
type W<'db>
= TonboBenchWriteTransaction<'db>
where
Self: 'db;
type R<'db>
= TonboBenchReadTransaction<'db>
where
Self: 'db;

fn db_type_name() -> &'static str {
"tonbo"
Expand All @@ -216,27 +225,35 @@ impl BenchDatabase for TonboBenchDataBase {
}

async fn build(path: impl AsRef<Path>) -> Self {
let option = DbOption::from(path.as_ref()).disable_wal();
let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let option =
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::new()).await.unwrap();
let db = tonbo::DB::new(option, TokioExecutor::new(), manager)
.await
.unwrap();
TonboBenchDataBase::new(db)
}
}

pub struct TonboBenchReadTransaction<'a> {
txn: tonbo::transaction::Transaction<'a, Customer, TokioExecutor>,
txn: tonbo::transaction::Transaction<'a, Customer>,
}

impl<'db> BenchReadTransaction for TonboBenchReadTransaction<'db> {
type T<'txn> = TonboBenchReader<'db, 'txn> where Self: 'txn;
type T<'txn>
= TonboBenchReader<'db, 'txn>
where
Self: 'txn;

fn get_reader(&self) -> Self::T<'_> {
TonboBenchReader { txn: &self.txn }
}
}

pub struct TonboBenchReader<'db, 'txn> {
txn: &'txn tonbo::transaction::Transaction<'db, Customer, TokioExecutor>,
txn: &'txn tonbo::transaction::Transaction<'db, Customer>,
}

impl BenchReader for TonboBenchReader<'_, '_> {
Expand Down Expand Up @@ -276,11 +293,14 @@ impl BenchReader for TonboBenchReader<'_, '_> {
}

pub struct TonboBenchWriteTransaction<'a> {
txn: tonbo::transaction::Transaction<'a, Customer, TokioExecutor>,
txn: tonbo::transaction::Transaction<'a, Customer>,
}

impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> {
type W<'txn> = TonboBenchInserter<'db, 'txn> where Self: 'txn;
type W<'txn>
= TonboBenchInserter<'db, 'txn>
where
Self: 'txn;

fn get_inserter(&mut self) -> Self::W<'_> {
TonboBenchInserter { txn: &mut self.txn }
Expand All @@ -293,7 +313,7 @@ impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> {
}

pub struct TonboBenchInserter<'db, 'txn> {
txn: &'txn mut tonbo::transaction::Transaction<'db, Customer, TokioExecutor>,
txn: &'txn mut tonbo::transaction::Transaction<'db, Customer>,
}

impl BenchInserter for TonboBenchInserter<'_, '_> {
Expand All @@ -320,8 +340,14 @@ impl RedbBenchDatabase {
}

impl BenchDatabase for RedbBenchDatabase {
type W<'db> = RedbBenchWriteTransaction where Self: 'db;
type R<'db> = RedbBenchReadTransaction where Self: 'db;
type W<'db>
= RedbBenchWriteTransaction
where
Self: 'db;
type R<'db>
= RedbBenchReadTransaction
where
Self: 'db;

fn db_type_name() -> &'static str {
"redb"
Expand Down Expand Up @@ -351,7 +377,10 @@ pub struct RedbBenchReadTransaction {
}

impl BenchReadTransaction for RedbBenchReadTransaction {
type T<'txn> = RedbBenchReader where Self: 'txn;
type T<'txn>
= RedbBenchReader
where
Self: 'txn;

fn get_reader(&self) -> Self::T<'_> {
let table = self.txn.open_table(X).unwrap();
Expand Down Expand Up @@ -416,7 +445,10 @@ pub struct RedbBenchWriteTransaction {
}

impl BenchWriteTransaction for RedbBenchWriteTransaction {
type W<'txn> = RedbBenchInserter<'txn> where Self: 'txn;
type W<'txn>
= RedbBenchInserter<'txn>
where
Self: 'txn;

fn get_inserter(&mut self) -> Self::W<'_> {
let table = self.txn.open_table(X).unwrap();
Expand Down Expand Up @@ -464,11 +496,11 @@ impl SledBenchDatabase {

impl BenchDatabase for SledBenchDatabase {
type W<'db>
= SledBenchWriteTransaction<'db>
= SledBenchWriteTransaction<'db>
where
Self: 'db;
type R<'db>
= SledBenchReadTransaction<'db>
= SledBenchReadTransaction<'db>
where
Self: 'db;

Expand Down Expand Up @@ -500,7 +532,7 @@ pub struct SledBenchReadTransaction<'db> {

impl BenchReadTransaction for SledBenchReadTransaction<'_> {
type T<'txn>
= SledBenchReader<'txn>
= SledBenchReader<'txn>
where
Self: 'txn;

Expand Down Expand Up @@ -568,7 +600,7 @@ pub struct SledBenchWriteTransaction<'a> {

impl BenchWriteTransaction for SledBenchWriteTransaction<'_> {
type W<'txn>
= SledBenchInserter<'txn>
= SledBenchInserter<'txn>
where
Self: 'txn;

Expand Down Expand Up @@ -624,11 +656,11 @@ impl RocksdbBenchDatabase {

impl BenchDatabase for RocksdbBenchDatabase {
type W<'db>
= RocksdbBenchWriteTransaction<'db>
= RocksdbBenchWriteTransaction<'db>
where
Self: 'db;
type R<'db>
= RocksdbBenchReadTransaction<'db>
= RocksdbBenchReadTransaction<'db>
where
Self: 'db;

Expand Down Expand Up @@ -667,7 +699,7 @@ pub struct RocksdbBenchWriteTransaction<'a> {

impl<'a> BenchWriteTransaction for RocksdbBenchWriteTransaction<'a> {
type W<'txn>
= RocksdbBenchInserter<'txn>
= RocksdbBenchInserter<'txn>
where
Self: 'txn;

Expand Down Expand Up @@ -706,7 +738,7 @@ pub struct RocksdbBenchReadTransaction<'db> {

impl<'db> BenchReadTransaction for RocksdbBenchReadTransaction<'db> {
type T<'txn>
= RocksdbBenchReader<'db, 'txn>
= RocksdbBenchReader<'db, 'txn>
where
Self: 'txn;

Expand Down
15 changes: 12 additions & 3 deletions benches/criterion/writes.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{iter::repeat_with, sync::Arc};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use fusio::local::TokioFs;
use mimalloc::MiMalloc;
use tonbo::{executor::tokio::TokioExecutor, DbOption, Record, DB};
use tonbo::{executor::tokio::TokioExecutor, fs::manager::StoreManager, DbOption, Record, DB};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -55,10 +56,14 @@ fn single_write(c: &mut Criterion) {
let batches = [1, 16, 128];

let _ = std::fs::remove_dir_all("/tmp/tonbo");
let _ = std::fs::create_dir_all("/tmp/tonbo");

for batch in batches {
let option = DbOption::from("/tmp/tonbo").disable_wal();
let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let option = DbOption::from(fusio::path::Path::from_filesystem_path("/tmp/tonbo").unwrap())
.disable_wal();
let db = runtime
.block_on(async { DB::new(option, TokioExecutor::default()).await })
.block_on(async { DB::new(option, TokioExecutor::default(), manager).await })
.unwrap();

group.bench_with_input(BenchmarkId::new("Tonbo", batch), &batch, |b, batch| {
Expand All @@ -67,9 +72,12 @@ fn single_write(c: &mut Criterion) {
.iter(|| async { tonbo_write(&db, *batch).await });
});
let _ = std::fs::remove_dir_all("/tmp/tonbo");
let _ = std::fs::create_dir_all("/tmp/tonbo");
}

let _ = std::fs::remove_dir_all("/tmp/sled");
let _ = std::fs::create_dir_all("/tmp/sled");

for batch in batches {
let sled = sled::open("/tmp/sled").unwrap();
group.bench_with_input(BenchmarkId::new("Sled", batch), &batch, |b, batch| {
Expand All @@ -78,6 +86,7 @@ fn single_write(c: &mut Criterion) {
.iter(|| async { sled_write(&sled, *batch).await });
});
let _ = std::fs::remove_dir_all("/tmp/sled");
let _ = std::fs::create_dir_all("/tmp/sled");
}

group.finish();
Expand Down
6 changes: 2 additions & 4 deletions benches/read_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ mod common;

use std::{
collections::Bound,
env::current_dir,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};

use futures_util::{future::join_all, StreamExt};
use tokio::io::AsyncWriteExt;
use tonbo::{executor::tokio::TokioExecutor, fs::FileProvider};
use tokio::{fs, io::AsyncWriteExt};

use crate::common::{
read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase,
Expand Down Expand Up @@ -181,7 +179,7 @@ async fn main() {
println!();
println!("{table}");

let mut file = TokioExecutor::open("read_benchmark.md").await.unwrap();
let mut file = fs::File::create("read_benchmark.md").await.unwrap();
file.write_all(b"Read: \n```shell\n").await.unwrap();
for line in table.lines() {
file.write_all(line.as_bytes()).await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions benches/write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use common::*;
use futures_util::future::join_all;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tonbo::{executor::tokio::TokioExecutor, fs::FileProvider};

const WRITE_TIMES: usize = 500_000;
const WRITE_BATCH_TIMES: usize = 5000;
Expand Down Expand Up @@ -227,7 +226,7 @@ async fn main() {
println!();
println!("{table}");

let mut file = TokioExecutor::open("write_benchmark.md").await.unwrap();
let mut file = tokio::fs::File::create("write_benchmark.md").await.unwrap();
file.write_all(b"Write: \n```shell\n").await.unwrap();
for line in table.lines() {
file.write_all(line.as_bytes()).await.unwrap();
Expand Down
Loading
Loading