From d5c48ec59207e2b21e286078f56ce95d195dee3c Mon Sep 17 00:00:00 2001 From: Corentin REGAL Date: Thu, 19 Dec 2024 14:34:06 +0100 Subject: [PATCH] Stream chunks without olding them in memory when downloading via the bridge --- cas_object/Cargo.toml | 5 +- .../src/cas_chunk_format/deserialize_async.rs | 95 ++++++++++++++++++- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/cas_object/Cargo.toml b/cas_object/Cargo.toml index 5178514a..e46b8453 100644 --- a/cas_object/Cargo.toml +++ b/cas_object/Cargo.toml @@ -15,6 +15,5 @@ bytes = "1.7.2" rand = "0.8.5" blake3 = "1.5.4" futures = { version = "0.3.31" } -tokio-util = {version = "0.7.12", features = ["io"]} -tokio = {version = "1.41.1" } - +tokio-util = { version = "0.7.12", features = ["io", "io-util"] } +tokio = { version = "1.41.1" } diff --git a/cas_object/src/cas_chunk_format/deserialize_async.rs b/cas_object/src/cas_chunk_format/deserialize_async.rs index 6f708672..dc4c2927 100644 --- a/cas_object/src/cas_chunk_format/deserialize_async.rs +++ b/cas_object/src/cas_chunk_format/deserialize_async.rs @@ -1,12 +1,14 @@ -use std::io::Write; +use std::io::{Read, Write}; use std::mem::size_of; use std::slice; use anyhow::anyhow; -use bytes::Buf; +use bytes::{Buf, Bytes}; use futures::Stream; use tokio::io::{AsyncRead, AsyncReadExt}; -use tokio_util::io::StreamReader; +use tokio::sync::mpsc::Sender; +use tokio::task::{spawn_blocking, JoinHandle}; +use tokio_util::io::{StreamReader, SyncIoBridge}; use crate::error::CasObjectError; use crate::{CASChunkHeader, CAS_CHUNK_HEADER_LENGTH}; @@ -81,6 +83,93 @@ pub async fn deserialize_chunks_from_async_read( Ok((buf, chunk_byte_indices)) } +pub fn deserialize_chunk_header_stream(reader: &mut R) -> Result { + let mut result = CASChunkHeader::default(); + unsafe { + let buf = slice::from_raw_parts_mut(&mut result as *mut _ as *mut u8, size_of::()); + reader.read_exact(buf)?; + } + result.validate()?; + Ok(result) +} + +/// Returns the compressed chunk size along with the uncompressed chunk size as a tuple, (compressed, uncompressed) +pub fn deserialize_chunk_to_writer_stream(reader: &mut R) -> Result { + let header = deserialize_chunk_header_stream(reader)?; + let mut compressed_buf = vec![0u8; header.get_compressed_length() as usize]; + reader.read_exact(&mut compressed_buf)?; + let mut uncompressed_buf = vec![0u8; header.get_uncompressed_length() as usize]; + + let uncompressed_len = super::decompress_chunk_to_writer(header, &mut compressed_buf, &mut uncompressed_buf)?; + + if uncompressed_len != header.get_uncompressed_length() { + return Err(CasObjectError::FormatError(anyhow!( + "chunk is corrupted, uncompressed bytes len doesn't agree with chunk header" + ))); + } + + Ok(Bytes::from(uncompressed_buf)) +} + +/// Deserialize chunks one by one and write them to the writer, one message contains one full chunk +/// Returns the uncompressed chunks size +pub fn deserialize_chunks_to_writer_from_async_read_stream( + reader: R, + writer: Sender>, + start_offset: Option, + end_offset: Option, +) -> JoinHandle<()> { + let mut reader = SyncIoBridge::new(reader); + let mut start_offset = start_offset.unwrap_or(0); + let mut end_offset = end_offset.unwrap_or(std::u64::MAX); + + // The deserialization of the chunks is done synchronously and thus needs to happen on a blocking thread + // Moreover we expect to return from this function right away to be able to read the other end of the stream + spawn_blocking(move || { + let mut uncompressed_len = 0; + loop { + match deserialize_chunk_to_writer_stream(&mut reader) { + Ok(uncompressed_bytes) => { + let uncompressed_bytes = if start_offset >= uncompressed_bytes.len() as u64 { + // Skip this chunk, it's entirely before the start offset + start_offset -= uncompressed_bytes.len() as u64; + continue; + } else if start_offset > 0 { + // Skip the part of the chunk before the start offset + let offset = start_offset as usize; + start_offset = 0; + uncompressed_bytes.slice(offset..) + } else if end_offset < uncompressed_len + uncompressed_bytes.len() as u64 { + // Skip the part of the chunk after the end offset + let offset = (end_offset - uncompressed_len) as usize; + end_offset = 0; + uncompressed_bytes.slice(..offset) + } else { + uncompressed_bytes + }; + + uncompressed_len += uncompressed_bytes.len() as u64; + if writer.blocking_send(Ok(uncompressed_bytes)).is_err() { + // Other end of the channel is closed, we can return + break; + } + }, + Err(CasObjectError::InternalIOError(e)) => { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + break; + } + let _ = writer.blocking_send(Err(CasObjectError::InternalIOError(e))); + break; + }, + Err(e) => { + let _ = writer.blocking_send(Err(e)); + break; + }, + } + } + }) +} + pub async fn deserialize_chunks_to_writer_from_stream( stream: S, writer: &mut W,