diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 620d7f0f5b..17a2e7a52f 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -13,8 +13,7 @@ use futures_util::{FutureExt, ready}; use fluvio_future::sync::{Mutex, MutexGuard}; use fluvio_future::sync::Condvar; -use fluvio_protocol::record::{Batch, RawRecords}; -use fluvio_protocol::Encoder; +use fluvio_protocol::record::Batch; use fluvio_compression::Compression; use fluvio_protocol::record::Offset; use fluvio_protocol::link::ErrorCode; @@ -111,15 +110,21 @@ impl RecordAccumulator { // If the last batch is not full, push the record to it if let Some(batch) = batches.back_mut() { - if let Some(push_record) = batch.push_record(record.clone()) { - if batch.is_full() { + match batch.push_record(record.clone()) { + Ok(Some(push_record)) => { + if batch.is_full() { + batch_events.notify_batch_full().await; + } + return Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )); + } + Ok(None) => { batch_events.notify_batch_full().await; } - return Ok(PushRecord::new( - push_record.into_future_record_metadata(partition_id), - )); - } else { - batch_events.notify_batch_full().await; + Err(err) => { + return Err(err); + } } } @@ -127,7 +132,7 @@ impl RecordAccumulator { // Create and push a new batch if needed let push_record = self - .create_and_new_batch(batch_events, &mut batches, record) + .create_and_new_batch(batch_events, &mut batches, record, 1) .await?; Ok(PushRecord::new( @@ -154,29 +159,25 @@ impl RecordAccumulator { } Ok(batches) } - async fn create_and_new_batch( &self, batch_events: &BatchEvents, batches: &mut VecDeque, record: Record, + attempts: usize, ) -> Result { - let estimated_size = record.write_size(0) - + Batch::::default().write_size(0) - + Vec::::default().write_size(0); - - if estimated_size > self.max_request_size { - return Err(ProducerError::RecordTooLarge(estimated_size)); + if attempts > 2 { + // This should never happen, but if it does, we should stop the recursion + return Err(ProducerError::Internal( + "Attempts exceeded while creating a new batch".to_string(), + )); } - let mut batch = if estimated_size > self.batch_size { - ProducerBatch::new(None, self.compression) - } else { - ProducerBatch::new(Some(self.batch_size), self.compression) - }; + let mut batch = + ProducerBatch::new(self.max_request_size, self.batch_size, self.compression); - match batch.push_record(record) { - Some(push_record) => { + match batch.push_record(record.clone()) { + Ok(Some(push_record)) => { batch_events.notify_new_batch().await; if batch.is_full() { @@ -186,9 +187,26 @@ impl RecordAccumulator { batches.push_back(batch); Ok(push_record) } - None => Err(ProducerError::RecordTooLarge(self.batch_size)), + Ok(None) => { + if batch.is_full() { + batch_events.notify_batch_full().await; + } + + batches.push_back(batch); + + // Box the future to avoid infinite size due to recursion + Box::pin(self.create_and_new_batch( + batch_events, + batches, + record, + attempts + 1, + )) + .await + } + Err(err) => Err(err), } } + pub(crate) async fn batches(&self) -> HashMap { self.batches.read().await.clone() } @@ -211,10 +229,10 @@ pub(crate) struct ProducerBatch { batch: MemoryBatch, } impl ProducerBatch { - fn new(write_limit: Option, compression: Compression) -> Self { + fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self { let (sender, receiver) = async_channel::bounded(1); let batch_metadata = Arc::new(BatchMetadata::new(receiver)); - let batch = MemoryBatch::new(write_limit, compression); + let batch = MemoryBatch::new(write_limit, batch_limit, compression); Self { notify: sender, @@ -226,13 +244,17 @@ impl ProducerBatch { /// Add a record to the batch. /// Return ProducerError::BatchFull if record does not fit in the batch, so /// the RecordAccumulator can create more batches if needed. - fn push_record(&mut self, record: Record) -> Option { + fn push_record( + &mut self, + record: Record, + ) -> Result, ProducerError> { match self.batch.push_record(record) { - None => None, - Some(relative_offset) => Some(PartialFutureRecordMetadata::new( - relative_offset, + Ok(Some(offset)) => Ok(Some(PartialFutureRecordMetadata::new( + offset, self.batch_metadata.clone(), - )), + ))), + Ok(None) => Ok(None), + Err(err) => Err(err), } } @@ -362,22 +384,21 @@ mod test { // Producer batch that can store three instances of Record::from(("key", "value")) let mut pb = ProducerBatch::new( - Some( - size * 3 - + 1 - + Batch::::default().write_size(0) - + Vec::::default().write_size(0), - ), + 1_048_576, + size * 3 + + 1 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); assert!(!pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(pb.push_record(record).unwrap().is_none()); } #[test] @@ -387,21 +408,20 @@ mod test { // Producer batch that can store three instances of Record::from(("key", "value")) let mut pb = ProducerBatch::new( - Some( - size * 3 - + Batch::::default().write_size(0) - + Vec::::default().write_size(0), - ), + 1_048_576, + size * 3 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); + assert!(pb.push_record(record.clone()).unwrap().is_some()); assert!(pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(pb.push_record(record).unwrap().is_none()); } #[fluvio_future::test] diff --git a/crates/fluvio/src/producer/memory_batch.rs b/crates/fluvio/src/producer/memory_batch.rs index 1a23237ce5..98c9a0ee05 100644 --- a/crates/fluvio/src/producer/memory_batch.rs +++ b/crates/fluvio/src/producer/memory_batch.rs @@ -10,18 +10,20 @@ use super::*; pub struct MemoryBatch { compression: Compression, - write_limit: Option, + batch_limit: usize, + write_limit: usize, current_size_uncompressed: usize, is_full: bool, create_time: Timestamp, records: Vec, } impl MemoryBatch { - pub fn new(write_limit: Option, compression: Compression) -> Self { + pub fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self { let now = Utc::now().timestamp_millis(); Self { compression, is_full: false, + batch_limit, write_limit, create_time: now, current_size_uncompressed: Vec::::default().write_size(0), @@ -35,7 +37,9 @@ impl MemoryBatch { /// Add a record to the batch. /// The value of `Offset` is relative to the `MemoryBatch` instance. - pub fn push_record(&mut self, mut record: Record) -> Option { + pub fn push_record(&mut self, mut record: Record) -> Result, ProducerError> { + let is_the_first_record = self.records_len() == 0; + let current_offset = self.offset() as i64; record .get_mut_header() @@ -45,33 +49,36 @@ impl MemoryBatch { record.get_mut_header().set_timestamp_delta(timestamp_delta); let record_size = record.write_size(0); + let actual_batch_size = self.estimated_size() + record_size; - if let Some(write_limit) = self.write_limit { - if self.estimated_size() + record_size > write_limit { - self.is_full = true; - return None; - } + // Error if the record is too large + if actual_batch_size > self.write_limit { + self.is_full = true; + return Err(ProducerError::RecordTooLarge(actual_batch_size)); + } - if self.estimated_size() + record_size == write_limit { + // is full, but is first record, add to the batch and then we will send it directly + // is full, but is not the first record, then finish the batch and let this record to be added to next batch + // is not full, then add record to batch + if is_the_first_record { + if actual_batch_size > self.batch_limit { self.is_full = true; } - } else { + } else if actual_batch_size > self.batch_limit { + self.is_full = true; + return Ok(None); + } else if actual_batch_size == self.batch_limit { self.is_full = true; } self.current_size_uncompressed += record_size; - self.records.push(record); - Some(current_offset) + Ok(Some(current_offset)) } pub fn is_full(&self) -> bool { - if let Some(write_limit) = self.write_limit { - self.is_full || write_limit <= self.estimated_size() - } else { - self.is_full - } + self.is_full || self.estimated_size() > self.batch_limit } pub fn elapsed(&self) -> Timestamp { @@ -143,21 +150,20 @@ mod test { let size = record.write_size(0); let mut mb = MemoryBatch::new( - Some( - size * 4 - + Batch::::default().write_size(0) - + Vec::::default().write_size(0), - ), + size * 4 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), + 1_048_576, Compression::None, ); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).unwrap().is_some()); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).unwrap().is_some()); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(mb.push_record(record).unwrap().is_some()); let batch: Batch = mb.into(); assert!( @@ -198,7 +204,7 @@ mod test { let memory_batch_compression = Compression::Gzip; // This MemoryBatch write limit is minimal value to pass test - let mut memory_batch = MemoryBatch::new(Some(360), memory_batch_compression); + let mut memory_batch = MemoryBatch::new(360, 1_048_576, memory_batch_compression); let mut offset = 0; @@ -208,6 +214,7 @@ mod test { value: RecordData::from(record_data.clone()), ..Default::default() }) + .unwrap() .expect("Offset should exist"); } diff --git a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats index a88283f7a4..28a280aef0 100644 --- a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats +++ b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats @@ -41,16 +41,16 @@ teardown_file() { run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt" debug_msg small 25 - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-small.txt --raw --compression gzip' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-small.txt --raw --compression gzip' assert_success debug_msg med 50+1 - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-med.txt --raw --compression gzip' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-med.txt --raw --compression gzip' assert_success # should create a single batch for this record debug_msg big 150 - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-big.txt --raw --compression gzip' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-big.txt --raw --compression gzip' assert_success } @@ -71,8 +71,24 @@ teardown_file() { run bash -c "yes abcdefghijklmnopqrstuvwxyz | head -c 49999$x > $TOPIC_NAME-loop.txt" debug_msg small 25 - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip' assert_success done } +@test "Produce message larger then max request size" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 14999000 > $TOPIC_NAME.txt" + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip' + assert_success + + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt" + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip' + assert_failure +} +