diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7ab1621..6171af5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,6 +59,12 @@ jobs: command: build args: --package fusio-object-store + - name: Run cargo build on fusio-opendal + uses: actions-rs/cargo@v1 + with: + command: build + args: --package fusio-opendal + - name: Run cargo build on fusio-parquet uses: actions-rs/cargo@v1 with: diff --git a/Cargo.toml b/Cargo.toml index 4e99579..a1f5294 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "fusio", "fusio-dispatch", "fusio-object-store", + "fusio-opendal", "fusio-parquet", ] resolver = "2" diff --git a/fusio-opendal/Cargo.toml b/fusio-opendal/Cargo.toml new file mode 100644 index 0000000..cbf2ea7 --- /dev/null +++ b/fusio-opendal/Cargo.toml @@ -0,0 +1,13 @@ +[package] +description = "the Apache OpenDAL integration of Fusio." +edition.workspace = true +license.workspace = true +name = "fusio-opendal" +repository.workspace = true +version = "0.1.0" + +[dependencies] +fusio = { version = "0.3.0", path = "../fusio", features = [ "bytes" ] } +futures-core = { version = "0.3" } +futures-util = { version = "0.3" } +opendal = { version ="0.50.1", default-features = false } diff --git a/fusio-opendal/README.md b/fusio-opendal/README.md new file mode 100644 index 0000000..51a4de8 --- /dev/null +++ b/fusio-opendal/README.md @@ -0,0 +1,3 @@ +# fusio-opendal + +`fusion-opendal` is the Apache OpenDAL integration of Fusio. diff --git a/fusio-opendal/src/file.rs b/fusio-opendal/src/file.rs new file mode 100644 index 0000000..99e3042 --- /dev/null +++ b/fusio-opendal/src/file.rs @@ -0,0 +1,151 @@ +use fusio::{fs::OpenOptions, Error, IoBuf, IoBufMut, Read, Write}; +use opendal::{Operator, Reader, Writer}; + +use crate::utils::parse_opendal_error; + +/// OpendalFile is the file created by OpendalFs. +pub struct OpendalFile { + op: Operator, + path: String, + + state: FileState, +} + +enum FileState { + Read(Reader), + Write(Writer), +} + +impl OpendalFile { + /// Open a new file with given options. + /// + /// We mark this function as `pub(crate)` to make sure users can only create it from OpendalFs. + pub(crate) async fn open( + op: Operator, + path: String, + options: OpenOptions, + ) -> Result { + // open as read + if options.read && !options.write && !options.create && !options.truncate { + let r = op + .reader(path.as_ref()) + .await + .map_err(parse_opendal_error)?; + + return Ok(Self { + op, + path, + state: FileState::Read(r), + }); + } + + // open as truncate write + // + // TODO: we only support `create && truncate` for now, maybe we can check if the file exists + // before writing. + if !options.read && options.write && options.create && options.truncate { + let w = op + .writer(path.as_ref()) + .await + .map_err(parse_opendal_error)?; + return Ok(Self { + op, + path, + state: FileState::Write(w), + }); + } + + // FIXME: `truncate` has different semantics with `append`, we should re-think it. + if !options.read && options.write && !options.create && !options.truncate { + let w = op + .writer_with(path.as_ref()) + .append(true) + .await + .map_err(parse_opendal_error)?; + return Ok(Self { + op, + path, + state: FileState::Write(w), + }); + } + + Err(Error::Unsupported { + message: format!("open options {:?} is not supported", options), + }) + } +} + +impl Read for OpendalFile { + async fn read_exact_at(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + let FileState::Read(r) = &mut self.state else { + return ( + Err(Error::Other("file is not open as read mode".into())), + buf, + ); + }; + let size = buf.as_slice_mut().len() as u64; + let res = r + .read_into(&mut buf.as_slice_mut(), pos..pos + size) + .await + .map(|_| ()) + .map_err(parse_opendal_error); + + (res, buf) + } + + async fn read_to_end_at(&mut self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + let FileState::Read(r) = &mut self.state else { + return ( + Err(Error::Other("file is not open as read mode".into())), + buf, + ); + }; + let res = r + .read_into(&mut buf, pos..) + .await + .map(|_| ()) + .map_err(parse_opendal_error); + + (res, buf) + } + + async fn size(&self) -> Result { + let meta = self + .op + .stat(&self.path) + .await + .map_err(parse_opendal_error)?; + Ok(meta.content_length()) + } +} + +impl Write for OpendalFile { + /// TODO: opendal has native buffer support, maybe we can tune it while open writer. + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + let FileState::Write(w) = &mut self.state else { + return ( + Err(Error::Other("file is not open as write mode".into())), + buf, + ); + }; + + let res = w + .write(buf.as_bytes()) + .await + .map(|_| ()) + .map_err(parse_opendal_error); + (res, buf) + } + + /// flush is no-op on opendal. + async fn flush(&mut self) -> Result<(), Error> { + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + let FileState::Write(w) = &mut self.state else { + return Err(Error::Other("file is not open as write mode".into())); + }; + w.close().await.map_err(parse_opendal_error) + } +} diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs new file mode 100644 index 0000000..c03af9e --- /dev/null +++ b/fusio-opendal/src/fs.rs @@ -0,0 +1,60 @@ +use fusio::{ + fs::{FileMeta, Fs, OpenOptions}, + path::Path, + Error, +}; +use futures_core::Stream; +use futures_util::TryStreamExt; +use opendal::{Metakey, Operator}; + +use crate::{utils::parse_opendal_error, OpendalFile}; + +/// OpendalFs is a [`fusio::Fs`] compatible file system implementation. +/// +/// It can be built from an [`opendal::Operator`] and provides file operations on top of it. +pub struct OpendalFs { + op: Operator, +} + +impl From for OpendalFs { + fn from(op: Operator) -> Self { + Self { op } + } +} + +impl Fs for OpendalFs { + type File = OpendalFile; + + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { + OpendalFile::open(self.op.clone(), path.to_string(), options).await + } + + /// FIXME: we need operator to perform this operation. + async fn create_dir_all(_: &Path) -> Result<(), Error> { + Ok(()) + } + + async fn list( + &self, + path: &Path, + ) -> Result>, Error> { + Ok(self + .op + .lister_with(path.as_ref()) + .metakey(Metakey::ContentLength) + .await + .map_err(parse_opendal_error)? + .map_ok(|e| FileMeta { + path: e.path().into(), + size: e.metadata().content_length(), + }) + .map_err(parse_opendal_error)) + } + + async fn remove(&self, path: &Path) -> Result<(), Error> { + self.op + .delete(path.as_ref()) + .await + .map_err(parse_opendal_error) + } +} diff --git a/fusio-opendal/src/lib.rs b/fusio-opendal/src/lib.rs new file mode 100644 index 0000000..17edd82 --- /dev/null +++ b/fusio-opendal/src/lib.rs @@ -0,0 +1,6 @@ +mod file; +pub use file::OpendalFile; +mod fs; +pub use fs::OpendalFs; + +mod utils; diff --git a/fusio-opendal/src/utils.rs b/fusio-opendal/src/utils.rs new file mode 100644 index 0000000..0b0063d --- /dev/null +++ b/fusio-opendal/src/utils.rs @@ -0,0 +1,17 @@ +use std::io; + +use fusio::Error; + +/// Convert an `opendal::Error` into a `fusio::Error`. +pub fn parse_opendal_error(e: opendal::Error) -> Error { + match e.kind() { + opendal::ErrorKind::AlreadyExists => { + Error::Io(io::Error::new(io::ErrorKind::AlreadyExists, e)) + } + opendal::ErrorKind::PermissionDenied => { + Error::Io(io::Error::new(io::ErrorKind::PermissionDenied, e)) + } + opendal::ErrorKind::NotFound => Error::Io(io::Error::new(io::ErrorKind::NotFound, e)), + _ => Error::Other(Box::new(e)), + } +} diff --git a/fusio/src/fs/options.rs b/fusio/src/fs/options.rs index 5c3af53..6718c98 100644 --- a/fusio/src/fs/options.rs +++ b/fusio/src/fs/options.rs @@ -1,3 +1,4 @@ +#[derive(Debug)] pub struct OpenOptions { pub read: bool, pub write: bool,