Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update Read & Write for tokio-uring impl #35

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,43 @@ jobs:
command: test
args: --package fusio --features "monoio, futures"

tokio_uring_check:
name: Rust project check on tokio_uring
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install latest
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

# `cargo check` command here will use installed `nightly`
# as it is set as an "override" for current directory

- name: Run cargo clippy on tokio-uring
uses: actions-rs/cargo@v1
with:
command: check
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo build on tokio-uring
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo test on tokio-uring
uses: actions-rs/cargo@v1
with:
command: test
args: --package fusio --features "tokio-uring, futures"

# 2
fmt:
name: Rust fmt
Expand Down
4 changes: 4 additions & 0 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl<'read> Read for Box<dyn DynFile + 'read> {
Ok(unsafe { B::recover_from_buf_mut(buf) })
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
DynRead::read_to_end(self.as_mut(), buf).await
}

async fn size(&self) -> Result<u64, Error> {
DynRead::size(self.as_ref()).await
}
Expand Down
62 changes: 47 additions & 15 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ pub trait Read: MaybeSend + MaybeSync {

fn read_to_end(
&mut self,
mut buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend {
async move {
buf.resize(self.size().await? as usize, 0);
let buf = self.read_exact(buf).await?;
Ok(buf)
}
}
buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend;

fn size(&self) -> impl Future<Output = Result<u64, Error>> + MaybeSend;
}
Expand All @@ -93,6 +87,11 @@ where
Ok(buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
let _ = std::io::Read::read_to_end(self, &mut buf)?;
Ok(buf)
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.get_ref().as_ref().len() as u64)
}
Expand Down Expand Up @@ -143,6 +142,13 @@ impl<R: Read> Read for &mut R {
R::read_exact(self, buf)
}

fn read_to_end(
&mut self,
buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend {
R::read_to_end(self, buf)
}

fn size(&self) -> impl Future<Output = Result<u64, Error>> + MaybeSend {
R::size(self)
}
Expand Down Expand Up @@ -171,7 +177,9 @@ impl<W: Write> Write for &mut W {

#[cfg(test)]
mod tests {
use super::{Read, Write};
use std::future::Future;

use super::{MaybeSend, Read, Write};
use crate::{buf::IoBufMut, Error, IoBuf, Seek};

#[allow(unused)]
Expand Down Expand Up @@ -233,6 +241,13 @@ mod tests {
.inspect(|buf| self.cnt += buf.bytes_init())
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
self.r
.read_to_end(buf)
.await
.inspect(|buf| self.cnt += buf.bytes_init())
}

async fn size(&self) -> Result<u64, Error> {
self.r.size().await
}
Expand All @@ -259,13 +274,24 @@ mod tests {
writer.sync_data().await.unwrap();

let mut reader = CountRead::new(read);
reader.seek(0).await.unwrap();
{
reader.seek(0).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();
assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}
{
reader.seek(2).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
assert_eq!(buf.bytes_init(), 2);
assert_eq!(buf.as_slice(), &[2, 4]);
}
}

#[cfg(feature = "futures")]
Expand Down Expand Up @@ -418,11 +444,17 @@ mod tests {
use tempfile::tempfile;
use tokio_uring::fs::File;

use crate::local::tokio_uring::TokioUringFile;

tokio_uring::start(async {
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();

write_and_read(File::from_std(write), File::from_std(read)).await;
write_and_read(
TokioUringFile::from(File::from_std(write)),
TokioUringFile::from(File::from_std(read)),
)
.await;
});
}
}
4 changes: 3 additions & 1 deletion fusio/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) mod monoio;
#[cfg(feature = "tokio")]
pub(crate) mod tokio;
#[cfg(all(feature = "tokio-uring", target_os = "linux"))]
mod tokio_uring;
pub(crate) mod tokio_uring;

#[cfg(all(feature = "monoio", feature = "fs"))]
#[allow(unused)]
Expand All @@ -21,5 +21,7 @@ cfg_if::cfg_if! {
pub type LocalFs = TokioFs;
} else if #[cfg(feature = "monoio")] {
pub type LocalFs = MonoIoFs;
} else if #[cfg(feature = "tokio-uring")] {
pub type LocalFs = TokioUringFs;
}
}
10 changes: 9 additions & 1 deletion fusio/src/local/monoio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#[cfg(feature = "fs")]
pub mod fs;

use std::future::Future;

use monoio::fs::File;

use crate::{buf::IoBufMut, Error, IoBuf, Read, Seek, Write};
use crate::{buf::IoBufMut, Error, IoBuf, MaybeSend, Read, Seek, Write};

#[repr(transparent)]
struct MonoioBuf<B> {
Expand Down Expand Up @@ -94,6 +96,12 @@ impl Read for MonoioFile {
Ok(buf.buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
buf.resize((self.size().await? - self.pos) as usize, 0);

Ok(self.read_exact(buf).await?)
}

async fn size(&self) -> Result<u64, Error> {
let metadata = File::metadata(self.file.as_ref().expect("read file after closed")).await?;
Ok(metadata.len())
Expand Down
5 changes: 5 additions & 0 deletions fusio/src/local/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl Read for File {
Ok(buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
let _ = AsyncReadExt::read_to_end(self, &mut buf).await?;
Ok(buf)
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.metadata().await?.len())
}
Expand Down
55 changes: 42 additions & 13 deletions fusio/src/local/tokio_uring/fs.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,62 @@
use std::{io, path::Path};

use async_stream::stream;
use futures_core::Stream;
use tokio_uring::fs::{remove_file, File};
use tokio_uring::fs::{create_dir_all, remove_file};

use crate::fs::{FileMeta, Fs};
use crate::{
fs::{FileMeta, Fs, OpenOptions, WriteMode},
local::tokio_uring::TokioUringFile,
path::{path_to_local, Path},
Error,
};

pub struct TokioUringFs;

impl Fs for TokioUringFs {
type File = File;
type File = TokioUringFile;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

let file = tokio_uring::fs::OpenOptions::new()
.read(options.read)
.write(options.write.is_some())
.create(options.create)
.append(options.write == Some(WriteMode::Append))
.truncate(options.write == Some(WriteMode::Overwrite))
.open(&local_path)
.await?;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
File::open(path).await
Ok(TokioUringFile {
file: Some(file),
pos: 0,
})
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;
create_dir_all(path).await?;

Ok(())
}

async fn list(
&self,
path: impl AsRef<Path>,
) -> io::Result<impl Stream<Item = io::Result<FileMeta>>> {
let dir = path.as_ref().read_dir()?;
path: &Path,
) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
let path = path_to_local(path)?;
let dir = path.read_dir()?;

Ok(stream! {
for entry in dir {
yield Ok(crate::fs::FileMeta { path: entry?.path() });
let entry = entry?;
yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() });
}
})
}

async fn remove(&self, path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(remove_file(path).await?)
}
}
Loading
Loading