Skip to content

Commit

Permalink
fixe: Fs::copy on S3
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 12, 2024
1 parent 38d70aa commit 01423a8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 25 deletions.
2 changes: 1 addition & 1 deletion fusio/src/impls/disk/opfs/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Fs for OPFS {
Ok(())
}

async fn copy<F: Fs>(&self, from: &Path, to: &Path) -> Result<(), Error> {
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "opfs does not support copy file".to_string(),
})
Expand Down
76 changes: 71 additions & 5 deletions fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl AmazonS3Builder {
inner: Arc::new(AmazonS3Inner {
options: S3Options {
endpoint,
bucket: self.bucket,
region: self.region,
credential: self.credential,
sign_payload: self.sign_payload,
Expand Down Expand Up @@ -242,15 +243,11 @@ impl Fs for AmazonS3 {
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from_file = S3File::new(self.clone(), from.clone());
let from_file_size = from_file.size().await?;

let upload = MultipartUpload::new(self.clone(), to.clone());
upload
.upload_once(
from_file_size as usize,
UploadType::Copy {
endpoint: self.inner.options.endpoint.clone(),
bucket: self.inner.options.bucket.clone(),
from: from.clone(),
body: Empty::<Bytes>::new(),
},
Expand Down Expand Up @@ -296,6 +293,9 @@ pub struct ListResponse {

#[cfg(test)]
mod tests {
use crate::fs::Fs;
use crate::path::Path;

#[cfg(feature = "tokio-http")]
#[tokio::test]
async fn list_and_remove() {
Expand Down Expand Up @@ -329,4 +329,70 @@ mod tests {
s3.remove(&meta.path).await.unwrap();
}
}

#[ignore]
#[cfg(all(feature = "tokio-http", not(feature = "completion-based")))]
#[tokio::test]
async fn copy() {
use std::sync::Arc;

use crate::{
remotes::{
aws::{
credential::AwsCredential,
fs::{AmazonS3, AmazonS3Inner},
options::S3Options,
s3::S3File,
},
http::{tokio::TokioClient, DynHttpClient, HttpClient},
},
Read, Write,
};

let key_id = "user".to_string();
let secret_key = "password".to_string();

let client = TokioClient::new();
let region = "ap-southeast-1";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
token: None,
}),
region: region.into(),
sign_payload: true,
checksum: false,
};

let s3 = AmazonS3 {
inner: Arc::new(AmazonS3Inner {
options,
client: Box::new(client) as Box<dyn DynHttpClient>,
}),
};

let from_path: Path = "read-write.txt".into();
let to_path: Path = "read-write-copy.txt".into();
{
let mut s3 = S3File::new(s3.clone(), from_path.clone());

let (result, _) = s3
.write_all(&b"The answer of life, universe and everthing"[..])
.await;
result.unwrap();
s3.close().await.unwrap();
}
s3.copy(&from_path, &to_path).await.unwrap();
let mut s3 = S3File::new(s3, to_path.clone());

let size = s3.size().await.unwrap();
assert_eq!(size, 42);
let buf = Vec::new();
let (result, buf) = s3.read_to_end_at(buf, 0).await;
result.unwrap();
assert_eq!(buf, b"The answer of life, universe and everthing");
}
}
32 changes: 20 additions & 12 deletions fusio/src/impls/remotes/aws/multipart_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ pub(crate) struct MultipartUpload {
}

pub enum UploadType<B> {
Write(B),
Write {
size: usize,
body: B,
},
Copy {
endpoint: String,
bucket: String,
from: Path,
// FIXME: for Empty
body: B,
Expand Down Expand Up @@ -79,25 +82,27 @@ impl MultipartUpload {

pub(crate) async fn upload_once<B>(
&self,
size: usize,
upload_type: UploadType<B>,
) -> Result<(), Error>
where
B: Body<Data = Bytes> + Clone + Unpin + Send + Sync + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
let (body, copy_from) = match upload_type {
UploadType::Write(body) => (body, None),
let (size, body, copy_from) = match upload_type {
UploadType::Write {
size,
body
} => (Some(size), body, None),
UploadType::Copy {
endpoint,
from: file_path,
bucket,
from,
body,
} => {
let from_url = format!(
"{endpoint}/{}",
utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET)
"/{bucket}/{}",
utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET)
);
(body, Some(from_url))
(None, body, Some(from_url))
}
};
let url = format!(
Expand All @@ -107,11 +112,14 @@ impl MultipartUpload {
);
let mut builder = Request::builder()
.uri(url)
.method(Method::PUT)
.header(CONTENT_LENGTH, size);
.method(Method::PUT);
if let Some(from_url) = copy_from {
builder = builder.header("x-amz-copy-source", from_url);
}
// Tips: When the body is empty or the length is less than CONTENT_LENGTH, it may block
if let Some(size) = size {
builder = builder.header(CONTENT_LENGTH, size)
}
let request = builder.body(body).map_err(|e| Error::Other(e.into()))?;
let _ = self.send_request(request).await?;

Expand Down
1 change: 1 addition & 0 deletions fusio/src/impls/remotes/aws/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::credential::AwsCredential;

pub(crate) struct S3Options {
pub(crate) endpoint: String,
pub(crate) bucket: String,
pub(crate) region: String,
pub(crate) credential: Option<AwsCredential>,
pub(crate) sign_payload: bool,
Expand Down
18 changes: 12 additions & 6 deletions fusio/src/impls/remotes/aws/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ impl Write for S3File {

#[cfg(test)]
mod tests {
#[ignore]
use crate::Write;

#[cfg(all(feature = "tokio-http", not(feature = "completion-based")))]
#[tokio::test]
async fn write_and_read_s3_file() {
Expand All @@ -279,6 +280,7 @@ mod tests {
let region = "ap-southeast-1";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
Expand All @@ -296,12 +298,16 @@ mod tests {
}),
};

let mut s3 = S3File::new(s3, "read-write.txt".into());
{
let mut s3 = S3File::new(s3.clone(), "read-write.txt".into());

let (result, _) = s3
.write_all(&b"The answer of life, universe and everthing"[..])
.await;
result.unwrap();
let (result, _) = s3
.write_all(&b"The answer of life, universe and everthing"[..])
.await;
result.unwrap();
s3.close().await.unwrap();
}
let mut s3 = S3File::new(s3, "read-write.txt".into());

let size = s3.size().await.unwrap();
assert_eq!(size, 42);
Expand Down
6 changes: 5 additions & 1 deletion fusio/src/impls/remotes/aws/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl Write for S3Writer {
let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze();

self.inner
.upload_once(bytes.len(), UploadType::Write(Full::new(bytes)))
.upload_once(UploadType::Write {
size: bytes.len(),
body: Full::new(bytes),
})
.await?;
}
return Ok(());
Expand Down Expand Up @@ -143,6 +146,7 @@ mod tests {
let region = "ap-southeast-2";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id: "user".to_string(),
secret_key: "password".to_string(),
Expand Down

0 comments on commit 01423a8

Please sign in to comment.