Skip to content

Commit

Permalink
opfs for parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Nov 6, 2024
1 parent 2276d64 commit 476c5dc
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 155 deletions.
3 changes: 3 additions & 0 deletions examples/opfs/index.js
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
import { write_to_opfs, read_from_opfs } from "./pkg";

await write_to_opfs();
await read_from_opfs();
24 changes: 22 additions & 2 deletions fusio-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,33 @@ name = "fusio-parquet"
repository.workspace = true
version = "0.2.1"

[features]
default = []
tokio = ["fusio/tokio"]
opfs = ["fusio/opfs"]

[dependencies]
bytes = { workspace = true }
fusio = { version = "0.3.0", path = "../fusio", features = ["bytes", "dyn"] }
cfg-if = "1.0.0"
fusio = { version = "0.3.0", path = "../fusio", features = [
"bytes",
"dyn",
] }
futures = { version = "0.3" }
parquet = { version = "53", features = ["arrow", "async"] }
parquet = { version = "53", default-features = false, features = [
"arrow",
"async",
] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4.45" }

[dev-dependencies]
arrow = "53"
rand = "0.8"
tempfile = "3.12.0"
tokio = { version = "1.40" }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = "0.2.95"
wasm-bindgen-test = "0.3.9"
207 changes: 152 additions & 55 deletions fusio-parquet/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{cmp, ops::Range, sync::Arc};

use bytes::{Bytes, BytesMut};
#[allow(unused)]
use fusio::{dynamic::DynFile, Read};
use futures::{future::BoxFuture, FutureExt};
use parquet::{
Expand All @@ -21,6 +22,8 @@ pub struct AsyncReader {
prefetch_footer_size: usize,
}

unsafe impl Send for AsyncReader {}

fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize {
let footer_size = cmp::max(footer_size, FOOTER_SIZE);
cmp::min(footer_size as u64, content_size) as usize
Expand All @@ -43,72 +46,166 @@ impl AsyncReader {

impl AsyncFileReader for AsyncReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
async move {
let len = range.end - range.start;
let mut buf = BytesMut::with_capacity(len);
buf.resize(len, 0);

let (result, buf) = self.inner.read_exact_at(buf, range.start as u64).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
Ok(buf.freeze())
let len = range.end - range.start;
let mut buf = BytesMut::with_capacity(len);
buf.resize(len, 0);

cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Bytes, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();

wasm_bindgen_futures::spawn_local(async move {

let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await;

let ret = match result {
Ok(_) => Ok(buf.freeze()),
Err(err) => Err(ParquetError::External(Box::new(err)))
};
let _ = sender.send(ret);
});

async move {
receiver.await.unwrap()
}
.boxed()
} else {
async move {
let (result, buf) = self.inner.read_exact_at(buf, range.start as u64).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
Ok(buf.freeze())
}
.boxed()
}
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
if self.content_length == 0 {
return Err(ParquetError::EOF("file empty".to_string()));
}
if self.content_length == 0 {
return async { Err(ParquetError::EOF("file empty".to_string())) }.boxed();
}
let footer_size = self.prefetch_footer_size;
let content_length = self.content_length;

cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
let mut buf = BytesMut::with_capacity(footer_size);
buf.resize(footer_size, 0);
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Arc<ParquetMetaData>, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();

let footer_size = self.prefetch_footer_size;
let mut buf = BytesMut::with_capacity(footer_size);
buf.resize(footer_size, 0);

let (result, prefetched_footer_content) = self
.inner
.read_exact_at(buf, self.content_length - footer_size as u64)
.await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
let prefetched_footer_slice = prefetched_footer_content.as_ref();
let prefetched_footer_length = prefetched_footer_slice.len();

// Decode the metadata length from the last 8 bytes of the file.
let metadata_length = {
let buf = &prefetched_footer_slice
[(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length];
debug_assert!(buf.len() == FOOTER_SIZE);
ParquetMetaDataReader::decode_footer(buf.try_into().unwrap())?
};

// Try to read the metadata from the `prefetched_footer_content`.
// Otherwise, fetch exact metadata from the remote.
if prefetched_footer_length >= metadata_length + FOOTER_SIZE {
let buf = &prefetched_footer_slice[(prefetched_footer_length
- metadata_length
- FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE)];
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(buf)?))
wasm_bindgen_futures::spawn_local(async move {
let (result, prefetched_footer_content) = file_handle
.read_exact_at(buf, content_length - footer_size as u64)
.await;
if let Err(err) = result {
let _ = sender.send(Err(ParquetError::External(Box::new(err))));
return ;
}
let prefetched_footer_slice = prefetched_footer_content.as_ref();
let prefetched_footer_length = prefetched_footer_slice.len();

// Decode the metadata length from the last 8 bytes of the file.
let metadata_length = {
let buf = &prefetched_footer_slice
[(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length];
debug_assert!(buf.len() == FOOTER_SIZE);
ParquetMetaDataReader::decode_footer(buf.try_into().unwrap()).unwrap()
};

// Try to read the metadata from the `prefetched_footer_content`.
// Otherwise, fetch exact metadata from the remote.
if prefetched_footer_length >= metadata_length + FOOTER_SIZE {
let buf = &prefetched_footer_slice[(prefetched_footer_length
- metadata_length
- FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE)];


let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf)
.map(|meta| Arc::new(meta)));
} else {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let (result, bytes) = file_handle
.read_exact_at(
buf,
content_length - metadata_length as u64 - FOOTER_SIZE as u64,
)
.await;
if let Err(err) = result {
let _ = sender.send(Err(ParquetError::External(Box::new(err))));
return ;
}

let _ = sender.send(ParquetMetaDataReader::decode_metadata(&bytes)
.map(|meta| Arc::new(meta)));
}
});
async move {
receiver.await.unwrap()
}.boxed()
} else {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let (result, bytes) = self
.inner
.read_exact_at(
buf,
self.content_length - metadata_length as u64 - FOOTER_SIZE as u64,
)
.await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;

Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&bytes)?))
async move {
let mut buf = BytesMut::with_capacity(footer_size);
buf.resize(footer_size, 0);
let (result, prefetched_footer_content) = self
.inner
.read_exact_at(buf, content_length - footer_size as u64)
.await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
let prefetched_footer_slice = prefetched_footer_content.as_ref();
let prefetched_footer_length = prefetched_footer_slice.len();

// Decode the metadata length from the last 8 bytes of the file.
let metadata_length = {
let buf = &prefetched_footer_slice
[(prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length];
debug_assert!(buf.len() == FOOTER_SIZE);
ParquetMetaDataReader::decode_footer(buf.try_into().unwrap())?
};

// Try to read the metadata from the `prefetched_footer_content`.
// Otherwise, fetch exact metadata from the remote.
if prefetched_footer_length >= metadata_length + FOOTER_SIZE {
let buf = &prefetched_footer_slice[(prefetched_footer_length
- metadata_length
- FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE)];
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(buf)?))
} else {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let (result, bytes) = self
.inner
.read_exact_at(
buf,
content_length - metadata_length as u64 - FOOTER_SIZE as u64,
)
.await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;

Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&bytes)?))
}
}
.boxed()
}
}
.boxed()
}
}

