Skip to content

Commit

Permalink
feat: add max_request_size
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Oct 14, 2024
1 parent 17a4675 commit 68d2982
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 11 deletions.
13 changes: 12 additions & 1 deletion crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ mod cmd {
#[arg(long, value_parser=parse_duration)]
pub linger: Option<Duration>,

/// Max amount of bytes accumulated before sending
/// Max number of records to batch before sending
#[arg(long)]
pub batch_size: Option<usize>,

/// Max amount of bytes accumulated before sending
#[arg(long)]
pub max_request_size: Option<usize>,

/// Isolation level that producer must respect.
/// Supported values: read_committed (ReadCommitted) - wait for records to be committed before response,
/// read_uncommitted (ReadUncommitted) - just wait for leader to accept records.
Expand Down Expand Up @@ -212,6 +216,13 @@ mod cmd {
config_builder
};

// Max request size
let config_builder = if let Some(max_request_size) = self.max_request_size {
config_builder.max_request_size(max_request_size)
} else {
config_builder
};

// Isolation
let config_builder = if let Some(isolation) = self.isolation {
config_builder.isolation(isolation)
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub struct SharedReplicaConfig {
pub flush_write_count: SharedConfigU32Value,
pub flush_idle_msec: SharedConfigU32Value,
pub max_batch_size: SharedConfigU32Value,
pub max_request_size: SharedConfigU32Value,
pub update_hw: bool, // if true, enable hw update
pub retention_seconds: SharedConfigU32Value,
pub max_partition_size: SharedConfigU64Value,
Expand All @@ -240,6 +241,7 @@ impl From<ReplicaConfig> for SharedReplicaConfig {
flush_write_count: SharedConfigU32Value::new(config.flush_write_count),
flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec),
max_batch_size: SharedConfigU32Value::new(config.max_batch_size),
max_request_size: SharedConfigU32Value::new(config.max_batch_size),
update_hw: config.update_hw,
retention_seconds: SharedConfigU32Value::new(config.retention_seconds),
max_partition_size: SharedConfigU64Value::new(config.max_partition_size),
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ReplicaStorage for FileReplica {
records: &mut RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize> {
let max_batch_size = self.option.max_batch_size.get() as usize;
let max_request_size = self.option.max_request_size.get() as usize;
let max_segment_size = self.option.segment_max_bytes.get() as usize;
let mut total_size = 0;
// check if any of the records's batch exceed max length
Expand All @@ -146,8 +146,8 @@ impl ReplicaStorage for FileReplica {
.into());
}
total_size += batch_size;
if batch_size > max_batch_size {
return Err(StorageError::BatchTooBig(max_batch_size).into());
if batch_size > max_request_size {
return Err(StorageError::BatchTooBig(max_request_size).into());
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use std::time::Duration;

use async_channel::Sender;
use async_lock::RwLock;
use fluvio_protocol::Encoder;
use tracing::trace;
use futures_util::future::{BoxFuture, Either, Shared};
use futures_util::{FutureExt, ready};

use fluvio_future::sync::Mutex;
use fluvio_future::sync::Condvar;
use fluvio_protocol::record::{Batch, RawRecords};
use fluvio_protocol::Encoder;
use fluvio_compression::Compression;
use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
Expand Down
13 changes: 13 additions & 0 deletions crates/fluvio/src/producer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const DEFAULT_LINGER_MS: u64 = 100;
const DEFAULT_TIMEOUT_MS: u64 = 1500;
const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384;
const DEFAULT_BATCH_QUEUE_SIZE: usize = 100;
const DEFAULT_MAX_REQUEST_SIZE: usize = 1_048_576;

const DEFAULT_RETRIES_TIMEOUT: Duration = Duration::from_secs(300);
const DEFAULT_INITIAL_DELAY: Duration = Duration::from_millis(20);
Expand All @@ -27,6 +28,10 @@ fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE_BYTES
}

fn default_max_request_size() -> usize {
DEFAULT_MAX_REQUEST_SIZE
}

fn default_batch_queue_size() -> usize {
DEFAULT_BATCH_QUEUE_SIZE
}
Expand Down Expand Up @@ -68,6 +73,9 @@ pub struct TopicProducerConfig {
/// Maximum amount of bytes accumulated by the records before sending the batch.
#[builder(default = "default_batch_size()")]
pub(crate) batch_size: usize,
/// Maximum amount of bytes that the server is allowed to process in a single request.
#[builder(default = "default_max_request_size()")]
pub(crate) max_request_size: usize,
/// Maximum amount of batches waiting in the queue before sending to the SPU.
#[builder(default = "default_batch_queue_size()")]
pub(crate) batch_queue_size: usize,
Expand Down Expand Up @@ -118,6 +126,10 @@ impl TopicProducerConfig {
self.batch_size
}

pub fn max_request_size(&self) -> usize {
self.max_request_size
}

pub fn batch_queue_size(&self) -> usize {
self.batch_queue_size
}
Expand Down Expand Up @@ -148,6 +160,7 @@ impl Default for TopicProducerConfig {
Self {
linger: default_linger_duration(),
batch_size: default_batch_size(),
max_request_size: default_max_request_size(),
batch_queue_size: default_batch_queue_size(),
partitioner: default_partitioner(),
compression: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/producer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::producer::PartitionId;
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum ProducerError {
#[error("the given record is larger than the buffer max_size ({0} bytes)")]
#[error("the given record is larger than the max_request_size ({0} bytes)")]
RecordTooLarge(usize),
#[error("failed to send record metadata: {0}")]
SendRecordMetadata(#[from] async_channel::SendError<RecordMetadata>),
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio/src/producer/partition_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ where
};

let mut batch_notifiers = vec![];
let mut request_size = 0;

for p_batch in batches_ready {
let mut partition_request = DefaultPartitionRequest {
Expand All @@ -204,12 +205,19 @@ where
let producer_metrics = self.metrics.producer_client();
producer_metrics.add_records(raw_batch.records_len() as u64);
producer_metrics.add_bytes(raw_batch.batch_len() as u64);
request_size += raw_batch.batch_len();

partition_request.records.batches.push(raw_batch);
batch_notifiers.push(notify);
topic_request.partitions.push(partition_request);
}

if request_size > self.config.max_request_size as i32 {
return Err(FluvioError::Producer(ProducerError::RecordTooLarge(
request_size as usize,
)));
}

request.isolation = self.config.isolation;
request.timeout = self.config.timeout;
request.smartmodules.clone_from(&self.config.smartmodules);
Expand Down
19 changes: 17 additions & 2 deletions tests/cli/fluvio_smoke_tests/consume-batch-size.bats
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,29 @@ teardown_file() {
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME"
}

@test "Produce message with batch large" {
@test "Produce message with max request size" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi

run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt"
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 15000000 --file $TOPIC_NAME.txt --linger 30s'
run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --max-request-size 15000000 --file $TOPIC_NAME.txt --linger 30s'

assert_success
}

# Consume message and compare message
@test "Consume message" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi

run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -B -d --maxbytes 16000000
assert_output --partial "abcdefghijklmnopqrstuvwxyz"
assert_success
Expand Down
3 changes: 1 addition & 2 deletions tests/cli/fluvio_smoke_tests/produce-batch-size.bats
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ teardown_file() {
skip "don't run on cluster stable version"
fi
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 2500000 > $TOPIC_NAME-small.txt"
# run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt"
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 4999989 > $TOPIC_NAME-med.txt"
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt"
run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt"

debug_msg small 25
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/fluvio_smoke_tests/produce-error.bats
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ teardown_file() {
skip "don't check output on stable version"
fi

assert_output --partial "the message is too large to send"
assert_output --partial "the given record is larger than the max_request_size"
}

# This should fail due to wrong compression algorithm
Expand Down

0 comments on commit 68d2982

Please sign in to comment.