Skip to content

Commit

Permalink
chore: clean up part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Oct 14, 2024
1 parent 32cd469 commit ad767ae
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 81 deletions.
119 changes: 68 additions & 51 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use futures_util::{FutureExt, ready};

use fluvio_future::sync::{Mutex, MutexGuard};
use fluvio_future::sync::Condvar;
use fluvio_protocol::record::{Batch, RawRecords};
use fluvio_protocol::Encoder;
use fluvio_protocol::record::Batch;
use fluvio_compression::Compression;
use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
Expand Down Expand Up @@ -111,23 +110,29 @@ impl RecordAccumulator {

// If the last batch is not full, push the record to it
if let Some(batch) = batches.back_mut() {
if let Some(push_record) = batch.push_record(record.clone()) {
if batch.is_full() {
match batch.push_record(record.clone()) {
Ok(Some(push_record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Ok(None) => {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
} else {
batch_events.notify_batch_full().await;
Err(err) => {
return Err(err);
}
}
}

trace!(partition_id, "Creating a new batch");

// Create and push a new batch if needed
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record)
.create_and_new_batch(batch_events, &mut batches, record, 1)
.await?;

Ok(PushRecord::new(
Expand All @@ -154,29 +159,22 @@ impl RecordAccumulator {
}
Ok(batches)
}

async fn create_and_new_batch(
&self,
batch_events: &BatchEvents,
batches: &mut VecDeque<ProducerBatch>,
record: Record,
recursion_depth: usize, // New parameter to track recursion depth
) -> Result<PartialFutureRecordMetadata, ProducerError> {
let estimated_size = record.write_size(0)
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0);

if estimated_size > self.max_request_size {
return Err(ProducerError::RecordTooLarge(estimated_size));
if recursion_depth > 2 {
return Err(ProducerError::Internal("Something went wrong".to_string()));
}

let mut batch = if estimated_size > self.batch_size {
ProducerBatch::new(None, self.compression)
} else {
ProducerBatch::new(Some(self.batch_size), self.compression)
};
let mut batch =
ProducerBatch::new(self.max_request_size, self.batch_size, self.compression);

match batch.push_record(record) {
Some(push_record) => {
match batch.push_record(record.clone()) {
Ok(Some(push_record)) => {
batch_events.notify_new_batch().await;

if batch.is_full() {
Expand All @@ -186,9 +184,26 @@ impl RecordAccumulator {
batches.push_back(batch);
Ok(push_record)
}
None => Err(ProducerError::RecordTooLarge(self.batch_size)),
Ok(None) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}

batches.push_back(batch);

// Box the future to avoid infinite size due to recursion
Box::pin(self.create_and_new_batch(
batch_events,
batches,
record,
recursion_depth + 1,
))
.await
}
Err(err) => Err(err),
}
}

pub(crate) async fn batches(&self) -> HashMap<PartitionId, BatchHandler> {
self.batches.read().await.clone()
}
Expand All @@ -211,10 +226,10 @@ pub(crate) struct ProducerBatch {
batch: MemoryBatch,
}
impl ProducerBatch {
fn new(write_limit: Option<usize>, compression: Compression) -> Self {
fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self {
let (sender, receiver) = async_channel::bounded(1);
let batch_metadata = Arc::new(BatchMetadata::new(receiver));
let batch = MemoryBatch::new(write_limit, compression);
let batch = MemoryBatch::new(write_limit, batch_limit, compression);

Self {
notify: sender,
Expand All @@ -226,13 +241,17 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(&mut self, record: Record) -> Option<PartialFutureRecordMetadata> {
fn push_record(
&mut self,
record: Record,
) -> Result<Option<PartialFutureRecordMetadata>, ProducerError> {
match self.batch.push_record(record) {
None => None,
Some(relative_offset) => Some(PartialFutureRecordMetadata::new(
relative_offset,
Ok(Some(offset)) => Ok(Some(PartialFutureRecordMetadata::new(
offset,
self.batch_metadata.clone(),
)),
))),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}

Expand Down Expand Up @@ -362,22 +381,21 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
Some(
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
1_048_576,
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());

assert!(!pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).unwrap().is_none());
}

#[test]
Expand All @@ -387,21 +405,20 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
Some(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
1_048_576,
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());

assert!(pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).unwrap().is_none());
}

#[fluvio_future::test]
Expand Down
59 changes: 33 additions & 26 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ use super::*;

pub struct MemoryBatch {
compression: Compression,
write_limit: Option<usize>,
batch_limit: usize,
write_limit: usize,
current_size_uncompressed: usize,
is_full: bool,
create_time: Timestamp,
records: Vec<Record>,
}
impl MemoryBatch {
pub fn new(write_limit: Option<usize>, compression: Compression) -> Self {
pub fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self {
let now = Utc::now().timestamp_millis();
Self {
compression,
is_full: false,
batch_limit,
write_limit,
create_time: now,
current_size_uncompressed: Vec::<RawRecords>::default().write_size(0),
Expand All @@ -35,7 +37,9 @@ impl MemoryBatch {

/// Add a record to the batch.
/// The value of `Offset` is relative to the `MemoryBatch` instance.
pub fn push_record(&mut self, mut record: Record) -> Option<Offset> {
pub fn push_record(&mut self, mut record: Record) -> Result<Option<Offset>, ProducerError> {
let is_the_first_record = self.records_len() == 0;

let current_offset = self.offset() as i64;
record
.get_mut_header()
Expand All @@ -45,33 +49,36 @@ impl MemoryBatch {
record.get_mut_header().set_timestamp_delta(timestamp_delta);

let record_size = record.write_size(0);
let actual_batch_size = self.estimated_size() + record_size;

if let Some(write_limit) = self.write_limit {
if self.estimated_size() + record_size > write_limit {
self.is_full = true;
return None;
}
// Error if the record is too large
if self.estimated_size() + record_size > self.write_limit {
self.is_full = true;
return Err(ProducerError::RecordTooLarge(actual_batch_size));
}

if self.estimated_size() + record_size == write_limit {
// is full, but is first record, add to the batch and then we will send it directly
// is full, but is not the first record, then finish the batch and let this record to be added to next batch
// is not full, then add record to batch
if is_the_first_record {
if self.estimated_size() + record_size > self.batch_limit {
self.is_full = true;
}
} else {
self.is_full = true;
if self.estimated_size() + record_size > self.write_limit {
self.is_full = true;
return Ok(None);
}
}

self.current_size_uncompressed += record_size;

self.records.push(record);

Some(current_offset)
Ok(Some(current_offset))
}

pub fn is_full(&self) -> bool {
if let Some(write_limit) = self.write_limit {
self.is_full || write_limit <= self.estimated_size()
} else {
self.is_full
}
self.is_full || self.estimated_size() > self.batch_limit
}

pub fn elapsed(&self) -> Timestamp {
Expand Down Expand Up @@ -143,21 +150,20 @@ mod test {
let size = record.write_size(0);

let mut mb = MemoryBatch::new(
Some(
size * 4
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
size * 4
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
1_048_576,
Compression::None,
);

assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).unwrap().is_some());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).unwrap().is_some());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).unwrap().is_some());

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -198,7 +204,7 @@ mod test {
let memory_batch_compression = Compression::Gzip;

// This MemoryBatch write limit is minimal value to pass test
let mut memory_batch = MemoryBatch::new(Some(360), memory_batch_compression);
let mut memory_batch = MemoryBatch::new(360, 1_048_576, memory_batch_compression);

let mut offset = 0;

Expand All @@ -208,6 +214,7 @@ mod test {
value: RecordData::from(record_data.clone()),
..Default::default()
})
.unwrap()
.expect("Offset should exist");
}

Expand Down
24 changes: 20 additions & 4 deletions tests/cli/fluvio_smoke_tests/produce-batch-size.bats
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ teardown_file() {
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt"

debug_msg small 25
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-small.txt --raw --compression gzip'
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-small.txt --raw --compression gzip'
assert_success

debug_msg med 50+1
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-med.txt --raw --compression gzip'
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-med.txt --raw --compression gzip'
assert_success

# should create a single batch for this record
debug_msg big 150
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-big.txt --raw --compression gzip'
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-big.txt --raw --compression gzip'
assert_success
}

Expand All @@ -71,8 +71,24 @@ teardown_file() {
run bash -c "yes abcdefghijklmnopqrstuvwxyz | head -c 49999$x > $TOPIC_NAME-loop.txt"

debug_msg small 25
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip'
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip'
assert_success
done
}

@test "Produce message larger then max request size" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 14999000 > $TOPIC_NAME.txt"
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip'
assert_success

run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt"
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip'
assert_failure
}

0 comments on commit ad767ae

Please sign in to comment.