diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index 89d0c5c..a31b43f 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -49,6 +49,7 @@ opfs = [ "dep:web-sys", "no-send", ] +sync = [] tokio = ["async-stream", "dep:tokio"] tokio-http = ["dep:reqwest", "http"] tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"] @@ -116,7 +117,9 @@ web-sys = { version = "0.3", optional = true, features = [ "FileSystemFileHandle", "FileSystemGetDirectoryOptions", "FileSystemGetFileOptions", + "FileSystemReadWriteOptions", "FileSystemRemoveOptions", + "FileSystemSyncAccessHandle", "FileSystemWritableFileStream", "Navigator", "ReadableStream", diff --git a/fusio/src/impls/disk/mod.rs b/fusio/src/impls/disk/mod.rs index c7e8016..ae1867d 100644 --- a/fusio/src/impls/disk/mod.rs +++ b/fusio/src/impls/disk/mod.rs @@ -8,6 +8,9 @@ pub(crate) mod opfs; #[cfg(all(feature = "opfs", target_arch = "wasm32", feature = "fs"))] #[allow(unused)] pub use opfs::fs::*; +#[cfg(all(feature = "opfs", feature = "sync", target_arch = "wasm32"))] +#[allow(unused)] +pub use opfs::sync::OPFSSyncFile; #[cfg(all(feature = "opfs", target_arch = "wasm32"))] #[allow(unused)] pub use opfs::OPFSFile; diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index 6fda353..1307d0c 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use async_stream::stream; use futures_core::Stream; use futures_util::StreamExt; @@ -13,7 +11,10 @@ use web_sys::{ FileSystemGetFileOptions, FileSystemRemoveOptions, }; +#[cfg(not(feature = "sync"))] use super::OPFSFile; +#[cfg(feature = "sync")] +use crate::disk::OPFSSyncFile; use crate::{ disk::opfs::{promise, storage}, error::wasm_err, @@ -26,6 +27,9 @@ use crate::{ pub struct OPFS; impl Fs for OPFS { + #[cfg(feature = "sync")] + type File = OPFSSyncFile; + #[cfg(not(feature = "sync"))] type File = OPFSFile; fn file_system(&self) -> FileSystemTag { @@ -59,7 +63,13 @@ impl Fs for OPFS { ) .await?; - Ok(OPFSFile::new(file_handle)) + cfg_if::cfg_if! { + if #[cfg(feature = "sync")] { + Ok(Self::File::new(file_handle).await?) + } else { + Ok(OPFSFile::new(file_handle)) + } + } } /// Recursively creates a directory and all of its parent components if they are missing. diff --git a/fusio/src/impls/disk/opfs/mod.rs b/fusio/src/impls/disk/opfs/mod.rs index 584f10a..b3ec835 100644 --- a/fusio/src/impls/disk/opfs/mod.rs +++ b/fusio/src/impls/disk/opfs/mod.rs @@ -1,6 +1,9 @@ #[cfg(feature = "fs")] pub mod fs; +#[cfg(feature = "sync")] +pub mod sync; + use std::{io, sync::Arc}; use js_sys::Uint8Array; @@ -62,6 +65,7 @@ impl FileHandle { (result, buf) } + /// Attempts to write an entire buffer into the stream. /// /// No changes are written to the actual file on disk until the stream is closed. @@ -199,6 +203,7 @@ pub struct OPFSFile { } impl OPFSFile { + #[allow(unused)] pub(crate) fn new(file_handle: FileSystemFileHandle) -> Self { Self { file_handle: Some(Arc::new(FileHandle::new(file_handle))), diff --git a/fusio/src/impls/disk/opfs/sync/mod.rs b/fusio/src/impls/disk/opfs/sync/mod.rs new file mode 100644 index 0000000..da3be87 --- /dev/null +++ b/fusio/src/impls/disk/opfs/sync/mod.rs @@ -0,0 +1,162 @@ +use std::io; + +use web_sys::{FileSystemFileHandle, FileSystemReadWriteOptions, FileSystemSyncAccessHandle}; + +use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, Write}; + +/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle) +/// This file is only accessible inside dedicated Web Workers. +pub struct OPFSSyncFile { + file_handle: FileSystemFileHandle, + access_handle: Option, +} + +impl OPFSSyncFile { + pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result { + let js_promise = file_handle.create_sync_access_handle(); + let access_handle = Some(promise::(js_promise).await?); + Ok(Self { + file_handle, + access_handle, + }) + } + + pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle { + if self.access_handle.is_none() { + let js_promise = self.file_handle.create_sync_access_handle(); + self.access_handle = Some( + promise::(js_promise) + .await + .unwrap(), + ); + } + self.access_handle.as_ref().unwrap() + } +} + +impl Write for OPFSSyncFile { + /// Attempts to write an entire buffer into the file. + /// + /// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called. + /// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write) + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + match self + .access_handle() + .await + .write_with_u8_array(buf.as_slice()) + { + Ok(_) => (Ok(()), buf), + Err(err) => (Err(wasm_err(err)), buf), + } + } + + /// Persists any changes made to the file. + /// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush) + async fn flush(&mut self) -> Result<(), Error> { + self.access_handle().await.flush().map_err(wasm_err) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(access_handle) = self.access_handle.take() { + access_handle.close(); + } + Ok(()) + } +} + +impl Read for OPFSSyncFile { + /// Reads the exact number of bytes required to fill `buf` at `pos`. + /// + /// # Errors + /// + /// If the operation encounters an "end of file" before completely + /// filling the buffer, it returns an error of [`crate::Error`]. + async fn read_exact_at(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + let buf_len = buf.bytes_init() as i32; + let options = FileSystemReadWriteOptions::new(); + options.set_at(pos as f64); + + let access_handle = self.access_handle().await; + let size = access_handle + .get_size() + .expect("InvalidStateError: file is already closed."); + if (size.round() as u64) < pos + buf_len as u64 { + return ( + Err(Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Read unexpected eof", + ))), + buf, + ); + } + match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) { + Ok(_) => (Ok(()), buf), + Err(err) => (Err(wasm_err(err)), buf), + } + } + + /// Reads all bytes until EOF in this source, placing them into `buf`. + /// + /// # Errors + /// + /// If an error is encountered then the `read_to_end_at` operation + /// immediately completes. + async fn read_to_end_at(&mut self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + let options = FileSystemReadWriteOptions::new(); + options.set_at(pos as f64); + + let access_handle = self.access_handle().await; + let size = access_handle + .get_size() + .expect("InvalidStateError: file is already closed."); + let buf_len = size.round() as usize - pos as usize; + if buf_len == 0 { + return ( + Err(Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Read unexpected eof", + ))), + buf, + ); + } + buf.resize(buf_len, 0); + + match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) { + Ok(_) => (Ok(()), buf), + Err(err) => (Err(wasm_err(err)), buf), + } + } + + /// Return the size of file in bytes. + async fn size(&self) -> Result { + match self.access_handle.as_ref() { + Some(access_handle) => access_handle + .get_size() + .map(|sz| sz.round() as u64) + .map_err(wasm_err), + None => { + // FIXME: here should throw an error + let js_promise = self.file_handle.create_sync_access_handle(); + let access_handle = promise::(js_promise) + .await + .unwrap(); + let result = access_handle + .get_size() + .map(|sz| sz.round() as u64) + .map_err(wasm_err); + + access_handle.close(); + + result + } + } + } +} + +impl Drop for OPFSSyncFile { + fn drop(&mut self) { + if let Some(access_handle) = self.access_handle.take() { + access_handle.close(); + } + } +}