From 482b392a225a8797734aa32aeb97105f48368cfa Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 19 Nov 2024 16:11:11 +0800 Subject: [PATCH] feat: `Fs` adds `copy` & `link` (#100) * feat: `Fs` adds `copy` & `link`, `TokioFs` & `TokioUringFs` & `Monoio` have been implemented * feat: impl `copy` for S3 & add `fs::copy` for different filesystems * fixe: `Fs::copy` on S3 * test: diff filesystem copy * fix: ci fail on opendal * test: add fs test for monoio & tokio uring * test: add fs test for monoio & tokio uring * chore: add `copy` & `link` for `DynFs` * chore: remove `Fs::link` arg `to_fs` * fix: fix the semantics of append writing when TokioFs::open_options (write from scratch after reopening) * fix: MonoioFS execute blocking task without thread pool attached * fix: test_local_fs_copy_link ci fail * chore: move copy to dynamic * clippy --------- Co-authored-by: Gwo Tzu-Hsing --- fusio-object-store/src/fs.rs | 24 ++- fusio-opendal/src/fs.rs | 19 +- fusio/Cargo.toml | 2 +- fusio/src/dynamic/fs.rs | 75 +++++++- fusio/src/fs/mod.rs | 97 ++++++++++ fusio/src/impls/disk/monoio/fs.rs | 28 ++- fusio/src/impls/disk/opfs/fs.rs | 22 ++- fusio/src/impls/disk/tokio/fs.rs | 26 ++- fusio/src/impls/disk/tokio_uring/fs.rs | 28 ++- fusio/src/impls/remotes/aws/fs.rs | 110 ++++++++++- fusio/src/impls/remotes/aws/mod.rs | 2 +- .../src/impls/remotes/aws/multipart_upload.rs | 40 +++- fusio/src/impls/remotes/aws/options.rs | 1 + fusio/src/impls/remotes/aws/s3.rs | 18 +- fusio/src/impls/remotes/aws/writer.rs | 11 +- fusio/src/lib.rs | 180 +++++++++++++++++- 16 files changed, 645 insertions(+), 38 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index c53870f..3834d24 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_stream::stream; use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, Error, }; @@ -27,6 +27,10 @@ impl From for S3Store { impl Fs for S3Store { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { if !options.truncate { return Err(Error::Unsupported { @@ -64,4 +68,22 @@ impl Fs for S3Store { Ok(()) } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = from.clone().into(); + let to = to.clone().into(); + + self.inner + .copy(&from, &to) + .await + .map_err(BoxedError::from)?; + + Ok(()) + } + + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) + } } diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index c03af9e..7e7ef12 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -1,5 +1,5 @@ use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, Error, }; @@ -25,6 +25,10 @@ impl From for OpendalFs { impl Fs for OpendalFs { type File = OpendalFile; + fn file_system(&self) -> FileSystemTag { + todo!() + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { OpendalFile::open(self.op.clone(), path.to_string(), options).await } @@ -57,4 +61,17 @@ impl Fs for OpendalFs { .await .map_err(parse_opendal_error) } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + self.op + .copy(from.as_ref(), to.as_ref()) + .await + .map_err(parse_opendal_error) + } + + async fn link(&self, _from: &Path, _to: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "opendal does not support link file".to_string(), + }) + } } diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index bdc47de..700b312 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -80,7 +80,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [ "http2", ] } itertools = { version = "0.13" } -monoio = { version = "0.2", optional = true } +monoio = { version = "0.2", optional = true, features = ["sync"] } object_store = { version = "0.11", optional = true, features = ["aws"] } percent-encoding = { version = "2", default-features = false } quick-xml = { version = "0.36", features = [ diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index 1df009f..5d96bf2 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -1,11 +1,11 @@ -use std::pin::Pin; +use std::{cmp, pin::Pin, sync::Arc}; use futures_core::Stream; use super::MaybeSendFuture; use crate::{ buf::IoBufMut, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write, }; @@ -48,6 +48,8 @@ impl<'write> Write for Box { } pub trait DynFs: MaybeSend + MaybeSync { + fn file_system(&self) -> FileSystemTag; + fn open<'s, 'path: 's>( &'s self, path: &'path Path, @@ -84,9 +86,25 @@ pub trait DynFs: MaybeSend + MaybeSync { &'s self, path: &'path Path, ) -> Pin> + 's>>; + + fn copy<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>>; + + fn link<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>>; } impl DynFs for F { + fn file_system(&self) -> FileSystemTag { + Fs::file_system(self) + } + fn open_options<'s, 'path: 's>( &'s self, path: &'path Path, @@ -130,6 +148,59 @@ impl DynFs for F { ) -> Pin> + 's>> { Box::pin(F::remove(self, path)) } + + fn copy<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>> { + Box::pin(F::copy(self, from, to)) + } + + fn link<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>> { + Box::pin(F::link(self, from, to)) + } +} + +pub async fn copy( + from_fs: &Arc, + from: &Path, + to_fs: &Arc, + to: &Path, +) -> Result<(), Error> { + if from_fs.file_system() == to_fs.file_system() { + from_fs.copy(from, to).await?; + return Ok(()); + } + let mut from_file = from_fs + .open_options(from, OpenOptions::default().read(true)) + .await?; + let from_file_size = DynRead::size(&from_file).await? as usize; + + let mut to_file = to_fs + .open_options(to, OpenOptions::default().create(true).write(true)) + .await?; + let buf_size = cmp::min(from_file_size, 4 * 1024); + let mut buf = Some(vec![0u8; buf_size]); + let mut read_pos = 0u64; + + while (read_pos as usize) < from_file_size - 1 { + let tmp = buf.take().unwrap(); + let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await; + result?; + read_pos += tmp.bytes_init() as u64; + + let (result, tmp) = Write::write_all(&mut to_file, tmp).await; + result?; + buf = Some(tmp); + } + DynWrite::close(&mut to_file).await?; + + Ok(()) } #[cfg(test)] diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 8105bab..d0589d8 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -16,11 +16,21 @@ pub struct FileMeta { pub size: u64, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum FileSystemTag { + Local, + OPFS, + // TODO: Remote needs to check whether endpoint and other remote fs are consistent + S3, +} + pub trait Fs: MaybeSend + MaybeSync { //! This trait is used to abstract file system operations across different file systems. type File: Read + Write + MaybeSend + 'static; + fn file_system(&self) -> FileSystemTag; + fn open(&self, path: &Path) -> impl Future> { self.open_options(path, OpenOptions::default()) } @@ -39,4 +49,91 @@ pub trait Fs: MaybeSend + MaybeSync { ) -> impl Future>, Error>> + MaybeSend; fn remove(&self, path: &Path) -> impl Future> + MaybeSend; + + fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; + + fn link(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; +} + +#[cfg(test)] +mod tests { + #[ignore] + #[cfg(all( + feature = "tokio-http", + feature = "tokio", + feature = "aws", + not(feature = "completion-based") + ))] + #[tokio::test] + async fn test_diff_fs_copy() -> Result<(), crate::Error> { + use std::sync::Arc; + + use tempfile::TempDir; + + use crate::{ + fs::{Fs, OpenOptions}, + impls::disk::tokio::fs::TokioFs, + path::Path, + remotes::{ + aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File}, + http::tokio::TokioClient, + }, + DynFs, Read, Write, + }; + + let tmp_dir = TempDir::new()?; + let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?; + let s3_path: Path = "s3_copy_test.file".into(); + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options)); + let local_fs = Arc::new(TokioFs); + + { + let mut local_file = Fs::open_options( + local_fs.as_ref(), + &local_path, + OpenOptions::default().create(true).write(true), + ) + .await?; + local_file + .write_all("🎵never gonna give you up🎵".as_bytes()) + .await + .0?; + local_file.close().await.unwrap(); + } + { + let s3_fs = s3_fs.clone() as Arc; + let local_fs = local_fs.clone() as Arc; + crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?; + } + + let mut s3 = S3File::new(Arc::into_inner(s3_fs).unwrap(), s3_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 31); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes()); + + Ok(()) + } } diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index c18c587..9a7e1b1 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -1,11 +1,11 @@ -use std::fs::create_dir_all; +use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; use super::MonoioFile; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -15,6 +15,10 @@ pub struct MonoIoFs; impl Fs for MonoIoFs { type File = MonoioFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -54,6 +58,24 @@ impl Fs for MonoIoFs { async fn remove(&self, path: &Path) -> Result<(), Error> { let path = path_to_local(path)?; - Ok(std::fs::remove_file(path)?) + Ok(fs::remove_file(path)?) + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + monoio::spawn(async move { fs::copy(&from, &to) }).await?; + + Ok(()) + } + + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + monoio::spawn(async move { fs::hard_link(&from, &to) }).await?; + + Ok(()) } } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index fa3901d..3044153 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use async_stream::stream; use futures_core::Stream; use futures_util::StreamExt; @@ -15,9 +17,9 @@ use super::OPFSFile; use crate::{ disk::opfs::{promise, storage}, error::wasm_err, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; pub struct OPFS; @@ -25,6 +27,10 @@ pub struct OPFS; impl Fs for OPFS { type File = OPFSFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); @@ -97,6 +103,18 @@ impl Fs for OPFS { .map_err(wasm_err)?; Ok(()) } + + async fn copy(&self, _: &Path, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "opfs does not support copy file".to_string(), + }) + } + + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "opfs does not support link file".to_string(), + }) + } } impl OPFS { diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index 8fc17e9..2f470d4 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -8,7 +8,7 @@ use tokio::{ }; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -18,12 +18,16 @@ pub struct TokioFs; impl Fs for TokioFs { type File = File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; let file = tokio::fs::OpenOptions::new() .read(options.read) - .append(options.write) + .write(options.write) .create(options.create) .open(&local_path) .await?; @@ -67,4 +71,22 @@ impl Fs for TokioFs { remove_file(&path).await?; Ok(()) } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::copy(&from, &to).await?; + + Ok(()) + } + + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::hard_link(&from, &to).await?; + + Ok(()) + } } diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index b9ea928..e28277f 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -1,12 +1,14 @@ +use std::{fs, future::Future}; + use async_stream::stream; use futures_core::Stream; use tokio_uring::fs::{create_dir_all, remove_file}; use crate::{ disk::tokio_uring::TokioUringFile, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, - Error, + Error, MaybeSend, }; pub struct TokioUringFs; @@ -14,6 +16,10 @@ pub struct TokioUringFs; impl Fs for TokioUringFs { type File = TokioUringFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -58,4 +64,22 @@ impl Fs for TokioUringFs { Ok(remove_file(path).await?) } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; + + Ok(()) + } + + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::hard_link(&from, &to)?; + + Ok(()) + } } diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 0546aea..fad6aaa 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -11,10 +11,13 @@ use url::Url; use super::{credential::AwsCredential, options::S3Options, S3Error, S3File}; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, remotes::{ - aws::sign::Sign, + aws::{ + multipart_upload::{MultipartUpload, UploadType}, + sign::Sign, + }, http::{DynHttpClient, HttpClient, HttpError}, }, Error, @@ -95,6 +98,7 @@ impl AmazonS3Builder { inner: Arc::new(AmazonS3Inner { options: S3Options { endpoint, + bucket: self.bucket, region: self.region, credential: self.credential, sign_payload: self.sign_payload, @@ -122,9 +126,23 @@ pub(super) struct AmazonS3Inner { pub(super) client: Box, } +impl AmazonS3 { + #[allow(dead_code)] + pub(crate) fn new(client: Box, options: S3Options) -> Self { + AmazonS3 { + #[allow(clippy::arc_with_non_send_sync)] + inner: Arc::new(AmazonS3Inner { options, client }), + } + } +} + impl Fs for AmazonS3 { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, _: OpenOptions) -> Result { Ok(S3File::new(self.clone(), path.clone())) } @@ -234,6 +252,25 @@ impl Fs for AmazonS3 { Ok(()) } + + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let upload = MultipartUpload::new(self.clone(), to.clone()); + upload + .upload_once(UploadType::Copy { + bucket: self.inner.options.bucket.clone(), + from: from.clone(), + body: Empty::::new(), + }) + .await?; + + Ok(()) + } + + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) + } } #[derive(Debug, Deserialize)] @@ -265,6 +302,9 @@ pub struct ListResponse { #[cfg(test)] mod tests { + #[cfg(feature = "tokio-http")] + use crate::{fs::Fs, path::Path}; + #[cfg(feature = "tokio-http")] #[tokio::test] async fn list_and_remove() { @@ -298,4 +338,70 @@ mod tests { s3.remove(&meta.path).await.unwrap(); } } + + #[ignore] + #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] + #[tokio::test] + async fn copy() { + use std::sync::Arc; + + use crate::{ + remotes::{ + aws::{ + credential::AwsCredential, + fs::{AmazonS3, AmazonS3Inner}, + options::S3Options, + s3::S3File, + }, + http::{tokio::TokioClient, DynHttpClient}, + }, + Read, Write, + }; + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3 = AmazonS3 { + inner: Arc::new(AmazonS3Inner { + options, + client: Box::new(client) as Box, + }), + }; + + let from_path: Path = "read-write.txt".into(); + let to_path: Path = "read-write-copy.txt".into(); + { + let mut s3 = S3File::new(s3.clone(), from_path.clone()); + + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + s3.copy(&from_path, &to_path).await.unwrap(); + let mut s3 = S3File::new(s3, to_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 42); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, b"The answer of life, universe and everthing"); + } } diff --git a/fusio/src/impls/remotes/aws/mod.rs b/fusio/src/impls/remotes/aws/mod.rs index a3b842c..3215d38 100644 --- a/fusio/src/impls/remotes/aws/mod.rs +++ b/fusio/src/impls/remotes/aws/mod.rs @@ -4,7 +4,7 @@ mod error; pub mod fs; pub(crate) mod multipart_upload; pub(crate) mod options; -mod s3; +pub(crate) mod s3; pub(crate) mod sign; pub(crate) mod writer; diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index 112d0f7..0b1b7eb 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -27,6 +27,19 @@ pub(crate) struct MultipartUpload { path: Path, } +pub enum UploadType { + Write { + size: usize, + body: B, + }, + Copy { + bucket: String, + from: Path, + // FIXME: for Empty + body: B, + }, +} + impl MultipartUpload { pub fn new(fs: AmazonS3, path: Path) -> Self { Self { fs, path } @@ -67,22 +80,35 @@ impl MultipartUpload { Self::check_response(response).await } - pub(crate) async fn upload_once(&self, size: usize, body: B) -> Result<(), Error> + pub(crate) async fn upload_once(&self, upload_type: UploadType) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { + let (size, body, copy_from) = match upload_type { + UploadType::Write { size, body } => (Some(size), body, None), + UploadType::Copy { bucket, from, body } => { + let from_url = format!( + "/{bucket}/{}", + utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET) + ); + (None, body, Some(from_url)) + } + }; let url = format!( "{}/{}", self.fs.as_ref().options.endpoint, utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) ); - let request = Request::builder() - .uri(url) - .method(Method::PUT) - .header(CONTENT_LENGTH, size) - .body(body) - .map_err(|e| Error::Other(e.into()))?; + let mut builder = Request::builder().uri(url).method(Method::PUT); + if let Some(from_url) = copy_from { + builder = builder.header("x-amz-copy-source", from_url); + } + // Tips: When the body is empty or the length is less than CONTENT_LENGTH, it may block + if let Some(size) = size { + builder = builder.header(CONTENT_LENGTH, size) + } + let request = builder.body(body).map_err(|e| Error::Other(e.into()))?; let _ = self.send_request(request).await?; Ok(()) diff --git a/fusio/src/impls/remotes/aws/options.rs b/fusio/src/impls/remotes/aws/options.rs index 3e64355..89dd339 100644 --- a/fusio/src/impls/remotes/aws/options.rs +++ b/fusio/src/impls/remotes/aws/options.rs @@ -2,6 +2,7 @@ use super::credential::AwsCredential; pub(crate) struct S3Options { pub(crate) endpoint: String, + pub(crate) bucket: String, pub(crate) region: String, pub(crate) credential: Option, pub(crate) sign_payload: bool, diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index 79636b6..54f1cc9 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -254,6 +254,7 @@ impl Write for S3File { #[cfg(test)] mod tests { + #[ignore] #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] #[tokio::test] @@ -268,7 +269,7 @@ mod tests { options::S3Options, s3::S3File, }, - http::{tokio::TokioClient, DynHttpClient, HttpClient}, + http::{tokio::TokioClient, DynHttpClient}, }, Read, Write, }; @@ -280,6 +281,7 @@ mod tests { let region = "ap-southeast-1"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id, secret_key, @@ -297,12 +299,16 @@ mod tests { }), }; - let mut s3 = S3File::new(s3, "read-write.txt".into()); + { + let mut s3 = S3File::new(s3.clone(), "read-write.txt".into()); - let (result, _) = s3 - .write_all(&b"The answer of life, universe and everthing"[..]) - .await; - result.unwrap(); + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + let mut s3 = S3File::new(s3, "read-write.txt".into()); let size = s3.size().await.unwrap(); assert_eq!(size, 42); diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index 520a18d..6b96d5b 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -6,7 +6,10 @@ use http_body_util::Full; use crate::{ dynamic::MaybeSendFuture, - remotes::{aws::multipart_upload::MultipartUpload, serde::MultipartPart}, + remotes::{ + aws::multipart_upload::{MultipartUpload, UploadType}, + serde::MultipartPart, + }, Error, IoBuf, Write, }; @@ -90,7 +93,10 @@ impl Write for S3Writer { let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze(); self.inner - .upload_once(bytes.len(), Full::new(bytes)) + .upload_once(UploadType::Write { + size: bytes.len(), + body: Full::new(bytes), + }) .await?; } return Ok(()); @@ -140,6 +146,7 @@ mod tests { let region = "ap-southeast-2"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id: "user".to_string(), secret_key: "password".to_string(), diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 47863d2..a4fd45d 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -327,7 +327,7 @@ mod tests { #[allow(unused)] #[cfg(not(target_arch = "wasm32"))] - async fn test_local_fs(fs: S) -> Result<(), Error> + async fn test_local_fs_read_write(fs: S) -> Result<(), Error> where S: crate::fs::Fs, { @@ -377,10 +377,157 @@ mod tests { ) .await?; file.write_all("Hello! world".as_bytes()).await.0?; - + file.flush().await.unwrap(); file.close().await.unwrap(); - let (result, buf) = file.read_exact_at(vec![0u8; 12], 12).await; + let mut file = fs + .open_options( + &Path::from_absolute_path(&work_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + } + + Ok(()) + } + + #[cfg(not(target_arch = "wasm32"))] + #[allow(unused)] + async fn test_local_fs_copy_link(src_fs: F) -> Result<(), Error> { + use std::collections::HashSet; + + use futures_util::StreamExt; + use tempfile::TempDir; + + use crate::{fs::OpenOptions, path::Path, DynFs}; + + let tmp_dir = TempDir::new()?; + + let work_dir_path = tmp_dir.path().join("work_dir"); + let src_file_path = work_dir_path.join("src_test.file"); + let dst_file_path = work_dir_path.join("dst_test.file"); + + src_fs + .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) + .await?; + + // create files + let _ = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + let _ = src_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + // copy + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .copy( + &Path::from_absolute_path(&src_file_path)?, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.flush().await?; + src_file.close().await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = src_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! fusio"); + } + + src_fs + .remove(&Path::from_absolute_path(&dst_file_path)?) + .await?; + // link + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .link( + &Path::from_absolute_path(&src_file_path)?, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.flush().await?; + src_file.close().await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = src_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await; result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); } @@ -388,7 +535,7 @@ mod tests { Ok(()) } - #[cfg(feature = "tokio")] + #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] #[tokio::test] async fn test_tokio() { use tempfile::tempfile; @@ -405,10 +552,31 @@ mod tests { async fn test_tokio_fs() { use crate::disk::TokioFs; - test_local_fs(TokioFs).await.unwrap(); + test_local_fs_read_write(TokioFs).await.unwrap(); + test_local_fs_copy_link(TokioFs).await.unwrap(); } - #[cfg(feature = "tokio")] + #[cfg(all(feature = "tokio-uring", target_os = "linux"))] + #[test] + fn test_tokio_uring_fs() { + use crate::disk::tokio_uring::fs::TokioUringFs; + + tokio_uring::start(async { + test_local_fs_read_write(TokioUringFs).await.unwrap(); + test_local_fs_copy_link(TokioUringFs).await.unwrap(); + }) + } + + #[cfg(all(feature = "monoio", not(target_arch = "wasm32")))] + #[monoio::test] + async fn test_monoio_fs() { + use crate::disk::monoio::fs::MonoIoFs; + + test_local_fs_read_write(MonoIoFs).await.unwrap(); + test_local_fs_copy_link(MonoIoFs).await.unwrap(); + } + + #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] #[tokio::test] async fn test_read_exact() { use tempfile::tempfile;