From 38d70aa8ede163694c34e780cde4a0a39a9658f6 Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 12 Nov 2024 15:50:18 +0800 Subject: [PATCH] feat: impl `copy` for S3 & add `fs::copy` for different filesystems --- fusio-object-store/src/fs.rs | 28 ++++++------ fusio-opendal/src/fs.rs | 9 +--- fusio/src/fs/mod.rs | 45 +++++++++++++++---- fusio/src/impls/disk/monoio/fs.rs | 14 +++--- fusio/src/impls/disk/opfs/fs.rs | 14 +----- fusio/src/impls/disk/tokio/fs.rs | 14 +++--- fusio/src/impls/disk/tokio_uring/fs.rs | 14 +++--- fusio/src/impls/remotes/aws/fs.rs | 32 ++++++++++--- .../src/impls/remotes/aws/multipart_upload.rs | 40 ++++++++++++++--- fusio/src/impls/remotes/aws/writer.rs | 9 ++-- fusio/src/lib.rs | 1 - 11 files changed, 137 insertions(+), 83 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index c54481d..e5340ac 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -69,21 +69,21 @@ impl Fs for S3Store { Ok(()) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = from.clone().into(); + let to = to.clone().into(); + + self.inner + .copy(&from, &to) + .await + .map_err(BoxedError::from)?; + + Ok(()) } - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) } } diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index b696630..8f28b41 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -64,13 +64,8 @@ impl Fs for OpendalFs { .map_err(parse_opendal_error) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend { + self.op.copy(from.as_ref(), to.as_ref()) } fn link( diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 2336c46..34d54ae 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -3,12 +3,12 @@ mod options; -use std::future::Future; +use std::{cmp, future::Future}; use futures_core::Stream; pub use options::*; -use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Write}; +use crate::{path::Path, Error, IoBufMut, MaybeSend, MaybeSync, Read, Write}; #[derive(Debug)] pub struct FileMeta { @@ -50,12 +50,7 @@ pub trait Fs: MaybeSend + MaybeSync { fn remove(&self, path: &Path) -> impl Future> + MaybeSend; - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend; + fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; fn link( &self, @@ -64,3 +59,37 @@ pub trait Fs: MaybeSend + MaybeSync { to: &Path, ) -> impl Future> + MaybeSend; } + +pub async fn copy(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error> +where + F: Fs, + T: Fs, +{ + if from_fs.file_system() == to_fs.file_system() { + from_fs.copy(from, to).await?; + return Ok(()); + } + let mut from_file = from_fs + .open_options(from, OpenOptions::default().read(true)) + .await?; + let from_file_size = from_file.size().await? as usize; + + let mut to_file = to_fs + .open_options(to, OpenOptions::default().create(true).write(true)) + .await?; + let buf_size = cmp::min(from_file_size, 4 * 1024); + let mut buf = vec![0u8; buf_size]; + let mut read_pos = 0u64; + + while (read_pos as usize) < from_file_size - 1 { + let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await; + result?; + read_pos += buf.len() as u64; + + let (result, _) = to_file.write_all(buf.as_slice()).await; + result?; + buf.resize(buf_size, 0); + } + + Ok(()) +} diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index ccc3067..919c1a7 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -61,15 +61,11 @@ impl Fs for MonoIoFs { Ok(fs::remove_file(path)?) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - fs::copy(&from, &to)?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; Ok(()) } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index 17d86fc..d7be0ac 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -104,23 +104,13 @@ impl Fs for OPFS { Ok(()) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support copy file".to_string(), }) } - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support link file".to_string(), }) diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index b5a2e60..c8848a9 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -72,15 +72,11 @@ impl Fs for TokioFs { Ok(()) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - tokio::fs::copy(&from, &to).await?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::copy(&from, &to).await?; Ok(()) } diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index 9783188..02c3ddb 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -65,15 +65,11 @@ impl Fs for TokioUringFs { Ok(remove_file(path).await?) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - fs::copy(&from, &to)?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; Ok(()) } diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index a8da01c..f15575f 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -14,10 +14,13 @@ use crate::{ fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, remotes::{ - aws::sign::Sign, + aws::{ + multipart_upload::{MultipartUpload, UploadType}, + sign::Sign, + }, http::{DynHttpClient, HttpClient, HttpError}, }, - Error, + Error, Read, }; pub struct AmazonS3Builder { @@ -238,12 +241,29 @@ impl Fs for AmazonS3 { Ok(()) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - todo!() + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from_file = S3File::new(self.clone(), from.clone()); + let from_file_size = from_file.size().await?; + + let upload = MultipartUpload::new(self.clone(), to.clone()); + upload + .upload_once( + from_file_size as usize, + UploadType::Copy { + endpoint: self.inner.options.endpoint.clone(), + from: from.clone(), + body: Empty::::new(), + }, + ) + .await?; + + Ok(()) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - todo!() + async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) } } diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index 112d0f7..6aabf01 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -27,6 +27,16 @@ pub(crate) struct MultipartUpload { path: Path, } +pub enum UploadType { + Write(B), + Copy { + endpoint: String, + from: Path, + // FIXME: for Empty + body: B, + }, +} + impl MultipartUpload { pub fn new(fs: AmazonS3, path: Path) -> Self { Self { fs, path } @@ -67,22 +77,42 @@ impl MultipartUpload { Self::check_response(response).await } - pub(crate) async fn upload_once(&self, size: usize, body: B) -> Result<(), Error> + pub(crate) async fn upload_once( + &self, + size: usize, + upload_type: UploadType, + ) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { + let (body, copy_from) = match upload_type { + UploadType::Write(body) => (body, None), + UploadType::Copy { + endpoint, + from: file_path, + body, + } => { + let from_url = format!( + "{endpoint}/{}", + utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET) + ); + (body, Some(from_url)) + } + }; let url = format!( "{}/{}", self.fs.as_ref().options.endpoint, utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) ); - let request = Request::builder() + let mut builder = Request::builder() .uri(url) .method(Method::PUT) - .header(CONTENT_LENGTH, size) - .body(body) - .map_err(|e| Error::Other(e.into()))?; + .header(CONTENT_LENGTH, size); + if let Some(from_url) = copy_from { + builder = builder.header("x-amz-copy-source", from_url); + } + let request = builder.body(body).map_err(|e| Error::Other(e.into()))?; let _ = self.send_request(request).await?; Ok(()) diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index 520a18d..f4b24c1 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -2,11 +2,14 @@ use std::{mem, pin::Pin, sync::Arc}; use bytes::{BufMut, BytesMut}; use futures_util::{stream::FuturesOrdered, StreamExt}; -use http_body_util::Full; +use http_body_util::{Empty, Full}; use crate::{ dynamic::MaybeSendFuture, - remotes::{aws::multipart_upload::MultipartUpload, serde::MultipartPart}, + remotes::{ + aws::multipart_upload::{MultipartUpload, UploadType}, + serde::MultipartPart, + }, Error, IoBuf, Write, }; @@ -90,7 +93,7 @@ impl Write for S3Writer { let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze(); self.inner - .upload_once(bytes.len(), Full::new(bytes)) + .upload_once(bytes.len(), UploadType::Write(Full::new(bytes))) .await?; } return Ok(()); diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index cc03c17..9babc6a 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -436,7 +436,6 @@ mod tests { src_fs .copy( &Path::from_absolute_path(&src_file_path)?, - &dst_fs, &Path::from_absolute_path(&dst_file_path)?, ) .await?;