Skip to content

Commit

Permalink
feat: Fs adds copy & link, TokioFs & TokioUringFs & `Monoio…
Browse files Browse the repository at this point in the history
…` have been implemented
  • Loading branch information
KKould committed Nov 11, 2024
1 parent 8038993 commit d4856f8
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 16 deletions.
28 changes: 25 additions & 3 deletions fusio-object-store/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::{future::Future, sync::Arc};

use async_stream::stream;
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
Error, MaybeSend,
};
use futures_core::Stream;
use futures_util::stream::StreamExt;
Expand All @@ -27,6 +27,10 @@ impl<O: ObjectStore> From<O> for S3Store<O> {
impl<O: ObjectStore> Fs for S3Store<O> {
type File = S3File<O>;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::S3
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
if !options.truncate {
return Err(Error::Unsupported {
Expand Down Expand Up @@ -64,4 +68,22 @@ impl<O: ObjectStore> Fs for S3Store<O> {

Ok(())
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
}

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
}
}
28 changes: 26 additions & 2 deletions fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::future::Future;

use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
Error, MaybeSend,
};
use futures_core::Stream;
use futures_util::TryStreamExt;
Expand All @@ -25,6 +27,10 @@ impl From<Operator> for OpendalFs {
impl Fs for OpendalFs {
type File = OpendalFile;

fn file_system(&self) -> FileSystemTag {
todo!()
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
OpendalFile::open(self.op.clone(), path.to_string(), options).await
}
Expand Down Expand Up @@ -57,4 +63,22 @@ impl Fs for OpendalFs {
.await
.map_err(parse_opendal_error)
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
}

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
todo!()
}
}
24 changes: 24 additions & 0 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@ pub struct FileMeta {
pub size: u64,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum FileSystemTag {
Local,
OPFS,
// TODO: Remote needs to check whether endpoint and other remote fs are consistent
S3,
}

pub trait Fs: MaybeSend + MaybeSync {
//! This trait is used to abstract file system operations across different file systems.
type File: Read + Write + MaybeSend + 'static;

fn file_system(&self) -> FileSystemTag;

fn open(&self, path: &Path) -> impl Future<Output = Result<Self::File, Error>> {
self.open_options(path, OpenOptions::default())
}
Expand All @@ -39,4 +49,18 @@ pub trait Fs: MaybeSend + MaybeSync {
) -> impl Future<Output = Result<impl Stream<Item = Result<FileMeta, Error>>, Error>> + MaybeSend;

fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}
37 changes: 34 additions & 3 deletions fusio/src/impls/disk/monoio/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fs::create_dir_all;
use std::{fs, fs::create_dir_all};

use async_stream::stream;
use futures_core::Stream;

use super::MonoioFile;
use crate::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
};
Expand All @@ -15,6 +15,10 @@ pub struct MonoIoFs;
impl Fs for MonoIoFs {
type File = MonoioFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

Expand Down Expand Up @@ -54,6 +58,33 @@ impl Fs for MonoIoFs {
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(std::fs::remove_file(path)?)
Ok(fs::remove_file(path)?)
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to).await?;
} else {
todo!()
}

Ok(())
}

async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() != to_fs.file_system() {
return Err(Error::Unsupported {
message: "file system is inconsistent".to_string(),
});
}
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::hard_link(&from, &to).await?;

Ok(())
}
}
32 changes: 30 additions & 2 deletions fusio/src/impls/disk/opfs/fs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use async_stream::stream;
use futures_core::Stream;
use futures_util::StreamExt;
Expand All @@ -15,16 +17,20 @@ use super::OPFSFile;
use crate::{
disk::opfs::{promise, storage},
error::wasm_err,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
Error, MaybeSend,
};

pub struct OPFS;

impl Fs for OPFS {
type File = OPFSFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect();

Expand Down Expand Up @@ -151,4 +157,26 @@ impl OPFS {
}
Ok(parent)
}

fn copy<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
Err(Error::Unsupported {
message: "opfs does not support copy file".to_string(),
})
}

fn link<F: Fs>(
&self,
from: &Path,
to_fs: &F,
to: &Path,
) -> impl Future<Output = Result<(), Error>> + MaybeSend {
Err(Error::Unsupported {
message: "opfs does not support link file".to_string(),
})
}
}
33 changes: 32 additions & 1 deletion fusio/src/impls/disk/tokio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::{
};

use crate::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
};
Expand All @@ -18,6 +18,10 @@ pub struct TokioFs;
impl Fs for TokioFs {
type File = File;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

Expand Down Expand Up @@ -67,4 +71,31 @@ impl Fs for TokioFs {
remove_file(&path).await?;
Ok(())
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

tokio::fs::copy(&from, &to).await?;
} else {
todo!()
}

Ok(())
}

async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() != to_fs.file_system() {
return Err(Error::Unsupported {
message: "file system is inconsistent".to_string(),
});
}
let from = path_to_local(from)?;
let to = path_to_local(to)?;

tokio::fs::hard_link(&from, &to).await?;

Ok(())
}
}
37 changes: 35 additions & 2 deletions fusio/src/impls/disk/tokio_uring/fs.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::{fs, future::Future};

use async_stream::stream;
use futures_core::Stream;
use tokio_uring::fs::{create_dir_all, remove_file};

use crate::{
disk::tokio_uring::TokioUringFile,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
Error, MaybeSend,
};

pub struct TokioUringFs;

impl Fs for TokioUringFs {
type File = TokioUringFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

Expand Down Expand Up @@ -58,4 +64,31 @@ impl Fs for TokioUringFs {

Ok(remove_file(path).await?)
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() == to_fs.file_system() {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::copy(&from, &to).await?;
} else {
todo!()
}

Ok(())
}

async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
if self.file_system() != to_fs.file_system() {
return Err(Error::Unsupported {
message: "file system is inconsistent".to_string(),
});
}
let from = path_to_local(from)?;
let to = path_to_local(to)?;

fs::hard_link(&from, &to).await?;

Ok(())
}
}
14 changes: 13 additions & 1 deletion fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use url::Url;

use super::{credential::AwsCredential, options::S3Options, S3Error, S3File};
use crate::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
remotes::{
aws::sign::Sign,
Expand Down Expand Up @@ -124,6 +124,10 @@ pub(super) struct AmazonS3Inner {
impl Fs for AmazonS3 {
type File = S3File;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::S3
}

async fn open_options(&self, path: &Path, _: OpenOptions) -> Result<Self::File, crate::Error> {
Ok(S3File::new(self.clone(), path.clone()))
}
Expand Down Expand Up @@ -233,6 +237,14 @@ impl Fs for AmazonS3 {

Ok(())
}

async fn copy<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
todo!()
}

async fn link<F: Fs>(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> {
todo!()
}
}

#[derive(Debug, Deserialize)]
Expand Down
Loading

0 comments on commit d4856f8

Please sign in to comment.