Skip to content

Commit

Permalink
test: diff filesystem copy
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 12, 2024
1 parent 01423a8 commit a47cac2
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 31 deletions.
90 changes: 85 additions & 5 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,98 @@ where
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = vec![0u8; buf_size];
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await;
let tmp = buf.take().unwrap();
let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await;
result?;
read_pos += buf.len() as u64;
read_pos += tmp.len() as u64;

let (result, _) = to_file.write_all(buf.as_slice()).await;
let (result, tmp) = to_file.write_all(tmp).await;
result?;
buf.resize(buf_size, 0);
buf = Some(tmp);
}
to_file.close().await?;

Ok(())
}

#[cfg(test)]
mod tests {

#[ignore]
#[cfg(all(
feature = "tokio-http",
feature = "tokio",
feature = "aws",
not(feature = "completion-based")
))]
#[tokio::test]
async fn test_diff_fs_copy() -> Result<(), crate::Error> {
use std::sync::Arc;

use tempfile::TempDir;

use crate::{
fs,
fs::{Fs, OpenOptions},
impls::disk::tokio::fs::TokioFs,
path::Path,
remotes::{
aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File},
http::{tokio::TokioClient, DynHttpClient, HttpClient},
},
Read, Write,
};

let tmp_dir = TempDir::new()?;
let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?;
let s3_path: Path = "s3_copy_test.file".into();

let key_id = "user".to_string();
let secret_key = "password".to_string();

let client = TokioClient::new();
let region = "ap-southeast-1";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
token: None,
}),
region: region.into(),
sign_payload: true,
checksum: false,
};

let s3_fs = AmazonS3::new(Box::new(client), options);
let local_fs = TokioFs;

{
let mut local_file = local_fs
.open_options(&local_path, OpenOptions::default().create(true).write(true))
.await?;
local_file
.write_all("🎵never gonna give you up🎵".as_bytes())
.await
.0?;
local_file.close().await.unwrap();
}
fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;

let mut s3 = S3File::new(s3_fs, s3_path.clone());

let size = s3.size().await.unwrap();
assert_eq!(size, 31);
let buf = Vec::new();
let (result, buf) = s3.read_to_end_at(buf, 0).await;
result.unwrap();
assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes());

Ok(())
}
}
23 changes: 14 additions & 9 deletions fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ pub(super) struct AmazonS3Inner {
pub(super) client: Box<dyn DynHttpClient>,
}

impl AmazonS3 {
pub fn new(client: Box<dyn DynHttpClient>, options: S3Options) -> Self {
AmazonS3 {
inner: Arc::new(AmazonS3Inner { options, client }),
}
}
}

impl Fs for AmazonS3 {
type File = S3File;

Expand Down Expand Up @@ -245,13 +253,11 @@ impl Fs for AmazonS3 {
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let upload = MultipartUpload::new(self.clone(), to.clone());
upload
.upload_once(
UploadType::Copy {
bucket: self.inner.options.bucket.clone(),
from: from.clone(),
body: Empty::<Bytes>::new(),
},
)
.upload_once(UploadType::Copy {
bucket: self.inner.options.bucket.clone(),
from: from.clone(),
body: Empty::<Bytes>::new(),
})
.await?;

Ok(())
Expand Down Expand Up @@ -293,8 +299,7 @@ pub struct ListResponse {

#[cfg(test)]
mod tests {
use crate::fs::Fs;
use crate::path::Path;
use crate::{fs::Fs, path::Path};

#[cfg(feature = "tokio-http")]
#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion fusio/src/impls/remotes/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod error;
pub mod fs;
pub(crate) mod multipart_upload;
pub(crate) mod options;
mod s3;
pub(crate) mod s3;
pub(crate) mod sign;
pub(crate) mod writer;

Expand Down
20 changes: 4 additions & 16 deletions fusio/src/impls/remotes/aws/multipart_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,14 @@ impl MultipartUpload {
Self::check_response(response).await
}

pub(crate) async fn upload_once<B>(
&self,
upload_type: UploadType<B>,
) -> Result<(), Error>
pub(crate) async fn upload_once<B>(&self, upload_type: UploadType<B>) -> Result<(), Error>
where
B: Body<Data = Bytes> + Clone + Unpin + Send + Sync + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
let (size, body, copy_from) = match upload_type {
UploadType::Write {
size,
body
} => (Some(size), body, None),
UploadType::Copy {
bucket,
from,
body,
} => {
UploadType::Write { size, body } => (Some(size), body, None),
UploadType::Copy { bucket, from, body } => {
let from_url = format!(
"/{bucket}/{}",
utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET)
Expand All @@ -110,9 +100,7 @@ impl MultipartUpload {
self.fs.as_ref().options.endpoint,
utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET)
);
let mut builder = Request::builder()
.uri(url)
.method(Method::PUT);
let mut builder = Request::builder().uri(url).method(Method::PUT);
if let Some(from_url) = copy_from {
builder = builder.header("x-amz-copy-source", from_url);
}
Expand Down

0 comments on commit a47cac2

Please sign in to comment.