Skip to content

Commit

Permalink
feat: Fs adds copy & link (#100)
Browse files Browse the repository at this point in the history
* feat: `Fs` adds `copy` & `link`, `TokioFs` & `TokioUringFs` & `Monoio` have been implemented

* feat: impl `copy` for S3 & add `fs::copy` for different filesystems

* fixe: `Fs::copy` on S3

* test: diff filesystem copy

* fix: ci fail on opendal

* test: add fs test for monoio & tokio uring

* test: add fs test for monoio & tokio uring

* chore: add `copy` & `link` for `DynFs`

* chore: remove `Fs::link` arg `to_fs`

* fix: fix the semantics of append writing when TokioFs::open_options (write from scratch after reopening)

* fix: MonoioFS execute blocking task without thread pool attached

* fix: test_local_fs_copy_link ci fail

* chore: move copy to dynamic

* clippy

---------

Co-authored-by: Gwo Tzu-Hsing <[email protected]>
  • Loading branch information
KKould and ethe authored Nov 19, 2024
1 parent d7abd2e commit 482b392
Show file tree
Hide file tree
Showing 16 changed files with 645 additions and 38 deletions.
24 changes: 23 additions & 1 deletion fusio-object-store/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_stream::stream;
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
};
Expand All @@ -27,6 +27,10 @@ impl<O: ObjectStore> From<O> for S3Store<O> {
impl<O: ObjectStore> Fs for S3Store<O> {
type File = S3File<O>;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::S3
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
if !options.truncate {
return Err(Error::Unsupported {
Expand Down Expand Up @@ -64,4 +68,22 @@ impl<O: ObjectStore> Fs for S3Store<O> {

Ok(())
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = from.clone().into();
let to = to.clone().into();

self.inner
.copy(&from, &to)
.await
.map_err(BoxedError::from)?;

Ok(())
}

async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "s3 does not support link file".to_string(),
})
}
}
19 changes: 18 additions & 1 deletion fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
};
Expand All @@ -25,6 +25,10 @@ impl From<Operator> for OpendalFs {
impl Fs for OpendalFs {
type File = OpendalFile;

fn file_system(&self) -> FileSystemTag {
todo!()
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
OpendalFile::open(self.op.clone(), path.to_string(), options).await
}
Expand Down Expand Up @@ -57,4 +61,17 @@ impl Fs for OpendalFs {
.await
.map_err(parse_opendal_error)
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
self.op
.copy(from.as_ref(), to.as_ref())
.await
.map_err(parse_opendal_error)
}

async fn link(&self, _from: &Path, _to: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "opendal does not support link file".to_string(),
})
}
}
2 changes: 1 addition & 1 deletion fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [
"http2",
] }
itertools = { version = "0.13" }
monoio = { version = "0.2", optional = true }
monoio = { version = "0.2", optional = true, features = ["sync"] }
object_store = { version = "0.11", optional = true, features = ["aws"] }
percent-encoding = { version = "2", default-features = false }
quick-xml = { version = "0.36", features = [
Expand Down
75 changes: 73 additions & 2 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::pin::Pin;
use std::{cmp, pin::Pin, sync::Arc};

use futures_core::Stream;

use super::MaybeSendFuture;
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write,
};
Expand Down Expand Up @@ -48,6 +48,8 @@ impl<'write> Write for Box<dyn DynFile + 'write> {
}

pub trait DynFs: MaybeSend + MaybeSync {
fn file_system(&self) -> FileSystemTag;

fn open<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -84,9 +86,25 @@ pub trait DynFs: MaybeSend + MaybeSync {
&'s self,
path: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

fn copy<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

fn link<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;
}

impl<F: Fs> DynFs for F {
fn file_system(&self) -> FileSystemTag {
Fs::file_system(self)
}

fn open_options<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -130,6 +148,59 @@ impl<F: Fs> DynFs for F {
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::remove(self, path))
}

fn copy<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::copy(self, from, to))
}

fn link<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::link(self, from, to))
}
}

