diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 34d54ae..74bfcbe 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -78,18 +78,98 @@ where .open_options(to, OpenOptions::default().create(true).write(true)) .await?; let buf_size = cmp::min(from_file_size, 4 * 1024); - let mut buf = vec![0u8; buf_size]; + let mut buf = Some(vec![0u8; buf_size]); let mut read_pos = 0u64; while (read_pos as usize) < from_file_size - 1 { - let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await; + let tmp = buf.take().unwrap(); + let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await; result?; - read_pos += buf.len() as u64; + read_pos += tmp.len() as u64; - let (result, _) = to_file.write_all(buf.as_slice()).await; + let (result, tmp) = to_file.write_all(tmp).await; result?; - buf.resize(buf_size, 0); + buf = Some(tmp); } + to_file.close().await?; Ok(()) } + +#[cfg(test)] +mod tests { + + #[ignore] + #[cfg(all( + feature = "tokio-http", + feature = "tokio", + feature = "aws", + not(feature = "completion-based") + ))] + #[tokio::test] + async fn test_diff_fs_copy() -> Result<(), crate::Error> { + use std::sync::Arc; + + use tempfile::TempDir; + + use crate::{ + fs, + fs::{Fs, OpenOptions}, + impls::disk::tokio::fs::TokioFs, + path::Path, + remotes::{ + aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File}, + http::{tokio::TokioClient, DynHttpClient, HttpClient}, + }, + Read, Write, + }; + + let tmp_dir = TempDir::new()?; + let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?; + let s3_path: Path = "s3_copy_test.file".into(); + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3_fs = AmazonS3::new(Box::new(client), options); + let local_fs = TokioFs; + + { + let mut local_file = local_fs + .open_options(&local_path, OpenOptions::default().create(true).write(true)) + .await?; + local_file + .write_all("🎵never gonna give you up🎵".as_bytes()) + .await + .0?; + local_file.close().await.unwrap(); + } + fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?; + + let mut s3 = S3File::new(s3_fs, s3_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 31); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes()); + + Ok(()) + } +} diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 1005b9c..1324948 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -125,6 +125,14 @@ pub(super) struct AmazonS3Inner { pub(super) client: Box, } +impl AmazonS3 { + pub fn new(client: Box, options: S3Options) -> Self { + AmazonS3 { + inner: Arc::new(AmazonS3Inner { options, client }), + } + } +} + impl Fs for AmazonS3 { type File = S3File; @@ -245,13 +253,11 @@ impl Fs for AmazonS3 { async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { let upload = MultipartUpload::new(self.clone(), to.clone()); upload - .upload_once( - UploadType::Copy { - bucket: self.inner.options.bucket.clone(), - from: from.clone(), - body: Empty::::new(), - }, - ) + .upload_once(UploadType::Copy { + bucket: self.inner.options.bucket.clone(), + from: from.clone(), + body: Empty::::new(), + }) .await?; Ok(()) @@ -293,8 +299,7 @@ pub struct ListResponse { #[cfg(test)] mod tests { - use crate::fs::Fs; - use crate::path::Path; + use crate::{fs::Fs, path::Path}; #[cfg(feature = "tokio-http")] #[tokio::test] diff --git a/fusio/src/impls/remotes/aws/mod.rs b/fusio/src/impls/remotes/aws/mod.rs index a3b842c..3215d38 100644 --- a/fusio/src/impls/remotes/aws/mod.rs +++ b/fusio/src/impls/remotes/aws/mod.rs @@ -4,7 +4,7 @@ mod error; pub mod fs; pub(crate) mod multipart_upload; pub(crate) mod options; -mod s3; +pub(crate) mod s3; pub(crate) mod sign; pub(crate) mod writer; diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index c3a5bab..0b1b7eb 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -80,24 +80,14 @@ impl MultipartUpload { Self::check_response(response).await } - pub(crate) async fn upload_once( - &self, - upload_type: UploadType, - ) -> Result<(), Error> + pub(crate) async fn upload_once(&self, upload_type: UploadType) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { let (size, body, copy_from) = match upload_type { - UploadType::Write { - size, - body - } => (Some(size), body, None), - UploadType::Copy { - bucket, - from, - body, - } => { + UploadType::Write { size, body } => (Some(size), body, None), + UploadType::Copy { bucket, from, body } => { let from_url = format!( "/{bucket}/{}", utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET) @@ -110,9 +100,7 @@ impl MultipartUpload { self.fs.as_ref().options.endpoint, utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) ); - let mut builder = Request::builder() - .uri(url) - .method(Method::PUT); + let mut builder = Request::builder().uri(url).method(Method::PUT); if let Some(from_url) = copy_from { builder = builder.header("x-amz-copy-source", from_url); }