From edbca6924c02a7297a0d535638f3be819404e7d0 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Wed, 5 Feb 2025 22:30:27 +0800 Subject: [PATCH] feat: append to a file instead of overwriting while opening for disk based file --- fusio/src/impls/disk/monoio/fs.rs | 20 +- fusio/src/impls/disk/monoio/mod.rs | 9 + fusio/src/impls/disk/opfs/fs.rs | 4 +- fusio/src/impls/disk/opfs/mod.rs | 305 ++++++++----------------- fusio/src/impls/disk/opfs/sync/mod.rs | 29 ++- fusio/src/impls/disk/tokio/fs.rs | 1 + fusio/src/impls/disk/tokio_uring/fs.rs | 6 +- fusio/tests/opfs.rs | 37 +-- 8 files changed, 146 insertions(+), 265 deletions(-) diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index 9a7e1b1..2cd601f 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -2,6 +2,7 @@ use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; +use monoio::fs::File; use super::MonoioFile; use crate::{ @@ -21,16 +22,15 @@ impl Fs for MonoIoFs { async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; - - Ok(MonoioFile::from( - monoio::fs::OpenOptions::new() - .read(options.read) - .write(options.write) - .create(options.create) - .truncate(options.truncate) - .open(&local_path) - .await?, - )) + let file = monoio::fs::OpenOptions::new() + .read(options.read) + .write(options.write) + .create(options.create) + .truncate(options.truncate) + .open(&local_path) + .await?; + let metadata = file.metadata().await.expect("monoio: get metadat failed"); + Ok(MonoioFile::new(file, metadata.len())) } async fn create_dir_all(path: &Path) -> Result<(), Error> { diff --git a/fusio/src/impls/disk/monoio/mod.rs b/fusio/src/impls/disk/monoio/mod.rs index ceaa40a..9d98835 100644 --- a/fusio/src/impls/disk/monoio/mod.rs +++ b/fusio/src/impls/disk/monoio/mod.rs @@ -52,6 +52,15 @@ impl From for MonoioFile { } } +impl MonoioFile { + pub(crate) fn new(file: File, pos: u64) -> Self { + Self { + file: Some(file), + pos, + } + } +} + impl Write for MonoioFile { async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { let (result, buf) = self diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index 5692f4a..e8ea9fe 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -65,9 +65,9 @@ impl Fs for OPFS { cfg_if::cfg_if! { if #[cfg(feature = "sync")] { - Ok(Self::File::new(file_handle).await?) + Ok(Self::File::new(file_handle, options).await?) } else { - Ok(OPFSFile::new(file_handle)) + Ok(OPFSFile::new(file_handle, options).await?) } } } diff --git a/fusio/src/impls/disk/opfs/mod.rs b/fusio/src/impls/disk/opfs/mod.rs index b3ec835..97171ca 100644 --- a/fusio/src/impls/disk/opfs/mod.rs +++ b/fusio/src/impls/disk/opfs/mod.rs @@ -4,7 +4,7 @@ pub mod fs; #[cfg(feature = "sync")] pub mod sync; -use std::{io, sync::Arc}; +use std::io; use js_sys::Uint8Array; use wasm_bindgen_futures::JsFuture; @@ -14,7 +14,7 @@ use web_sys::{ ReadableStreamReadResult, WorkerGlobalScope, }; -use crate::{error::wasm_err, Error, IoBuf, IoBufMut, Read, Write}; +use crate::{error::wasm_err, fs::OpenOptions, Error, IoBuf, IoBufMut, Read, Write}; pub(crate) async fn promise(promise: js_sys::Promise) -> Result where @@ -25,198 +25,62 @@ where js_val.dyn_into::().map_err(|_obj| Error::CastError) } -/// File handle of OPFS file -pub struct FileHandle { +/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemWritableFileStream) +pub struct OPFSFile { file_handle: FileSystemFileHandle, + write_stream: Option, + pos: u64, } -impl FileHandle { - fn new(file_handle: FileSystemFileHandle) -> Self { - Self { file_handle } - } -} - -impl FileHandle { - /// Attempts to write an entire buffer into the file. - /// - /// Unlike [`OPFSFile::write_all`], changes will be written to the actual file. - pub async fn write_at(&self, buf: B, pos: u64) -> (Result<(), Error>, B) { - let options = FileSystemCreateWritableOptions::new(); - options.set_keep_existing_data(true); - - let writer_promise = self.create_writable_with_options(&options); - let writer = match promise::(writer_promise).await { - Ok(writer) => writer, - Err(err) => return (Err(err), buf), +impl OPFSFile { + #[allow(unused)] + pub(crate) async fn new( + file_handle: FileSystemFileHandle, + open_options: OpenOptions, + ) -> Result { + let size = if open_options.truncate { + 0.0 + } else { + let file = promise::(file_handle.get_file()).await?; + file.size() }; + let write_stream = if open_options.truncate || open_options.write { + let options = FileSystemCreateWritableOptions::new(); + options.set_keep_existing_data(!open_options.truncate); - if let Err(err) = JsFuture::from(writer.seek_with_u32(pos as u32).unwrap()).await { - return (Err(wasm_err(err)), buf); - } - - let (result, buf) = self.write_with_stream(buf, &writer).await; - if result.is_err() { - return (result, buf); - } - let result = JsFuture::from(writer.close()) - .await - .map_err(wasm_err) - .map(|_| ()); - - (result, buf) - } + let writer_promise = file_handle.create_writable_with_options(&options); + let write_stream = promise::(writer_promise).await?; + JsFuture::from(write_stream.seek_with_f64(size).unwrap()) + .await + .map_err(wasm_err)?; - /// Attempts to write an entire buffer into the stream. - /// - /// No changes are written to the actual file on disk until the stream is closed. - async fn write_with_stream( - &self, - buf: B, - stream: &FileSystemWritableFileStream, - ) -> (Result<(), Error>, B) { - match JsFuture::from(stream.write_with_u8_array(buf.as_slice()).unwrap()).await { - Ok(_) => (Ok(()), buf), - Err(err) => (Err(wasm_err(err)), buf), - } - } + Some(write_stream) + } else { + None + }; - /// Create a `FileSystemWritableFileStream` and return a JavaScript Promise - fn create_writable_with_options( - &self, - options: &FileSystemCreateWritableOptions, - ) -> js_sys::Promise { - self.file_handle.create_writable_with_options(options) + Ok(Self { + file_handle, + write_stream, + pos: size.round() as u64, + }) } -} -impl FileHandle { - /// Reads all bytes until EOF in this source, placing them into `buf`. - /// - /// # Errors - /// - /// If an error is encountered then the `read_to_end_at` operation - /// immediately completes. - async fn read_to_end_at(&self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { - let file_promise = self.file_handle.get_file(); - let file = match promise::(file_promise).await { - Ok(file) => file, - Err(err) => return (Err(err), buf), - }; + async fn reader(&self, pos: u64, buf_len: u64) -> Result { + let file = promise::(self.file_handle.get_file()).await?; - if (file.size().round() as u64) < pos + buf.bytes_init() as u64 { - return ( - Err(Error::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "Read unexpected eof", - ))), - buf, - ); + if (file.size().round() as u64) < pos + buf_len as u64 { + return Err(Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Read unexpected eof", + ))); } let blob = file.slice_with_i32(pos as i32).unwrap(); - let reader = match blob - .stream() + blob.stream() .get_reader() .dyn_into::() .map_err(|_obj| Error::CastError) - { - Ok(reader) => reader, - Err(err) => return (Err(err), buf), - }; - - while let Ok(v) = JsFuture::from(reader.read()).await { - let result = ReadableStreamReadResult::from(v); - if result.get_done().unwrap() { - break; - } - let chunk = result.get_value().dyn_into::().unwrap(); - buf.extend(chunk.to_vec()); - } - - (Ok(()), buf) - } - - /// Reads the exact number of bytes required to fill `buf` at `pos`. - /// - /// # Errors - /// - /// If the operation encounters an "end of file" before completely - /// filling the buffer, it returns an error of [`Error::Io`]. - pub async fn read_exact_at(&self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { - let buf_len = buf.bytes_init() as i32; - let buf_slice = buf.as_slice_mut(); - let end = pos as i32 + buf_len; - - let file = match promise::(self.file_handle.get_file()).await { - Ok(file) => file, - Err(err) => return (Err(err), buf), - }; - - if (file.size().round() as u64) < pos + buf_len as u64 { - return ( - Err(Error::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "Read unexpected eof", - ))), - buf, - ); - } - let blob = file.slice_with_i32_and_i32(pos as i32, end).unwrap(); - let reader = match blob - .stream() - .get_reader() - .dyn_into::() - .map_err(|_obj| Error::CastError) - { - Ok(reader) => reader, - Err(err) => return (Err(err), buf), - }; - - let mut offset = 0; - while let Ok(v) = JsFuture::from(reader.read()).await { - let result = ReadableStreamReadResult::from(v); - if result.get_done().unwrap() { - break; - } - - let chunk = result.get_value().dyn_into::().unwrap(); - let chunk_len = chunk.length() as usize; - buf_slice[offset..offset + chunk_len].copy_from_slice(chunk.to_vec().as_slice()); - offset += chunk_len; - } - - (Ok(()), buf) - } - - pub async fn size(&self) -> Result { - let file = promise::(self.file_handle.get_file()).await?; - - Ok(file.size() as u64) - } -} - -/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemWritableFileStream) -pub struct OPFSFile { - file_handle: Option>, - write_stream: Option, - pos: u32, -} - -impl OPFSFile { - #[allow(unused)] - pub(crate) fn new(file_handle: FileSystemFileHandle) -> Self { - Self { - file_handle: Some(Arc::new(FileHandle::new(file_handle))), - write_stream: None, - pos: 0, - } - } - - pub fn file_handle(&self) -> Option> { - match self.file_handle.as_ref() { - None => None, - Some(file_handle) => Some(Arc::clone(file_handle)), - } } } @@ -226,31 +90,22 @@ impl Write for OPFSFile { /// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called. /// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemWritableFileStream/write) async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { - let file_handle = self.file_handle.as_ref().expect("write file after closed"); - if self.write_stream.is_none() { - let options = FileSystemCreateWritableOptions::new(); - options.set_keep_existing_data(true); - let writer_promise = file_handle.create_writable_with_options(&options); - - let writer = match promise::(writer_promise).await { - Ok(writer) => writer, - Err(err) => return (Err(err), buf), - }; + debug_assert!(self.write_stream.is_some()); - if let Err(err) = JsFuture::from(writer.seek_with_u32(self.pos).unwrap()) - .await - .map_err(wasm_err) - { - return (Err(err), buf); - } - - self.write_stream = Some(writer); - } - - let writer = self.write_stream.as_ref().unwrap(); let len = buf.bytes_init(); - self.pos += len as u32; - file_handle.write_with_stream(buf, writer).await + self.pos += len as u64; + match JsFuture::from( + self.write_stream + .as_ref() + .unwrap() + .write_with_u8_array(buf.as_slice()) + .unwrap(), + ) + .await + { + Ok(_) => (Ok(()), buf), + Err(err) => (Err(wasm_err(err)), buf), + } } async fn flush(&mut self) -> Result<(), Error> { @@ -274,10 +129,29 @@ impl Read for OPFSFile { /// /// If the operation encounters an "end of file" before completely /// filling the buffer, it returns an error of [`Error::Io`]. - async fn read_exact_at(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) { - let file_handle = self.file_handle.as_ref().expect("read file after closed"); + async fn read_exact_at(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) { + let buf_len = buf.bytes_init() as u64; + let buf_slice = buf.as_slice_mut(); + + let reader = match self.reader(pos, buf_len).await { + Ok(reader) => reader, + Err(err) => return (Err(err), buf), + }; + + let mut offset = 0; + while let Ok(v) = JsFuture::from(reader.read()).await { + let result = ReadableStreamReadResult::from(v); + if result.get_done().unwrap() { + break; + } + + let chunk = result.get_value().dyn_into::().unwrap(); + let chunk_len = chunk.length() as usize; + buf_slice[offset..offset + chunk_len].copy_from_slice(chunk.to_vec().as_slice()); + offset += chunk_len; + } - file_handle.read_exact_at(buf, pos).await + (Ok(()), buf) } /// Reads all bytes until EOF in this source, placing them into `buf`. @@ -286,16 +160,29 @@ impl Read for OPFSFile { /// /// If an error is encountered then the `read_to_end_at` operation /// immediately completes. - async fn read_to_end_at(&mut self, buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { - let file_handle = self.file_handle.as_ref().expect("read file after closed"); + async fn read_to_end_at(&mut self, mut buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + let reader = match self.reader(pos, buf.bytes_init() as u64).await { + Ok(reader) => reader, + Err(err) => return (Err(err), buf), + }; - file_handle.read_to_end_at(buf, pos).await + while let Ok(v) = JsFuture::from(reader.read()).await { + let result = ReadableStreamReadResult::from(v); + if result.get_done().unwrap() { + break; + } + let chunk = result.get_value().dyn_into::().unwrap(); + buf.extend(chunk.to_vec()); + } + + (Ok(()), buf) } /// Return the size of file in bytes. async fn size(&self) -> Result { - let file_handle = self.file_handle.as_ref().expect("read file after closed"); - file_handle.size().await + let file = promise::(self.file_handle.get_file()).await?; + + Ok(file.size() as u64) } } diff --git a/fusio/src/impls/disk/opfs/sync/mod.rs b/fusio/src/impls/disk/opfs/sync/mod.rs index 74e8b24..022f847 100644 --- a/fusio/src/impls/disk/opfs/sync/mod.rs +++ b/fusio/src/impls/disk/opfs/sync/mod.rs @@ -2,19 +2,32 @@ use std::io; use web_sys::{FileSystemFileHandle, FileSystemReadWriteOptions, FileSystemSyncAccessHandle}; -use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, Write}; +use crate::{ + disk::opfs::promise, error::wasm_err, fs::OpenOptions, Error, IoBuf, IoBufMut, Read, Write, +}; /// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle) /// This file is only accessible inside dedicated Web Workers. pub struct OPFSSyncFile { access_handle: Option, + pos: u64, } impl OPFSSyncFile { - pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result { + pub(crate) async fn new( + file_handle: FileSystemFileHandle, + open_options: OpenOptions, + ) -> Result { let js_promise = file_handle.create_sync_access_handle(); - let access_handle = Some(promise::(js_promise).await?); - Ok(Self { access_handle }) + let access_handle = promise::(js_promise).await?; + if open_options.truncate { + access_handle.truncate_with_u32(0).map_err(wasm_err)?; + } + + Ok(Self { + access_handle: Some(access_handle), + pos: 0, + }) } } @@ -26,11 +39,17 @@ impl Write for OPFSSyncFile { async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { debug_assert!(self.access_handle.is_some(), "file is already closed"); + let options = FileSystemReadWriteOptions::new(); + options.set_at(self.pos as f64); + + let len = buf.bytes_init(); + self.pos += len as u64; + match self .access_handle .as_ref() .unwrap() - .write_with_u8_array(buf.as_slice()) + .write_with_u8_array_and_options(buf.as_slice(), &options) { Ok(_) => (Ok(()), buf), Err(err) => (Err(wasm_err(err)), buf), diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index bb7d158..16f3bb1 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -29,6 +29,7 @@ impl Fs for TokioFs { let file = tokio::fs::OpenOptions::new() .read(options.read) .write(options.write) + .append(options.write) .create(options.create) .open(&local_path) .await?; diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index e28277f..50eef03 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -30,10 +30,10 @@ impl Fs for TokioUringFs { .truncate(options.truncate) .open(&local_path) .await?; - + let stat = file.statx().await?; Ok(TokioUringFile { - file: Some(file), - pos: 0, + file: Some(open_options.open(&local_path).await?), + pos: stat.stx_size, }) } diff --git a/fusio/tests/opfs.rs b/fusio/tests/opfs.rs index 5a2f7f7..2dab3f1 100644 --- a/fusio/tests/opfs.rs +++ b/fusio/tests/opfs.rs @@ -4,7 +4,7 @@ pub(crate) mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); - use fusio::{disk::OPFS, dynamic::DynFile, fs::OpenOptions, path::Path, DynFs, Read, Write}; + use fusio::{disk::OPFS, fs::OpenOptions, path::Path, DynFs, Read, Write}; use futures_util::StreamExt; use wasm_bindgen_test::wasm_bindgen_test; @@ -152,41 +152,6 @@ pub(crate) mod tests { remove_all(&fs, &["file"]).await; } - #[wasm_bindgen_test] - async fn test_opfs_handle_write() { - let fs = OPFS; - let mut file = fs - .open_options(&"file".into(), OpenOptions::default().create(true)) - .await - .unwrap(); - let opfs = - unsafe { std::mem::transmute::<&Box, &Box>(&file) }; - - let handle = opfs.file_handle().unwrap(); - - let (result, _) = handle.write_at([1, 2, 3, 4].as_mut(), 0).await; - result.unwrap(); - let (result, _) = handle.write_at([11, 23, 34, 47].as_mut(), 4).await; - result.unwrap(); - let (result, _) = handle.write_at([121, 93, 94, 97].as_mut(), 8).await; - result.unwrap(); - file.close().await.unwrap(); - - let file = fs - .open_options(&"file".into(), OpenOptions::default().create(true)) - .await - .unwrap(); - let opfs = - unsafe { std::mem::transmute::<&Box, &Box>(&file) }; - let handle = opfs.file_handle().unwrap(); - let mut buf = [0; 7]; - - let (result, data) = handle.read_exact_at(buf.as_mut(), 3).await; - result.unwrap(); - assert_eq!(data, [4, 11, 23, 34, 47, 121, 93]); - remove_all(&fs, &["file"]).await; - } - #[wasm_bindgen_test] async fn test_opfs_read_eof() { let fs = OPFS;