Skip to content

Commit

Permalink
refactor: Path -> Url
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Sep 29, 2024
1 parent 5682847 commit f5f4c7a
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 772 deletions.
10 changes: 5 additions & 5 deletions fusio/benches/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use criterion::{criterion_group, criterion_main, Criterion};
use fusio::{
fs::{Fs, OpenOptions},
local::TokioFs,
path::Path,
IoBuf, IoBufMut, Write,
};
use rand::Rng;
use tempfile::NamedTempFile;
use url::Url;

fn write(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
Expand All @@ -24,11 +24,11 @@ fn write(c: &mut Criterion) {
let write_bytes = Arc::new(write_bytes);

let temp_file = NamedTempFile::new().unwrap();
let path = Path::from_filesystem_path(temp_file.path()).unwrap();
let url = Url::from_file_path(temp_file.path()).unwrap();

let fs = TokioFs;
let file = Rc::new(RefCell::new(runtime.block_on(async {
fs.open_options(&path, OpenOptions::default().write(true).append(true))
fs.open_options(&url, OpenOptions::default().write(true).append(true))
.await
.unwrap()
})));
Expand Down Expand Up @@ -80,12 +80,12 @@ fn read(c: &mut Criterion) {
rand::thread_rng().fill(&mut write_bytes);

let temp_file = NamedTempFile::new().unwrap();
let path = Path::from_filesystem_path(temp_file.path()).unwrap();
let url = Url::from_file_path(temp_file.path()).unwrap();

let fs = TokioFs;
let file = Rc::new(RefCell::new(runtime.block_on(async {
let mut file = fs
.open_options(&path, OpenOptions::default().write(true).append(true))
.open_options(&url, OpenOptions::default().write(true).append(true))
.await
.unwrap();
let (result, _) = file.write_all(&write_bytes[..]).await;
Expand Down
48 changes: 24 additions & 24 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::pin::Pin;

use futures_core::Stream;
use url::Url;

use super::{DynSeek, MaybeSendFuture};
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
path::Path,
DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Seek, Write,
};

Expand Down Expand Up @@ -52,27 +52,27 @@ impl<'write> Write for Box<dyn DynFile + 'write> {
}

pub trait DynFs: MaybeSend + MaybeSync {
fn open<'s, 'path: 's>(
fn open<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> {
self.open_options(path, OpenOptions::default())
self.open_options(url, OpenOptions::default())
}

fn open_options<'s, 'path: 's>(
fn open_options<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
options: OpenOptions,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>>;

fn create_dir_all<'s, 'path: 's>(
fn create_dir_all<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

fn list<'s, 'path: 's>(
fn list<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<
Box<
dyn MaybeSendFuture<
Expand All @@ -84,34 +84,34 @@ pub trait DynFs: MaybeSend + MaybeSync {
>,
>;

fn remove<'s, 'path: 's>(
fn remove<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;
}

impl<F: Fs> DynFs for F {
fn open_options<'s, 'path: 's>(
fn open_options<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
options: OpenOptions,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> {
Box::pin(async move {
let file = F::open_options(self, path, options).await?;
let file = F::open_options(self, url, options).await?;
Ok(Box::new(file) as Box<dyn DynFile>)
})
}

fn create_dir_all<'s, 'path: 's>(
fn create_dir_all<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::create_dir_all(path))
Box::pin(F::create_dir_all(url))
}

fn list<'s, 'path: 's>(
fn list<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<
Box<
dyn MaybeSendFuture<
Expand All @@ -123,16 +123,16 @@ impl<F: Fs> DynFs for F {
>,
> {
Box::pin(async move {
let stream = F::list(self, path).await?;
let stream = F::list(self, url).await?;
Ok(Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<FileMeta, Error>>>>)
})
}

fn remove<'s, 'path: 's>(
fn remove<'s, 'url: 's>(
&'s self,
path: &'path Path,
url: &'url Url,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::remove(self, path))
Box::pin(F::remove(self, url))
}
}

Expand Down
14 changes: 12 additions & 2 deletions fusio/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
use std::io;
use std::{io, path::PathBuf};

use thiserror::Error;
use url::Url;

#[derive(Debug, Error)]
#[error(transparent)]
#[non_exhaustive]
pub enum Error {
Io(#[from] io::Error),
#[error("Unable to convert url \"{url}\" to local path")]
InvalidLocalUrl {
url: Url,
},
#[error("Unable to convert path \"{path}\" to local url")]
InvalidLocalPath {
path: PathBuf,
},
#[cfg(feature = "http")]
Http(#[from] http::Error),
#[cfg(feature = "object_store")]
ObjectStore(#[from] object_store::Error),
Path(#[from] crate::path::Error),
#[cfg(feature = "object_store")]
ObjectStorePath(#[from] object_store::path::Error),
#[error("unsupported operation")]
Unsupported,
#[error(transparent)]
Expand Down
17 changes: 9 additions & 8 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,35 @@ use std::future::Future;

use futures_core::Stream;
pub use options::*;
use url::Url;

use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Seek, Write};
use crate::{Error, MaybeSend, MaybeSync, Read, Seek, Write};

#[derive(Debug)]
pub struct FileMeta {
pub path: Path,
pub url: Url,
pub size: u64,
}

pub trait Fs: MaybeSend + MaybeSync {
type File: Read + Seek + Write + MaybeSend + MaybeSync + 'static;

fn open(&self, path: &Path) -> impl Future<Output = Result<Self::File, Error>> {
self.open_options(path, OpenOptions::default())
fn open(&self, url: &Url) -> impl Future<Output = Result<Self::File, Error>> {
self.open_options(url, OpenOptions::default())
}

fn open_options(
&self,
path: &Path,
url: &Url,
options: OpenOptions,
) -> impl Future<Output = Result<Self::File, Error>> + MaybeSend;

fn create_dir_all(path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
fn create_dir_all(url: &Url) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn list(
&self,
path: &Path,
url: &Url,
) -> impl Future<Output = Result<impl Stream<Item = Result<FileMeta, Error>>, Error>> + MaybeSend;

fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
fn remove(&self, url: &Url) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}
61 changes: 43 additions & 18 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ mod error;
#[cfg(feature = "fs")]
pub mod fs;
pub mod local;
pub mod path;
pub mod remotes;

use std::{future::Future, io::Cursor};
Expand Down Expand Up @@ -278,27 +277,33 @@ mod tests {
use futures_util::StreamExt;
use tempfile::TempDir;

use crate::{fs::OpenOptions, path::Path, DynFs};

let tmp_dir = TempDir::new()?;
let work_dir_path = tmp_dir.path().join("work");
let work_file_path = work_dir_path.join("test.file");

fs.create_dir_all(&Path::from_absolute_path(&work_dir_path)?)
.await?;
fs.create_dir_all(&Url::from_file_path(&work_dir_path).map_err(|_| {
Error::InvalidLocalPath {
path: work_dir_path.clone(),
}
})?)
.await?;

assert!(work_dir_path.exists());
assert!(fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone()
})?,
OpenOptions::default()
)
.await
.is_err());
{
let _ = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
OpenOptions::default().create(true).write(true),
)
.await?;
Expand All @@ -307,14 +312,18 @@ mod tests {
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
OpenOptions::default().write(true),
)
.await?;
file.write_all("Hello! fusio".as_bytes()).await.0?;
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
OpenOptions::default().write(true),
)
.await?;
Expand All @@ -325,7 +334,9 @@ mod tests {
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
OpenOptions::default().append(true),
)
.await?;
Expand All @@ -336,7 +347,9 @@ mod tests {
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)?,
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
OpenOptions::default(),
)
.await?;
Expand All @@ -346,27 +359,39 @@ mod tests {
&file.read_to_end(Vec::new()).await?
)
}
fs.remove(&Path::from_filesystem_path(&work_file_path)?)
.await?;
fs.remove(
&Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath {
path: work_file_path.clone(),
})?,
)
.await?;
assert!(!work_file_path.exists());

let mut file_set = HashSet::new();
for i in 0..10 {
let path = work_dir_path.join(i.to_string());
let _ = fs
.open_options(
&Path::from_absolute_path(work_dir_path.join(i.to_string()))?,
&Url::from_file_path(&path)
.map_err(|_| Error::InvalidLocalPath { path: path.clone() })?,
OpenOptions::default().create(true).write(true),
)
.await?;
file_set.insert(i.to_string());
}

let path = Path::from_filesystem_path(&work_dir_path)?;
let mut file_stream = Box::pin(fs.list(&path).await?);
let url = Url::from_file_path(&work_dir_path).map_err(|_| Error::InvalidLocalPath {
path: work_dir_path.clone(),
})?;
let mut file_stream = Box::pin(fs.list(&url).await?);

while let Some(file_meta) = file_stream.next().await {
if let Some(file_name) = file_meta?.path.filename() {
assert!(file_set.remove(file_name));
let path = file_meta?
.url
.to_file_path()
.map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?;
if let Some(file_name) = path.file_name() {
assert!(file_set.remove(file_name.to_str().unwrap()));
}
}
assert!(file_set.is_empty());
Expand Down
Loading

0 comments on commit f5f4c7a

Please sign in to comment.