From f5f4c7ac9cc1c76997e81ed888f788fd89e265c1 Mon Sep 17 00:00:00 2001 From: Kould <kould2333@gmail.com> Date: Sun, 29 Sep 2024 12:12:09 +0800 Subject: [PATCH] refactor: Path -> Url --- fusio/benches/tokio.rs | 10 +- fusio/src/dynamic/fs.rs | 48 +-- fusio/src/error.rs | 14 +- fusio/src/fs/mod.rs | 17 +- fusio/src/lib.rs | 61 +++- fusio/src/local/monoio/fs.rs | 42 ++- fusio/src/local/tokio/fs.rs | 40 ++- fusio/src/local/tokio_uring/fs.rs | 2 +- fusio/src/path/mod.rs | 518 --------------------------- fusio/src/path/parts.rs | 149 -------- fusio/src/remotes/aws/s3.rs | 10 +- fusio/src/remotes/object_store/fs.rs | 23 +- 12 files changed, 162 insertions(+), 772 deletions(-) delete mode 100644 fusio/src/path/mod.rs delete mode 100644 fusio/src/path/parts.rs diff --git a/fusio/benches/tokio.rs b/fusio/benches/tokio.rs index 589d485..3bed982 100644 --- a/fusio/benches/tokio.rs +++ b/fusio/benches/tokio.rs @@ -4,11 +4,11 @@ use criterion::{criterion_group, criterion_main, Criterion}; use fusio::{ fs::{Fs, OpenOptions}, local::TokioFs, - path::Path, IoBuf, IoBufMut, Write, }; use rand::Rng; use tempfile::NamedTempFile; +use url::Url; fn write(c: &mut Criterion) { let runtime = tokio::runtime::Builder::new_multi_thread() @@ -24,11 +24,11 @@ fn write(c: &mut Criterion) { let write_bytes = Arc::new(write_bytes); let temp_file = NamedTempFile::new().unwrap(); - let path = Path::from_filesystem_path(temp_file.path()).unwrap(); + let url = Url::from_file_path(temp_file.path()).unwrap(); let fs = TokioFs; let file = Rc::new(RefCell::new(runtime.block_on(async { - fs.open_options(&path, OpenOptions::default().write(true).append(true)) + fs.open_options(&url, OpenOptions::default().write(true).append(true)) .await .unwrap() }))); @@ -80,12 +80,12 @@ fn read(c: &mut Criterion) { rand::thread_rng().fill(&mut write_bytes); let temp_file = NamedTempFile::new().unwrap(); - let path = Path::from_filesystem_path(temp_file.path()).unwrap(); + let url = Url::from_file_path(temp_file.path()).unwrap(); let fs = TokioFs; let file = Rc::new(RefCell::new(runtime.block_on(async { let mut file = fs - .open_options(&path, OpenOptions::default().write(true).append(true)) + .open_options(&url, OpenOptions::default().write(true).append(true)) .await .unwrap(); let (result, _) = file.write_all(&write_bytes[..]).await; diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index c29a7dd..81d99e9 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -1,12 +1,12 @@ use std::pin::Pin; use futures_core::Stream; +use url::Url; use super::{DynSeek, MaybeSendFuture}; use crate::{ buf::IoBufMut, fs::{FileMeta, Fs, OpenOptions}, - path::Path, DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Seek, Write, }; @@ -52,27 +52,27 @@ impl<'write> Write for Box<dyn DynFile + 'write> { } pub trait DynFs: MaybeSend + MaybeSync { - fn open<'s, 'path: 's>( + fn open<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> { - self.open_options(path, OpenOptions::default()) + self.open_options(url, OpenOptions::default()) } - fn open_options<'s, 'path: 's>( + fn open_options<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, options: OpenOptions, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>>; - fn create_dir_all<'s, 'path: 's>( + fn create_dir_all<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>; - fn list<'s, 'path: 's>( + fn list<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin< Box< dyn MaybeSendFuture< @@ -84,34 +84,34 @@ pub trait DynFs: MaybeSend + MaybeSync { >, >; - fn remove<'s, 'path: 's>( + fn remove<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>; } impl<F: Fs> DynFs for F { - fn open_options<'s, 'path: 's>( + fn open_options<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, options: OpenOptions, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> { Box::pin(async move { - let file = F::open_options(self, path, options).await?; + let file = F::open_options(self, url, options).await?; Ok(Box::new(file) as Box<dyn DynFile>) }) } - fn create_dir_all<'s, 'path: 's>( + fn create_dir_all<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> { - Box::pin(F::create_dir_all(path)) + Box::pin(F::create_dir_all(url)) } - fn list<'s, 'path: 's>( + fn list<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin< Box< dyn MaybeSendFuture< @@ -123,16 +123,16 @@ impl<F: Fs> DynFs for F { >, > { Box::pin(async move { - let stream = F::list(self, path).await?; + let stream = F::list(self, url).await?; Ok(Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<FileMeta, Error>>>>) }) } - fn remove<'s, 'path: 's>( + fn remove<'s, 'url: 's>( &'s self, - path: &'path Path, + url: &'url Url, ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> { - Box::pin(F::remove(self, path)) + Box::pin(F::remove(self, url)) } } diff --git a/fusio/src/error.rs b/fusio/src/error.rs index a15ad54..ee7b9a8 100644 --- a/fusio/src/error.rs +++ b/fusio/src/error.rs @@ -1,17 +1,27 @@ -use std::io; +use std::{io, path::PathBuf}; use thiserror::Error; +use url::Url; #[derive(Debug, Error)] #[error(transparent)] #[non_exhaustive] pub enum Error { Io(#[from] io::Error), + #[error("Unable to convert url \"{url}\" to local path")] + InvalidLocalUrl { + url: Url, + }, + #[error("Unable to convert path \"{path}\" to local url")] + InvalidLocalPath { + path: PathBuf, + }, #[cfg(feature = "http")] Http(#[from] http::Error), #[cfg(feature = "object_store")] ObjectStore(#[from] object_store::Error), - Path(#[from] crate::path::Error), + #[cfg(feature = "object_store")] + ObjectStorePath(#[from] object_store::path::Error), #[error("unsupported operation")] Unsupported, #[error(transparent)] diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index f05ab72..0f581a5 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -4,34 +4,35 @@ use std::future::Future; use futures_core::Stream; pub use options::*; +use url::Url; -use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Seek, Write}; +use crate::{Error, MaybeSend, MaybeSync, Read, Seek, Write}; #[derive(Debug)] pub struct FileMeta { - pub path: Path, + pub url: Url, pub size: u64, } pub trait Fs: MaybeSend + MaybeSync { type File: Read + Seek + Write + MaybeSend + MaybeSync + 'static; - fn open(&self, path: &Path) -> impl Future<Output = Result<Self::File, Error>> { - self.open_options(path, OpenOptions::default()) + fn open(&self, url: &Url) -> impl Future<Output = Result<Self::File, Error>> { + self.open_options(url, OpenOptions::default()) } fn open_options( &self, - path: &Path, + url: &Url, options: OpenOptions, ) -> impl Future<Output = Result<Self::File, Error>> + MaybeSend; - fn create_dir_all(path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend; + fn create_dir_all(url: &Url) -> impl Future<Output = Result<(), Error>> + MaybeSend; fn list( &self, - path: &Path, + url: &Url, ) -> impl Future<Output = Result<impl Stream<Item = Result<FileMeta, Error>>, Error>> + MaybeSend; - fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend; + fn remove(&self, url: &Url) -> impl Future<Output = Result<(), Error>> + MaybeSend; } diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 9c9bbc3..de1e811 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -5,7 +5,6 @@ mod error; #[cfg(feature = "fs")] pub mod fs; pub mod local; -pub mod path; pub mod remotes; use std::{future::Future, io::Cursor}; @@ -278,19 +277,23 @@ mod tests { use futures_util::StreamExt; use tempfile::TempDir; - use crate::{fs::OpenOptions, path::Path, DynFs}; - let tmp_dir = TempDir::new()?; let work_dir_path = tmp_dir.path().join("work"); let work_file_path = work_dir_path.join("test.file"); - fs.create_dir_all(&Path::from_absolute_path(&work_dir_path)?) - .await?; + fs.create_dir_all(&Url::from_file_path(&work_dir_path).map_err(|_| { + Error::InvalidLocalPath { + path: work_dir_path.clone(), + } + })?) + .await?; assert!(work_dir_path.exists()); assert!(fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone() + })?, OpenOptions::default() ) .await @@ -298,7 +301,9 @@ mod tests { { let _ = fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, OpenOptions::default().create(true).write(true), ) .await?; @@ -307,14 +312,18 @@ mod tests { { let mut file = fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, OpenOptions::default().write(true), ) .await?; file.write_all("Hello! fusio".as_bytes()).await.0?; let mut file = fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, OpenOptions::default().write(true), ) .await?; @@ -325,7 +334,9 @@ mod tests { { let mut file = fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, OpenOptions::default().append(true), ) .await?; @@ -336,7 +347,9 @@ mod tests { { let mut file = fs .open_options( - &Path::from_absolute_path(&work_file_path)?, + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, OpenOptions::default(), ) .await?; @@ -346,27 +359,39 @@ mod tests { &file.read_to_end(Vec::new()).await? ) } - fs.remove(&Path::from_filesystem_path(&work_file_path)?) - .await?; + fs.remove( + &Url::from_file_path(&work_file_path).map_err(|_| Error::InvalidLocalPath { + path: work_file_path.clone(), + })?, + ) + .await?; assert!(!work_file_path.exists()); let mut file_set = HashSet::new(); for i in 0..10 { + let path = work_dir_path.join(i.to_string()); let _ = fs .open_options( - &Path::from_absolute_path(work_dir_path.join(i.to_string()))?, + &Url::from_file_path(&path) + .map_err(|_| Error::InvalidLocalPath { path: path.clone() })?, OpenOptions::default().create(true).write(true), ) .await?; file_set.insert(i.to_string()); } - let path = Path::from_filesystem_path(&work_dir_path)?; - let mut file_stream = Box::pin(fs.list(&path).await?); + let url = Url::from_file_path(&work_dir_path).map_err(|_| Error::InvalidLocalPath { + path: work_dir_path.clone(), + })?; + let mut file_stream = Box::pin(fs.list(&url).await?); while let Some(file_meta) = file_stream.next().await { - if let Some(file_name) = file_meta?.path.filename() { - assert!(file_set.remove(file_name)); + let path = file_meta? + .url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; + if let Some(file_name) = path.file_name() { + assert!(file_set.remove(file_name.to_str().unwrap())); } } assert!(file_set.is_empty()); diff --git a/fusio/src/local/monoio/fs.rs b/fusio/src/local/monoio/fs.rs index fb9d447..28e8acd 100644 --- a/fusio/src/local/monoio/fs.rs +++ b/fusio/src/local/monoio/fs.rs @@ -2,6 +2,7 @@ use std::fs::create_dir_all; use async_stream::stream; use futures_core::Stream; +use url::Url; use super::MonoioFile; use crate::{ @@ -15,8 +16,10 @@ pub struct MonoIoFs; impl Fs for MonoIoFs { type File = MonoioFile; - async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> { - let local_path = path_to_local(path)?; + async fn open_options(&self, url: &Url, options: OpenOptions) -> Result<Self::File, Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; Ok(MonoioFile::from( monoio::fs::OpenOptions::new() @@ -25,35 +28,44 @@ impl Fs for MonoIoFs { .create(options.create) .append(options.write == Some(WriteMode::Append)) .truncate(options.write == Some(WriteMode::Overwrite)) - .open(&local_path) + .open(&path) .await?, )) } - async fn create_dir_all(path: &Path) -> Result<(), Error> { - let path = path_to_local(path)?; + async fn create_dir_all(url: &Url) -> Result<(), Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; + create_dir_all(path)?; Ok(()) } - async fn list( - &self, - path: &Path, - ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { - let path = path_to_local(path)?; - let dir = path.read_dir()?; + async fn list(&self, url: &Url) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; Ok(stream! { - for entry in dir { + for entry in path.read_dir()? { let entry = entry?; - yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() }); + let path = entry.path(); + + let url = Url::from_file_path(&path) + .map_err(|_| Error::InvalidLocalPath { path })?; + + + yield Ok(FileMeta { url, size: entry.metadata()?.len() }); } }) } - async fn remove(&self, path: &Path) -> Result<(), Error> { - let path = path_to_local(path)?; + async fn remove(&self, url: &Url) -> Result<(), Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; Ok(std::fs::remove_file(path)?) } diff --git a/fusio/src/local/tokio/fs.rs b/fusio/src/local/tokio/fs.rs index 7783b2b..fd812c0 100644 --- a/fusio/src/local/tokio/fs.rs +++ b/fusio/src/local/tokio/fs.rs @@ -6,10 +6,10 @@ use tokio::{ fs::{create_dir_all, remove_file, File}, task::spawn_blocking, }; +use url::Url; use crate::{ fs::{FileMeta, Fs, OpenOptions, WriteMode}, - path::{path_to_local, Path}, Error, }; @@ -18,8 +18,10 @@ pub struct TokioFs; impl Fs for TokioFs { type File = File; - async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> { - let local_path = path_to_local(path)?; + async fn open_options(&self, url: &Url, options: OpenOptions) -> Result<Self::File, Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; Ok(tokio::fs::OpenOptions::new() .read(options.read) @@ -27,29 +29,36 @@ impl Fs for TokioFs { .create(options.create) .append(options.write == Some(WriteMode::Append)) .truncate(options.write == Some(WriteMode::Overwrite)) - .open(&local_path) + .open(path) .await?) } - async fn create_dir_all(path: &Path) -> Result<(), Error> { - let path = path_to_local(path)?; + async fn create_dir_all(url: &Url) -> Result<(), Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; + create_dir_all(path).await?; Ok(()) } - async fn list( - &self, - path: &Path, - ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { - let path = path_to_local(path)?; + async fn list(&self, url: &Url) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; spawn_blocking(move || { let entries = path.read_dir()?; Ok::<_, Error>(stream! { for entry in entries { let entry = entry?; - yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() }); + let path = entry.path(); + + let url = Url::from_file_path(&path) + .map_err(|_| Error::InvalidLocalPath { path })?; + + yield Ok(FileMeta { url, size: entry.metadata()?.len() }); } }) }) @@ -57,10 +66,13 @@ impl Fs for TokioFs { .map_err(io::Error::from)? } - async fn remove(&self, path: &Path) -> Result<(), Error> { - let path = path_to_local(path)?; + async fn remove(&self, url: &Url) -> Result<(), Error> { + let path = url + .to_file_path() + .map_err(|_| Error::InvalidLocalUrl { url: url.clone() })?; remove_file(&path).await?; + Ok(()) } } diff --git a/fusio/src/local/tokio_uring/fs.rs b/fusio/src/local/tokio_uring/fs.rs index 4121548..7478a81 100644 --- a/fusio/src/local/tokio_uring/fs.rs +++ b/fusio/src/local/tokio_uring/fs.rs @@ -22,7 +22,7 @@ impl Fs for TokioUringFs { let dir = path.as_ref().read_dir()?; Ok(stream! { for entry in dir { - yield Ok(crate::fs::FileMeta { path: entry?.path() }); + yield Ok(crate::fs::FileMeta { url: entry?.path() }); } }) } diff --git a/fusio/src/path/mod.rs b/fusio/src/path/mod.rs deleted file mode 100644 index bd60280..0000000 --- a/fusio/src/path/mod.rs +++ /dev/null @@ -1,518 +0,0 @@ -use std::{fmt::Formatter, path::PathBuf}; - -use itertools::Itertools; -use percent_encoding::percent_decode; -use thiserror::Error; -use url::Url; - -/// The delimiter to separate object namespaces, creating a directory structure. -pub const DELIMITER: &str = "/"; - -/// The path delimiter as a single byte -pub const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0]; - -mod parts; - -pub use parts::{InvalidPart, PathPart}; - -#[derive(Debug, Error)] -#[error(transparent)] -pub enum Error { - #[error("Path \"{path}\" contained empty path segment")] - EmptySegment { path: String }, - #[error("Error parsing Path \"{path}\": {source}")] - BadSegment { path: String, source: InvalidPart }, - #[error("Failed to canonicalize path \"{path}\": {source}")] - Canonicalize { - path: std::path::PathBuf, - source: std::io::Error, - }, - #[error("Unable to convert path \"{path}\" to URL")] - InvalidPath { path: PathBuf }, - #[error("Unable to convert url \"{url}\" to Path")] - InvalidUrl { url: Url }, - #[error("Path \"{path}\" contained non-unicode characters: {source}")] - NonUnicode { - path: String, - source: std::str::Utf8Error, - }, - #[error("Path {path} does not start with prefix {prefix}")] - PrefixMismatch { path: String, prefix: String }, -} - -#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub struct Path { - raw: String, -} - -impl Path { - pub fn parse(path: impl AsRef<str>) -> Result<Self, Error> { - let path = path.as_ref(); - - let stripped = path.strip_prefix(DELIMITER).unwrap_or(path); - if stripped.is_empty() { - return Ok(Default::default()); - } - - let stripped = stripped.strip_suffix(DELIMITER).unwrap_or(stripped); - - for segment in stripped.split(DELIMITER) { - if segment.is_empty() { - return Err(Error::EmptySegment { - path: path.to_string(), - }); - } - PathPart::parse(segment).map_err(|err| Error::BadSegment { - path: path.to_string(), - source: err, - })?; - } - - Ok(Self { - raw: stripped.to_string(), - }) - } - - pub fn from_filesystem_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> { - let absolute = std::fs::canonicalize(&path).map_err(|err| Error::Canonicalize { - path: path.as_ref().to_path_buf(), - source: err, - })?; - - Self::from_absolute_path(absolute) - } - - pub fn from_absolute_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> { - Self::from_absolute_path_with_base(path, None) - } - - pub(crate) fn from_absolute_path_with_base( - path: impl AsRef<std::path::Path>, - base: Option<&url::Url>, - ) -> Result<Self, Error> { - let url = absolute_path_to_url(path)?; - let path = match base { - Some(prefix) => { - url.path() - .strip_prefix(prefix.path()) - .ok_or_else(|| Error::PrefixMismatch { - path: url.path().to_string(), - prefix: prefix.to_string(), - })? - } - None => url.path(), - }; - - // Reverse any percent encoding performed by conversion to URL - Self::from_url_path(path) - } - - pub fn from_url_path(path: impl AsRef<str>) -> Result<Self, Error> { - let path = path.as_ref(); - let decoded = percent_decode(path.as_bytes()) - .decode_utf8() - .map_err(|err| Error::NonUnicode { - path: path.to_string(), - source: err, - })?; - - Self::parse(decoded) - } - - pub fn parts(&self) -> impl Iterator<Item = PathPart<'_>> { - self.raw - .split_terminator(DELIMITER) - .map(|s| PathPart { raw: s.into() }) - } - - pub fn filename(&self) -> Option<&str> { - match self.raw.is_empty() { - true => None, - false => self.raw.rsplit(DELIMITER).next(), - } - } - - pub fn extension(&self) -> Option<&str> { - self.filename() - .and_then(|f| f.rsplit_once('.')) - .and_then(|(_, extension)| { - if extension.is_empty() { - None - } else { - Some(extension) - } - }) - } - - pub fn prefix_match(&self, prefix: &Self) -> Option<impl Iterator<Item = PathPart<'_>> + '_> { - let mut stripped = self.raw.strip_prefix(&prefix.raw)?; - if !stripped.is_empty() && !prefix.raw.is_empty() { - stripped = stripped.strip_prefix(DELIMITER)?; - } - let iter = stripped - .split_terminator(DELIMITER) - .map(|x| PathPart { raw: x.into() }); - Some(iter) - } - - pub fn prefix_matches(&self, prefix: &Self) -> bool { - self.prefix_match(prefix).is_some() - } - - pub fn child<'a>(&self, child: impl Into<PathPart<'a>>) -> Self { - let raw = match self.raw.is_empty() { - true => format!("{}", child.into().raw), - false => format!("{}{}{}", self.raw, DELIMITER, child.into().raw), - }; - - Self { raw } - } -} - -#[cfg(feature = "object_store")] -impl From<Path> for object_store::path::Path { - fn from(value: Path) -> Self { - object_store::path::Path::from(value.as_ref()) - } -} - -#[cfg(feature = "object_store")] -impl From<object_store::path::Path> for Path { - fn from(value: object_store::path::Path) -> Self { - Self::from(value.as_ref()) - } -} - -impl AsRef<str> for Path { - fn as_ref(&self) -> &str { - &self.raw - } -} - -impl From<&str> for Path { - fn from(path: &str) -> Self { - Self::from_iter(path.split(DELIMITER)) - } -} - -impl From<String> for Path { - fn from(path: String) -> Self { - Self::from_iter(path.split(DELIMITER)) - } -} - -impl From<Path> for String { - fn from(path: Path) -> Self { - path.raw - } -} - -impl std::fmt::Display for Path { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.raw.fmt(f) - } -} - -impl<'a, I> FromIterator<I> for Path -where - I: Into<PathPart<'a>>, -{ - fn from_iter<T: IntoIterator<Item = I>>(iter: T) -> Self { - let raw = T::into_iter(iter) - .map(|s| s.into()) - .filter(|s| !s.raw.is_empty()) - .map(|s| s.raw) - .join(DELIMITER); - - Self { raw } - } -} - -pub(crate) fn absolute_path_to_url(path: impl AsRef<std::path::Path>) -> Result<Url, Error> { - Url::from_file_path(&path).map_err(|_| Error::InvalidPath { - path: path.as_ref().into(), - }) -} - -pub fn path_to_local(location: &Path) -> Result<PathBuf, Error> { - let mut url = Url::parse("file:///").unwrap(); - url.path_segments_mut() - .expect("url path") - // technically not necessary as Path ignores empty segments - // but avoids creating paths with "//" which look odd in error messages. - .pop_if_empty() - .extend(location.parts()); - - let path = url.to_file_path().map_err(|_| Error::InvalidUrl { url })?; - - #[cfg(target_os = "windows")] - let path = { - let path = path.to_string_lossy(); - - // Assume the first char is the drive letter and the next is a colon. - let mut out = String::new(); - let drive = &path[..2]; // The drive letter and colon (e.g., "C:") - let filepath = &path[2..].replace(':', "%3A"); // Replace subsequent colons - out.push_str(drive); - out.push_str(filepath); - PathBuf::from(out) - }; - - Ok(path) -} - -#[cfg(test)] -mod tests { - use tempfile::NamedTempFile; - - use super::*; - - #[test] - fn cloud_prefix_with_trailing_delimiter() { - let prefix = Path::from_iter(["test"]); - assert_eq!(prefix.as_ref(), "test"); - } - - #[test] - fn push_encodes() { - let location = Path::from_iter(["foo/bar", "baz%2Ftest"]); - assert_eq!(location.as_ref(), "foo%2Fbar/baz%252Ftest"); - } - - #[test] - fn test_parse() { - assert_eq!(Path::parse("/").unwrap().as_ref(), ""); - assert_eq!(Path::parse("").unwrap().as_ref(), ""); - - let err = Path::parse("//").unwrap_err(); - assert!(matches!(err, Error::EmptySegment { .. })); - - assert_eq!(Path::parse("/foo/bar/").unwrap().as_ref(), "foo/bar"); - assert_eq!(Path::parse("foo/bar/").unwrap().as_ref(), "foo/bar"); - assert_eq!(Path::parse("foo/bar").unwrap().as_ref(), "foo/bar"); - - let err = Path::parse("foo///bar").unwrap_err(); - assert!(matches!(err, Error::EmptySegment { .. })); - } - - #[test] - fn convert_raw_before_partial_eq() { - // dir and file_name - let cloud = Path::from("test_dir/test_file.json"); - let built = Path::from_iter(["test_dir", "test_file.json"]); - - assert_eq!(built, cloud); - - // dir and file_name w/o dot - let cloud = Path::from("test_dir/test_file"); - let built = Path::from_iter(["test_dir", "test_file"]); - - assert_eq!(built, cloud); - - // dir, no file - let cloud = Path::from("test_dir/"); - let built = Path::from_iter(["test_dir"]); - assert_eq!(built, cloud); - - // file_name, no dir - let cloud = Path::from("test_file.json"); - let built = Path::from_iter(["test_file.json"]); - assert_eq!(built, cloud); - - // empty - let cloud = Path::from(""); - let built = Path::from_iter(["", ""]); - - assert_eq!(built, cloud); - } - - #[test] - fn parts_after_prefix_behavior() { - let existing_path = Path::from("apple/bear/cow/dog/egg.json"); - - // Prefix with one directory - let prefix = Path::from("apple"); - let expected_parts: Vec<PathPart<'_>> = vec!["bear", "cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect(); - assert_eq!(parts, expected_parts); - - // Prefix with two directories - let prefix = Path::from("apple/bear"); - let expected_parts: Vec<PathPart<'_>> = vec!["cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect(); - assert_eq!(parts, expected_parts); - - // Not a prefix - let prefix = Path::from("cow"); - assert!(existing_path.prefix_match(&prefix).is_none()); - - // Prefix with a partial directory - let prefix = Path::from("ap"); - assert!(existing_path.prefix_match(&prefix).is_none()); - - // Prefix matches but there aren't any parts after it - let existing = Path::from("apple/bear/cow/dog"); - - assert_eq!(existing.prefix_match(&existing).unwrap().count(), 0); - assert_eq!(Path::default().parts().count(), 0); - } - - #[test] - fn prefix_matches() { - let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something"]); - // self starts with self - assert!( - haystack.prefix_matches(&haystack), - "{haystack:?} should have started with {haystack:?}" - ); - - // a longer prefix doesn't match - let needle = haystack.child("longer now"); - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} shouldn't have started with {needle:?}" - ); - - // one dir prefix matches - let needle = Path::from_iter(["foo/bar"]); - assert!( - haystack.prefix_matches(&needle), - "{haystack:?} should have started with {needle:?}" - ); - - // two dir prefix matches - let needle = needle.child("baz%2Ftest"); - assert!( - haystack.prefix_matches(&needle), - "{haystack:?} should have started with {needle:?}" - ); - - // partial dir prefix doesn't match - let needle = Path::from_iter(["f"]); - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - - // one dir and one partial dir doesn't match - let needle = Path::from_iter(["foo/bar", "baz"]); - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - - // empty prefix matches - let needle = Path::from(""); - assert!( - haystack.prefix_matches(&needle), - "{haystack:?} should have started with {needle:?}" - ); - } - - #[test] - fn prefix_matches_with_file_name() { - let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo.segment"]); - - // All directories match and file name is a prefix - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo"]); - - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - - // All directories match but file name is not a prefix - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "e"]); - - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - - // Not all directories match; file name is a prefix of the next directory; this - // does not match - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "s"]); - - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - - // Not all directories match; file name is NOT a prefix of the next directory; - // no match - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "p"]); - - assert!( - !haystack.prefix_matches(&needle), - "{haystack:?} should not have started with {needle:?}" - ); - } - - #[test] - fn path_containing_spaces() { - let a = Path::from_iter(["foo bar", "baz"]); - let b = Path::from("foo bar/baz"); - let c = Path::parse("foo bar/baz").unwrap(); - - assert_eq!(a.raw, "foo bar/baz"); - assert_eq!(a.raw, b.raw); - assert_eq!(b.raw, c.raw); - } - - #[test] - fn from_url_path() { - let a = Path::from_url_path("foo%20bar").unwrap(); - let b = Path::from_url_path("foo/%2E%2E/bar").unwrap_err(); - let c = Path::from_url_path("foo%2F%252E%252E%2Fbar").unwrap(); - let d = Path::from_url_path("foo/%252E%252E/bar").unwrap(); - let e = Path::from_url_path("%48%45%4C%4C%4F").unwrap(); - let f = Path::from_url_path("foo/%FF/as").unwrap_err(); - - assert_eq!(a.raw, "foo bar"); - assert!(matches!(b, Error::BadSegment { .. })); - assert_eq!(c.raw, "foo/%2E%2E/bar"); - assert_eq!(d.raw, "foo/%2E%2E/bar"); - assert_eq!(e.raw, "HELLO"); - assert!(matches!(f, Error::NonUnicode { .. })); - } - - #[test] - fn filename_from_path() { - let a = Path::from("foo/bar"); - let b = Path::from("foo/bar.baz"); - let c = Path::from("foo.bar/baz"); - - assert_eq!(a.filename(), Some("bar")); - assert_eq!(b.filename(), Some("bar.baz")); - assert_eq!(c.filename(), Some("baz")); - } - - #[test] - fn file_extension() { - let a = Path::from("foo/bar"); - let b = Path::from("foo/bar.baz"); - let c = Path::from("foo.bar/baz"); - let d = Path::from("foo.bar/baz.qux"); - - assert_eq!(a.extension(), None); - assert_eq!(b.extension(), Some("baz")); - assert_eq!(c.extension(), None); - assert_eq!(d.extension(), Some("qux")); - } - - #[test] - fn test_path_to_local() { - let temp_file = NamedTempFile::new().unwrap(); - - let this_path = Path::from_filesystem_path(temp_file.path()).unwrap(); - let std_path = path_to_local(&this_path).unwrap(); - - assert_eq!(std_path, temp_file.path()); - } -} diff --git a/fusio/src/path/parts.rs b/fusio/src/path/parts.rs deleted file mode 100644 index c705b27..0000000 --- a/fusio/src/path/parts.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::borrow::Cow; - -use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; -use thiserror::Error; - -use crate::path::DELIMITER_BYTE; - -#[derive(Debug, Error)] -#[error( - "Encountered illegal character sequence \"{segment}\" whilst parsing path segment \ - \"{illegal}\"" -)] -#[allow(missing_copy_implementations)] -pub struct InvalidPart { - segment: String, - illegal: String, -} - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] -pub struct PathPart<'a> { - pub(super) raw: Cow<'a, str>, -} - -impl<'a> PathPart<'a> { - pub fn parse(segment: &'a str) -> Result<Self, InvalidPart> { - if segment == "." || segment == ".." { - return Err(InvalidPart { - segment: segment.to_string(), - illegal: segment.to_string(), - }); - } - - for c in segment.chars() { - if c.is_ascii_control() || c == '/' { - return Err(InvalidPart { - segment: segment.to_string(), - // This is correct as only single byte characters up to this point - illegal: c.to_string(), - }); - } - } - - Ok(Self { - raw: segment.into(), - }) - } -} - -const INVALID: &AsciiSet = &CONTROLS - // The delimiter we are reserving for internal hierarchy - .add(DELIMITER_BYTE) - // Characters AWS recommends avoiding for object keys - // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html - .add(b'\\') - .add(b'{') - .add(b'^') - .add(b'}') - .add(b'%') - .add(b'`') - .add(b']') - .add(b'"') // " <-- my editor is confused about double quotes within single quotes - .add(b'>') - .add(b'[') - .add(b'~') - .add(b'<') - .add(b'#') - .add(b'|') - // Characters Google Cloud Storage recommends avoiding for object names - // https://cloud.google.com/storage/docs/naming-objects - .add(b'\r') - .add(b'\n') - .add(b'*') - .add(b'?'); - -impl<'a> From<&'a [u8]> for PathPart<'a> { - fn from(v: &'a [u8]) -> Self { - let inner = match v { - // We don't want to encode `.` generally, but we do want to disallow parts of paths - // to be equal to `.` or `..` to prevent file system traversal shenanigans. - b"." => "%2E".into(), - b".." => "%2E%2E".into(), - other => percent_encode(other, INVALID).into(), - }; - Self { raw: inner } - } -} - -impl<'a> From<&'a str> for PathPart<'a> { - fn from(v: &'a str) -> Self { - Self::from(v.as_bytes()) - } -} - -impl From<String> for PathPart<'static> { - fn from(s: String) -> Self { - Self { - raw: Cow::Owned(PathPart::from(s.as_str()).raw.into_owned()), - } - } -} - -impl<'a> AsRef<str> for PathPart<'a> { - fn as_ref(&self) -> &str { - self.raw.as_ref() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn path_part_delimiter_gets_encoded() { - let part: PathPart<'_> = "foo/bar".into(); - assert_eq!(part.raw, "foo%2Fbar"); - } - - #[test] - fn path_part_given_already_encoded_string() { - let part: PathPart<'_> = "foo%2Fbar".into(); - assert_eq!(part.raw, "foo%252Fbar"); - } - - #[test] - fn path_part_cant_be_one_dot() { - let part: PathPart<'_> = ".".into(); - assert_eq!(part.raw, "%2E"); - } - - #[test] - fn path_part_cant_be_two_dots() { - let part: PathPart<'_> = "..".into(); - assert_eq!(part.raw, "%2E%2E"); - } - - #[test] - fn path_part_parse() { - PathPart::parse("foo").unwrap(); - PathPart::parse("foo/bar").unwrap_err(); - - // Test percent-encoded path - PathPart::parse("foo%2Fbar").unwrap(); - PathPart::parse("L%3ABC.parquet").unwrap(); - - // Test path containing bad escape sequence - PathPart::parse("%Z").unwrap(); - PathPart::parse("%%").unwrap(); - } -} diff --git a/fusio/src/remotes/aws/s3.rs b/fusio/src/remotes/aws/s3.rs index a30a373..f935c21 100644 --- a/fusio/src/remotes/aws/s3.rs +++ b/fusio/src/remotes/aws/s3.rs @@ -8,11 +8,11 @@ use http::{ }; use http_body_util::{BodyExt, Empty, Full}; use percent_encoding::utf8_percent_encode; +use url::Url; use super::{options::S3Options, STRICT_PATH_ENCODE_SET}; use crate::{ buf::IoBufMut, - path::Path, remotes::{ aws::sign::Sign, http::{DynHttpClient, HttpClient as _}, @@ -22,17 +22,17 @@ use crate::{ pub struct S3File { options: Arc<S3Options>, - path: Path, + url: Url, pos: u64, client: Arc<dyn DynHttpClient>, } impl S3File { - pub(crate) fn new(options: Arc<S3Options>, path: Path, client: Arc<dyn DynHttpClient>) -> Self { + pub(crate) fn new(options: Arc<S3Options>, url: Url, client: Arc<dyn DynHttpClient>) -> Self { Self { options, - path, + url, pos: 0, client, } @@ -42,7 +42,7 @@ impl S3File { let url = format!( "{}/{}", self.options.endpoint, - utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) + utf8_percent_encode(self.url.path(), &STRICT_PATH_ENCODE_SET) ); Request::builder().method(method).uri(url) diff --git a/fusio/src/remotes/object_store/fs.rs b/fusio/src/remotes/object_store/fs.rs index db230ba..12cc103 100644 --- a/fusio/src/remotes/object_store/fs.rs +++ b/fusio/src/remotes/object_store/fs.rs @@ -3,11 +3,11 @@ use std::sync::Arc; use async_stream::stream; use futures_core::Stream; use futures_util::stream::StreamExt; -use object_store::{aws::AmazonS3, ObjectStore}; +use object_store::{aws::AmazonS3, path::Path, ObjectStore}; +use url::Url; use crate::{ fs::{FileMeta, Fs, OpenOptions, WriteMode}, - path::Path, remotes::object_store::S3File, Error, }; @@ -19,37 +19,34 @@ pub struct S3Store { impl Fs for S3Store { type File = S3File; - async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> { + async fn open_options(&self, url: &Url, options: OpenOptions) -> Result<Self::File, Error> { if let Some(WriteMode::Append) = options.write { return Err(Error::Unsupported); } Ok(S3File { inner: self.inner.clone(), - path: path.clone().into(), + path: Path::from_url_path(url)?, pos: 0, }) } - async fn create_dir_all(_: &Path) -> Result<(), Error> { + async fn create_dir_all(_: &Url) -> Result<(), Error> { Ok(()) } - async fn list( - &self, - path: &Path, - ) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { - let path = path.clone().into(); + async fn list(&self, url: &Url) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> { + let path = Path::from_url_path(url)?; let mut stream = self.inner.list(Some(&path)); Ok(stream! { while let Some(meta) = stream.next().await.transpose()? { - yield Ok(FileMeta { path: meta.location.into(), size: meta.size as u64 }); + yield Ok(FileMeta { url: meta.location.into(), size: meta.size as u64 }); } }) } - async fn remove(&self, path: &Path) -> Result<(), Error> { - let path = path.clone().into(); + async fn remove(&self, url: &Url) -> Result<(), Error> { + let path = Path::from_url_path(url)?; self.inner.delete(&path).await?; Ok(())