pub async fn copy(
from_fs: &Arc<dyn DynFs>,
from: &Path,
to_fs: &Arc<dyn DynFs>,
to: &Path,
) -> Result<(), Error> {
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = DynRead::size(&from_file).await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await;
result?;
read_pos += tmp.bytes_init() as u64;

let (result, tmp) = Write::write_all(&mut to_file, tmp).await;
result?;
buf = Some(tmp);
}
DynWrite::close(&mut to_file).await?;

Ok(())
}

#[cfg(test)]
Expand Down
97 changes: 97 additions & 0 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@ pub struct FileMeta {
pub size: u64,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum FileSystemTag {
Local,
OPFS,
// TODO: Remote needs to check whether endpoint and other remote fs are consistent
S3,
}

pub trait Fs: MaybeSend + MaybeSync {
//! This trait is used to abstract file system operations across different file systems.
type File: Read + Write + MaybeSend + 'static;

fn file_system(&self) -> FileSystemTag;

fn open(&self, path: &Path) -> impl Future<Output = Result<Self::File, Error>> {
self.open_options(path, OpenOptions::default())
}
Expand All @@ -39,4 +49,91 @@ pub trait Fs: MaybeSend + MaybeSync {
) -> impl Future<Output = Result<impl Stream<Item = Result<FileMeta, Error>>, Error>> + MaybeSend;

fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn link(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}

#[cfg(test)]
mod tests {
#[ignore]
#[cfg(all(
feature = "tokio-http",
feature = "tokio",
feature = "aws",
not(feature = "completion-based")
))]
#[tokio::test]
async fn test_diff_fs_copy() -> Result<(), crate::Error> {
use std::sync::Arc;

use tempfile::TempDir;

use crate::{
fs::{Fs, OpenOptions},
impls::disk::tokio::fs::TokioFs,
path::Path,
remotes::{
aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File},
http::tokio::TokioClient,
},
DynFs, Read, Write,
};

let tmp_dir = TempDir::new()?;
let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?;
let s3_path: Path = "s3_copy_test.file".into();

let key_id = "user".to_string();
let secret_key = "password".to_string();

let client = TokioClient::new();
let region = "ap-southeast-1";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
token: None,
}),
region: region.into(),
sign_payload: true,
checksum: false,
};

let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options));
let local_fs = Arc::new(TokioFs);

{
let mut local_file = Fs::open_options(
local_fs.as_ref(),
&local_path,
OpenOptions::default().create(true).write(true),
)
.await?;
local_file
.write_all("🎵never gonna give you up🎵".as_bytes())
.await
.0?;
local_file.close().await.unwrap();
}
{
let s3_fs = s3_fs.clone() as Arc<dyn DynFs>;
let local_fs = local_fs.clone() as Arc<dyn DynFs>;
crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;
}

let mut s3 = S3File::new(Arc::into_inner(s3_fs).unwrap(), s3_path.clone());

let size = s3.size().await.unwrap();
assert_eq!(size, 31);
let buf = Vec::new();
let (result, buf) = s3.read_to_end_at(buf, 0).await;
result.unwrap();
assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes());

Ok(())
}
}
28 changes: 25 additions & 3 deletions fusio/src/impls/disk/monoio/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fs::create_dir_all;
use std::{fs, fs::create_dir_all};

use async_stream::stream;
use futures_core::Stream;

use super::MonoioFile;
use crate::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
};
Expand All @@ -15,6 +15,10 @@ pub struct MonoIoFs;
impl Fs for MonoIoFs {
type File = MonoioFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

Expand Down Expand Up @@ -54,6 +58,24 @@ impl Fs for MonoIoFs {
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(std::fs::remove_file(path)?)
Ok(fs::remove_file(path)?)
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

monoio::spawn(async move { fs::copy(&from, &to) }).await?;

Ok(())
}

async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

monoio::spawn(async move { fs::hard_link(&from, &to) }).await?;

Ok(())
}
}
Loading

0 comments on commit 482b392

Please sign in to comment.