#[cfg(feature = "tokio")]
#[cfg(test)]
mod tests {
use std::{
Expand Down
78 changes: 63 additions & 15 deletions fusio-parquet/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,93 @@
use bytes::Bytes;
#[allow(unused)]
use fusio::{dynamic::DynFile, Write};
use futures::future::BoxFuture;
use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError};

pub struct AsyncWriter {
inner: Option<Box<dyn DynFile>>,
#[cfg(feature = "opfs")]
pos: u64,
}

unsafe impl Send for AsyncWriter {}
impl AsyncWriter {
pub fn new(writer: Box<dyn DynFile>) -> Self {
Self {
inner: Some(writer),
#[cfg(feature = "opfs")]
pos: 0,
}
}
}

impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
if let Some(writer) = self.inner.as_mut() {
let (result, _) = writer.write_all(bs).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
match self.inner.as_mut() {
Some(writer) => {
let pos = self.pos;
self.pos += bs.len() as u64;
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<(), ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(writer)
};
let handle = opfs.file_handle().unwrap();

wasm_bindgen_futures::spawn_local(async move {
let (result, _) = handle.write_at(bs, pos).await;
let _ = sender.send(result
.map_err(|err| ParquetError::External(Box::new(err))));
});

Box::pin(async move {
receiver.await.unwrap()?;
Ok(())
})

},
None => Box::pin(async move {
Ok(())
})

}
} else {
Box::pin(async move {
if let Some(writer) = self.inner.as_mut() {
let (result, _) = writer.write_all(bs).await;
result.map_err(|err| ParquetError::External(Box::new(err)))?;
}
Ok(())
})
}
Ok(())
})
}
}

fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
if let Some(mut writer) = self.inner.take() {
writer
.close()
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
Box::pin(async move {
Ok(())
})
} else {
Box::pin(async move {
if let Some(mut writer) = self.inner.take() {
writer
.close()
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
}

Ok(())
})
}

Ok(())
})
}
}
}

#[cfg(feature = "tokio")]
#[cfg(test)]
mod tests {
use std::{
Expand Down
Loading

0 comments on commit 476c5dc

Please sign in to comment.