Skip to content

Commit

Permalink
feat: Add basic layout for fusio-opendal (#89)
Browse files Browse the repository at this point in the history
* feat: Add basic layout for fusio-opendal

Signed-off-by: Xuanwo <[email protected]>

* ci: Add CI check for fusio-opendal

Signed-off-by: Xuanwo <[email protected]>

* chore: Remove not needed feature flags

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Nov 1, 2024
1 parent a407ef1 commit f313093
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ jobs:
command: build
args: --package fusio-object-store

- name: Run cargo build on fusio-opendal
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio-opendal

- name: Run cargo build on fusio-parquet
uses: actions-rs/cargo@v1
with:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"fusio",
"fusio-dispatch",
"fusio-object-store",
"fusio-opendal",
"fusio-parquet",
]
resolver = "2"
Expand Down
13 changes: 13 additions & 0 deletions fusio-opendal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
description = "the Apache OpenDAL integration of Fusio."
edition.workspace = true
license.workspace = true
name = "fusio-opendal"
repository.workspace = true
version = "0.1.0"

[dependencies]
fusio = { version = "0.3.0", path = "../fusio", features = [ "bytes" ] }
futures-core = { version = "0.3" }
futures-util = { version = "0.3" }
opendal = { version ="0.50.1", default-features = false }
3 changes: 3 additions & 0 deletions fusio-opendal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# fusio-opendal

`fusion-opendal` is the Apache OpenDAL integration of Fusio.
151 changes: 151 additions & 0 deletions fusio-opendal/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use fusio::{fs::OpenOptions, Error, IoBuf, IoBufMut, Read, Write};
use opendal::{Operator, Reader, Writer};

use crate::utils::parse_opendal_error;

/// OpendalFile is the file created by OpendalFs.
pub struct OpendalFile {
op: Operator,
path: String,

state: FileState,
}

enum FileState {
Read(Reader),
Write(Writer),
}

impl OpendalFile {
/// Open a new file with given options.
///
/// We mark this function as `pub(crate)` to make sure users can only create it from OpendalFs.
pub(crate) async fn open(
op: Operator,
path: String,
options: OpenOptions,
) -> Result<Self, Error> {
// open as read
if options.read && !options.write && !options.create && !options.truncate {
let r = op
.reader(path.as_ref())
.await
.map_err(parse_opendal_error)?;

return Ok(Self {
op,
path,
state: FileState::Read(r),
});
}

// open as truncate write
//
// TODO: we only support `create && truncate` for now, maybe we can check if the file exists
// before writing.
if !options.read && options.write && options.create && options.truncate {
let w = op
.writer(path.as_ref())
.await
.map_err(parse_opendal_error)?;
return Ok(Self {
op,
path,
state: FileState::Write(w),
});
}

// FIXME: `truncate` has different semantics with `append`, we should re-think it.
if !options.read && options.write && !options.create && !options.truncate {
let w = op
.writer_with(path.as_ref())
.append(true)
.await
.map_err(parse_opendal_error)?;
return Ok(Self {
op,
path,
state: FileState::Write(w),
});
}

Err(Error::Unsupported {
message: format!("open options {:?} is not supported", options),
})
}
}

impl Read for OpendalFile {
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
let FileState::Read(r) = &mut self.state else {
return (
Err(Error::Other("file is not open as read mode".into())),
buf,
);
};
let size = buf.as_slice_mut().len() as u64;
let res = r
.read_into(&mut buf.as_slice_mut(), pos..pos + size)
.await
.map(|_| ())
.map_err(parse_opendal_error);

(res, buf)
}

async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
let FileState::Read(r) = &mut self.state else {
return (
Err(Error::Other("file is not open as read mode".into())),
buf,
);
};
let res = r
.read_into(&mut buf, pos..)
.await
.map(|_| ())
.map_err(parse_opendal_error);

(res, buf)
}

async fn size(&self) -> Result<u64, Error> {
let meta = self
.op
.stat(&self.path)
.await
.map_err(parse_opendal_error)?;
Ok(meta.content_length())
}
}

impl Write for OpendalFile {
/// TODO: opendal has native buffer support, maybe we can tune it while open writer.
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let FileState::Write(w) = &mut self.state else {
return (
Err(Error::Other("file is not open as write mode".into())),
buf,
);
};

let res = w
.write(buf.as_bytes())
.await
.map(|_| ())
.map_err(parse_opendal_error);
(res, buf)
}

/// flush is no-op on opendal.
async fn flush(&mut self) -> Result<(), Error> {
Ok(())
}

async fn close(&mut self) -> Result<(), Error> {
let FileState::Write(w) = &mut self.state else {
return Err(Error::Other("file is not open as write mode".into()));
};
w.close().await.map_err(parse_opendal_error)
}
}
60 changes: 60 additions & 0 deletions fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
path::Path,
Error,
};
use futures_core::Stream;
use futures_util::TryStreamExt;
use opendal::{Metakey, Operator};

use crate::{utils::parse_opendal_error, OpendalFile};

/// OpendalFs is a [`fusio::Fs`] compatible file system implementation.
///
/// It can be built from an [`opendal::Operator`] and provides file operations on top of it.
pub struct OpendalFs {
op: Operator,
}

impl From<Operator> for OpendalFs {
fn from(op: Operator) -> Self {
Self { op }
}
}

impl Fs for OpendalFs {
type File = OpendalFile;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
OpendalFile::open(self.op.clone(), path.to_string(), options).await
}

/// FIXME: we need operator to perform this operation.
async fn create_dir_all(_: &Path) -> Result<(), Error> {
Ok(())
}

async fn list(
&self,
path: &Path,
) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
Ok(self
.op
.lister_with(path.as_ref())
.metakey(Metakey::ContentLength)
.await
.map_err(parse_opendal_error)?
.map_ok(|e| FileMeta {
path: e.path().into(),
size: e.metadata().content_length(),
})
.map_err(parse_opendal_error))
}

async fn remove(&self, path: &Path) -> Result<(), Error> {
self.op
.delete(path.as_ref())
.await
.map_err(parse_opendal_error)
}
}
6 changes: 6 additions & 0 deletions fusio-opendal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod file;
pub use file::OpendalFile;
mod fs;
pub use fs::OpendalFs;

mod utils;
17 changes: 17 additions & 0 deletions fusio-opendal/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::io;

use fusio::Error;

/// Convert an `opendal::Error` into a `fusio::Error`.
pub fn parse_opendal_error(e: opendal::Error) -> Error {
match e.kind() {
opendal::ErrorKind::AlreadyExists => {
Error::Io(io::Error::new(io::ErrorKind::AlreadyExists, e))
}
opendal::ErrorKind::PermissionDenied => {
Error::Io(io::Error::new(io::ErrorKind::PermissionDenied, e))
}
opendal::ErrorKind::NotFound => Error::Io(io::Error::new(io::ErrorKind::NotFound, e)),
_ => Error::Other(Box::new(e)),
}
}
1 change: 1 addition & 0 deletions fusio/src/fs/options.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[derive(Debug)]
pub struct OpenOptions {
pub read: bool,
pub write: bool,
Expand Down

0 comments on commit f313093

Please sign in to comment.