Skip to content

Commit

Permalink
feat: impl copy for S3 & add fs::copy for different filesystems
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 12, 2024
1 parent 6a34e6a commit 38d70aa
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 83 deletions.
28 changes: 14 additions & 14 deletions fusio-object-store/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ impl<O: ObjectStore> Fs for S3Store<O> {
Ok(())
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = from.clone().into();
let to = to.clone().into();

self.inner
.copy(&from, &to)
.await
.map_err(BoxedError::from)?;

Ok(())
}

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
async fn link<F: Fs>(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "s3 does not support link file".to_string(),
})
}
}
9 changes: 2 additions & 7 deletions fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,8 @@ impl Fs for OpendalFs {
.map_err(parse_opendal_error)
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend {
self.op.copy(from.as_ref(), to.as_ref())
}

fn link<F: Fs>(
Expand Down
45 changes: 37 additions & 8 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
mod options;

use std::future::Future;
use std::{cmp, future::Future};

use futures_core::Stream;
pub use options::*;

use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Write};
use crate::{path::Path, Error, IoBufMut, MaybeSend, MaybeSync, Read, Write};

#[derive(Debug)]
pub struct FileMeta {
Expand Down Expand Up @@ -50,12 +50,7 @@ pub trait Fs: MaybeSend + MaybeSync {

fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend;
fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn link<F: Fs>(
&self,
Expand All @@ -64,3 +59,37 @@ pub trait Fs: MaybeSend + MaybeSync {
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}

pub async fn copy<F, T>(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error>
where
F: Fs,
T: Fs,
{
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = from_file.size().await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = vec![0u8; buf_size];
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await;
result?;
read_pos += buf.len() as u64;

let (result, _) = to_file.write_all(buf.as_slice()).await;
result?;
buf.resize(buf_size, 0);
}

Ok(())
}
14 changes: 5 additions & 9 deletions fusio/src/impls/disk/monoio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,11 @@ impl Fs for MonoIoFs {
Ok(fs::remove_file(path)?)
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to)?;
} else {
todo!()
}
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to)?;

Ok(())
}
Expand Down
14 changes: 2 additions & 12 deletions fusio/src/impls/disk/opfs/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,13 @@ impl Fs for OPFS {
Ok(())
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
async fn copy<F: Fs>(&self, from: &Path, to: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "opfs does not support copy file".to_string(),
})
}

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "opfs does not support link file".to_string(),
})
Expand Down
14 changes: 5 additions & 9 deletions fusio/src/impls/disk/tokio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,11 @@ impl Fs for TokioFs {
Ok(())
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

tokio::fs::copy(&from, &to).await?;
} else {
todo!()
}
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

tokio::fs::copy(&from, &to).await?;

Ok(())
}
Expand Down
14 changes: 5 additions & 9 deletions fusio/src/impls/disk/tokio_uring/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,11 @@ impl Fs for TokioUringFs {
Ok(remove_file(path).await?)
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to)?;
} else {
todo!()
}
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to)?;

Ok(())
}
Expand Down
32 changes: 26 additions & 6 deletions fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use crate::{
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
remotes::{
aws::sign::Sign,
aws::{
multipart_upload::{MultipartUpload, UploadType},
sign::Sign,
},
http::{DynHttpClient, HttpClient, HttpError},
},
Error,
Error, Read,
};

pub struct AmazonS3Builder {
Expand Down Expand Up @@ -238,12 +241,29 @@ impl Fs for AmazonS3 {
Ok(())
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
todo!()
async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from_file = S3File::new(self.clone(), from.clone());
let from_file_size = from_file.size().await?;

let upload = MultipartUpload::new(self.clone(), to.clone());
upload
.upload_once(
from_file_size as usize,
UploadType::Copy {
endpoint: self.inner.options.endpoint.clone(),
from: from.clone(),
body: Empty::<Bytes>::new(),
},
)
.await?;

Ok(())
}

async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
todo!()
async fn link<F: Fs>(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "s3 does not support link file".to_string(),
})
}
}

Expand Down
40 changes: 35 additions & 5 deletions fusio/src/impls/remotes/aws/multipart_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ pub(crate) struct MultipartUpload {
path: Path,
}

pub enum UploadType<B> {
Write(B),
Copy {
endpoint: String,
from: Path,
// FIXME: for Empty
body: B,
},
}

impl MultipartUpload {
pub fn new(fs: AmazonS3, path: Path) -> Self {
Self { fs, path }
Expand Down Expand Up @@ -67,22 +77,42 @@ impl MultipartUpload {
Self::check_response(response).await
}

pub(crate) async fn upload_once<B>(&self, size: usize, body: B) -> Result<(), Error>
pub(crate) async fn upload_once<B>(
&self,
size: usize,
upload_type: UploadType<B>,
) -> Result<(), Error>
where
B: Body<Data = Bytes> + Clone + Unpin + Send + Sync + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
let (body, copy_from) = match upload_type {
UploadType::Write(body) => (body, None),
UploadType::Copy {
endpoint,
from: file_path,
body,
} => {
let from_url = format!(
"{endpoint}/{}",
utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET)
);
(body, Some(from_url))
}
};
let url = format!(
"{}/{}",
self.fs.as_ref().options.endpoint,
utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET)
);
let request = Request::builder()
let mut builder = Request::builder()
.uri(url)
.method(Method::PUT)
.header(CONTENT_LENGTH, size)
.body(body)
.map_err(|e| Error::Other(e.into()))?;
.header(CONTENT_LENGTH, size);
if let Some(from_url) = copy_from {
builder = builder.header("x-amz-copy-source", from_url);
}
let request = builder.body(body).map_err(|e| Error::Other(e.into()))?;
let _ = self.send_request(request).await?;

Ok(())
Expand Down
9 changes: 6 additions & 3 deletions fusio/src/impls/remotes/aws/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use std::{mem, pin::Pin, sync::Arc};

use bytes::{BufMut, BytesMut};
use futures_util::{stream::FuturesOrdered, StreamExt};
use http_body_util::Full;
use http_body_util::{Empty, Full};

use crate::{
dynamic::MaybeSendFuture,
remotes::{aws::multipart_upload::MultipartUpload, serde::MultipartPart},
remotes::{
aws::multipart_upload::{MultipartUpload, UploadType},
serde::MultipartPart,
},
Error, IoBuf, Write,
};

Expand Down Expand Up @@ -90,7 +93,7 @@ impl Write for S3Writer {
let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze();

self.inner
.upload_once(bytes.len(), Full::new(bytes))
.upload_once(bytes.len(), UploadType::Write(Full::new(bytes)))
.await?;
}
return Ok(());
Expand Down
1 change: 0 additions & 1 deletion fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ mod tests {
src_fs
.copy(
&Path::from_absolute_path(&src_file_path)?,
&dst_fs,
&Path::from_absolute_path(&dst_file_path)?,
)
.await?;
Expand Down

0 comments on commit 38d70aa

Please sign in to comment.