Skip to content

Commit

Permalink
feat: fix record size bigger than batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Sep 29, 2024
1 parent 062cbf8 commit 0008547
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 41 deletions.
34 changes: 24 additions & 10 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use std::time::Duration;

use async_channel::Sender;
use async_lock::RwLock;
use fluvio_protocol::Encoder;
use tracing::trace;
use futures_util::future::{BoxFuture, Either, Shared};
use futures_util::{FutureExt, ready};

use fluvio_future::sync::Mutex;
use fluvio_future::sync::Condvar;
use fluvio_protocol::record::Batch;
use fluvio_protocol::record::{Batch, RawRecords};
use fluvio_compression::Compression;
use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
Expand Down Expand Up @@ -134,7 +135,16 @@ impl RecordAccumulator {
"Batch is full. Creating a new batch for partition"
);

let mut batch = ProducerBatch::new(self.batch_size, self.compression);
println!("record.write_size(0): {}", record.write_size(0));
let estimated_size = record.write_size(0)
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0);
println!("estimated_size: {}", estimated_size);
let mut batch = if estimated_size > self.batch_size {
ProducerBatch::new(None, self.compression)
} else {
ProducerBatch::new(Some(self.batch_size), self.compression)
};

match batch.push_record(record) {
Some(push_record) => {
Expand Down Expand Up @@ -176,7 +186,7 @@ pub(crate) struct ProducerBatch {
batch: MemoryBatch,
}
impl ProducerBatch {
fn new(write_limit: usize, compression: Compression) -> Self {
fn new(write_limit: Option<usize>, compression: Compression) -> Self {
let (sender, receiver) = async_channel::bounded(1);
let batch_metadata = Arc::new(BatchMetadata::new(receiver));
let batch = MemoryBatch::new(write_limit, compression);
Expand Down Expand Up @@ -327,10 +337,12 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Some(
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
Compression::None,
);

Expand All @@ -350,9 +362,11 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Some(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
Compression::None,
);

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/producer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::producer::PartitionId;
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum ProducerError {
#[error("the given record is larger than the buffer max_size ({0} bytes). Try increasing the producer batch size or reducing the record size enabling a compression algorithm")]
#[error("the given record is larger than the buffer max_size ({0} bytes)")]
RecordTooLarge(usize),
#[error("failed to send record metadata: {0}")]
SendRecordMetadata(#[from] async_channel::SendError<RecordMetadata>),
Expand Down
52 changes: 23 additions & 29 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use super::*;

pub struct MemoryBatch {
compression: Compression,
write_limit: usize,
write_limit: Option<usize>,
current_size_uncompressed: usize,
is_full: bool,
create_time: Timestamp,
records: Vec<Record>,
}
impl MemoryBatch {
pub fn new(write_limit: usize, compression: Compression) -> Self {
pub fn new(write_limit: Option<usize>, compression: Compression) -> Self {
let now = Utc::now().timestamp_millis();
Self {
compression,
Expand Down Expand Up @@ -46,12 +46,16 @@ impl MemoryBatch {

let record_size = record.write_size(0);

if self.estimated_size() + record_size > self.write_limit {
self.is_full = true;
return None;
}
if let Some(write_limit) = self.write_limit {
if self.estimated_size() + record_size > write_limit {
self.is_full = true;
return None;
}

if self.estimated_size() + record_size == self.write_limit {
if self.estimated_size() + record_size == write_limit {
self.is_full = true;
}
} else {
self.is_full = true;
}

Expand All @@ -63,7 +67,11 @@ impl MemoryBatch {
}

pub fn is_full(&self) -> bool {
self.is_full || self.write_limit <= self.estimated_size()
if let Some(write_limit) = self.write_limit {
self.is_full || write_limit <= self.estimated_size()
} else {
self.is_full
}
}

pub fn elapsed(&self) -> Timestamp {
Expand All @@ -73,23 +81,7 @@ impl MemoryBatch {
}

fn estimated_size(&self) -> usize {
(self.current_size_uncompressed as f32 * self.compression_coefficient()) as usize
+ Batch::<RawRecords>::default().write_size(0)
}

fn compression_coefficient(&self) -> f32 {
cfg_if::cfg_if! {
if #[cfg(feature = "compress")] {
match self.compression {
Compression::None => 1.0,
Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => {
0.5
}
}
} else {
1.0
}
}
self.current_size_uncompressed + Batch::<RawRecords>::default().write_size(0)
}

pub fn records_len(&self) -> usize {
Expand Down Expand Up @@ -151,9 +143,11 @@ mod test {
let size = record.write_size(0);

let mut mb = MemoryBatch::new(
size * 4
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Some(
size * 4
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
),
Compression::None,
);

Expand Down Expand Up @@ -204,7 +198,7 @@ mod test {
let memory_batch_compression = Compression::Gzip;

// This MemoryBatch write limit is minimal value to pass test
let mut memory_batch = MemoryBatch::new(180, memory_batch_compression);
let mut memory_batch = MemoryBatch::new(Some(360), memory_batch_compression);

let mut offset = 0;

Expand Down
79 changes: 79 additions & 0 deletions tests/cli/fluvio_smoke_tests/produce-batch-size.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env bats

TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper"
export TEST_HELPER_DIR

load "$TEST_HELPER_DIR"/tools_check.bash
load "$TEST_HELPER_DIR"/fluvio_dev.bash
load "$TEST_HELPER_DIR"/bats-support/load.bash
load "$TEST_HELPER_DIR"/bats-assert/load.bash

setup_file() {
TOPIC_NAME=$(random_string)
export TOPIC_NAME
debug_msg "Topic name: $TOPIC_NAME"
}

teardown_file() {
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
run rm $TOPIC_NAME-small.txt
run rm $TOPIC_NAME-med.txt
run rm $TOPIC_NAME-big.txt
}

# Create topic
@test "Create topics for test" {
debug_msg "topic: $TOPIC_NAME"
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME"
assert_success
}

# regression test issue https://github.com/infinyon/fluvio/issues/4161
@test "Produce message larger than batch size" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 2500000 > $TOPIC_NAME-small.txt"
# run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt"
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 4999989 > $TOPIC_NAME-med.txt"
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt"

debug_msg small 25
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-small.txt --raw --compression gzip'
assert_success

debug_msg med 50+1
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-med.txt --raw --compression gzip'
assert_success

# should create a single batch for this record
debug_msg big 150
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-big.txt --raw --compression gzip'
assert_success
}

# This is to cover cases when the metadata + record size is larger than the batch size
@test "Produce message larger than batch size with loop" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi


for x in $(seq 0 99); do
debug_msg "Running test with $x characters"

# Create the text file with 'x' number of characters
run bash -c "yes abcdefghijklmnopqrstuvwxyz | head -c 49999$x > $TOPIC_NAME-loop.txt"

debug_msg small 25
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip'
assert_success
done
}

2 changes: 1 addition & 1 deletion tests/cli/fluvio_smoke_tests/produce-error.bats
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ teardown_file() {
skip "don't check output on stable version"
fi

assert_output --partial "Try increasing the producer batch size"
assert_output --partial "the message is too large to send"
}

# This should fail due to wrong compression algorithm
Expand Down

0 comments on commit 0008547

Please sign in to comment.