Skip to content

Commit

Permalink
feat: add support for FileSystemSyncAccessHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Dec 13, 2024
1 parent 216eb44 commit ea927f1
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 3 deletions.
3 changes: 3 additions & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ opfs = [
"dep:web-sys",
"no-send",
]
sync = []
tokio = ["async-stream", "dep:tokio"]
tokio-http = ["dep:reqwest", "http"]
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]
Expand Down Expand Up @@ -116,7 +117,9 @@ web-sys = { version = "0.3", optional = true, features = [
"FileSystemFileHandle",
"FileSystemGetDirectoryOptions",
"FileSystemGetFileOptions",
"FileSystemReadWriteOptions",
"FileSystemRemoveOptions",
"FileSystemSyncAccessHandle",
"FileSystemWritableFileStream",
"Navigator",
"ReadableStream",
Expand Down
3 changes: 3 additions & 0 deletions fusio/src/impls/disk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub(crate) mod opfs;
#[cfg(all(feature = "opfs", target_arch = "wasm32", feature = "fs"))]
#[allow(unused)]
pub use opfs::fs::*;
#[cfg(all(feature = "opfs", feature = "sync", target_arch = "wasm32"))]
#[allow(unused)]
pub use opfs::sync::OPFSSyncFile;
#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
#[allow(unused)]
pub use opfs::OPFSFile;
Expand Down
16 changes: 13 additions & 3 deletions fusio/src/impls/disk/opfs/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::future::Future;

use async_stream::stream;
use futures_core::Stream;
use futures_util::StreamExt;
Expand All @@ -13,7 +11,10 @@ use web_sys::{
FileSystemGetFileOptions, FileSystemRemoveOptions,
};

#[cfg(not(feature = "sync"))]
use super::OPFSFile;
#[cfg(feature = "sync")]
use crate::disk::OPFSSyncFile;
use crate::{
disk::opfs::{promise, storage},
error::wasm_err,
Expand All @@ -26,6 +27,9 @@ use crate::{
pub struct OPFS;

impl Fs for OPFS {
#[cfg(feature = "sync")]
type File = OPFSSyncFile;
#[cfg(not(feature = "sync"))]
type File = OPFSFile;

fn file_system(&self) -> FileSystemTag {
Expand Down Expand Up @@ -59,7 +63,13 @@ impl Fs for OPFS {
)
.await?;

Ok(OPFSFile::new(file_handle))
cfg_if::cfg_if! {
if #[cfg(feature = "sync")] {
Ok(Self::File::new(file_handle).await?)
} else {
Ok(OPFSFile::new(file_handle))
}
}
}

/// Recursively creates a directory and all of its parent components if they are missing.
Expand Down
5 changes: 5 additions & 0 deletions fusio/src/impls/disk/opfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "fs")]
pub mod fs;

#[cfg(feature = "sync")]
pub mod sync;

use std::{io, sync::Arc};

use js_sys::Uint8Array;
Expand Down Expand Up @@ -62,6 +65,7 @@ impl FileHandle {

(result, buf)
}

/// Attempts to write an entire buffer into the stream.
///
/// No changes are written to the actual file on disk until the stream is closed.
Expand Down Expand Up @@ -199,6 +203,7 @@ pub struct OPFSFile {
}

impl OPFSFile {
#[allow(unused)]
pub(crate) fn new(file_handle: FileSystemFileHandle) -> Self {
Self {
file_handle: Some(Arc::new(FileHandle::new(file_handle))),
Expand Down
162 changes: 162 additions & 0 deletions fusio/src/impls/disk/opfs/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::io;

use web_sys::{FileSystemFileHandle, FileSystemReadWriteOptions, FileSystemSyncAccessHandle};

use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, Write};

/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle)
/// This file is only accessible inside dedicated Web Workers.
pub struct OPFSSyncFile {
file_handle: FileSystemFileHandle,
access_handle: Option<FileSystemSyncAccessHandle>,
}

impl OPFSSyncFile {
pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result<Self, Error> {
let js_promise = file_handle.create_sync_access_handle();
let access_handle = Some(promise::<FileSystemSyncAccessHandle>(js_promise).await?);
Ok(Self {
file_handle,
access_handle,
})
}

pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle {
if self.access_handle.is_none() {
let js_promise = self.file_handle.create_sync_access_handle();
self.access_handle = Some(
promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap(),
);
}
self.access_handle.as_ref().unwrap()
}
}

impl Write for OPFSSyncFile {
/// Attempts to write an entire buffer into the file.
///
/// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write)
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
match self
.access_handle()
.await
.write_with_u8_array(buf.as_slice())
{
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Persists any changes made to the file.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush)
async fn flush(&mut self) -> Result<(), Error> {
self.access_handle().await.flush().map_err(wasm_err)
}

async fn close(&mut self) -> Result<(), Error> {
if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
Ok(())
}
}

impl Read for OPFSSyncFile {
/// Reads the exact number of bytes required to fill `buf` at `pos`.
///
/// # Errors
///
/// If the operation encounters an "end of file" before completely
/// filling the buffer, it returns an error of [`crate::Error`].
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
let buf_len = buf.bytes_init() as i32;
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
if (size.round() as u64) < pos + buf_len as u64 {
return (
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Read unexpected eof",
))),
buf,
);
}
match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Reads all bytes until EOF in this source, placing them into `buf`.
///
/// # Errors
///
/// If an error is encountered then the `read_to_end_at` operation
/// immediately completes.
async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
let buf_len = size.round() as usize - pos as usize;
if buf_len == 0 {
return (
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Read unexpected eof",
))),
buf,
);
}
buf.resize(buf_len, 0);

match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Return the size of file in bytes.
async fn size(&self) -> Result<u64, Error> {
match self.access_handle.as_ref() {
Some(access_handle) => access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err),
None => {
// FIXME: here should throw an error
let js_promise = self.file_handle.create_sync_access_handle();
let access_handle = promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap();
let result = access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err);

access_handle.close();

result
}
}
}
}

impl Drop for OPFSSyncFile {
fn drop(&mut self) {
if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
}
}

0 comments on commit ea927f1

Please sign in to comment.