Skip to content

Commit

Permalink
Merge pull request #28944 from bkirwi/iter-revert
Browse files Browse the repository at this point in the history
[persist] Temporarily switch back to the non-chunked iter in compaction
  • Loading branch information
bkirwi authored Aug 12, 2024
2 parents 7b8acc8 + ecfce9a commit f575f26
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 17 deletions.
20 changes: 3 additions & 17 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,30 +727,16 @@ where
let mut val_vec = vec![];
loop {
let fetch_start = Instant::now();
let Some(updates) = consolidator
.next_chunk(cfg.compaction_yield_after_n_updates)
.await?
else {
let Some(updates) = consolidator.next().await? else {
break;
};
timings.part_fetching += fetch_start.elapsed();
// We now have a full set of consolidated columnar data here, but no way to push it
// into the batch builder yet. Instead, iterate over the codec records and push those
// in directly.
for ((k, v), t, d) in updates.records().iter() {
for ((k, v), t, d) in updates.take(cfg.compaction_yield_after_n_updates) {
key_vec.clear();
key_vec.extend_from_slice(k);
val_vec.clear();
val_vec.extend_from_slice(v);
batch
.add(
&real_schemas,
&key_vec,
&val_vec,
&T::decode(t),
&D::decode(d),
)
.await?;
batch.add(&real_schemas, &key_vec, &val_vec, &t, &d).await?;
}
tokio::task::yield_now().await;
}
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ where
/// Wait until data is available, then return an iterator over the next
/// consolidated chunk of output. If this method returns `None`, that all the data has been
/// exhausted and the full consolidated dataset has been returned.
#[allow(dead_code)]
pub(crate) async fn next_chunk(
&mut self,
max_len: usize,
Expand Down

0 comments on commit f575f26

Please sign in to comment.