From 01423a8acc5fbea5ee1252a023ef79f9cf48889b Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 08:58:45 +0000 Subject: [PATCH] fixe: `Fs::copy` on S3 --- fusio/src/impls/disk/opfs/fs.rs | 2 +- fusio/src/impls/remotes/aws/fs.rs | 76 +++++++++++++++++-- .../src/impls/remotes/aws/multipart_upload.rs | 32 +++++--- fusio/src/impls/remotes/aws/options.rs | 1 + fusio/src/impls/remotes/aws/s3.rs | 18 +++-- fusio/src/impls/remotes/aws/writer.rs | 6 +- 6 files changed, 110 insertions(+), 25 deletions(-) diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index d7be0ac..a1bc0d3 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -104,7 +104,7 @@ impl Fs for OPFS { Ok(()) } - async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support copy file".to_string(), }) diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index f15575f..1005b9c 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -97,6 +97,7 @@ impl AmazonS3Builder { inner: Arc::new(AmazonS3Inner { options: S3Options { endpoint, + bucket: self.bucket, region: self.region, credential: self.credential, sign_payload: self.sign_payload, @@ -242,15 +243,11 @@ impl Fs for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { - let from_file = S3File::new(self.clone(), from.clone()); - let from_file_size = from_file.size().await?; - let upload = MultipartUpload::new(self.clone(), to.clone()); upload .upload_once( - from_file_size as usize, UploadType::Copy { - endpoint: self.inner.options.endpoint.clone(), + bucket: self.inner.options.bucket.clone(), from: from.clone(), body: Empty::::new(), }, @@ -296,6 +293,9 @@ pub struct ListResponse { #[cfg(test)] mod tests { + use crate::fs::Fs; + use crate::path::Path; + #[cfg(feature = "tokio-http")] #[tokio::test] async fn list_and_remove() { @@ -329,4 +329,70 @@ mod tests { s3.remove(&meta.path).await.unwrap(); } } + + #[ignore] + #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] + #[tokio::test] + async fn copy() { + use std::sync::Arc; + + use crate::{ + remotes::{ + aws::{ + credential::AwsCredential, + fs::{AmazonS3, AmazonS3Inner}, + options::S3Options, + s3::S3File, + }, + http::{tokio::TokioClient, DynHttpClient, HttpClient}, + }, + Read, Write, + }; + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3 = AmazonS3 { + inner: Arc::new(AmazonS3Inner { + options, + client: Box::new(client) as Box, + }), + }; + + let from_path: Path = "read-write.txt".into(); + let to_path: Path = "read-write-copy.txt".into(); + { + let mut s3 = S3File::new(s3.clone(), from_path.clone()); + + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + s3.copy(&from_path, &to_path).await.unwrap(); + let mut s3 = S3File::new(s3, to_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 42); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, b"The answer of life, universe and everthing"); + } } diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index 6aabf01..c3a5bab 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -28,9 +28,12 @@ pub(crate) struct MultipartUpload { } pub enum UploadType { - Write(B), + Write { + size: usize, + body: B, + }, Copy { - endpoint: String, + bucket: String, from: Path, // FIXME: for Empty body: B, @@ -79,25 +82,27 @@ impl MultipartUpload { pub(crate) async fn upload_once( &self, - size: usize, upload_type: UploadType, ) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { - let (body, copy_from) = match upload_type { - UploadType::Write(body) => (body, None), + let (size, body, copy_from) = match upload_type { + UploadType::Write { + size, + body + } => (Some(size), body, None), UploadType::Copy { - endpoint, - from: file_path, + bucket, + from, body, } => { let from_url = format!( - "{endpoint}/{}", - utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET) + "/{bucket}/{}", + utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET) ); - (body, Some(from_url)) + (None, body, Some(from_url)) } }; let url = format!( @@ -107,11 +112,14 @@ impl MultipartUpload { ); let mut builder = Request::builder() .uri(url) - .method(Method::PUT) - .header(CONTENT_LENGTH, size); + .method(Method::PUT); if let Some(from_url) = copy_from { builder = builder.header("x-amz-copy-source", from_url); } + // Tips: When the body is empty or the length is less than CONTENT_LENGTH, it may block + if let Some(size) = size { + builder = builder.header(CONTENT_LENGTH, size) + } let request = builder.body(body).map_err(|e| Error::Other(e.into()))?; let _ = self.send_request(request).await?; diff --git a/fusio/src/impls/remotes/aws/options.rs b/fusio/src/impls/remotes/aws/options.rs index 3e64355..89dd339 100644 --- a/fusio/src/impls/remotes/aws/options.rs +++ b/fusio/src/impls/remotes/aws/options.rs @@ -2,6 +2,7 @@ use super::credential::AwsCredential; pub(crate) struct S3Options { pub(crate) endpoint: String, + pub(crate) bucket: String, pub(crate) region: String, pub(crate) credential: Option, pub(crate) sign_payload: bool, diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index 3c7aa70..eb0f834 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -253,7 +253,8 @@ impl Write for S3File { #[cfg(test)] mod tests { - #[ignore] + use crate::Write; + #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] #[tokio::test] async fn write_and_read_s3_file() { @@ -279,6 +280,7 @@ mod tests { let region = "ap-southeast-1"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id, secret_key, @@ -296,12 +298,16 @@ mod tests { }), }; - let mut s3 = S3File::new(s3, "read-write.txt".into()); + { + let mut s3 = S3File::new(s3.clone(), "read-write.txt".into()); - let (result, _) = s3 - .write_all(&b"The answer of life, universe and everthing"[..]) - .await; - result.unwrap(); + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + let mut s3 = S3File::new(s3, "read-write.txt".into()); let size = s3.size().await.unwrap(); assert_eq!(size, 42); diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index f4b24c1..9d12dc0 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -93,7 +93,10 @@ impl Write for S3Writer { let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze(); self.inner - .upload_once(bytes.len(), UploadType::Write(Full::new(bytes))) + .upload_once(UploadType::Write { + size: bytes.len(), + body: Full::new(bytes), + }) .await?; } return Ok(()); @@ -143,6 +146,7 @@ mod tests { let region = "ap-southeast-2"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id: "user".to_string(), secret_key: "password".to_string(),