From b974981fcb40f2ac4c17d9de2320be699d3b062d Mon Sep 17 00:00:00 2001 From: Hoyt Koepke <56061+hoytak@users.noreply.github.com> Date: Fri, 31 Jan 2025 12:33:53 -1000 Subject: [PATCH] Avoid uploading empty xorbs. (#161) Avoids uploading empty xorbs. --- data/src/data_processing.rs | 5 +- data/src/parallel_xorb_uploader.rs | 90 ++++++++++++++++-------------- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 0999c58..d0b2585 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -194,9 +194,8 @@ impl PointerFileTranslator { let new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. - // Note: Only upload if there is new data; the file stuff here isn't really - // - if !new_cas_data.data.is_empty() { + // Upload if there is new data or info + if !new_cas_data.is_empty() { self.xorb_uploader.register_new_cas_block(new_cas_data).await?; } diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index fe76508..80623d5 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -89,51 +89,59 @@ impl XorbUpload for ParallelXorbUploader { // Only upload a new xorb if there is new data; it may be that an existing new file is formed only // from existing chunks. - let xorb_data_len = cas_data.data.len(); - - let cas_hash = cas_node_hash(&cas_data.chunks[..]); - - // Rate limiting, the acquired permit is dropped after the task completes. - // The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first. - let permit = self - .rate_limiter - .clone() - .acquire_owned() - .await - .map_err(|e| UploadTaskError(e.to_string()))?; - - let item = (cas_hash, cas_data.data, cas_data.chunks); - let shard_manager = self.shard_manager.clone(); - let cas = self.cas.clone(); - let cas_prefix = self.cas_prefix.clone(); - - let mut upload_tasks = self.upload_tasks.lock().await; - let upload_progress_updater = self.upload_progress_updater.clone(); - upload_tasks.spawn_on( - async move { - let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await; - if ret.is_ok() { - if let Some(updater) = upload_progress_updater { - updater.update(xorb_data_len as u64); + if cas_data.data.is_empty() { + // Register any new files if present; ignore xorb uploading in this case. + for (fi, _chunk_hash_indices) in cas_data.pending_file_info { + debug_assert!(_chunk_hash_indices.is_empty()); + self.shard_manager.add_file_reconstruction_info(fi).await?; + } + Ok(MerkleHash::default()) + } else { + let xorb_data_len = cas_data.data.len(); + + let cas_hash = cas_node_hash(&cas_data.chunks[..]); + + // Rate limiting, the acquired permit is dropped after the task completes. + // The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first. + let permit = self + .rate_limiter + .clone() + .acquire_owned() + .await + .map_err(|e| UploadTaskError(e.to_string()))?; + + let item = (cas_hash, cas_data.data, cas_data.chunks); + let shard_manager = self.shard_manager.clone(); + let cas = self.cas.clone(); + let cas_prefix = self.cas_prefix.clone(); + + let mut upload_tasks = self.upload_tasks.lock().await; + let upload_progress_updater = self.upload_progress_updater.clone(); + upload_tasks.spawn_on( + async move { + let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await; + if ret.is_ok() { + if let Some(updater) = upload_progress_updater { + updater.update(xorb_data_len as u64); + } } + drop(permit); + ret + }, + &self.threadpool.handle(), + ); + + // Now register any new files as needed. + for (mut fi, chunk_hash_indices) in cas_data.pending_file_info { + for i in chunk_hash_indices { + debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default()); + fi.segments[i].cas_hash = cas_hash; } - drop(permit); - ret - }, - &self.threadpool.handle(), - ); - - // Now register any new files as needed. - for (mut fi, chunk_hash_indices) in cas_data.pending_file_info { - for i in chunk_hash_indices { - debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default()); - fi.segments[i].cas_hash = cas_hash; - } - self.shard_manager.add_file_reconstruction_info(fi).await?; + self.shard_manager.add_file_reconstruction_info(fi).await?; + } + Ok(cas_hash) } - - Ok(cas_hash) } /// Flush makes sure all xorbs added to queue before this call are sent successfully