From 8038993675591f87dd65b88ffdade31dc0a254b7 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Mon, 11 Nov 2024 15:31:56 +0800 Subject: [PATCH] add support for opfs (#85) * add support for opfs * feat: remove tokio fs feature required * opfs read by stream * add example * remove useless code * opfs for parquet * add ci for wasm build * add ci for aws building in wasm * fix opfs read_exact_at buf too large * add wasm test ci * error handling * chore: bump fusio to 0.3.1 (#86) * feat: impl `BufReader` (#84) * feat: Add basic layout for fusio-opendal (#89) * feat: Add basic layout for fusio-opendal Signed-off-by: Xuanwo * ci: Add CI check for fusio-opendal Signed-off-by: Xuanwo * chore: Remove not needed feature flags Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo * docs: Update Compare to OpenDAL (#90) * docs: Update Compare to OpenDAL * Polish the full ecosystem * feat: support custom endpoints (#87) * docs: Add example for fusio_opendal (#92) * docs: Add example for fusio_opendal Signed-off-by: Xuanwo * chore: format --------- Signed-off-by: Xuanwo Co-authored-by: Gwo Tzu-Hsing * resolve error * fix: unexpected tokio required in fusio-parquet * fix opfs read eof --------- Signed-off-by: Xuanwo Co-authored-by: Gwo Tzu-Hsing Co-authored-by: Kould Co-authored-by: Xuanwo --- .github/workflows/ci.yml | 57 +++++++ Cargo.toml | 1 + examples/opfs/.gitignore | 5 + examples/opfs/Cargo.toml | 28 ++++ examples/opfs/index.js | 9 ++ examples/opfs/package.json | 13 ++ examples/opfs/src/lib.rs | 101 ++++++++++++ examples/opfs/webpack.config.js | 22 +++ fusio-dispatch/Cargo.toml | 1 + fusio-dispatch/src/lib.rs | 4 +- fusio-parquet/Cargo.toml | 26 ++- fusio-parquet/src/reader.rs | 207 +++++++++++++++++------- fusio-parquet/src/writer.rs | 78 +++++++-- fusio-parquet/tests/opfs.rs | 165 +++++++++++++++++++ fusio/Cargo.toml | 47 +++++- fusio/src/error.rs | 13 ++ fusio/src/impls/buffered.rs | 1 + fusio/src/impls/disk/mod.rs | 12 ++ fusio/src/impls/disk/opfs/fs.rs | 154 ++++++++++++++++++ fusio/src/impls/disk/opfs/mod.rs | 267 +++++++++++++++++++++++++++++++ fusio/src/lib.rs | 1 + fusio/src/path/mod.rs | 64 ++++---- fusio/tests/opfs.rs | 217 +++++++++++++++++++++++++ 23 files changed, 1382 insertions(+), 111 deletions(-) create mode 100644 examples/opfs/.gitignore create mode 100644 examples/opfs/Cargo.toml create mode 100644 examples/opfs/index.js create mode 100644 examples/opfs/package.json create mode 100644 examples/opfs/src/lib.rs create mode 100644 examples/opfs/webpack.config.js create mode 100644 fusio-parquet/tests/opfs.rs create mode 100644 fusio/src/impls/disk/opfs/fs.rs create mode 100644 fusio/src/impls/disk/opfs/mod.rs create mode 100644 fusio/tests/opfs.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6171af5..16326d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -109,3 +109,60 @@ jobs: with: command: fmt args: -- --check + wasm: + name: WASM + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install latest + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + + - name: Setup for wasm32 + run: | + rustup target add wasm32-unknown-unknown + + - name: Run cargo build on fusio + uses: actions-rs/cargo@v1 + with: + command: build + args: --target wasm32-unknown-unknown --package fusio --features=opfs,aws + + - name: Run cargo build on fusio-parquet + uses: actions-rs/cargo@v1 + with: + command: build + args: --target wasm32-unknown-unknown --package fusio-parquet --features=opfs + + - name: Run cargo build on fusio-dispatch + uses: actions-rs/cargo@v1 + with: + command: build + args: --target wasm32-unknown-unknown --package fusio-dispatch --features=opfs,aws + + - name: Install Chrome Environment + run: | + mkdir -p /tmp/chrome + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url') + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url') + unzip chromedriver-linux64.zip + unzip chrome-linux64.zip + cp -r chrome-linux64/ /tmp/chrome/ + cp -r chromedriver-linux64 /tmp/chrome/chromedriver + + - name: Setup wasm-pack + run: | + cargo install wasm-pack + + - name: Run wasm-pack test on fusion + run: | + export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ + wasm-pack test --chrome --headless fusio --features opfs + + - name: Run wasm-pack test on fusion-parquet + run: | + export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ + wasm-pack test --chrome --headless fusio-parquet --features opfs diff --git a/Cargo.toml b/Cargo.toml index a1f5294..95ddb3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "examples", + "examples/opfs", "fusio", "fusio-dispatch", "fusio-object-store", diff --git a/examples/opfs/.gitignore b/examples/opfs/.gitignore new file mode 100644 index 0000000..67dbedb --- /dev/null +++ b/examples/opfs/.gitignore @@ -0,0 +1,5 @@ +package-lock.json +pkg +dist +wasm-pack.log +node_modules diff --git a/examples/opfs/Cargo.toml b/examples/opfs/Cargo.toml new file mode 100644 index 0000000..913dcd6 --- /dev/null +++ b/examples/opfs/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "example-opfs" +edition.workspace = true +license.workspace = true +version = "0.1.0" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +arrow = "53" +fusio = { path = "../../fusio", features = ["opfs"] } +fusio-dispatch = { path = "../../fusio-dispatch", features = ["opfs"] } +fusio-parquet = { path = "../../fusio-parquet", features = ["opfs"] } +futures = { version = "0.3" } +parquet = { version = "53", default-features = false, features = [ + "arrow", + "async", +] } +wasm-bindgen = "0.2.95" +wasm-bindgen-futures = { version = "0.4.45" } +web-sys = { version = "0.3", features = [ + "console", + +] } + +[dev-dependencies] +wasm-bindgen-test = "0.3.34" diff --git a/examples/opfs/index.js b/examples/opfs/index.js new file mode 100644 index 0000000..690e096 --- /dev/null +++ b/examples/opfs/index.js @@ -0,0 +1,9 @@ +import { async_reader, async_writer, read_from_opfs, remove_all_dir, write_to_opfs } from "./pkg"; + +await write_to_opfs(); +await read_from_opfs(); + +await async_writer(); +await async_reader(); + +await remove_all_dir(); diff --git a/examples/opfs/package.json b/examples/opfs/package.json new file mode 100644 index 0000000..5dc619b --- /dev/null +++ b/examples/opfs/package.json @@ -0,0 +1,13 @@ +{ + "scripts": { + "build": "webpack", + "serve": "webpack serve" + }, + "devDependencies": { + "@wasm-tool/wasm-pack-plugin": "1.5.0", + "html-webpack-plugin": "^5.6.0", + "webpack": "^5.93.0", + "webpack-cli": "^5.1.4", + "webpack-dev-server": "^5.0.4" + } +} diff --git a/examples/opfs/src/lib.rs b/examples/opfs/src/lib.rs new file mode 100644 index 0000000..e537434 --- /dev/null +++ b/examples/opfs/src/lib.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +use fusio::{fs::OpenOptions, path::Path, Read, Write}; +use fusio_dispatch::FsOptions; +use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter}; +use futures::StreamExt; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen] +pub async fn write_to_opfs() { + let fs_options = FsOptions::Local; + let fs = fs_options.parse().unwrap(); + let mut file = fs + .open_options( + &Path::from_opfs_path("foo").unwrap(), + OpenOptions::default().create(true), + ) + .await + .unwrap(); + + let write_buf = "hello, fusio".as_bytes(); + + let (result, buf) = file.write_all(write_buf).await; + result.unwrap(); + + file.close().await.unwrap(); + web_sys::console::log_1(&format!("write data: {:?}", buf).into()); +} + +#[wasm_bindgen] +pub async fn read_from_opfs() { + let fs_options = FsOptions::Local; + let fs = fs_options.parse().unwrap(); + let mut file = fs + .open(&Path::from_opfs_path("foo").unwrap()) + .await + .unwrap(); + + let (result, read_buf) = file.read_to_end_at(vec![], 0).await; + result.unwrap(); + assert_eq!(read_buf.as_slice(), b"hello, fusio"); + web_sys::console::log_1(&format!("read data: {:?}", read_buf).into()); +} + +#[wasm_bindgen] +pub async fn async_writer() { + let fs_options = FsOptions::Local; + let fs = fs_options.parse().unwrap(); + let file = fs + .open_options( + &Path::from_opfs_path("bar").unwrap(), + OpenOptions::default().create(true), + ) + .await + .unwrap(); + let writer = AsyncWriter::new(file); + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + web_sys::console::log_1(&format!("write data: {:?} to parquet", to_write).into()); +} + +#[wasm_bindgen] +pub async fn async_reader() { + let fs_options = FsOptions::Local; + let fs = fs_options.parse().unwrap(); + let file = fs + .open(&Path::from_opfs_path("bar").unwrap()) + .await + .unwrap(); + + let size = file.size().await.unwrap(); + let reader = AsyncReader::new(file, size).await.unwrap(); + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + + let read = stream.next().await.unwrap().unwrap(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let expected = RecordBatch::try_from_iter([("col", col)]).unwrap(); + assert_eq!(expected, read); + web_sys::console::log_1(&format!("read data: {:?} from parquet", read).into()); +} +#[wasm_bindgen] +pub async fn remove_all_dir() { + let fs_options = FsOptions::Local; + let fs = fs_options.parse().unwrap(); + fs.remove(&Path::from_opfs_path("foo").unwrap()) + .await + .unwrap(); + fs.remove(&Path::from_opfs_path("bar").unwrap()) + .await + .unwrap(); +} diff --git a/examples/opfs/webpack.config.js b/examples/opfs/webpack.config.js new file mode 100644 index 0000000..80540e4 --- /dev/null +++ b/examples/opfs/webpack.config.js @@ -0,0 +1,22 @@ +const path = require("path"); +const HtmlWebpackPlugin = require("html-webpack-plugin"); +const webpack = require("webpack"); +const WasmPackPlugin = require("@wasm-tool/wasm-pack-plugin"); + +module.exports = { + entry: "./index.js", + output: { + path: path.resolve(__dirname, "dist"), + filename: "index.js", + }, + mode: "development", + plugins: [ + new HtmlWebpackPlugin(), + new WasmPackPlugin({ + crateDirectory: path.resolve(__dirname, "."), + }), + ], + experiments: { + asyncWebAssembly: true, + }, +}; diff --git a/fusio-dispatch/Cargo.toml b/fusio-dispatch/Cargo.toml index 73e4d1c..05df8a9 100644 --- a/fusio-dispatch/Cargo.toml +++ b/fusio-dispatch/Cargo.toml @@ -10,6 +10,7 @@ version = "0.2.2" aws = ["fusio/aws"] default = [] monoio = ["fusio/monoio"] +opfs = ["fusio/opfs"] object_store = ["dep:fusio-object-store", "object_store/aws"] tokio = ["fusio/tokio"] diff --git a/fusio-dispatch/src/lib.rs b/fusio-dispatch/src/lib.rs index 54b770e..7fa7f34 100644 --- a/fusio-dispatch/src/lib.rs +++ b/fusio-dispatch/src/lib.rs @@ -5,7 +5,7 @@ use fusio::{DynFs, Error}; #[derive(Clone)] #[non_exhaustive] pub enum FsOptions { - #[cfg(any(feature = "tokio", feature = "monoio"))] + #[cfg(any(feature = "tokio", feature = "monoio", feature = "opfs"))] Local, #[cfg(feature = "aws")] S3 { @@ -21,7 +21,7 @@ pub enum FsOptions { impl FsOptions { pub fn parse(self) -> Result, Error> { match self { - #[cfg(any(feature = "tokio", feature = "monoio"))] + #[cfg(any(feature = "tokio", feature = "monoio", feature = "opfs"))] FsOptions::Local => Ok(Arc::new(fusio::disk::LocalFs {})), #[cfg(feature = "object_store")] FsOptions::S3 { diff --git a/fusio-parquet/Cargo.toml b/fusio-parquet/Cargo.toml index a991d0a..5b2c903 100644 --- a/fusio-parquet/Cargo.toml +++ b/fusio-parquet/Cargo.toml @@ -6,18 +6,30 @@ name = "fusio-parquet" repository.workspace = true version = "0.2.2" +[features] +default = [] +opfs = ["fusio/opfs"] +tokio = ["fusio/tokio"] + [dependencies] bytes = { workspace = true } -fusio = { version = "0.3.2", path = "../fusio", features = [ - "bytes", - "dyn", - "tokio", -] } +cfg-if = "1.0.0" +fusio = { version = "0.3.2", path = "../fusio", features = ["bytes", "dyn"] } futures = { version = "0.3" } -parquet = { version = "53", features = ["arrow", "async"] } -tokio = { version = "1.40" } +parquet = { version = "53", default-features = false, features = [ + "arrow", + "async", +] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = { version = "0.4.45" } [dev-dependencies] arrow = "53" rand = "0.8" tempfile = "3.12.0" +tokio = { version = "1.40" } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-test = "0.3.9" diff --git a/fusio-parquet/src/reader.rs b/fusio-parquet/src/reader.rs index 840de5c..f18a2b1 100644 --- a/fusio-parquet/src/reader.rs +++ b/fusio-parquet/src/reader.rs @@ -1,6 +1,7 @@ use std::{cmp, ops::Range, sync::Arc}; use bytes::{Bytes, BytesMut}; +#[allow(unused)] use fusio::{dynamic::DynFile, Read}; use futures::{future::BoxFuture, FutureExt}; use parquet::{ @@ -21,6 +22,8 @@ pub struct AsyncReader { prefetch_footer_size: usize, } +unsafe impl Send for AsyncReader {} + fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { let footer_size = cmp::max(footer_size, FOOTER_SIZE); cmp::min(footer_size as u64, content_size) as usize @@ -43,72 +46,166 @@ impl AsyncReader { impl AsyncFileReader for AsyncReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - async move { - let len = range.end - range.start; - let mut buf = BytesMut::with_capacity(len); - buf.resize(len, 0); - - let (result, buf) = self.inner.read_exact_at(buf, range.start as u64).await; - result.map_err(|err| ParquetError::External(Box::new(err)))?; - Ok(buf.freeze()) + let len = range.end - range.start; + let mut buf = BytesMut::with_capacity(len); + buf.resize(len, 0); + + cfg_if::cfg_if! { + if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + let (sender, receiver) = + futures::channel::oneshot::channel::>(); + let opfs = unsafe { + std::mem::transmute::<&Box, &Box>(&self.inner) + }; + let file_handle = opfs.file_handle().unwrap(); + + wasm_bindgen_futures::spawn_local(async move { + + let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await; + + let ret = match result { + Ok(_) => Ok(buf.freeze()), + Err(err) => Err(ParquetError::External(Box::new(err))) + }; + let _ = sender.send(ret); + }); + + async move { + receiver.await.unwrap() + } + .boxed() + } else { + async move { + let (result, buf) = self.inner.read_exact_at(buf, range.start as u64).await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; + Ok(buf.freeze()) + } + .boxed() + } } - .boxed() } fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - if self.content_length == 0 { - return Err(ParquetError::EOF("file empty".to_string())); - } + if self.content_length == 0 { + return async { Err(ParquetError::EOF("file empty".to_string())) }.boxed(); + } + let footer_size = self.prefetch_footer_size; + let content_length = self.content_length; + + cfg_if::cfg_if! { + if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + let mut buf = BytesMut::with_capacity(footer_size); + buf.resize(footer_size, 0); + let (sender, receiver) = + futures::channel::oneshot::channel::, ParquetError>>(); + let opfs = unsafe { + std::mem::transmute::<&Box, &Box>(&self.inner) + }; + let file_handle = opfs.file_handle().unwrap(); - let footer_size = self.prefetch_footer_size; - let mut buf = BytesMut::with_capacity(footer_size); - buf.resize(footer_size, 0); - - let (result, prefetched_footer_content) = self - .inner - .read_exact_at(buf, self.content_length - footer_size as u64) - .await; - result.map_err(|err| ParquetError::External(Box::new(err)))?; - let prefetched_footer_slice = prefetched_footer_content.as_ref(); - let prefetched_footer_length = prefetched_footer_slice.len(); - - // Decode the metadata length from the last 8 bytes of the file. - let metadata_length = { - let buf = &prefetched_footer_slice - [(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length]; - debug_assert!(buf.len() == FOOTER_SIZE); - ParquetMetaDataReader::decode_footer(buf.try_into().unwrap())? - }; - - // Try to read the metadata from the `prefetched_footer_content`. - // Otherwise, fetch exact metadata from the remote. - if prefetched_footer_length >= metadata_length + FOOTER_SIZE { - let buf = &prefetched_footer_slice[(prefetched_footer_length - - metadata_length - - FOOTER_SIZE) - ..(prefetched_footer_length - FOOTER_SIZE)]; - Ok(Arc::new(ParquetMetaDataReader::decode_metadata(buf)?)) + wasm_bindgen_futures::spawn_local(async move { + let (result, prefetched_footer_content) = file_handle + .read_exact_at(buf, content_length - footer_size as u64) + .await; + if let Err(err) = result { + let _ = sender.send(Err(ParquetError::External(Box::new(err)))); + return ; + } + let prefetched_footer_slice = prefetched_footer_content.as_ref(); + let prefetched_footer_length = prefetched_footer_slice.len(); + + // Decode the metadata length from the last 8 bytes of the file. + let metadata_length = { + let buf = &prefetched_footer_slice + [(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length]; + debug_assert!(buf.len() == FOOTER_SIZE); + ParquetMetaDataReader::decode_footer(buf.try_into().unwrap()).unwrap() + }; + + // Try to read the metadata from the `prefetched_footer_content`. + // Otherwise, fetch exact metadata from the remote. + if prefetched_footer_length >= metadata_length + FOOTER_SIZE { + let buf = &prefetched_footer_slice[(prefetched_footer_length + - metadata_length + - FOOTER_SIZE) + ..(prefetched_footer_length - FOOTER_SIZE)]; + + + let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf) + .map(|meta| Arc::new(meta))); + } else { + let mut buf = BytesMut::with_capacity(metadata_length); + buf.resize(metadata_length, 0); + + let (result, bytes) = file_handle + .read_exact_at( + buf, + content_length - metadata_length as u64 - FOOTER_SIZE as u64, + ) + .await; + if let Err(err) = result { + let _ = sender.send(Err(ParquetError::External(Box::new(err)))); + return ; + } + + let _ = sender.send(ParquetMetaDataReader::decode_metadata(&bytes) + .map(|meta| Arc::new(meta))); + } + }); + async move { + receiver.await.unwrap() + }.boxed() } else { - let mut buf = BytesMut::with_capacity(metadata_length); - buf.resize(metadata_length, 0); - - let (result, bytes) = self - .inner - .read_exact_at( - buf, - self.content_length - metadata_length as u64 - FOOTER_SIZE as u64, - ) - .await; - result.map_err(|err| ParquetError::External(Box::new(err)))?; - - Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&bytes)?)) + async move { + let mut buf = BytesMut::with_capacity(footer_size); + buf.resize(footer_size, 0); + let (result, prefetched_footer_content) = self + .inner + .read_exact_at(buf, content_length - footer_size as u64) + .await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; + let prefetched_footer_slice = prefetched_footer_content.as_ref(); + let prefetched_footer_length = prefetched_footer_slice.len(); + + // Decode the metadata length from the last 8 bytes of the file. + let metadata_length = { + let buf = &prefetched_footer_slice + [(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length]; + debug_assert!(buf.len() == FOOTER_SIZE); + ParquetMetaDataReader::decode_footer(buf.try_into().unwrap())? + }; + + // Try to read the metadata from the `prefetched_footer_content`. + // Otherwise, fetch exact metadata from the remote. + if prefetched_footer_length >= metadata_length + FOOTER_SIZE { + let buf = &prefetched_footer_slice[(prefetched_footer_length + - metadata_length + - FOOTER_SIZE) + ..(prefetched_footer_length - FOOTER_SIZE)]; + Ok(Arc::new(ParquetMetaDataReader::decode_metadata(buf)?)) + } else { + let mut buf = BytesMut::with_capacity(metadata_length); + buf.resize(metadata_length, 0); + + let (result, bytes) = self + .inner + .read_exact_at( + buf, + content_length - metadata_length as u64 - FOOTER_SIZE as u64, + ) + .await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; + + Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&bytes)?)) + } + } + .boxed() } } - .boxed() } } +#[cfg(feature = "tokio")] #[cfg(test)] mod tests { use std::{ diff --git a/fusio-parquet/src/writer.rs b/fusio-parquet/src/writer.rs index d1279e7..1c910d0 100644 --- a/fusio-parquet/src/writer.rs +++ b/fusio-parquet/src/writer.rs @@ -1,45 +1,93 @@ use bytes::Bytes; +#[allow(unused)] use fusio::{dynamic::DynFile, Write}; use futures::future::BoxFuture; use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError}; pub struct AsyncWriter { inner: Option>, + #[cfg(feature = "opfs")] + pos: u64, } +unsafe impl Send for AsyncWriter {} impl AsyncWriter { pub fn new(writer: Box) -> Self { Self { inner: Some(writer), + #[cfg(feature = "opfs")] + pos: 0, } } } impl AsyncFileWriter for AsyncWriter { fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { - Box::pin(async move { - if let Some(writer) = self.inner.as_mut() { - let (result, _) = writer.write_all(bs).await; - result.map_err(|err| ParquetError::External(Box::new(err)))?; + cfg_if::cfg_if! { + if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + match self.inner.as_mut() { + Some(writer) => { + let pos = self.pos; + self.pos += bs.len() as u64; + let (sender, receiver) = + futures::channel::oneshot::channel::>(); + let opfs = unsafe { + std::mem::transmute::<&Box, &Box>(writer) + }; + let handle = opfs.file_handle().unwrap(); + + wasm_bindgen_futures::spawn_local(async move { + let (result, _) = handle.write_at(bs, pos).await; + let _ = sender.send(result + .map_err(|err| ParquetError::External(Box::new(err)))); + }); + + Box::pin(async move { + receiver.await.unwrap()?; + Ok(()) + }) + + }, + None => Box::pin(async move { + Ok(()) + }) + + } + } else { + Box::pin(async move { + if let Some(writer) = self.inner.as_mut() { + let (result, _) = writer.write_all(bs).await; + result.map_err(|err| ParquetError::External(Box::new(err)))?; + } + Ok(()) + }) } - Ok(()) - }) + } } fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { - Box::pin(async move { - if let Some(mut writer) = self.inner.take() { - writer - .close() - .await - .map_err(|err| ParquetError::External(Box::new(err)))?; + cfg_if::cfg_if! { + if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + Box::pin(async move { + Ok(()) + }) + } else { + Box::pin(async move { + if let Some(mut writer) = self.inner.take() { + writer + .close() + .await + .map_err(|err| ParquetError::External(Box::new(err)))?; + } + + Ok(()) + }) } - - Ok(()) - }) + } } } +#[cfg(feature = "tokio")] #[cfg(test)] mod tests { use std::{ diff --git a/fusio-parquet/tests/opfs.rs b/fusio-parquet/tests/opfs.rs new file mode 100644 index 0000000..6a18d6b --- /dev/null +++ b/fusio-parquet/tests/opfs.rs @@ -0,0 +1,165 @@ +#[cfg(test)] +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub(crate) mod tests { + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + use bytes::Bytes; + use fusio::{ + disk::LocalFs, + fs::{Fs, OpenOptions}, + path::Path, + Read, + }; + use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter}; + use futures::StreamExt; + use parquet::{ + arrow::{ + arrow_reader::ParquetRecordBatchReaderBuilder, async_writer::AsyncFileWriter, + AsyncArrowWriter, ParquetRecordBatchStreamBuilder, + }, + file::{metadata::KeyValue, properties::WriterProperties}, + }; + use rand::{distributions::Alphanumeric, Rng}; + use wasm_bindgen_test::wasm_bindgen_test; + + #[wasm_bindgen_test] + async fn test_opfs_basic_write() { + let fs = LocalFs {}; + let path = Path::from_opfs_path("basic_write_file").unwrap(); + let options = OpenOptions::default().create(true).write(true); + let mut writer = AsyncWriter::new(Box::new(fs.open_options(&path, options).await.unwrap())); + + let bytes = Bytes::from_static(b"Hello "); + writer.write(bytes).await.unwrap(); + let bytes = Bytes::from_static(b"world!"); + writer.write(bytes).await.unwrap(); + writer.complete().await.unwrap(); + + let buf = Vec::new(); + let mut file = fs + .open_options(&path, OpenOptions::default()) + .await + .unwrap(); + + let (result, buf) = file.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello world!"); + + fs.remove(&path).await.unwrap(); + } + + #[wasm_bindgen_test] + async fn test_opfs_async_writer() { + let fs = LocalFs {}; + let path = Path::from_opfs_path("async_writer_file").unwrap(); + + let options = OpenOptions::default().create(true).write(true); + let writer = AsyncWriter::new(Box::new(fs.open_options(&path, options).await.unwrap())); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buf = Vec::new(); + let mut file = fs + .open_options(&path, OpenOptions::default()) + .await + .unwrap(); + let (result, buf) = file.read_to_end_at(buf, 0).await; + result.unwrap(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buf)) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); + + fs.remove(&path).await.unwrap(); + } + + struct TestCase { + metadata_size: usize, + prefetch: Option, + } + + fn gen_fixed_string(size: usize) -> String { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect() + } + + #[wasm_bindgen_test] + async fn test_opfs_async_reader_with_prefetch_footer_size() { + for case in [ + TestCase { + metadata_size: 256 * 1024, + prefetch: None, + }, + TestCase { + metadata_size: 1024 * 1024, + prefetch: None, + }, + TestCase { + metadata_size: 256 * 1024, + prefetch: Some(4), + }, + TestCase { + metadata_size: 1024 * 1024, + prefetch: Some(4), + }, + ] { + let fs = LocalFs {}; + let path = Path::from_opfs_path("async_reader_file").unwrap(); + + let options = OpenOptions::default().create(true).write(true); + let writer = AsyncWriter::new(Box::new(fs.open_options(&path, options).await.unwrap())); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new( + writer, + to_write.schema(), + Some( + WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: "__metadata".to_string(), + value: Some(gen_fixed_string(case.metadata_size)), + }])) + .build(), + ), + ) + .unwrap(); + + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let file = fs + .open_options(&path, OpenOptions::default()) + .await + .unwrap(); + let size = file.size().await.unwrap(); + + let mut reader = AsyncReader::new(Box::new(file), size).await.unwrap(); + if let Some(footer_size) = case.prefetch { + reader = reader.with_prefetch_footer_size(footer_size); + } + + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + let read = stream.next().await.unwrap().unwrap(); + assert_eq!(to_write, read); + let _ = fs.remove(&path).await; + } + } +} diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index 76cb09a..6dff917 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -25,7 +25,7 @@ bytes = ["dep:bytes"] completion-based = [] default = ["dyn", "fs"] dyn = [] -fs = ["tokio?/rt"] +fs = ["tokio?/fs", "tokio?/rt"] http = [ "async-stream", "bytes", @@ -39,10 +39,16 @@ http = [ monoio = ["async-stream", "completion-based", "dep:monoio", "no-send"] monoio-http = ["h2", "http", "hyper"] no-send = [] +opfs = [ + "async-stream", + "dep:js-sys", + "dep:wasm-bindgen-futures", + "dep:web-sys", + "no-send", +] tokio = ["async-stream", "dep:tokio"] tokio-http = ["dep:reqwest", "http"] tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"] - [[bench]] harness = false name = "tokio" @@ -86,23 +92,54 @@ serde_json = { version = "1", optional = true } serde_urlencoded = { version = "0.7", optional = true } thiserror = "1" tokio = { version = "1", optional = true, default-features = false, features = [ - "fs", "io-util", ] } url = { version = "2.5.3", default-features = false, features = ["std"] } +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } +js-sys = { version = "0.3.72", optional = true } +wasm-bindgen-futures = { version = "0.4.45", optional = true, features = [ + "futures-core-03-stream", +] } +web-sys = { version = "0.3", optional = true, features = [ + "Blob", + "File", + "FileSystemCreateWritableOptions", + "FileSystemDirectoryHandle", + "FileSystemFileHandle", + "FileSystemGetDirectoryOptions", + "FileSystemGetFileOptions", + "FileSystemRemoveOptions", + "FileSystemWritableFileStream", + "Navigator", + "ReadableStream", + "ReadableStreamDefaultReader", + "ReadableStreamReadResult", + "Storage", + "StorageManager", + "Window", + +] } + [target.'cfg(target_os = "linux")'.dependencies] tokio-uring = { version = "0.5", default-features = false, optional = true } [dev-dependencies] +rand = "0.8" +tempfile = "3" + +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } futures-util = { version = "0.3" } hyper = { version = "1", features = ["full"] } hyper-util = { version = "0.1", features = ["full"] } monoio = { version = "0.2" } -rand = "0.8" -tempfile = "3" tokio = { version = "1", features = ["full"] } [target.'cfg(target_os = "linux")'.dev-dependencies] tokio-uring = { version = "0.5" } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-test = "0.3.9" diff --git a/fusio/src/error.rs b/fusio/src/error.rs index acf1629..550709a 100644 --- a/fusio/src/error.rs +++ b/fusio/src/error.rs @@ -16,8 +16,21 @@ pub enum Error { Unsupported { message: String, }, + #[error("Performs dynamic cast failed.")] + CastError, + #[error("Error occurs in wasm: {message}")] + Wasm { + message: String, + }, #[error(transparent)] Other(#[from] BoxedError), } pub type BoxedError = Box; + +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub(crate) fn wasm_err(js_val: js_sys::wasm_bindgen::JsValue) -> Error { + Error::Wasm { + message: format!("{js_val:?}"), + } +} diff --git a/fusio/src/impls/buffered.rs b/fusio/src/impls/buffered.rs index 6f8d7fd..2328cf0 100644 --- a/fusio/src/impls/buffered.rs +++ b/fusio/src/impls/buffered.rs @@ -152,6 +152,7 @@ impl Write for BufWriter { } } +#[cfg(feature = "tokio")] #[cfg(test)] pub(crate) mod tests { use tokio::io::AsyncWriteExt; diff --git a/fusio/src/impls/disk/mod.rs b/fusio/src/impls/disk/mod.rs index 1e00cf7..c7e8016 100644 --- a/fusio/src/impls/disk/mod.rs +++ b/fusio/src/impls/disk/mod.rs @@ -2,6 +2,16 @@ pub(crate) mod monoio; #[cfg(feature = "tokio")] pub(crate) mod tokio; + +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub(crate) mod opfs; +#[cfg(all(feature = "opfs", target_arch = "wasm32", feature = "fs"))] +#[allow(unused)] +pub use opfs::fs::*; +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +#[allow(unused)] +pub use opfs::OPFSFile; + #[cfg(all(feature = "tokio-uring", target_os = "linux"))] pub(crate) mod tokio_uring; @@ -29,5 +39,7 @@ cfg_if::cfg_if! { pub type LocalFs = MonoIoFs; } else if #[cfg(feature = "tokio-uring")] { pub type LocalFs = TokioUringFs; + } else if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] { + pub type LocalFs = OPFS; } } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs new file mode 100644 index 0000000..fa3901d --- /dev/null +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -0,0 +1,154 @@ +use async_stream::stream; +use futures_core::Stream; +use futures_util::StreamExt; +use js_sys::{ + wasm_bindgen::{JsCast, JsValue}, + Array, JsString, +}; +use wasm_bindgen_futures::{stream::JsStream, JsFuture}; +use web_sys::{ + FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, + FileSystemGetFileOptions, FileSystemRemoveOptions, +}; + +use super::OPFSFile; +use crate::{ + disk::opfs::{promise, storage}, + error::wasm_err, + fs::{FileMeta, Fs, OpenOptions}, + path::Path, + Error, +}; + +pub struct OPFS; + +impl Fs for OPFS { + type File = OPFSFile; + + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { + let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); + + if segments.len() == 1 && segments[0].is_empty() { + return Err(Error::PathError(crate::path::Error::EmptySegment { + path: path.to_string(), + })); + } + + let dir_options = FileSystemGetDirectoryOptions::new(); + dir_options.set_create(options.create); + let parent = Self::access_parent_dir(path, &dir_options).await?; + + let file_name = segments.last().unwrap(); + let option = FileSystemGetFileOptions::new(); + option.set_create(options.create); + + let file_handle = promise::( + parent.get_file_handle_with_options(file_name, &option), + ) + .await?; + + Ok(OPFSFile::new(file_handle)) + } + + async fn create_dir_all(path: &Path) -> Result<(), Error> { + let options = FileSystemGetDirectoryOptions::new(); + options.set_create(true); + + Self::access_dir(path, &options).await?; + + Ok(()) + } + + async fn list( + &self, + path: &Path, + ) -> Result>, Error> { + let dir_options = FileSystemGetDirectoryOptions::new(); + dir_options.set_create(false); + + let dir = Self::access_dir(path, &dir_options).await?; + + let entries = JsStream::from(dir.entries()) + .map(|x| { + let array: Vec = x.unwrap().dyn_into::().unwrap().to_vec(); + let path: String = array[0].clone().dyn_into::().unwrap().into(); + path + }) + .collect::>() + .await; + + Ok(stream! { + for path in entries { + yield Ok(FileMeta{ path: path.into(), size: 0 }) + } + }) + } + + async fn remove(&self, path: &Path) -> Result<(), Error> { + let dir_options = FileSystemGetDirectoryOptions::new(); + dir_options.set_create(false); + let parent = Self::access_parent_dir(path, &dir_options).await?; + + let removed_entry = path.as_ref().trim_matches('/').split("/").last().unwrap(); + let options = FileSystemRemoveOptions::new(); + options.set_recursive(true); + JsFuture::from(parent.remove_entry_with_options(removed_entry, &options)) + .await + .map_err(wasm_err)?; + Ok(()) + } +} + +impl OPFS { + async fn access_dir( + path: &Path, + options: &FileSystemGetDirectoryOptions, + ) -> Result { + let mut parent = storage().await; + let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); + + if segments.len() == 1 && segments[0].is_empty() { + // "" case, return the root directory + return Ok(parent); + } + for segment in segments { + if segment.is_empty() { + return Err(Error::PathError(crate::path::Error::EmptySegment { + path: path.to_string(), + })); + } + parent = promise::( + parent.get_directory_handle_with_options(segment.as_ref(), options), + ) + .await?; + } + Ok(parent) + } + + /// Return the handle of parent directory. e.g. path "a/b/c" will return the handle of b + async fn access_parent_dir( + path: &Path, + options: &FileSystemGetDirectoryOptions, + ) -> Result { + let mut parent = storage().await; + let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); + let part_len = segments.len(); + + if part_len == 1 && segments[0].is_empty() { + // "" case, return the root directory + return Ok(parent); + } + for segment in &segments[0..part_len - 1] { + if segment.is_empty() { + return Err(Error::PathError(crate::path::Error::EmptySegment { + path: path.to_string(), + })); + } + parent = promise::( + parent.get_directory_handle_with_options(segment.as_ref(), options), + ) + .await?; + } + Ok(parent) + } +} diff --git a/fusio/src/impls/disk/opfs/mod.rs b/fusio/src/impls/disk/opfs/mod.rs new file mode 100644 index 0000000..95f9ff4 --- /dev/null +++ b/fusio/src/impls/disk/opfs/mod.rs @@ -0,0 +1,267 @@ +#[cfg(feature = "fs")] +pub mod fs; + +use std::{io, sync::Arc}; + +use js_sys::Uint8Array; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + wasm_bindgen::JsCast, window, File, FileSystemCreateWritableOptions, FileSystemDirectoryHandle, + FileSystemFileHandle, FileSystemWritableFileStream, ReadableStreamDefaultReader, + ReadableStreamReadResult, +}; + +use crate::{error::wasm_err, Error, IoBuf, IoBufMut, Read, Write}; + +pub(crate) async fn promise(promise: js_sys::Promise) -> Result +where + T: JsCast, +{ + let js_val = JsFuture::from(promise).await.map_err(wasm_err)?; + + js_val.dyn_into::().map_err(|_obj| Error::CastError) +} + +pub struct FileHandle { + file_handle: FileSystemFileHandle, +} + +impl FileHandle { + fn new(file_handle: FileSystemFileHandle) -> Self { + Self { file_handle } + } +} + +impl FileHandle { + pub async fn write_at(&self, buf: B, pos: u64) -> (Result<(), Error>, B) { + let options = FileSystemCreateWritableOptions::new(); + options.set_keep_existing_data(true); + + let writer_promise = self.create_writable_with_options(&options); + let writer = match promise::(writer_promise).await { + Ok(writer) => writer, + Err(err) => return (Err(err), buf), + }; + + if let Err(err) = JsFuture::from(writer.seek_with_u32(pos as u32).unwrap()).await { + return (Err(wasm_err(err)), buf); + } + + let (result, buf) = self.write_with_stream(buf, &writer).await; + if result.is_err() { + return (result, buf); + } + let result = JsFuture::from(writer.close()) + .await + .map_err(wasm_err) + .map(|_| ()); + + (result, buf) + } + + pub async fn write_with_stream( + &self, + buf: B, + stream: &FileSystemWritableFileStream, + ) -> (Result<(), Error>, B) { + match JsFuture::from(stream.write_with_u8_array(buf.as_slice()).unwrap()).await { + Ok(_) => (Ok(()), buf), + Err(err) => (Err(wasm_err(err)), buf), + } + } + + fn create_writable_with_options( + &self, + options: &FileSystemCreateWritableOptions, + ) -> js_sys::Promise { + self.file_handle.create_writable_with_options(options) + } +} + +impl FileHandle { + async fn read_to_end_at(&self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + let file_promise = self.file_handle.get_file(); + let file = match promise::(file_promise).await { + Ok(file) => file, + Err(err) => return (Err(err), buf), + }; + + if (file.size().round() as u64) < pos + buf.bytes_init() as u64 { + return ( + Err(Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Read unexpected eof", + ))), + buf, + ); + } + + let blob = file.slice_with_i32(pos as i32).unwrap(); + let reader = match blob + .stream() + .get_reader() + .dyn_into::() + .map_err(|_obj| Error::CastError) + { + Ok(reader) => reader, + Err(err) => return (Err(err), buf), + }; + + while let Ok(v) = JsFuture::from(reader.read()).await { + let result = ReadableStreamReadResult::from(v); + if result.get_done().unwrap() { + break; + } + let chunk = result.get_value().dyn_into::().unwrap(); + buf.extend(chunk.to_vec()); + } + + (Ok(()), buf) + } + + pub async fn read_exact_at(&self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + let buf_len = buf.bytes_init() as i32; + let buf_slice = buf.as_slice_mut(); + let end = pos as i32 + buf_len; + + let file = match promise::(self.file_handle.get_file()).await { + Ok(file) => file, + Err(err) => return (Err(err), buf), + }; + + if (file.size().round() as u64) < pos + buf_len as u64 { + return ( + Err(Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Read unexpected eof", + ))), + buf, + ); + } + let blob = file.slice_with_i32_and_i32(pos as i32, end).unwrap(); + let reader = match blob + .stream() + .get_reader() + .dyn_into::() + .map_err(|_obj| Error::CastError) + { + Ok(reader) => reader, + Err(err) => return (Err(err), buf), + }; + + let mut offset = 0; + while let Ok(v) = JsFuture::from(reader.read()).await { + let result = ReadableStreamReadResult::from(v); + if result.get_done().unwrap() { + break; + } + + let chunk = result.get_value().dyn_into::().unwrap(); + let chunk_len = chunk.length() as usize; + buf_slice[offset..offset + chunk_len].copy_from_slice(chunk.to_vec().as_slice()); + offset += chunk_len; + } + + (Ok(()), buf) + } + + pub async fn size(&self) -> Result { + let file = promise::(self.file_handle.get_file()).await?; + + Ok(file.size() as u64) + } +} + +pub struct OPFSFile { + file_handle: Option>, + write_stream: Option, + pos: u32, +} + +impl OPFSFile { + pub fn new(file_handle: FileSystemFileHandle) -> Self { + Self { + file_handle: Some(Arc::new(FileHandle::new(file_handle))), + write_stream: None, + pos: 0, + } + } + + pub fn file_handle(&self) -> Option> { + match self.file_handle.as_ref() { + None => None, + Some(file_handle) => Some(Arc::clone(file_handle)), + } + } +} + +impl Write for OPFSFile { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + let file_handle = self.file_handle.as_ref().expect("write file after closed"); + if self.write_stream.is_none() { + let options = FileSystemCreateWritableOptions::new(); + options.set_keep_existing_data(true); + let writer_promise = file_handle.create_writable_with_options(&options); + + let writer = match promise::(writer_promise).await { + Ok(writer) => writer, + Err(err) => return (Err(err), buf), + }; + + if let Err(err) = JsFuture::from(writer.seek_with_u32(self.pos).unwrap()) + .await + .map_err(wasm_err) + { + return (Err(err), buf); + } + + self.write_stream = Some(writer); + } + + let writer = self.write_stream.as_ref().unwrap(); + let len = buf.bytes_init(); + self.pos += len as u32; + file_handle.write_with_stream(buf, writer).await + } + + async fn flush(&mut self) -> Result<(), Error> { + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + let writer = self.write_stream.take(); + if let Some(writer) = writer { + JsFuture::from(writer.close()).await.map_err(wasm_err)?; + } + self.file_handle.take(); + Ok(()) + } +} + +impl Read for OPFSFile { + async fn read_exact_at(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) { + let file_handle = self.file_handle.as_ref().expect("read file after closed"); + + file_handle.read_exact_at(buf, pos).await + } + + async fn read_to_end_at(&mut self, buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + let file_handle = self.file_handle.as_ref().expect("read file after closed"); + + file_handle.read_to_end_at(buf, pos).await + } + + async fn size(&self) -> Result { + let file_handle = self.file_handle.as_ref().expect("read file after closed"); + file_handle.size().await + } +} + +pub(crate) async fn storage() -> FileSystemDirectoryHandle { + let storage_promise = window().unwrap().navigator().storage().get_directory(); + JsFuture::from(storage_promise) + .await + .unwrap() + .dyn_into::() + .unwrap() +} diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 7f823a9..a53450a 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -208,6 +208,7 @@ impl Write for &mut W { } #[cfg(test)] +#[cfg(not(target_arch = "wasm32"))] mod tests { use super::{Read, Write}; use crate::{buf::IoBufMut, Error, IoBuf}; diff --git a/fusio/src/path/mod.rs b/fusio/src/path/mod.rs index 0e9f4d1..00aee60 100644 --- a/fusio/src/path/mod.rs +++ b/fusio/src/path/mod.rs @@ -47,34 +47,8 @@ pub struct Path { raw: String, } +#[cfg(not(target_arch = "wasm32"))] impl Path { - pub fn parse(path: impl AsRef) -> Result { - let path = path.as_ref(); - - let stripped = path.strip_prefix(DELIMITER).unwrap_or(path); - if stripped.is_empty() { - return Ok(Default::default()); - } - - let stripped = stripped.strip_suffix(DELIMITER).unwrap_or(stripped); - - for segment in stripped.split(DELIMITER) { - if segment.is_empty() { - return Err(Error::EmptySegment { - path: path.to_string(), - }); - } - PathPart::parse(segment).map_err(|err| Error::BadSegment { - path: path.to_string(), - source: err, - })?; - } - - Ok(Self { - raw: stripped.to_string(), - }) - } - pub fn from_filesystem_path(path: impl AsRef) -> Result { let absolute = std::fs::canonicalize(&path).map_err(|err| Error::Canonicalize { path: path.as_ref().to_path_buf(), @@ -108,6 +82,39 @@ impl Path { // Reverse any percent encoding performed by conversion to URL Self::from_url_path(path) } +} + +impl Path { + pub fn parse(path: impl AsRef) -> Result { + let path = path.as_ref(); + + let stripped = path.strip_prefix(DELIMITER).unwrap_or(path); + if stripped.is_empty() { + return Ok(Default::default()); + } + + let stripped = stripped.strip_suffix(DELIMITER).unwrap_or(stripped); + + for segment in stripped.split(DELIMITER) { + if segment.is_empty() { + return Err(Error::EmptySegment { + path: path.to_string(), + }); + } + PathPart::parse(segment).map_err(|err| Error::BadSegment { + path: path.to_string(), + source: err, + })?; + } + + Ok(Self { + raw: stripped.to_string(), + }) + } + + pub fn from_opfs_path(path: impl AsRef) -> Result { + Self::parse(path.as_ref().to_str().unwrap()) + } pub fn from_url_path(path: impl AsRef) -> Result { let path = path.as_ref(); @@ -230,12 +237,14 @@ where } } +#[cfg(not(target_arch = "wasm32"))] pub(crate) fn absolute_path_to_url(path: impl AsRef) -> Result { Url::from_file_path(&path).map_err(|_| Error::InvalidPath { path: path.as_ref().into(), }) } +#[cfg(not(target_arch = "wasm32"))] pub fn path_to_local(location: &Path) -> Result { let mut url = Url::parse("file:///").unwrap(); url.path_segments_mut() @@ -264,6 +273,7 @@ pub fn path_to_local(location: &Path) -> Result { } #[cfg(test)] +#[cfg(not(target_arch = "wasm32"))] mod tests { use std::fs::canonicalize; diff --git a/fusio/tests/opfs.rs b/fusio/tests/opfs.rs new file mode 100644 index 0000000..f4682a0 --- /dev/null +++ b/fusio/tests/opfs.rs @@ -0,0 +1,217 @@ +#[cfg(test)] +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub(crate) mod tests { + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + use fusio::{disk::OPFS, dynamic::DynFile, fs::OpenOptions, path::Path, DynFs, Read, Write}; + use futures_util::StreamExt; + use wasm_bindgen_test::wasm_bindgen_test; + + async fn remove_all(fs: &OPFS, pathes: &[&str]) { + for path in pathes { + fs.remove(&(*path).into()).await.unwrap(); + } + } + + #[wasm_bindgen_test] + async fn test_opfs_list() { + let fs = OPFS; + let path = "test_opfs_dir".to_string(); + fs.create_dir_all(&path.into()).await.unwrap(); + let _ = fs + .open_options(&"file1".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let _ = fs + .open_options(&"file2".into(), OpenOptions::default().create(true)) + .await; + + let base_path = Path::from_opfs_path("/".to_string()).unwrap(); + let mut stream = fs.list(&base_path).await.unwrap(); + let mut result_len = 0; + let expected = ["test_opfs_dir", "file1", "file2"]; + while let Some(Ok(meta)) = stream.next().await { + assert!(expected.contains(&meta.path.as_ref())); + result_len += 1; + } + assert_eq!(result_len, 3); + + remove_all(&fs, &["test_opfs_dir", "file1", "file2"]).await; + } + + #[wasm_bindgen_test] + async fn test_create_nested_entry() { + let fs = OPFS; + let path = Path::from_opfs_path("test_opfs_dir/sub_dir".to_string()).unwrap(); + fs.create_dir_all(&path).await.unwrap(); + let _ = fs + .open_options( + &Path::from_opfs_path("test_opfs_dir/file").unwrap(), + OpenOptions::default().create(true), + ) + .await + .unwrap(); + let _ = fs + .open_options( + &Path::from_opfs_path("test_opfs_dir/sub_dir/sub_file").unwrap(), + OpenOptions::default().create(true), + ) + .await + .unwrap(); + + let base_path = Path::from_opfs_path("test_opfs_dir".to_string()).unwrap(); + let mut stream = fs.list(&base_path).await.unwrap(); + let expected = ["sub_dir", "file"]; + let mut result_len = 0; + while let Some(Ok(meta)) = stream.next().await { + assert!(expected.contains(&meta.path.as_ref())); + result_len += 1; + } + assert_eq!(result_len, 2); + + fs.remove(&Path::from_opfs_path("test_opfs_dir/file").unwrap()) + .await + .unwrap(); + + let expected = ["sub_dir"]; + let mut result_len = 0; + let mut stream = fs.list(&base_path).await.unwrap(); + while let Some(Ok(meta)) = stream.next().await { + assert!(expected.contains(&meta.path.as_ref())); + result_len += 1; + } + assert_eq!(result_len, 1); + + remove_all(&fs, &["test_opfs_dir"]).await; + } + + #[wasm_bindgen_test] + async fn test_opfs_read_write() { + let fs = OPFS; + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let (result, _) = file.write_all([1, 2, 3, 4].as_mut()).await; + result.unwrap(); + let (result, _) = file.write_all([11, 23, 34, 47].as_mut()).await; + result.unwrap(); + let (result, _) = file.write_all([121, 93, 94, 97].as_mut()).await; + result.unwrap(); + file.close().await.unwrap(); + + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let expected = [1_u8, 2, 3, 4, 11, 23, 34, 47, 121, 93, 94, 97]; + for i in 0..12 { + let (result, data) = file.read_to_end_at(vec![], i).await; + result.unwrap(); + assert_eq!(data.as_slice(), &expected[i as usize..]); + } + + let mut buf = [0; 7]; + let (result, data) = file.read_exact_at(buf.as_mut(), 3).await; + result.unwrap(); + assert_eq!(data, [4, 11, 23, 34, 47, 121, 93]); + remove_all(&fs, &["file"]).await; + } + + #[wasm_bindgen_test] + async fn test_opfs_read_write_utf16() { + let fs = OPFS; + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let utf16_bytes: &[u8] = &[ + 0x00, 0x48, 0x20, 0xAC, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, 0x00, 0x77, + 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x21, + ]; + + let (result, _) = file.write_all(utf16_bytes).await; + result.unwrap(); + file.close().await.unwrap(); + + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let (result, data) = file.read_to_end_at(vec![], 0).await; + result.unwrap(); + assert_eq!( + data, + [ + 0x00, 0x48, 0x20, 0xAC, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, 0x00, 0x77, + 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x21, + ] + ); + + remove_all(&fs, &["file"]).await; + } + + #[wasm_bindgen_test] + async fn test_opfs_handle_write() { + let fs = OPFS; + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let opfs = + unsafe { std::mem::transmute::<&Box, &Box>(&file) }; + + let handle = opfs.file_handle().unwrap(); + + let (result, _) = handle.write_at([1, 2, 3, 4].as_mut(), 0).await; + result.unwrap(); + let (result, _) = handle.write_at([11, 23, 34, 47].as_mut(), 4).await; + result.unwrap(); + let (result, _) = handle.write_at([121, 93, 94, 97].as_mut(), 8).await; + result.unwrap(); + file.close().await.unwrap(); + + let file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + let opfs = + unsafe { std::mem::transmute::<&Box, &Box>(&file) }; + let handle = opfs.file_handle().unwrap(); + let mut buf = [0; 7]; + + let (result, data) = handle.read_exact_at(buf.as_mut(), 3).await; + result.unwrap(); + assert_eq!(data, [4, 11, 23, 34, 47, 121, 93]); + remove_all(&fs, &["file"]).await; + } + + #[wasm_bindgen_test] + async fn test_opfs_read_eof() { + let fs = OPFS; + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + + let mut buf = [0; 1]; + let (result, _) = file.read_exact_at(buf.as_mut(), 0).await; + assert!(result.is_err()); + + let (result, _) = file.write_all([1, 2, 3, 4].as_mut()).await; + result.unwrap(); + file.close().await.unwrap(); + + let mut file = fs + .open_options(&"file".into(), OpenOptions::default().create(true)) + .await + .unwrap(); + + let mut buf = [0; 1]; + let (result, data) = file.read_exact_at(buf.as_mut(), 5).await; + assert!(result.is_err()); + assert_eq!(data, [0]); + remove_all(&fs, &["file"]).await; + } +}