Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add basic layout for fusio-opendal #89

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ jobs:
command: build
args: --package fusio-object-store

- name: Run cargo build on fusio-opendal
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio-opendal

- name: Run cargo build on fusio-parquet
uses: actions-rs/cargo@v1
with:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"fusio",
"fusio-dispatch",
"fusio-object-store",
"fusio-opendal",
"fusio-parquet",
]
resolver = "2"
Expand Down
13 changes: 13 additions & 0 deletions fusio-opendal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
description = "the Apache OpenDAL integration of Fusio."
edition.workspace = true
license.workspace = true
name = "fusio-opendal"
repository.workspace = true
version = "0.1.0"

[dependencies]
fusio = { version = "0.3.0", path = "../fusio", features = [ "bytes" ] }
futures-core = { version = "0.3" }
futures-util = { version = "0.3" }
opendal = { version ="0.50.1", default-features = false }
3 changes: 3 additions & 0 deletions fusio-opendal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# fusio-opendal

`fusion-opendal` is the Apache OpenDAL integration of Fusio.
151 changes: 151 additions & 0 deletions fusio-opendal/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use fusio::{fs::OpenOptions, Error, IoBuf, IoBufMut, Read, Write};
use opendal::{Operator, Reader, Writer};

use crate::utils::parse_opendal_error;

/// OpendalFile is the file created by OpendalFs.
pub struct OpendalFile {
op: Operator,
path: String,

state: FileState,
}

enum FileState {
Read(Reader),
Write(Writer),
}

impl OpendalFile {
/// Open a new file with given options.
///
/// We mark this function as `pub(crate)` to make sure users can only create it from OpendalFs.
pub(crate) async fn open(
op: Operator,
path: String,
options: OpenOptions,
) -> Result<Self, Error> {
// open as read
if options.read && !options.write && !options.create && !options.truncate {
let r = op
.reader(path.as_ref())
.await
.map_err(parse_opendal_error)?;

return Ok(Self {
op,
path,
state: FileState::Read(r),
});
}

// open as truncate write
//
// TODO: we only support `create && truncate` for now, maybe we can check if the file exists
// before writing.
if !options.read && options.write && options.create && options.truncate {
let w = op
.writer(path.as_ref())
.await
.map_err(parse_opendal_error)?;
return Ok(Self {
op,
path,
state: FileState::Write(w),
});
}

// FIXME: `truncate` has different semantics with `append`, we should re-think it.
if !options.read && options.write && !options.create && !options.truncate {
let w = op
.writer_with(path.as_ref())
.append(true)
.await
.map_err(parse_opendal_error)?;
return Ok(Self {
op,
path,
state: FileState::Write(w),
});
}

Err(Error::Unsupported {
message: format!("open options {:?} is not supported", options),
})
}
}

impl Read for OpendalFile {
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
let FileState::Read(r) = &mut self.state else {
return (
Err(Error::Other("file is not open as read mode".into())),
buf,
);
};
let size = buf.as_slice_mut().len() as u64;
let res = r
.read_into(&mut buf.as_slice_mut(), pos..pos + size)
.await
.map(|_| ())
.map_err(parse_opendal_error);

(res, buf)
}

async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
let FileState::Read(r) = &mut self.state else {
return (
Err(Error::Other("file is not open as read mode".into())),
buf,
);
};
let res = r
.read_into(&mut buf, pos..)
.await
.map(|_| ())
.map_err(parse_opendal_error);

(res, buf)
}

async fn size(&self) -> Result<u64, Error> {
let meta = self
.op
.stat(&self.path)
.await
.map_err(parse_opendal_error)?;
Ok(meta.content_length())
}
}

impl Write for OpendalFile {
/// TODO: opendal has native buffer support, maybe we can tune it while open writer.
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let FileState::Write(w) = &mut self.state else {
return (
Err(Error::Other("file is not open as write mode".into())),
buf,
);
};

let res = w
.write(buf.as_bytes())
.await
.map(|_| ())
.map_err(parse_opendal_error);
(res, buf)
}

/// flush is no-op on opendal.
async fn flush(&mut self) -> Result<(), Error> {
Ok(())
}

async fn close(&mut self) -> Result<(), Error> {
let FileState::Write(w) = &mut self.state else {
return Err(Error::Other("file is not open as write mode".into()));
};
w.close().await.map_err(parse_opendal_error)
}
}
60 changes: 60 additions & 0 deletions fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
path::Path,
Error,
};
use futures_core::Stream;
use futures_util::TryStreamExt;
use opendal::{Metakey, Operator};

use crate::{utils::parse_opendal_error, OpendalFile};

/// OpendalFs is a [`fusio::Fs`] compatible file system implementation.
///
/// It can be built from an [`opendal::Operator`] and provides file operations on top of it.
pub struct OpendalFs {
op: Operator,
}

impl From<Operator> for OpendalFs {
fn from(op: Operator) -> Self {
Self { op }
}
}

impl Fs for OpendalFs {
type File = OpendalFile;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
OpendalFile::open(self.op.clone(), path.to_string(), options).await
}

/// FIXME: we need operator to perform this operation.
async fn create_dir_all(_: &Path) -> Result<(), Error> {
Ok(())
}

async fn list(
&self,
path: &Path,
) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
Ok(self
.op
.lister_with(path.as_ref())
.metakey(Metakey::ContentLength)
.await
.map_err(parse_opendal_error)?
.map_ok(|e| FileMeta {
path: e.path().into(),
size: e.metadata().content_length(),
})
.map_err(parse_opendal_error))
}

async fn remove(&self, path: &Path) -> Result<(), Error> {
self.op
.delete(path.as_ref())
.await
.map_err(parse_opendal_error)
}
}
6 changes: 6 additions & 0 deletions fusio-opendal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod file;
pub use file::OpendalFile;
mod fs;
pub use fs::OpendalFs;

mod utils;
17 changes: 17 additions & 0 deletions fusio-opendal/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::io;

use fusio::Error;

/// Convert an `opendal::Error` into a `fusio::Error`.
pub fn parse_opendal_error(e: opendal::Error) -> Error {
match e.kind() {
opendal::ErrorKind::AlreadyExists => {
Error::Io(io::Error::new(io::ErrorKind::AlreadyExists, e))
}
opendal::ErrorKind::PermissionDenied => {
Error::Io(io::Error::new(io::ErrorKind::PermissionDenied, e))
}
opendal::ErrorKind::NotFound => Error::Io(io::Error::new(io::ErrorKind::NotFound, e)),
_ => Error::Other(Box::new(e)),
}
}
1 change: 1 addition & 0 deletions fusio/src/fs/options.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[derive(Debug)]
pub struct OpenOptions {
pub read: bool,
pub write: bool,
Expand Down
Loading