diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 29b8627fa67d4..26922db118b59 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -727,30 +727,16 @@ where let mut val_vec = vec![]; loop { let fetch_start = Instant::now(); - let Some(updates) = consolidator - .next_chunk(cfg.compaction_yield_after_n_updates) - .await? - else { + let Some(updates) = consolidator.next().await? else { break; }; timings.part_fetching += fetch_start.elapsed(); - // We now have a full set of consolidated columnar data here, but no way to push it - // into the batch builder yet. Instead, iterate over the codec records and push those - // in directly. - for ((k, v), t, d) in updates.records().iter() { + for ((k, v), t, d) in updates.take(cfg.compaction_yield_after_n_updates) { key_vec.clear(); key_vec.extend_from_slice(k); val_vec.clear(); val_vec.extend_from_slice(v); - batch - .add( - &real_schemas, - &key_vec, - &val_vec, - &T::decode(t), - &D::decode(d), - ) - .await?; + batch.add(&real_schemas, &key_vec, &val_vec, &t, &d).await?; } tokio::task::yield_now().await; } diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index e7f839ecec16d..8210c81295607 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -555,6 +555,7 @@ where /// Wait until data is available, then return an iterator over the next /// consolidated chunk of output. If this method returns `None`, that all the data has been /// exhausted and the full consolidated dataset has been returned. + #[allow(dead_code)] pub(crate) async fn next_chunk( &mut self, max_len: usize,