Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Sep 27, 2024
1 parent 317b1b0 commit d3bd0e1
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 11 deletions.
10 changes: 10 additions & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bytes = ["dep:bytes"]
completion-based = []
default = ["dyn", "fs"]
dyn = []
futures = ["dep:futures-util"]
fs = ["tokio?/rt"]
h2 = ["dep:h2"]
http = [
Expand All @@ -38,6 +39,12 @@ tokio = ["async-stream", "dep:tokio"]
tokio-http = ["dep:reqwest", "http"]
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]

[[bench]]
harness = false
name = "tokio"
path = "benches/tokio.rs"
required-features = ["tokio"]

[dependencies]
async-stream = { version = "0.3", optional = true }
bytes = { workspace = true, optional = true }
Expand Down Expand Up @@ -79,9 +86,12 @@ url = { version = "2" }
tokio-uring = { version = "0.5", default-features = false, optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
futures-executor = "0.3"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
monoio = { version = "0.2" }
rand = "0.8"
tempfile = "3"
tokio = { version = "1", features = ["full"] }

Expand Down
56 changes: 56 additions & 0 deletions fusio/benches/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::mem;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use fusio::{
fs::{Fs, OpenOptions},
local::TokioFs,
path::Path,
Write,
};
use rand::Rng;
use tempfile::{NamedTempFile, TempDir};
use tokio::{fs::File, io::AsyncWriteExt};

fn write(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.enable_all()
.build()
.unwrap();

let mut group = c.benchmark_group("write");

let mut write_bytes = [0u8; 4096];
rand::thread_rng().fill(&mut write_bytes);

let temp_file = NamedTempFile::new().unwrap();
let path = Path::from_filesystem_path(temp_file.path()).unwrap();

let fs = TokioFs;

group.bench_function("write 4k", |b| {
b.to_async(&runtime).iter_batched_ref(
|| {
futures_executor::block_on(async {
fs.open_options(
&path,
OpenOptions::default().create(true).write(true).append(true),
)
.await
.unwrap()
})
},
|file| async move {
let (result, _) = fusio::Write::write_all(file, &write_bytes[..]).await;
result.unwrap();
},
BatchSize::SmallInput,
);
});

group.finish();
}

criterion_group!(benches, write);
criterion_main!(benches);
16 changes: 8 additions & 8 deletions fusio/src/fs/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@ impl Default for OpenOptions {
}

impl OpenOptions {
pub fn read(mut self) -> Self {
self.read = true;
pub fn read(mut self, read: bool) -> Self {
self.read = read;
self
}

pub fn write(mut self) -> Self {
self.write = Some(WriteMode::Overwrite);
pub fn write(mut self, write: bool) -> Self {
self.write = write.then_some(WriteMode::Overwrite);
self
}

pub fn create(mut self) -> Self {
self.create = true;
pub fn create(mut self, create: bool) -> Self {
self.create = create;
self
}

pub fn append(mut self) -> Self {
self.write = Some(WriteMode::Append);
pub fn append(mut self, append: bool) -> Self {
self.write = append.then_some(WriteMode::Append);
self
}
}
122 changes: 120 additions & 2 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,17 @@ impl<W: Write> Write for &mut W {

#[cfg(test)]
mod tests {
use super::{Read, Write};
use crate::{buf::IoBufMut, Error, IoBuf, Seek};
use std::collections::HashSet;

use tempfile::TempDir;

use super::{DynFs, Read, Write};
use crate::{
buf::IoBufMut,
fs::{Fs, OpenOptions},
path::Path,
Error, IoBuf, Seek,
};

#[allow(unused)]
struct CountWrite<W> {
Expand Down Expand Up @@ -268,6 +277,107 @@ mod tests {
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}

#[cfg(feature = "futures")]
async fn test_local_fs<S>(fs: S) -> Result<(), Error>
where
S: Fs,
{
use futures_util::StreamExt;

let tmp_dir = TempDir::new()?;
let work_dir_path = tmp_dir.path().join("work");
let work_file_path = work_dir_path.join("test.file");

fs.create_dir_all(&Path::from_absolute_path(&work_dir_path)?)
.await?;

assert!(work_dir_path.exists());
assert!(fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default()
)
.await
.is_err());
{
let _ = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default().create(true).write(true),
)
.await?;
assert!(work_file_path.exists());
}
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default().write(true),
)
.await?;
file.write_all("Hello! fusio".as_bytes()).await.0?;
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default().write(true),
)
.await?;
file.write_all("Hello! world".as_bytes()).await.0?;

assert!(file.read_exact(vec![0u8; 24]).await.is_err());
}
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default().append(true),
)
.await?;
file.write_all("Hello! fusio".as_bytes()).await.0?;

assert!(file.read_exact(vec![0u8; 24]).await.is_err());
}
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
OpenOptions::default(),
)
.await?;

assert_eq!(
"Hello! worldHello! fusio".as_bytes(),
&file.read_to_end(Vec::new()).await?
)
}
fs.remove(&Path::from_filesystem_path(&work_file_path)?)
.await?;
assert!(!work_file_path.exists());

let mut file_set = HashSet::new();
for i in 0..10 {
let _ = fs
.open_options(
&Path::from_absolute_path(work_dir_path.join(i.to_string()))?,
OpenOptions::default().create(true).write(true),
)
.await?;
file_set.insert(i.to_string());
}

let path = Path::from_filesystem_path(&work_dir_path)?;
let mut file_stream = Box::pin(fs.list(&path).await?);

while let Some(file_meta) = file_stream.next().await {
if let Some(file_name) = file_meta?.path.filename() {
assert!(file_set.remove(file_name));
}
}
assert!(file_set.is_empty());

Ok(())
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_tokio() {
Expand All @@ -280,6 +390,14 @@ mod tests {
write_and_read(File::from_std(write), File::from_std(read)).await;
}

#[cfg(all(feature = "tokio", feature = "futures"))]
#[tokio::test]
async fn test_tokio_fs() {
use crate::local::TokioFs;

test_local_fs(TokioFs).await.unwrap();
}

#[cfg(feature = "monoio")]
#[monoio::test]
async fn test_monoio() {
Expand Down
2 changes: 1 addition & 1 deletion fusio/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,6 @@ mod tests {
let this_path = Path::from_filesystem_path(temp_file.path()).unwrap();
let std_path = path_to_local(&this_path).unwrap();

assert_eq!(std_path, std::fs::canonicalize(temp_file.path()).unwrap());
assert_eq!(std_path, temp_file.path());
}
}

0 comments on commit d3bd0e1

Please sign in to comment.