diff --git a/crates/fluvio-protocol/src/record/data.rs b/crates/fluvio-protocol/src/record/data.rs index 3f8831a9d2f..79900fbdb25 100644 --- a/crates/fluvio-protocol/src/record/data.rs +++ b/crates/fluvio-protocol/src/record/data.rs @@ -667,6 +667,8 @@ mod test { assert_eq!(value.len(), 3); assert_eq!(value[0], 0x64); + let hdr = record.get_header(); + assert_eq!(hdr.attributes, 0i8); Ok(()) } diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 21ecbdb368b..6c79ed33e17 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -27,7 +27,7 @@ use crate::producer::ProducerError; use crate::error::Result; use super::event::EventHandler; -use super::memory_batch::MemoryBatch; +use super::memory_batch::{BatchRecordStatus, BatchSize, MemoryBatch}; const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30); @@ -117,15 +117,27 @@ impl RecordAccumulator { batches = guard; } if let Some(batch) = batches.back_mut() { - if let Some(push_record) = batch.push_record(record.clone()) { - if batch.is_full() { + match batch.push_record(record.clone()) { + PushRecordStatus::Ok(push_record) => { + if batch.is_full() { + batch_events.notify_batch_full().await; + } + return Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )); + } + PushRecordStatus::DontFit(batch_size, record) => { batch_events.notify_batch_full().await; + return self + .recreate_batch_single_record( + batch_size, + record, + &mut batches, + batch_events, + partition_id, + ) + .await; } - return Ok(PushRecord::new( - push_record.into_future_record_metadata(partition_id), - )); - } else { - batch_events.notify_batch_full().await; } } @@ -137,20 +149,54 @@ impl RecordAccumulator { let mut batch = ProducerBatch::new(self.batch_size, self.compression); match batch.push_record(record) { - Some(push_record) => { + PushRecordStatus::Ok(push_record) => { batch_events.notify_new_batch().await; - if batch.is_full() { batch_events.notify_batch_full().await; } - batches.push_back(batch); + Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )) + } + PushRecordStatus::DontFit(size, record) => { + // send the single record without batch size limit + self.recreate_batch_single_record( + size, + record, + &mut batches, + batch_events, + partition_id, + ) + .await + } + } + } + /// Recreate a batch with a singles record + pub async fn recreate_batch_single_record( + &self, + batch_size: usize, + record: Record, + batches: &mut VecDeque, + batch_events: &BatchEvents, + partition_id: PartitionId, + ) -> Result { + let mut batch = ProducerBatch::new(batch_size, self.compression); + match batch.push_record(record) { + PushRecordStatus::Ok(push_record) => { + batch_events.notify_new_batch().await; + batches.push_back(batch); Ok(PushRecord::new( push_record.into_future_record_metadata(partition_id), )) } - None => Err(ProducerError::RecordTooLarge(self.batch_size)), + PushRecordStatus::DontFit(size, _record) => { + // This should never happen, as we are creating a batch with a single record, with + // the same size as the record + // Only if we add a max_size option to the record (not batch) + Err(ProducerError::RecordTooLarge(size)) + } } } @@ -170,6 +216,23 @@ where { } } +pub enum PushRecordStatus { + Ok(PartialFutureRecordMetadata), + DontFit(BatchSize, Record), +} + +impl PushRecordStatus { + #[allow(dead_code)] + pub fn is_ok(&self) -> bool { + matches!(self, Self::Ok(_)) + } + + #[allow(dead_code)] + pub fn dont_fit(&self) -> bool { + matches!(self, Self::DontFit(_, _)) + } +} + pub(crate) struct ProducerBatch { pub(crate) notify: Sender, batch_metadata: Arc, @@ -191,13 +254,13 @@ impl ProducerBatch { /// Add a record to the batch. /// Return ProducerError::BatchFull if record does not fit in the batch, so /// the RecordAccumulator can create more batches if needed. - fn push_record(&mut self, record: Record) -> Option { + fn push_record(&mut self, record: Record) -> PushRecordStatus { match self.batch.push_record(record) { - None => None, - Some(relative_offset) => Some(PartialFutureRecordMetadata::new( - relative_offset, - self.batch_metadata.clone(), - )), + BatchRecordStatus::NotFull(relative_offset) + | BatchRecordStatus::Full(relative_offset) => PushRecordStatus::Ok( + PartialFutureRecordMetadata::new(relative_offset, self.batch_metadata.clone()), + ), + BatchRecordStatus::DontFit(size, record) => PushRecordStatus::DontFit(size, record), } } @@ -334,13 +397,13 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(pb.push_record(record.clone()).is_ok()); + assert!(pb.push_record(record.clone()).is_ok()); + assert!(pb.push_record(record.clone()).is_ok()); assert!(!pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(pb.push_record(record).dont_fit()); } #[test] @@ -356,13 +419,13 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(pb.push_record(record.clone()).is_ok()); + assert!(pb.push_record(record.clone()).is_ok()); + assert!(pb.push_record(record.clone()).is_ok()); assert!(pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(pb.push_record(record).dont_fit()); } #[fluvio_future::test] diff --git a/crates/fluvio/src/producer/memory_batch.rs b/crates/fluvio/src/producer/memory_batch.rs index c4178b444ab..003ff2b0cb8 100644 --- a/crates/fluvio/src/producer/memory_batch.rs +++ b/crates/fluvio/src/producer/memory_batch.rs @@ -8,6 +8,39 @@ use fluvio_types::Timestamp; use super::*; +pub type BatchSize = usize; +pub enum BatchRecordStatus { + DontFit(BatchSize, Record), // Actual record does not fit in the batch + Full(Offset), // Actual record fits in the batch and the batch is full. + NotFull(Offset), // Actual record fits in the batch and the batch is not full. +} + +impl BatchRecordStatus { + pub fn fit(&self) -> bool { + match self { + BatchRecordStatus::DontFit(_, _) => false, + BatchRecordStatus::Full(_) => true, + BatchRecordStatus::NotFull(_) => true, + } + } + + pub fn is_full(&self) -> bool { + match self { + BatchRecordStatus::DontFit(_, _) => false, + BatchRecordStatus::Full(_) => true, + BatchRecordStatus::NotFull(_) => false, + } + } + + pub fn not_full(&self) -> bool { + match self { + BatchRecordStatus::DontFit(_, _) => false, + BatchRecordStatus::Full(_) => false, + BatchRecordStatus::NotFull(_) => true, + } + } +} + pub struct MemoryBatch { compression: Compression, write_limit: usize, @@ -35,7 +68,7 @@ impl MemoryBatch { /// Add a record to the batch. /// The value of `Offset` is relative to the `MemoryBatch` instance. - pub fn push_record(&mut self, mut record: Record) -> Option { + pub fn push_record(&mut self, mut record: Record) -> BatchRecordStatus { let current_offset = self.offset() as i64; record .get_mut_header() @@ -45,21 +78,21 @@ impl MemoryBatch { record.get_mut_header().set_timestamp_delta(timestamp_delta); let record_size = record.write_size(0); + let est_size = self.estimated_size() + record_size; - if self.estimated_size() + record_size > self.write_limit { - self.is_full = true; - return None; - } - - if self.estimated_size() + record_size == self.write_limit { - self.is_full = true; + if est_size > self.write_limit { + return BatchRecordStatus::DontFit(est_size, record); } self.current_size_uncompressed += record_size; - self.records.push(record); - Some(current_offset) + if est_size == self.write_limit { + self.is_full = true; + return BatchRecordStatus::Full(current_offset); + } + + BatchRecordStatus::NotFull(current_offset) } pub fn is_full(&self) -> bool { @@ -73,23 +106,7 @@ impl MemoryBatch { } fn estimated_size(&self) -> usize { - (self.current_size_uncompressed as f32 * self.compression_coefficient()) as usize - + Batch::::default().write_size(0) - } - - fn compression_coefficient(&self) -> f32 { - cfg_if::cfg_if! { - if #[cfg(feature = "compress")] { - match self.compression { - Compression::None => 1.0, - Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => { - 0.5 - } - } - } else { - 1.0 - } - } + self.current_size_uncompressed + Batch::::default().write_size(0) } pub fn records_len(&self) -> usize { @@ -157,13 +174,13 @@ mod test { Compression::None, ); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).not_full()); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).not_full()); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).not_full()); let batch: Batch = mb.into(); assert!( @@ -209,12 +226,19 @@ mod test { let mut offset = 0; for _ in 0..num_records { - offset = memory_batch - .push_record(Record { - value: RecordData::from(record_data.clone()), - ..Default::default() - }) - .expect("Offset should exist"); + let status = memory_batch.push_record(Record { + value: RecordData::from(record_data.clone()), + ..Default::default() + }); + + if let BatchRecordStatus::NotFull(_) = status { + offset += 1; + } + if let BatchRecordStatus::NotFull(_) = status { + offset += 1; + } else { + panic!("Offset should exist"); + } } let memory_batch_records_len = memory_batch.records_len(); diff --git a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats new file mode 100644 index 00000000000..10b494b1c97 --- /dev/null +++ b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats @@ -0,0 +1,48 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + TOPIC_NAME=$(random_string) + export TOPIC_NAME + debug_msg "Topic name: $TOPIC_NAME" +} + +teardown_file() { + run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + run rm $TOPIC_NAME.txt +} + +# Create topic +@test "Create topics for test" { + debug_msg "topic: $TOPIC_NAME" + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" + assert_success +} + +# regression test issue https://github.com/infinyon/fluvio/issues/4161 +# Checking for max batch size ignores compression +@test "Produce uncompressed message larger than batch size" { + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 2500000 > $TOPIC_NAME-small.txt" + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt" + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt" + + debug_msg small 25 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-small.txt --raw --compression gzip' + assert_success + + debug_msg med 50+1 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-med.txt --raw --compression gzip' + assert_success + + # this will fail if using estimated compression + debug_msg big 150 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-big.txt --raw --compression gzip' + assert_success +}