Skip to content

Commit

Permalink
Avoid uploading empty xorbs. (#161)
Browse files Browse the repository at this point in the history
Avoids uploading empty xorbs.
  • Loading branch information
hoytak authored Jan 31, 2025
1 parent 5cf29c1 commit b974981
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
5 changes: 2 additions & 3 deletions data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ impl PointerFileTranslator {
let new_cas_data = take(cas_data_accumulator.deref_mut());
drop(cas_data_accumulator); // Release the lock.

// Note: Only upload if there is new data; the file stuff here isn't really
//
if !new_cas_data.data.is_empty() {
// Upload if there is new data or info
if !new_cas_data.is_empty() {
self.xorb_uploader.register_new_cas_block(new_cas_data).await?;
}

Expand Down
90 changes: 49 additions & 41 deletions data/src/parallel_xorb_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,51 +89,59 @@ impl XorbUpload for ParallelXorbUploader {

// Only upload a new xorb if there is new data; it may be that an existing new file is formed only
// from existing chunks.
let xorb_data_len = cas_data.data.len();

let cas_hash = cas_node_hash(&cas_data.chunks[..]);

// Rate limiting, the acquired permit is dropped after the task completes.
// The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first.
let permit = self
.rate_limiter
.clone()
.acquire_owned()
.await
.map_err(|e| UploadTaskError(e.to_string()))?;

let item = (cas_hash, cas_data.data, cas_data.chunks);
let shard_manager = self.shard_manager.clone();
let cas = self.cas.clone();
let cas_prefix = self.cas_prefix.clone();

let mut upload_tasks = self.upload_tasks.lock().await;
let upload_progress_updater = self.upload_progress_updater.clone();
upload_tasks.spawn_on(
async move {
let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await;
if ret.is_ok() {
if let Some(updater) = upload_progress_updater {
updater.update(xorb_data_len as u64);
if cas_data.data.is_empty() {
// Register any new files if present; ignore xorb uploading in this case.
for (fi, _chunk_hash_indices) in cas_data.pending_file_info {
debug_assert!(_chunk_hash_indices.is_empty());
self.shard_manager.add_file_reconstruction_info(fi).await?;
}
Ok(MerkleHash::default())
} else {
let xorb_data_len = cas_data.data.len();

let cas_hash = cas_node_hash(&cas_data.chunks[..]);

// Rate limiting, the acquired permit is dropped after the task completes.
// The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first.
let permit = self
.rate_limiter
.clone()
.acquire_owned()
.await
.map_err(|e| UploadTaskError(e.to_string()))?;

let item = (cas_hash, cas_data.data, cas_data.chunks);
let shard_manager = self.shard_manager.clone();
let cas = self.cas.clone();
let cas_prefix = self.cas_prefix.clone();

let mut upload_tasks = self.upload_tasks.lock().await;
let upload_progress_updater = self.upload_progress_updater.clone();
upload_tasks.spawn_on(
async move {
let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await;
if ret.is_ok() {
if let Some(updater) = upload_progress_updater {
updater.update(xorb_data_len as u64);
}
}
drop(permit);
ret
},
&self.threadpool.handle(),
);

// Now register any new files as needed.
for (mut fi, chunk_hash_indices) in cas_data.pending_file_info {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}
drop(permit);
ret
},
&self.threadpool.handle(),
);

// Now register any new files as needed.
for (mut fi, chunk_hash_indices) in cas_data.pending_file_info {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}

self.shard_manager.add_file_reconstruction_info(fi).await?;
self.shard_manager.add_file_reconstruction_info(fi).await?;
}
Ok(cas_hash)
}

Ok(cas_hash)
}

/// Flush makes sure all xorbs added to queue before this call are sent successfully
Expand Down

0 comments on commit b974981

Please sign in to comment.