Skip to content

Commit

Permalink
add support for opfs (#85)
Browse files Browse the repository at this point in the history
* add support for opfs

* feat: remove tokio fs feature required

* opfs read by stream

* add example

* remove useless code

* opfs for parquet

* add ci for wasm build

* add ci for aws building in wasm

* fix opfs read_exact_at buf too large

* add wasm test ci

* error handling

* chore: bump fusio to 0.3.1 (#86)

* feat: impl `BufReader` (#84)

* feat: Add basic layout for fusio-opendal (#89)

* feat: Add basic layout for fusio-opendal

Signed-off-by: Xuanwo <[email protected]>

* ci: Add CI check for fusio-opendal

Signed-off-by: Xuanwo <[email protected]>

* chore: Remove not needed feature flags

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>

* docs: Update Compare to OpenDAL (#90)

* docs: Update Compare to OpenDAL

* Polish the full ecosystem

* feat: support custom endpoints (#87)

* docs: Add example for fusio_opendal (#92)

* docs: Add example for fusio_opendal

Signed-off-by: Xuanwo <[email protected]>

* chore: format

---------

Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Gwo Tzu-Hsing <[email protected]>

* resolve error

* fix: unexpected tokio required in fusio-parquet

* fix opfs read eof

---------

Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Gwo Tzu-Hsing <[email protected]>
Co-authored-by: Kould <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
4 people authored Nov 11, 2024
1 parent 1b18476 commit 8038993
Show file tree
Hide file tree
Showing 23 changed files with 1,382 additions and 111 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,60 @@ jobs:
with:
command: fmt
args: -- --check
wasm:
name: WASM
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install latest
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- name: Setup for wasm32
run: |
rustup target add wasm32-unknown-unknown
- name: Run cargo build on fusio
uses: actions-rs/cargo@v1
with:
command: build
args: --target wasm32-unknown-unknown --package fusio --features=opfs,aws

- name: Run cargo build on fusio-parquet
uses: actions-rs/cargo@v1
with:
command: build
args: --target wasm32-unknown-unknown --package fusio-parquet --features=opfs

- name: Run cargo build on fusio-dispatch
uses: actions-rs/cargo@v1
with:
command: build
args: --target wasm32-unknown-unknown --package fusio-dispatch --features=opfs,aws

- name: Install Chrome Environment
run: |
mkdir -p /tmp/chrome
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url')
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url')
unzip chromedriver-linux64.zip
unzip chrome-linux64.zip
cp -r chrome-linux64/ /tmp/chrome/
cp -r chromedriver-linux64 /tmp/chrome/chromedriver
- name: Setup wasm-pack
run: |
cargo install wasm-pack
- name: Run wasm-pack test on fusion
run: |
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
wasm-pack test --chrome --headless fusio --features opfs
- name: Run wasm-pack test on fusion-parquet
run: |
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
wasm-pack test --chrome --headless fusio-parquet --features opfs
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"examples",
"examples/opfs",
"fusio",
"fusio-dispatch",
"fusio-object-store",
Expand Down
5 changes: 5 additions & 0 deletions examples/opfs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package-lock.json
pkg
dist
wasm-pack.log
node_modules
28 changes: 28 additions & 0 deletions examples/opfs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "example-opfs"
edition.workspace = true
license.workspace = true
version = "0.1.0"

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
arrow = "53"
fusio = { path = "../../fusio", features = ["opfs"] }
fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] }
fusio-parquet = { path = "../../fusio-parquet", features = ["opfs"] }
futures = { version = "0.3" }
parquet = { version = "53", default-features = false, features = [
"arrow",
"async",
] }
wasm-bindgen = "0.2.95"
wasm-bindgen-futures = { version = "0.4.45" }
web-sys = { version = "0.3", features = [
"console",

] }

[dev-dependencies]
wasm-bindgen-test = "0.3.34"
9 changes: 9 additions & 0 deletions examples/opfs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { async_reader, async_writer, read_from_opfs, remove_all_dir, write_to_opfs } from "./pkg";

await write_to_opfs();
await read_from_opfs();

await async_writer();
await async_reader();

await remove_all_dir();
13 changes: 13 additions & 0 deletions examples/opfs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"scripts": {
"build": "webpack",
"serve": "webpack serve"
},
"devDependencies": {
"@wasm-tool/wasm-pack-plugin": "1.5.0",
"html-webpack-plugin": "^5.6.0",
"webpack": "^5.93.0",
"webpack-cli": "^5.1.4",
"webpack-dev-server": "^5.0.4"
}
}
101 changes: 101 additions & 0 deletions examples/opfs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use fusio::{fs::OpenOptions, path::Path, Read, Write};
use fusio_dispatch::FsOptions;
use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter};
use futures::StreamExt;
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
pub async fn write_to_opfs() {
let fs_options = FsOptions::Local;
let fs = fs_options.parse().unwrap();
let mut file = fs
.open_options(
&Path::from_opfs_path("foo").unwrap(),
OpenOptions::default().create(true),
)
.await
.unwrap();

let write_buf = "hello, fusio".as_bytes();

let (result, buf) = file.write_all(write_buf).await;
result.unwrap();

file.close().await.unwrap();
web_sys::console::log_1(&format!("write data: {:?}", buf).into());
}

#[wasm_bindgen]
pub async fn read_from_opfs() {
let fs_options = FsOptions::Local;
let fs = fs_options.parse().unwrap();
let mut file = fs
.open(&Path::from_opfs_path("foo").unwrap())
.await
.unwrap();

let (result, read_buf) = file.read_to_end_at(vec![], 0).await;
result.unwrap();
assert_eq!(read_buf.as_slice(), b"hello, fusio");
web_sys::console::log_1(&format!("read data: {:?}", read_buf).into());
}

#[wasm_bindgen]
pub async fn async_writer() {
let fs_options = FsOptions::Local;
let fs = fs_options.parse().unwrap();
let file = fs
.open_options(
&Path::from_opfs_path("bar").unwrap(),
OpenOptions::default().create(true),
)
.await
.unwrap();
let writer = AsyncWriter::new(file);
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
web_sys::console::log_1(&format!("write data: {:?} to parquet", to_write).into());
}

#[wasm_bindgen]
pub async fn async_reader() {
let fs_options = FsOptions::Local;
let fs = fs_options.parse().unwrap();
let file = fs
.open(&Path::from_opfs_path("bar").unwrap())
.await
.unwrap();

let size = file.size().await.unwrap();
let reader = AsyncReader::new(file, size).await.unwrap();
let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap();

let read = stream.next().await.unwrap().unwrap();

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let expected = RecordBatch::try_from_iter([("col", col)]).unwrap();
assert_eq!(expected, read);
web_sys::console::log_1(&format!("read data: {:?} from parquet", read).into());
}
#[wasm_bindgen]
pub async fn remove_all_dir() {
let fs_options = FsOptions::Local;
let fs = fs_options.parse().unwrap();
fs.remove(&Path::from_opfs_path("foo").unwrap())
.await
.unwrap();
fs.remove(&Path::from_opfs_path("bar").unwrap())
.await
.unwrap();
}
22 changes: 22 additions & 0 deletions examples/opfs/webpack.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const path = require("path");
const HtmlWebpackPlugin = require("html-webpack-plugin");
const webpack = require("webpack");
const WasmPackPlugin = require("@wasm-tool/wasm-pack-plugin");

module.exports = {
entry: "./index.js",
output: {
path: path.resolve(__dirname, "dist"),
filename: "index.js",
},
mode: "development",
plugins: [
new HtmlWebpackPlugin(),
new WasmPackPlugin({
crateDirectory: path.resolve(__dirname, "."),
}),
],
experiments: {
asyncWebAssembly: true,
},
};
1 change: 1 addition & 0 deletions fusio-dispatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = "0.2.2"
aws = ["fusio/aws"]
default = []
monoio = ["fusio/monoio"]
opfs = ["fusio/opfs"]
object_store = ["dep:fusio-object-store", "object_store/aws"]
tokio = ["fusio/tokio"]

Expand Down
4 changes: 2 additions & 2 deletions fusio-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fusio::{DynFs, Error};
#[derive(Clone)]
#[non_exhaustive]
pub enum FsOptions {
#[cfg(any(feature = "tokio", feature = "monoio"))]
#[cfg(any(feature = "tokio", feature = "monoio", feature = "opfs"))]
Local,
#[cfg(feature = "aws")]
S3 {
Expand All @@ -21,7 +21,7 @@ pub enum FsOptions {
impl FsOptions {
pub fn parse(self) -> Result<Arc<dyn DynFs>, Error> {
match self {
#[cfg(any(feature = "tokio", feature = "monoio"))]
#[cfg(any(feature = "tokio", feature = "monoio", feature = "opfs"))]
FsOptions::Local => Ok(Arc::new(fusio::disk::LocalFs {})),
#[cfg(feature = "object_store")]
FsOptions::S3 {
Expand Down
26 changes: 19 additions & 7 deletions fusio-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@ name = "fusio-parquet"
repository.workspace = true
version = "0.2.2"

[features]
default = []
opfs = ["fusio/opfs"]
tokio = ["fusio/tokio"]

[dependencies]
bytes = { workspace = true }
fusio = { version = "0.3.2", path = "../fusio", features = [
"bytes",
"dyn",
"tokio",
] }
cfg-if = "1.0.0"
fusio = { version = "0.3.2", path = "../fusio", features = ["bytes", "dyn"] }
futures = { version = "0.3" }
parquet = { version = "53", features = ["arrow", "async"] }
tokio = { version = "1.40" }
parquet = { version = "53", default-features = false, features = [
"arrow",
"async",
] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4.45" }

[dev-dependencies]
arrow = "53"
rand = "0.8"
tempfile = "3.12.0"
tokio = { version = "1.40" }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = "0.2.95"
wasm-bindgen-test = "0.3.9"
Loading

0 comments on commit 8038993

Please sign in to comment.