Skip to content

Commit

Permalink
opfs read by stream
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Nov 4, 2024
1 parent 1845bc7 commit 9fbca47
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 125 deletions.
3 changes: 3 additions & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ web-sys = { version = "0.3", optional = true, features = [
"FileSystemRemoveOptions",
"FileSystemWritableFileStream",
"Navigator",
"ReadableStream",
"ReadableStreamDefaultReader",
"ReadableStreamReadResult",
"Storage",
"StorageManager",
"Window",
Expand Down
5 changes: 5 additions & 0 deletions fusio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ pub enum Error {
}

pub type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
pub(crate) fn wasm_err(js_val: js_sys::wasm_bindgen::JsValue) -> BoxedError {
format!("{js_val:?}").into()
}
132 changes: 40 additions & 92 deletions fusio/src/impls/disk/opfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
#[cfg(feature = "fs")]
pub mod fs;

use std::usize;

use js_sys::JsString;
use js_sys::Uint8Array;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
wasm_bindgen::JsCast, window, File, FileSystemCreateWritableOptions, FileSystemDirectoryHandle,
FileSystemFileHandle, FileSystemWritableFileStream,
FileSystemFileHandle, FileSystemWritableFileStream, ReadableStreamDefaultReader,
ReadableStreamReadResult,
};

use crate::{Error, IoBuf, IoBufMut, Read, Write};
use crate::{error::wasm_err, Error, IoBuf, IoBufMut, Read, Write};

pub struct OPFSFile {
file_handle: Option<FileSystemFileHandle>,
write_stream: Option<FileSystemWritableFileStream>,
pos: u32,
remain: Option<u8>,
}

impl OPFSFile {
Expand All @@ -25,7 +23,6 @@ impl OPFSFile {
file_handle,
write_stream: None,
pos: 0,
remain: None,
}
}
}
Expand All @@ -43,24 +40,18 @@ impl Write for OPFSFile {
.dyn_into::<FileSystemWritableFileStream>()
.unwrap();

JsFuture::from(writer.seek_with_u32(self.pos as u32).unwrap())
JsFuture::from(writer.seek_with_u32(self.pos).unwrap())
.await
.unwrap();
self.write_stream = Some(writer);
}

let writer = self.write_stream.as_ref().unwrap();
let (s, remain) = Self::encode(buf.as_slice(), self.remain.take());

let len = s.len();
self.remain = remain;
JsFuture::from(
writer
.write_with_u8_array(s.into_bytes().as_slice())
.unwrap(),
)
.await
.unwrap();

let len = buf.bytes_init();
JsFuture::from(writer.write_with_u8_array(buf.as_slice()).unwrap())
.await
.unwrap();
self.pos += len as u32;
}
(Ok(()), buf)
Expand All @@ -73,12 +64,7 @@ impl Write for OPFSFile {
async fn close(&mut self) -> Result<(), Error> {
let writer = self.write_stream.take();
if let Some(writer) = writer {
if let Some(remain) = self.remain.take() {
JsFuture::from(writer.write_with_u8_array(&[remain]).unwrap())
.await
.unwrap();
}
JsFuture::from(writer.close()).await.unwrap();
JsFuture::from(writer.close()).await.map_err(wasm_err)?;
}
Ok(())
}
Expand All @@ -87,38 +73,30 @@ impl Write for OPFSFile {
impl Read for OPFSFile {
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
if let Some(file_handle) = self.file_handle.as_ref() {
let buf_len = buf.bytes_init();
let start = 2_i32.max(pos as i32) - 2;
let end = pos as i32 + 2 * buf_len as i32 + 2;
let buf_len = buf.bytes_init() as i32;
let buf_slice = buf.as_slice_mut();
let end = pos as i32 + buf_len;

let file = JsFuture::from(file_handle.get_file())
.await
.unwrap()
.dyn_into::<File>()
.map_err(wasm_err)
.unwrap();
let blob = file.slice_with_i32_and_i32(start, end).unwrap();
let text = JsFuture::from(blob.text())
.await
.unwrap()
.dyn_into::<JsString>()
let blob = file.slice_with_i32_and_i32(pos as i32, end).unwrap();
let reader = blob
.stream()
.get_reader()
.dyn_into::<ReadableStreamDefaultReader>()
.unwrap();

let data = Self::decode(&text);

let start = (pos - start as u64) as usize;
let end = start + buf_len;
if end >= data.len() {
return (
Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Read end of file",
))),
buf,
);
while let Ok(v) = JsFuture::from(reader.read()).await {
let result = ReadableStreamReadResult::from(v);
if result.get_done().unwrap() {
break;
}
let chunk = result.get_value().dyn_into::<Uint8Array>().unwrap();
buf_slice.copy_from_slice(chunk.to_vec().as_slice());
}

let buf_slice = buf.as_slice_mut();
buf_slice.copy_from_slice(&data[start..end]);
}
(Ok(()), buf)
}
Expand All @@ -131,17 +109,20 @@ impl Read for OPFSFile {
.dyn_into::<File>()
.unwrap();

let start = 2_i32.max(pos as i32) - 2;
let blob = file.slice_with_i32(start).unwrap();
let text = JsFuture::from(blob.text())
.await
.unwrap()
.dyn_into::<JsString>()
let blob = file.slice_with_i32(pos as i32).unwrap();
let reader = blob
.stream()
.get_reader()
.dyn_into::<ReadableStreamDefaultReader>()
.unwrap();

let mut data = Self::decode(&text);
let start = (pos - start as u64) as usize;
buf.extend_from_slice(&mut data[start..]);
while let Ok(v) = JsFuture::from(reader.read()).await {
let result = ReadableStreamReadResult::from(v);
if result.get_done().unwrap() {
break;
}
let chunk = result.get_value().dyn_into::<Uint8Array>().unwrap();
buf.extend(chunk.to_vec());
}
}
(Ok(()), buf)
}
Expand All @@ -151,39 +132,6 @@ impl Read for OPFSFile {
}
}

