Skip to content

Commit

Permalink
refactor: wrap parquet reader/writer with Arc and Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Dec 6, 2024
1 parent 7721cfa commit f173695
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 40 deletions.
47 changes: 26 additions & 21 deletions fusio-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use parquet::{
const PREFETCH_FOOTER_SIZE: usize = 512 * 1024;

pub struct AsyncReader {
#[cfg(feature = "opfs")]
inner: Arc<futures::lock::Mutex<Box<dyn DynFile>>>,
#[cfg(not(feature = "opfs"))]
inner: Box<dyn DynFile>,
content_length: u64,
// The prefetch size for fetching file footer.
Expand All @@ -31,6 +34,9 @@ fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize {

impl AsyncReader {
pub async fn new(reader: Box<dyn DynFile>, content_length: u64) -> Result<Self, fusio::Error> {
#[cfg(feature = "opfs")]
#[allow(clippy::arc_with_non_send_sync)]
let reader = Arc::new(futures::lock::Mutex::new(reader));
Ok(Self {
inner: reader,
content_length,
Expand All @@ -52,22 +58,20 @@ impl AsyncFileReader for AsyncReader {

cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Bytes, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();
let (sender, receiver) = futures::channel::oneshot::channel::<Result<Bytes, ParquetError>>();

let reader = self.inner.clone();
wasm_bindgen_futures::spawn_local(async move {

let (result, buf) = file_handle.read_exact_at(buf, range.start as u64).await;

let ret = match result {
Ok(_) => Ok(buf.freeze()),
Err(err) => Err(ParquetError::External(Box::new(err)))
let result = {
let mut guard = reader.lock().await;
let (result, buf) = guard.read_exact_at(buf, range.start as u64).await;
match result {
Ok(_) => Ok(buf.freeze()),
Err(err) => Err(ParquetError::External(Box::new(err)))
}
};
let _ = sender.send(ret);

let _ = sender.send(result);
});

async move {
Expand Down Expand Up @@ -96,15 +100,12 @@ impl AsyncFileReader for AsyncReader {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
let mut buf = BytesMut::with_capacity(footer_size);
buf.resize(footer_size, 0);
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<Arc<ParquetMetaData>, ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(&self.inner)
};
let file_handle = opfs.file_handle().unwrap();
let (sender, receiver) = futures::channel::oneshot::channel::<Result<Arc<ParquetMetaData>, ParquetError>>();

let reader = self.inner.clone();
wasm_bindgen_futures::spawn_local(async move {
let (result, prefetched_footer_content) = file_handle
let mut guard = reader.lock().await;
let (result, prefetched_footer_content) = guard
.read_exact_at(buf, content_length - footer_size as u64)
.await;
if let Err(err) = result {
Expand All @@ -130,19 +131,23 @@ impl AsyncFileReader for AsyncReader {
- FOOTER_SIZE)
..(prefetched_footer_length - FOOTER_SIZE)];

drop(guard);

let _ = sender.send(ParquetMetaDataReader::decode_metadata(buf)
.map(|meta| Arc::new(meta)));
} else {
let mut buf = BytesMut::with_capacity(metadata_length);
buf.resize(metadata_length, 0);

let (result, bytes) = file_handle
let (result, bytes) = guard
.read_exact_at(
buf,
content_length - metadata_length as u64 - FOOTER_SIZE as u64,
)
.await;

drop(guard);

if let Err(err) = result {
let _ = sender.send(Err(ParquetError::External(Box::new(err))));
return ;
Expand Down
57 changes: 38 additions & 19 deletions fusio-parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ use futures::future::BoxFuture;
use parquet::{arrow::async_writer::AsyncFileWriter, errors::ParquetError};

pub struct AsyncWriter {
inner: Option<Box<dyn DynFile>>,
#[cfg(feature = "opfs")]
pos: u64,
#[allow(clippy::arc_with_non_send_sync)]
inner: Option<std::sync::Arc<futures::lock::Mutex<Box<dyn DynFile>>>>,
#[cfg(not(feature = "opfs"))]
inner: Option<Box<dyn DynFile>>,
}

unsafe impl Send for AsyncWriter {}
impl AsyncWriter {
pub fn new(writer: Box<dyn DynFile>) -> Self {
Self {
inner: Some(writer),
#[cfg(feature = "opfs")]
pos: 0,
#[cfg(feature = "opfs")]
#[allow(clippy::arc_with_non_send_sync)]
let writer = std::sync::Arc::new(futures::lock::Mutex::new(writer));
{
Self {
inner: Some(writer),
}
}
}
}
Expand All @@ -27,17 +32,15 @@ impl AsyncFileWriter for AsyncWriter {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
match self.inner.as_mut() {
Some(writer) => {
let pos = self.pos;
self.pos += bs.len() as u64;
let (sender, receiver) =
futures::channel::oneshot::channel::<Result<(), ParquetError>>();
let opfs = unsafe {
std::mem::transmute::<&Box<dyn DynFile>, &Box<fusio::disk::OPFSFile>>(writer)
};
let handle = opfs.file_handle().unwrap();

let (sender, receiver) = futures::channel::oneshot::channel::<Result<(), ParquetError>>();
let writer = writer.clone();
wasm_bindgen_futures::spawn_local(async move {
let (result, _) = handle.write_at(bs, pos).await;
let result = {
let mut guard = writer.lock().await;
let (result, _) = guard.write_all(bs).await;
result
};

let _ = sender.send(result
.map_err(|err| ParquetError::External(Box::new(err))));
});
Expand Down Expand Up @@ -68,9 +71,25 @@ impl AsyncFileWriter for AsyncWriter {
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
cfg_if::cfg_if! {
if #[cfg(all(feature = "opfs", target_arch = "wasm32"))] {
Box::pin(async move {
Ok(())
})
match self.inner.take() {
Some(writer) => {
let (sender, receiver) = futures::channel::oneshot::channel::<Result<(), ParquetError>>();
wasm_bindgen_futures::spawn_local(async move {
let result = {
let mut guard = writer.lock().await;
guard.close().await
};
let _ = sender.send(result
.map_err(|err| ParquetError::External(Box::new(err))));
});
Box::pin(async move {
receiver.await.unwrap()
})
}
None => Box::pin(async move {
Ok(())
})
}
} else {
Box::pin(async move {
if let Some(mut writer) = self.inner.take() {
Expand Down

0 comments on commit f173695

Please sign in to comment.