Skip to content

Commit

Permalink
refactor: make buf writer more general
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Oct 24, 2024
1 parent cc9895f commit 0c98086
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 39 deletions.
39 changes: 18 additions & 21 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::pin::Pin;

use futures_core::Stream;

use super::{buf::BufWriter, MaybeSendFuture};
use super::MaybeSendFuture;
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
Expand Down Expand Up @@ -93,15 +93,8 @@ impl<F: Fs> DynFs for F {
options: OpenOptions,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> {
Box::pin(async move {
let buf_size = options.buf_size;
let file = F::open_options(self, path, options).await?;
let dyn_file = Box::new(file) as Box<dyn DynFile>;
match buf_size {
Some(buf_size) => {
Ok(Box::new(BufWriter::new(dyn_file, buf_size)) as Box<dyn DynFile>)
}
None => Ok(dyn_file),
}
Ok(Box::new(file) as Box<dyn DynFile>)
})
}

Expand Down Expand Up @@ -161,20 +154,23 @@ mod tests {
async fn test_dyn_buf_fs() {
use tempfile::NamedTempFile;

use crate::{disk::TokioFs, fs::OpenOptions, DynFs, Read, Write};
use crate::{
disk::TokioFs,
fs::{Fs, OpenOptions},
impls::buffered::BufWriter,
Read, Write,
};

let open_options = OpenOptions::default()
.create(true)
.write(true)
.read(true)
.buf_size(Some(5));
let fs = Box::new(TokioFs) as Box<dyn DynFs>;
let open_options = OpenOptions::default().create(true).write(true).read(true);
let fs = TokioFs;
let temp_file = NamedTempFile::new().unwrap();
let path = temp_file.into_temp_path();
let mut dyn_file = fs
.open_options(&path.to_str().unwrap().into(), open_options)
.await
.unwrap();
let mut dyn_file = BufWriter::new(
fs.open_options(&path.to_str().unwrap().into(), open_options)
.await
.unwrap(),
4 * 1024,
);

let buf = [24, 9, 24, 0];
let (result, _) = dyn_file.write_all(&buf[..]).await;
Expand All @@ -185,7 +181,8 @@ mod tests {
let buf = [34, 19, 34, 10];
let (result, _) = dyn_file.write_all(&buf[..]).await;
result.unwrap();
let (_, buf) = dyn_file.read_to_end_at(vec![], 0).await;
dyn_file.flush().await.unwrap();
let (_, buf) = dyn_file.read_exact_at(vec![0; 4], 0).await;
assert_eq!(buf.as_slice(), &[24, 9, 24, 0]);

dyn_file.flush().await.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions fusio/src/dynamic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#[cfg(feature = "fs")]
mod buf;
#[cfg(feature = "fs")]
pub mod fs;

use std::{future::Future, pin::Pin};
Expand Down
7 changes: 0 additions & 7 deletions fusio/src/fs/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub struct OpenOptions {
pub write: bool,
pub create: bool,
pub truncate: bool,
pub buf_size: Option<usize>,
}

impl Default for OpenOptions {
Expand All @@ -13,7 +12,6 @@ impl Default for OpenOptions {
write: false,
create: false,
truncate: false,
buf_size: None,
}
}
}
Expand All @@ -39,9 +37,4 @@ impl OpenOptions {
self.truncate = truncate;
self
}

pub fn buf_size(mut self, buf_size: Option<usize>) -> Self {
self.buf_size = buf_size;
self
}
}
18 changes: 9 additions & 9 deletions fusio/src/dynamic/buf.rs → fusio/src/impls/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{dynamic::DynFile, Error, IoBuf, IoBufMut, Read, Write};
use crate::{Error, IoBuf, IoBufMut, Read, Write};

pub struct BufWriter {
inner: Box<dyn DynFile>,
pub struct BufWriter<F> {
inner: F,
buf: Option<Vec<u8>>,
capacity: usize,
pos: usize,
}

impl BufWriter {
pub(crate) fn new(file: Box<dyn DynFile>, capacity: usize) -> Self {
impl<F> BufWriter<F> {
pub fn new(file: F, capacity: usize) -> Self {
Self {
inner: file,
buf: Some(Vec::with_capacity(capacity)),
Expand All @@ -18,7 +18,7 @@ impl BufWriter {
}
}

impl Read for BufWriter {
impl<F: Read> Read for BufWriter<F> {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
self.inner.read_exact_at(buf, pos).await
}
Expand All @@ -33,7 +33,7 @@ impl Read for BufWriter {
}
}

impl Write for BufWriter {
impl<F: Write> Write for BufWriter<F> {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let written_size = buf.bytes_init();
if self.pos + written_size > self.capacity {
Expand Down Expand Up @@ -87,10 +87,10 @@ pub(crate) mod tests {
async fn test_buf_read_write() {
use tempfile::tempfile;

use crate::{dynamic::buf::BufWriter, Read, Write};
use crate::{impls::buffered::BufWriter, Read, Write};

let file = tokio::fs::File::from_std(tempfile().unwrap());
let mut writer = BufWriter::new(Box::new(file), 4);
let mut writer = BufWriter::new(file, 4);
{
let _ = writer.write_all("Hello".as_bytes()).await;
let buf = vec![];
Expand Down
1 change: 1 addition & 0 deletions fusio/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod buffered;
pub mod disk;
pub mod remotes;

Expand Down

0 comments on commit 0c98086

Please sign in to comment.