Skip to content

Commit

Permalink
refactor: change close semantic for tokio fs and FileSystemSyncAccess…
Browse files Browse the repository at this point in the history
…Handle
  • Loading branch information
crwen committed Jan 24, 2025
1 parent 98d22b3 commit d918d16
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 57 deletions.
71 changes: 28 additions & 43 deletions fusio/src/impls/disk/opfs/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,14 @@ use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read,
/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle)
/// This file is only accessible inside dedicated Web Workers.
pub struct OPFSSyncFile {
file_handle: FileSystemFileHandle,
access_handle: Option<FileSystemSyncAccessHandle>,
}

impl OPFSSyncFile {
pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result<Self, Error> {
let js_promise = file_handle.create_sync_access_handle();
let access_handle = Some(promise::<FileSystemSyncAccessHandle>(js_promise).await?);
Ok(Self {
file_handle,
access_handle,
})
}

pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle {
if self.access_handle.is_none() {
let js_promise = self.file_handle.create_sync_access_handle();
self.access_handle = Some(
promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap(),
);
}
self.access_handle.as_ref().unwrap()
Ok(Self { access_handle })
}
}

Expand All @@ -40,9 +24,12 @@ impl Write for OPFSSyncFile {
/// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write)
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

match self
.access_handle()
.await
.access_handle
.as_ref()
.unwrap()
.write_with_u8_array(buf.as_slice())
{
Ok(_) => (Ok(()), buf),
Expand All @@ -53,10 +40,18 @@ impl Write for OPFSSyncFile {
/// Persists any changes made to the file.
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush)
async fn flush(&mut self) -> Result<(), Error> {
self.access_handle().await.flush().map_err(wasm_err)
debug_assert!(self.access_handle.is_some(), "file is already closed");

self.access_handle
.as_ref()
.unwrap()
.flush()
.map_err(wasm_err)
}

async fn close(&mut self) -> Result<(), Error> {
debug_assert!(self.access_handle.is_some(), "file is already closed");

if let Some(access_handle) = self.access_handle.take() {
access_handle.close();
}
Expand All @@ -72,11 +67,13 @@ impl Read for OPFSSyncFile {
/// If the operation encounters an "end of file" before completely
/// filling the buffer, it returns an error of [`crate::Error`].
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

let buf_len = buf.bytes_init() as i32;
let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let access_handle = self.access_handle.as_ref().unwrap();
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
Expand All @@ -102,10 +99,12 @@ impl Read for OPFSSyncFile {
/// If an error is encountered then the `read_to_end_at` operation
/// immediately completes.
async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
debug_assert!(self.access_handle.is_some(), "file is already closed");

let options = FileSystemReadWriteOptions::new();
options.set_at(pos as f64);

let access_handle = self.access_handle().await;
let access_handle = self.access_handle.as_ref().unwrap();
let size = access_handle
.get_size()
.expect("InvalidStateError: file is already closed.");
Expand All @@ -129,27 +128,13 @@ impl Read for OPFSSyncFile {

/// Return the size of file in bytes.
async fn size(&self) -> Result<u64, Error> {
match self.access_handle.as_ref() {
Some(access_handle) => access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err),
None => {
// FIXME: here should throw an error
let js_promise = self.file_handle.create_sync_access_handle();
let access_handle = promise::<FileSystemSyncAccessHandle>(js_promise)
.await
.unwrap();
let result = access_handle
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err);

access_handle.close();

result
}
}
debug_assert!(self.access_handle.is_some(), "file is already closed");
self.access_handle
.as_ref()
.unwrap()
.get_size()
.map(|sz| sz.round() as u64)
.map_err(wasm_err)
}
}

Expand Down
7 changes: 4 additions & 3 deletions fusio/src/impls/disk/tokio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::io;
use async_stream::stream;
use futures_core::Stream;
use tokio::{
fs::{create_dir_all, remove_file, File},
fs::{create_dir_all, remove_file},
task::spawn_blocking,
};

use crate::{
disk::tokio::TokioFile,
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
Expand All @@ -16,7 +17,7 @@ use crate::{
pub struct TokioFs;

impl Fs for TokioFs {
type File = File;
type File = TokioFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
Expand All @@ -36,7 +37,7 @@ impl Fs for TokioFs {
file.set_len(0).await?;
}

Ok(file)
Ok(TokioFile::new(file))
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
Expand Down
49 changes: 38 additions & 11 deletions fusio/src/impls/disk/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ use tokio::{

use crate::{buf::IoBufMut, Error, IoBuf, Read, Write};

impl Write for File {
pub struct TokioFile {
file: Option<File>,
}
impl TokioFile {
pub(crate) fn new(file: File) -> Self {
Self { file: Some(file) }
}
}

impl Write for TokioFile {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
debug_assert!(self.file.is_some(), "file is already closed");

(
AsyncWriteExt::write_all(self, unsafe {
AsyncWriteExt::write_all(self.file.as_mut().unwrap(), unsafe {
&*slice_from_raw_parts(buf.as_ptr(), buf.bytes_init())
})
.await
Expand All @@ -23,40 +34,56 @@ impl Write for File {
}

async fn flush(&mut self) -> Result<(), Error> {
AsyncWriteExt::flush(self).await.map_err(Error::from)
debug_assert!(self.file.is_some(), "file is already closed");

AsyncWriteExt::flush(self.file.as_mut().unwrap())
.await
.map_err(Error::from)
}

async fn close(&mut self) -> Result<(), Error> {
AsyncWriteExt::flush(self).await.map_err(Error::from)?;
File::shutdown(self).await?;
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
AsyncWriteExt::flush(file).await.map_err(Error::from)?;
File::shutdown(file).await?;
self.file.take();
Ok(())
}
}

impl Read for File {
impl Read for TokioFile {
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
// TODO: Use pread instead of seek + read_exact
if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await {
if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await {
return (Err(Error::Io(e)), buf);
}
match AsyncReadExt::read_exact(self, buf.as_slice_mut()).await {
match AsyncReadExt::read_exact(file, buf.as_slice_mut()).await {
Ok(_) => (Ok(()), buf),
Err(e) => (Err(Error::Io(e)), buf),
}
}

async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
debug_assert!(self.file.is_some(), "file is already closed");

let file = self.file.as_mut().unwrap();
// TODO: Use pread instead of seek + read_exact
if let Err(e) = AsyncSeekExt::seek(self, SeekFrom::Start(pos)).await {
if let Err(e) = AsyncSeekExt::seek(file, SeekFrom::Start(pos)).await {
return (Err(Error::Io(e)), buf);
}
match AsyncReadExt::read_to_end(self, &mut buf).await {
match AsyncReadExt::read_to_end(file, &mut buf).await {
Ok(_) => (Ok(()), buf),
Err(e) => (Err(Error::Io(e)), buf),
}
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.metadata().await?.len())
debug_assert!(self.file.is_some(), "file is already closed");

Ok(self.file.as_ref().unwrap().metadata().await?.len())
}
}

0 comments on commit d918d16

Please sign in to comment.