diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd1c360..9371e12 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -85,6 +85,43 @@ jobs: command: test args: --package fusio --features "monoio, futures" + tokio_uring_check: + name: Rust project check on tokio_uring + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install latest + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + + # `cargo check` command here will use installed `nightly` + # as it is set as an "override" for current directory + + - name: Run cargo clippy on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: check + args: --package fusio --features "tokio-uring, futures" + + - name: Run cargo build on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: build + args: --package fusio --features "tokio-uring, futures" + + - name: Run cargo test on tokio-uring + uses: actions-rs/cargo@v1 + with: + command: test + args: --package fusio --features "tokio-uring, futures" + # 2 fmt: name: Rust fmt diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index c29a7dd..5182c77 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -26,6 +26,10 @@ impl<'read> Read for Box { Ok(unsafe { B::recover_from_buf_mut(buf) }) } + async fn read_to_end(&mut self, buf: Vec) -> Result, Error> { + DynRead::read_to_end(self.as_mut(), buf).await + } + async fn size(&self) -> Result { DynRead::size(self.as_ref()).await } diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 9c9bbc3..afd7e33 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -68,14 +68,8 @@ pub trait Read: MaybeSend + MaybeSync { fn read_to_end( &mut self, - mut buf: Vec, - ) -> impl Future, Error>> + MaybeSend { - async move { - buf.resize(self.size().await? as usize, 0); - let buf = self.read_exact(buf).await?; - Ok(buf) - } - } + buf: Vec, + ) -> impl Future, Error>> + MaybeSend; fn size(&self) -> impl Future> + MaybeSend; } @@ -93,6 +87,11 @@ where Ok(buf) } + async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { + let _ = std::io::Read::read_to_end(self, &mut buf)?; + Ok(buf) + } + async fn size(&self) -> Result { Ok(self.get_ref().as_ref().len() as u64) } @@ -143,6 +142,13 @@ impl Read for &mut R { R::read_exact(self, buf) } + fn read_to_end( + &mut self, + buf: Vec, + ) -> impl Future, Error>> + MaybeSend { + R::read_to_end(self, buf) + } + fn size(&self) -> impl Future> + MaybeSend { R::size(self) } @@ -171,7 +177,9 @@ impl Write for &mut W { #[cfg(test)] mod tests { - use super::{Read, Write}; + use std::future::Future; + + use super::{MaybeSend, Read, Write}; use crate::{buf::IoBufMut, Error, IoBuf, Seek}; #[allow(unused)] @@ -233,6 +241,13 @@ mod tests { .inspect(|buf| self.cnt += buf.bytes_init()) } + async fn read_to_end(&mut self, buf: Vec) -> Result, Error> { + self.r + .read_to_end(buf) + .await + .inspect(|buf| self.cnt += buf.bytes_init()) + } + async fn size(&self) -> Result { self.r.size().await } @@ -259,13 +274,24 @@ mod tests { writer.sync_data().await.unwrap(); let mut reader = CountRead::new(read); - reader.seek(0).await.unwrap(); + { + reader.seek(0).await.unwrap(); + + let mut buf = vec![]; + buf = reader.read_to_end(buf).await.unwrap(); - let mut buf = vec![]; - buf = reader.read_to_end(buf).await.unwrap(); + assert_eq!(buf.bytes_init(), 4); + assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); + } + { + reader.seek(2).await.unwrap(); + + let mut buf = vec![]; + buf = reader.read_to_end(buf).await.unwrap(); - assert_eq!(buf.bytes_init(), 4); - assert_eq!(buf.as_slice(), &[2, 0, 2, 4]); + assert_eq!(buf.bytes_init(), 2); + assert_eq!(buf.as_slice(), &[2, 4]); + } } #[cfg(feature = "futures")] @@ -418,11 +444,17 @@ mod tests { use tempfile::tempfile; use tokio_uring::fs::File; + use crate::local::tokio_uring::TokioUringFile; + tokio_uring::start(async { let read = tempfile().unwrap(); let write = read.try_clone().unwrap(); - write_and_read(File::from_std(write), File::from_std(read)).await; + write_and_read( + TokioUringFile::from(File::from_std(write)), + TokioUringFile::from(File::from_std(read)), + ) + .await; }); } } diff --git a/fusio/src/local/mod.rs b/fusio/src/local/mod.rs index c71df41..94014a5 100644 --- a/fusio/src/local/mod.rs +++ b/fusio/src/local/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod monoio; #[cfg(feature = "tokio")] pub(crate) mod tokio; #[cfg(all(feature = "tokio-uring", target_os = "linux"))] -mod tokio_uring; +pub(crate) mod tokio_uring; #[cfg(all(feature = "monoio", feature = "fs"))] #[allow(unused)] @@ -21,5 +21,7 @@ cfg_if::cfg_if! { pub type LocalFs = TokioFs; } else if #[cfg(feature = "monoio")] { pub type LocalFs = MonoIoFs; + } else if #[cfg(feature = "tokio-uring")] { + pub type LocalFs = TokioUringFs; } } diff --git a/fusio/src/local/monoio/mod.rs b/fusio/src/local/monoio/mod.rs index 938d503..d6c3c92 100644 --- a/fusio/src/local/monoio/mod.rs +++ b/fusio/src/local/monoio/mod.rs @@ -1,9 +1,11 @@ #[cfg(feature = "fs")] pub mod fs; +use std::future::Future; + use monoio::fs::File; -use crate::{buf::IoBufMut, Error, IoBuf, Read, Seek, Write}; +use crate::{buf::IoBufMut, Error, IoBuf, MaybeSend, Read, Seek, Write}; #[repr(transparent)] struct MonoioBuf { @@ -94,6 +96,12 @@ impl Read for MonoioFile { Ok(buf.buf) } + async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { + buf.resize((self.size().await? - self.pos) as usize, 0); + + Ok(self.read_exact(buf).await?) + } + async fn size(&self) -> Result { let metadata = File::metadata(self.file.as_ref().expect("read file after closed")).await?; Ok(metadata.len()) diff --git a/fusio/src/local/tokio/mod.rs b/fusio/src/local/tokio/mod.rs index 4f62424..9f8f44c 100644 --- a/fusio/src/local/tokio/mod.rs +++ b/fusio/src/local/tokio/mod.rs @@ -44,6 +44,11 @@ impl Read for File { Ok(buf) } + async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { + let _ = AsyncReadExt::read_to_end(self, &mut buf).await?; + Ok(buf) + } + async fn size(&self) -> Result { Ok(self.metadata().await?.len()) } diff --git a/fusio/src/local/tokio_uring/fs.rs b/fusio/src/local/tokio_uring/fs.rs index 4121548..bc72c26 100644 --- a/fusio/src/local/tokio_uring/fs.rs +++ b/fusio/src/local/tokio_uring/fs.rs @@ -1,33 +1,62 @@ -use std::{io, path::Path}; - use async_stream::stream; use futures_core::Stream; -use tokio_uring::fs::{remove_file, File}; +use tokio_uring::fs::{create_dir_all, remove_file}; -use crate::fs::{FileMeta, Fs}; +use crate::{ + fs::{FileMeta, Fs, OpenOptions, WriteMode}, + local::tokio_uring::TokioUringFile, + path::{path_to_local, Path}, + Error, +}; pub struct TokioUringFs; impl Fs for TokioUringFs { - type File = File; + type File = TokioUringFile; + + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { + let local_path = path_to_local(path)?; + + let file = tokio_uring::fs::OpenOptions::new() + .read(options.read) + .write(options.write.is_some()) + .create(options.create) + .append(options.write == Some(WriteMode::Append)) + .truncate(options.write == Some(WriteMode::Overwrite)) + .open(&local_path) + .await?; - async fn open(&self, path: impl AsRef) -> io::Result { - File::open(path).await + Ok(TokioUringFile { + file: Some(file), + pos: 0, + }) + } + + async fn create_dir_all(path: &Path) -> Result<(), Error> { + let path = path_to_local(path)?; + create_dir_all(path).await?; + + Ok(()) } async fn list( &self, - path: impl AsRef, - ) -> io::Result>> { - let dir = path.as_ref().read_dir()?; + path: &Path, + ) -> Result>, Error> { + let path = path_to_local(path)?; + let dir = path.read_dir()?; + Ok(stream! { for entry in dir { - yield Ok(crate::fs::FileMeta { path: entry?.path() }); + let entry = entry?; + yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() }); } }) } - async fn remove(&self, path: impl AsRef) -> io::Result<()> { - remove_file(path).await + async fn remove(&self, path: &Path) -> Result<(), Error> { + let path = path_to_local(path)?; + + Ok(remove_file(path).await?) } } diff --git a/fusio/src/local/tokio_uring/mod.rs b/fusio/src/local/tokio_uring/mod.rs index 4b96017..946a578 100644 --- a/fusio/src/local/tokio_uring/mod.rs +++ b/fusio/src/local/tokio_uring/mod.rs @@ -1,9 +1,11 @@ #[cfg(feature = "fs")] pub mod fs; +use std::future::Future; + use tokio_uring::fs::File; -use crate::{Error, IoBuf, IoBufMut, Read, Write}; +use crate::{Error, IoBuf, IoBufMut, MaybeSend, Read, Seek, Write}; #[repr(transparent)] struct TokioUringBuf { @@ -35,43 +37,86 @@ where self.buf.as_mut_ptr() } - unsafe fn set_init(&mut self, pos: usize) { - IoBufMut::set_init(&mut self.buf, pos) + unsafe fn set_init(&mut self, _pos: usize) {} +} + +pub struct TokioUringFile { + file: Option, + pos: u64, +} + +impl From for TokioUringFile { + fn from(file: File) -> Self { + Self { + file: Some(file), + pos: 0, + } } } -impl Write for File { - async fn write(&mut self, buf: B, pos: u64) -> (Result, B) { - let (result, buf) = self.write_at(TokioUringBuf { buf }, pos).submit().await; +impl Write for TokioUringFile { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + let (result, buf) = self + .file + .as_ref() + .expect("read file after closed") + .write_all_at(TokioUringBuf { buf }, self.pos) + .await; + self.pos += buf.buf.bytes_init() as u64; (result.map_err(Error::from), buf.buf) } async fn sync_data(&self) -> Result<(), Error> { - File::sync_data(self).await?; + File::sync_data(self.file.as_ref().expect("read file after closed")).await?; Ok(()) } async fn sync_all(&self) -> Result<(), Error> { - File::sync_all(self).await?; + File::sync_all(self.file.as_ref().expect("read file after closed")).await?; Ok(()) } - async fn close(self) -> Result<(), Error> { - File::close(self).await?; + async fn close(&mut self) -> Result<(), Error> { + File::close(self.file.take().expect("close file twice")).await?; Ok(()) } } -impl Read for File { - async fn read(&mut self, pos: u64, len: Option) -> Result { - let buf = vec![0; len.unwrap_or(0) as usize]; - - let (result, buf) = self.read_at(TokioUringBuf { buf }, pos).await; +impl Read for TokioUringFile { + async fn read_exact(&mut self, buf: B) -> Result { + let (result, buf) = self + .file + .as_ref() + .expect("read file after closed") + .read_exact_at(TokioUringBuf { buf }, self.pos) + .await; result?; + self.pos += buf.buf.bytes_init() as u64; + + Ok(buf.buf) + } - #[cfg(not(feature = "bytes"))] - return Ok(buf.buf); - #[cfg(feature = "bytes")] - return Ok(bytes::Bytes::from(buf.buf)); + async fn read_to_end(&mut self, mut buf: Vec) -> Result, Error> { + buf.resize((self.size().await? - self.pos) as usize, 0); + + Ok(self.read_exact(buf).await?) + } + + async fn size(&self) -> Result { + let stat = self + .file + .as_ref() + .expect("read file after closed") + .statx() + .await?; + Ok(stat.stx_size) + } +} + +impl Seek for TokioUringFile { + async fn seek(&mut self, pos: u64) -> Result<(), Error> { + self.pos = pos; + + Ok(()) } }