impl OPFSFile {
fn encode(src: &[u8], mut remain: Option<u8>) -> (String, Option<u8>) {
let mut data = Vec::new();
let mut iter = src.iter();
let mut remain_data = None;
while let Some(num) = iter.next() {
let (low, high) = match remain.take() {
Some(low) => (low, Some(num)),
None => (*num, iter.next()),
};

if high.is_none() {
remain_data = Some(low);
break;
}
data.push(((*high.unwrap() as u16) << 8) | low as u16);
}
let s = String::from_utf16(&data).unwrap();
(s, remain_data)
}

fn decode(s: &JsString) -> Vec<u8> {
let mut data = Vec::with_capacity(s.length() as usize);
for n in s.iter() {
let low = n & 0xFF;
let high = n >> 8;
data.push(low as u8);
data.push(high as u8);
}
data
}
}

pub(crate) async fn storage() -> FileSystemDirectoryHandle {
let storage_promise = window().unwrap().navigator().storage().get_directory();
JsFuture::from(storage_promise)
Expand Down
42 changes: 9 additions & 33 deletions fusio/tests/opfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,17 @@ pub(crate) mod tests {
result.unwrap();
file.close().await.unwrap();

let (result, data) = file.read_to_end_at(vec![], 3).await;
result.unwrap();
assert_eq!(data, [3, 4, 11, 23, 34, 47, 121, 93, 94, 97]);
let expected = [1_u8, 2, 3, 4, 11, 23, 34, 47, 121, 93, 94, 97];
for i in 0..12 {
let (result, data) = file.read_to_end_at(vec![], i).await;
result.unwrap();
assert_eq!(data.as_slice(), &expected[i as usize..]);
}

let mut buf = [0; 6];
let (result, data) = file.read_exact_at(buf.as_mut(), 0).await;
let mut buf = [0; 7];
let (result, data) = file.read_exact_at(buf.as_mut(), 3).await;
result.unwrap();
assert_eq!(data, [1, 2, 3, 4, 11, 23]);
assert_eq!(data, [4, 11, 23, 34, 47, 121, 93]);
remove_all(&fs, &["file"]).await;
}

Expand Down Expand Up @@ -141,31 +144,4 @@ pub(crate) mod tests {

remove_all(&fs, &["file"]).await;
}

// #[wasm_bindgen_test]
// async fn test_opfs_write_padding() {
// let fs = OPFS;
// let mut file = fs
// .open_options(&"file".into(), OpenOptions::default().create(true))
// .await
// .unwrap();
// let (result, _) = file.write_all([1, 2, 3].as_mut()).await;
// result.unwrap();
// let (result, _) = file.write_all([11, 23, 34].as_mut()).await;
// result.unwrap();
// // file.close().await.unwrap();
// let (result, _) = file.write_all([121, 93, 94].as_mut()).await;
// result.unwrap();
// file.close().await.unwrap();
//
// let (result, data) = file.read_to_end_at(vec![], 0).await;
// result.unwrap();
// assert_eq!(data, [1, 2, 3, 11, 23, 34, 121, 93, 94]);
//
// let mut buf = [0; 4];
// let (result, data) = file.read_exact_at(buf.as_mut(), 0).await;
// result.unwrap();
// assert_eq!(data, [1, 2, 3, 11]);
// remove_all(&fs, &["file"]).await;
// }
}

0 comments on commit 9fbca47

Please sign in to comment.