diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index c53870f..c54481d 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -1,10 +1,10 @@ -use std::sync::Arc; +use std::{future::Future, sync::Arc}; use async_stream::stream; use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; use futures_core::Stream; use futures_util::stream::StreamExt; @@ -27,6 +27,10 @@ impl From for S3Store { impl Fs for S3Store { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { if !options.truncate { return Err(Error::Unsupported { @@ -64,4 +68,22 @@ impl Fs for S3Store { Ok(()) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } } diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index c03af9e..b696630 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -1,7 +1,9 @@ +use std::future::Future; + use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; use futures_core::Stream; use futures_util::TryStreamExt; @@ -25,6 +27,10 @@ impl From for OpendalFs { impl Fs for OpendalFs { type File = OpendalFile; + fn file_system(&self) -> FileSystemTag { + todo!() + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { OpendalFile::open(self.op.clone(), path.to_string(), options).await } @@ -57,4 +63,22 @@ impl Fs for OpendalFs { .await .map_err(parse_opendal_error) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } } diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 8105bab..2336c46 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -16,11 +16,21 @@ pub struct FileMeta { pub size: u64, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum FileSystemTag { + Local, + OPFS, + // TODO: Remote needs to check whether endpoint and other remote fs are consistent + S3, +} + pub trait Fs: MaybeSend + MaybeSync { //! This trait is used to abstract file system operations across different file systems. type File: Read + Write + MaybeSend + 'static; + fn file_system(&self) -> FileSystemTag; + fn open(&self, path: &Path) -> impl Future> { self.open_options(path, OpenOptions::default()) } @@ -39,4 +49,18 @@ pub trait Fs: MaybeSend + MaybeSync { ) -> impl Future>, Error>> + MaybeSend; fn remove(&self, path: &Path) -> impl Future> + MaybeSend; + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend; + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend; } diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index c18c587..c667eb4 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -1,11 +1,11 @@ -use std::fs::create_dir_all; +use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; use super::MonoioFile; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -15,6 +15,10 @@ pub struct MonoIoFs; impl Fs for MonoIoFs { type File = MonoioFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -54,6 +58,33 @@ impl Fs for MonoIoFs { async fn remove(&self, path: &Path) -> Result<(), Error> { let path = path_to_local(path)?; - Ok(std::fs::remove_file(path)?) + Ok(fs::remove_file(path)?) + } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to).await?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::hard_link(&from, &to).await?; + + Ok(()) } } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index fa3901d..1390f8c 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use async_stream::stream; use futures_core::Stream; use futures_util::StreamExt; @@ -15,9 +17,9 @@ use super::OPFSFile; use crate::{ disk::opfs::{promise, storage}, error::wasm_err, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; pub struct OPFS; @@ -25,6 +27,10 @@ pub struct OPFS; impl Fs for OPFS { type File = OPFSFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); @@ -151,4 +157,26 @@ impl OPFS { } Ok(parent) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + Err(Error::Unsupported { + message: "opfs does not support copy file".to_string(), + }) + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + Err(Error::Unsupported { + message: "opfs does not support link file".to_string(), + }) + } } diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index 8fc17e9..b5a2e60 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -8,7 +8,7 @@ use tokio::{ }; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -18,6 +18,10 @@ pub struct TokioFs; impl Fs for TokioFs { type File = File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -67,4 +71,31 @@ impl Fs for TokioFs { remove_file(&path).await?; Ok(()) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::copy(&from, &to).await?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::hard_link(&from, &to).await?; + + Ok(()) + } } diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index b9ea928..fc44134 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -1,12 +1,14 @@ +use std::{fs, future::Future}; + use async_stream::stream; use futures_core::Stream; use tokio_uring::fs::{create_dir_all, remove_file}; use crate::{ disk::tokio_uring::TokioUringFile, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, - Error, + Error, MaybeSend, }; pub struct TokioUringFs; @@ -14,6 +16,10 @@ pub struct TokioUringFs; impl Fs for TokioUringFs { type File = TokioUringFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -58,4 +64,31 @@ impl Fs for TokioUringFs { Ok(remove_file(path).await?) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to).await?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::hard_link(&from, &to).await?; + + Ok(()) + } } diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 022a968..a8da01c 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -11,7 +11,7 @@ use url::Url; use super::{credential::AwsCredential, options::S3Options, S3Error, S3File}; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, remotes::{ aws::sign::Sign, @@ -124,6 +124,10 @@ pub(super) struct AmazonS3Inner { impl Fs for AmazonS3 { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, _: OpenOptions) -> Result { Ok(S3File::new(self.clone(), path.clone())) } @@ -233,6 +237,14 @@ impl Fs for AmazonS3 { Ok(()) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + todo!() + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + todo!() + } } #[derive(Debug, Deserialize)] diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index a53450a..cc03c17 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -322,7 +322,7 @@ mod tests { } #[allow(unused)] - async fn test_local_fs(fs: S) -> Result<(), Error> + async fn test_local_fs_read_write(fs: S) -> Result<(), Error> where S: crate::fs::Fs, { @@ -383,6 +383,139 @@ mod tests { Ok(()) } + #[allow(unused)] + async fn test_local_fs_copy_link(src_fs: S, dst_fs: D) -> Result<(), Error> + where + S: crate::fs::Fs, + D: crate::fs::Fs, + { + use std::collections::HashSet; + + use futures_util::StreamExt; + use tempfile::TempDir; + + use crate::{fs::OpenOptions, path::Path, DynFs}; + + let tmp_dir = TempDir::new()?; + + let work_dir_path = tmp_dir.path().join("work_dir"); + let src_file_path = work_dir_path.join("src_test.file"); + let dst_file_path = work_dir_path.join("dst_test.file"); + + src_fs + .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) + .await?; + dst_fs + .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) + .await?; + + // create files + let _ = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + let _ = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + // copy + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .copy( + &Path::from_absolute_path(&src_file_path)?, + &dst_fs, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.close().await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! fusio"); + } + + dst_fs + .remove(&Path::from_absolute_path(&dst_file_path)?) + .await?; + // link + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .link( + &Path::from_absolute_path(&src_file_path)?, + &dst_fs, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.close().await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 24], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! fusioHello! world"); + } + + Ok(()) + } + #[cfg(feature = "tokio")] #[tokio::test] async fn test_tokio() { @@ -400,7 +533,28 @@ mod tests { async fn test_tokio_fs() { use crate::disk::TokioFs; - test_local_fs(TokioFs).await.unwrap(); + test_local_fs_read_write(TokioFs).await.unwrap(); + test_local_fs_copy_link(TokioFs, TokioFs).await.unwrap(); + } + + #[cfg(all(feature = "tokio-uring", target_os = "linux"))] + #[tokio::test] + async fn test_tokio_uring_fs() { + use crate::disk::tokio_uring::fs::TokioUringFs; + + test_local_fs_read_write(TokioUringFs).await.unwrap(); + test_local_fs_copy_link(TokioUringFs, TokioUringFs) + .await + .unwrap(); + } + + #[cfg(feature = "monoio")] + #[tokio::test] + async fn test_monoio_fs() { + use crate::disk::monoio::fs::MonoIoFs; + + test_local_fs_read_write(MonoIoFs).await.unwrap(); + test_local_fs_copy_link(MonoIoFs, MonoIoFs).await.unwrap(); } #[cfg(feature = "tokio")]