Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for FileSystemSyncAccessHandle #114

Merged
merged 1 commit into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ opfs = [
"dep:web-sys",
"no-send",
]
sync = ["opfs"]
tokio = ["async-stream", "dep:tokio"]
tokio-http = ["dep:reqwest", "http"]
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]
Expand Down Expand Up @@ -116,7 +117,9 @@ web-sys = { version = "0.3", optional = true, features = [
"FileSystemFileHandle",
"FileSystemGetDirectoryOptions",
"FileSystemGetFileOptions",
"FileSystemReadWriteOptions",
"FileSystemRemoveOptions",
"FileSystemSyncAccessHandle",
"FileSystemWritableFileStream",
"Navigator",
"ReadableStream",
Expand Down
3 changes: 3 additions & 0 deletions fusio/src/impls/disk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub(crate) mod opfs;
#[cfg(all(feature = "opfs", target_arch = "wasm32", feature = "fs"))]
#[allow(unused)]
pub use opfs::fs::*;
#[cfg(all(feature = "opfs", feature = "sync", target_arch = "wasm32"))]
#[allow(unused)]
pub use opfs::sync::OPFSSyncFile;
#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
#[allow(unused)]
pub use opfs::OPFSFile;
Expand Down
16 changes: 13 additions & 3 deletions fusio/src/impls/disk/opfs/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::future::Future;

use async_stream::stream;
use futures_core::Stream;
use futures_util::StreamExt;
Expand All @@ -13,7 +11,10 @@ use web_sys::{
FileSystemGetFileOptions, FileSystemRemoveOptions,
};

#[cfg(not(feature = "sync"))]
use super::OPFSFile;
#[cfg(feature = "sync")]
use crate::disk::OPFSSyncFile;
use crate::{
disk::opfs::{promise, storage},
error::wasm_err,
Expand All @@ -26,6 +27,9 @@ use crate::{
pub struct OPFS;

impl Fs for OPFS {
#[cfg(feature = "sync")]
type File = OPFSSyncFile;
#[cfg(not(feature = "sync"))]
type File = OPFSFile;

fn file_system(&self) -> FileSystemTag {
Expand Down Expand Up @@ -59,7 +63,13 @@ impl Fs for OPFS {
)
.await?;

Ok(OPFSFile::new(file_handle))
cfg_if::cfg_if! {
if #[cfg(feature = "sync")] {
Ok(Self::File::new(file_handle).await?)
} else {
Ok(OPFSFile::new(file_handle))
}
}
}

/// Recursively creates a directory and all of its parent components if they are missing.
Expand Down
5 changes: 5 additions & 0 deletions fusio/src/impls/disk/opfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "fs")]
pub mod fs;

#[cfg(feature = "sync")]
pub mod sync;

use std::{io, sync::Arc};

use js_sys::Uint8Array;
Expand Down Expand Up @@ -62,6 +65,7 @@ impl FileHandle {

(result, buf)
}

/// Attempts to write an entire buffer into the stream.
///
/// No changes are written to the actual file on disk until the stream is closed.
Expand Down Expand Up @@ -199,6 +203,7 @@ pub struct OPFSFile {
}

impl OPFSFile {
#[allow(unused)]
pub(crate) fn new(file_handle: FileSystemFileHandle) -> Self {
Self {
file_handle: Some(Arc::new(FileHandle::new(file_handle))),
Expand Down
162 changes: 162 additions & 0 deletions fusio/src/impls/disk/opfs/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::io;

use web_sys::{FileSystemFileHandle, FileSystemReadWriteOptions, FileSystemSyncAccessHandle};

use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, Write};

/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle)
/// This file is only accessible inside dedicated Web Workers.
pub struct OPFSSyncFile {
file_handle: FileSystemFileHandle,
access_handle: Option<FileSystemSyncAccessHandle>,
}

impl OPFSSyncFile {
pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result<Self, Error> {
let js_promise = file_handle.create_sync_access_handle();
let access_handle = Some(promise::<FileSystemSyncAccessHandle>(js_promise).await?);
Ok(Self {
file_handle,
access_handle,
})
}

pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle {
if self.access_handle.is_none() {
let js_promise = self.file_handle.create_sync_access_handle();
self.access_handle = Some(
promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap(),
);
}
self.access_handle.as_ref().unwrap()
}
}

impl Write for OPFSSyncFile {
/// Attempts to write an entire buffer into the file.
///
/// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write)
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
match self
.access_handle()
.await
.write_with_u8_array(buf.as_slice())
{
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Persists any changes made to the file.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush)
async fn flush(&mut self) -> Result<(), Error> {
self.access_handle().await.flush().map_err(wasm_err)
}

async fn close(&mut self) -> Result<(), Error> {
if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
Ok(())
}
}

impl Read for OPFSSyncFile {
/// Reads the exact number of bytes required to fill `buf` at `pos`.
///
/// # Errors
///
/// If the operation encounters an "end of file" before completely
/// filling the buffer, it returns an error of [`crate::Error`].
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
let buf_len = buf.bytes_init() as i32;
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
if (size.round() as u64) < pos + buf_len as u64 {
return (
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Read unexpected eof",
))),
buf,
);
}
match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Reads all bytes until EOF in this source, placing them into `buf`.
///
/// # Errors
///
/// If an error is encountered then the `read_to_end_at` operation
/// immediately completes.
async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
let buf_len = size.round() as usize - pos as usize;
if buf_len == 0 {
return (
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Read unexpected eof",
))),
buf,
);
}
buf.resize(buf_len, 0);

match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
Ok(_) => (Ok(()), buf),
Err(err) => (Err(wasm_err(err)), buf),
}
}

/// Return the size of file in bytes.
async fn size(&self) -> Result<u64, Error> {
match self.access_handle.as_ref() {
Some(access_handle) => access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err),
None => {
// FIXME: here should throw an error
let js_promise = self.file_handle.create_sync_access_handle();
let access_handle = promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap();
let result = access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err);

access_handle.close();

result
}
}
}
}

impl Drop for OPFSSyncFile {
fn drop(&mut self) {
if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
}
}
Loading