Skip to content

Commit

Permalink
Stream chunks without olding them in memory when downloading via the …
Browse files Browse the repository at this point in the history
…bridge
  • Loading branch information
co42 committed Dec 26, 2024
1 parent 286e663 commit c44855b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
5 changes: 2 additions & 3 deletions cas_object/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@ bytes = "1.7.2"
rand = "0.8.5"
blake3 = "1.5.4"
futures = { version = "0.3.31" }
tokio-util = {version = "0.7.12", features = ["io"]}
tokio = {version = "1.41.1" }

tokio-util = { version = "0.7.12", features = ["io", "io-util"] }
tokio = { version = "1.41.1" }
98 changes: 95 additions & 3 deletions cas_object/src/cas_chunk_format/deserialize_async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::io::Write;
use std::io::{Read, Write};
use std::mem::size_of;
use std::slice;

use anyhow::anyhow;
use bytes::Buf;
use bytes::{Buf, Bytes};
use futures::Stream;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::io::StreamReader;
use tokio::sync::mpsc::Sender;
use tokio::task::{spawn_blocking, JoinHandle};
use tokio_util::io::{StreamReader, SyncIoBridge};

use crate::error::CasObjectError;
use crate::{CASChunkHeader, CAS_CHUNK_HEADER_LENGTH};
Expand Down Expand Up @@ -81,6 +83,96 @@ pub async fn deserialize_chunks_from_async_read<R: AsyncRead + Unpin>(
Ok((buf, chunk_byte_indices))
}

pub fn deserialize_chunk_header_stream<R: Read>(reader: &mut R) -> Result<CASChunkHeader, CasObjectError> {
let mut result = CASChunkHeader::default();
unsafe {
let buf = slice::from_raw_parts_mut(&mut result as *mut _ as *mut u8, size_of::<CASChunkHeader>());
reader.read_exact(buf)?;
}
result.validate()?;
Ok(result)
}

/// Returns the compressed chunk size along with the uncompressed chunk size as a tuple, (compressed, uncompressed)
pub fn deserialize_chunk_to_writer_stream<R: Read>(reader: &mut R) -> Result<Bytes, CasObjectError> {
let header = deserialize_chunk_header_stream(reader)?;
let mut compressed_buf = vec![0u8; header.get_compressed_length() as usize];
reader.read_exact(&mut compressed_buf)?;
let mut uncompressed_buf = vec![0u8; header.get_uncompressed_length() as usize];

let uncompressed_len = super::decompress_chunk_to_writer(header, &mut compressed_buf, &mut uncompressed_buf)?;

if uncompressed_len != header.get_uncompressed_length() {
return Err(CasObjectError::FormatError(anyhow!(
"chunk is corrupted, uncompressed bytes len doesn't agree with chunk header"
)));
}

Ok(Bytes::from(uncompressed_buf))
}

/// Deserialize chunks one by one and write them to the writer, one message contains one full chunk
/// Returns the uncompressed chunks size
pub fn deserialize_chunks_to_writer_from_async_read_stream<R: AsyncRead + Unpin + Send + 'static>(
reader: R,
writer: Sender<Result<Bytes, CasObjectError>>,
start_offset: Option<u64>,
end_offset: Option<u64>,
) -> JoinHandle<()> {
let mut reader = SyncIoBridge::new(reader);
let mut start_offset = start_offset.unwrap_or(0);
let mut end_offset = end_offset.unwrap_or(std::u64::MAX);

// The deserialization of the chunks is done synchronously and thus needs to happen on a blocking thread
// Moreover we expect to return from this function right away to be able to read the other end of the stream
spawn_blocking(move || {
let mut uncompressed_len = 0;
loop {
match deserialize_chunk_to_writer_stream(&mut reader) {
Ok(uncompressed_bytes) => {
let uncompressed_bytes = if start_offset >= uncompressed_bytes.len() as u64 {
// Skip this chunk, it's entirely before the start offset
start_offset -= uncompressed_bytes.len() as u64;
continue;
} else if start_offset > 0 {
// Skip the part of the chunk before the start offset
let offset = start_offset as usize;
start_offset = 0;
uncompressed_bytes.slice(offset..)
} else if end_offset < uncompressed_len + uncompressed_bytes.len() as u64 {
// Skip the part of the chunk after the end offset
let offset = (end_offset - uncompressed_len) as usize;
end_offset = 0;
uncompressed_bytes.slice(..offset)
} else {
uncompressed_bytes
};

uncompressed_len += uncompressed_bytes.len() as u64;
if writer.blocking_send(Ok(uncompressed_bytes)).is_err() {
// Other end of the channel is closed, we can return
break;
}
if end_offset == 0 {
break;
}
},
Err(CasObjectError::InternalIOError(e)) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
let _ = writer.blocking_send(Err(CasObjectError::InternalIOError(e)));
break;
},
Err(e) => {
let _ = writer.blocking_send(Err(e));
break;
},
}
}
})
}

pub async fn deserialize_chunks_to_writer_from_stream<B, E, S, W>(
stream: S,
writer: &mut W,
Expand Down

0 comments on commit c44855b

Please sign in to comment.