Skip to content

Commit

Permalink
check
Browse files Browse the repository at this point in the history
  • Loading branch information
co42 committed Dec 26, 2024
1 parent d5c48ec commit 95a2e3e
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions cas_object/src/cas_chunk_format/deserialize_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,43 +116,46 @@ pub fn deserialize_chunk_to_writer_stream<R: Read>(reader: &mut R) -> Result<Byt
pub fn deserialize_chunks_to_writer_from_async_read_stream<R: AsyncRead + Unpin + Send + 'static>(
reader: R,
writer: Sender<Result<Bytes, CasObjectError>>,
start_offset: Option<u64>,
end_offset: Option<u64>,
_start_offset: Option<u64>,
_end_offset: Option<u64>,
) -> JoinHandle<()> {
let mut reader = SyncIoBridge::new(reader);
let mut start_offset = start_offset.unwrap_or(0);
let mut end_offset = end_offset.unwrap_or(std::u64::MAX);
// let mut start_offset = start_offset.unwrap_or(0);
// let mut end_offset = end_offset.unwrap_or(std::u64::MAX);

// The deserialization of the chunks is done synchronously and thus needs to happen on a blocking thread
// Moreover we expect to return from this function right away to be able to read the other end of the stream
spawn_blocking(move || {
let mut uncompressed_len = 0;
// let mut uncompressed_len = 0;
loop {
match deserialize_chunk_to_writer_stream(&mut reader) {
Ok(uncompressed_bytes) => {
let uncompressed_bytes = if start_offset >= uncompressed_bytes.len() as u64 {
// Skip this chunk, it's entirely before the start offset
start_offset -= uncompressed_bytes.len() as u64;
continue;
} else if start_offset > 0 {
// Skip the part of the chunk before the start offset
let offset = start_offset as usize;
start_offset = 0;
uncompressed_bytes.slice(offset..)
} else if end_offset < uncompressed_len + uncompressed_bytes.len() as u64 {
// Skip the part of the chunk after the end offset
let offset = (end_offset - uncompressed_len) as usize;
end_offset = 0;
uncompressed_bytes.slice(..offset)
} else {
uncompressed_bytes
};
// let uncompressed_bytes = if start_offset >= uncompressed_bytes.len() as u64 {
// // Skip this chunk, it's entirely before the start offset
// start_offset -= uncompressed_bytes.len() as u64;
// continue;
// } else if start_offset > 0 {
// // Skip the part of the chunk before the start offset
// let offset = start_offset as usize;
// start_offset = 0;
// uncompressed_bytes.slice(offset..)
// } else if end_offset < uncompressed_len + uncompressed_bytes.len() as u64 {
// // Skip the part of the chunk after the end offset
// let offset = (end_offset - uncompressed_len) as usize;
// end_offset = 0;
// uncompressed_bytes.slice(..offset)
// } else {
// uncompressed_bytes
// };

uncompressed_len += uncompressed_bytes.len() as u64;
// uncompressed_len += uncompressed_bytes.len() as u64;
if writer.blocking_send(Ok(uncompressed_bytes)).is_err() {
// Other end of the channel is closed, we can return
break;
}
// if end_offset == 0 {
// break;
// }
},
Err(CasObjectError::InternalIOError(e)) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand Down

0 comments on commit 95a2e3e

Please sign in to comment.