From 68d29826a4f0a42437df2bd1c67fbb9a35f8559b Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 2 Oct 2024 20:04:59 -0300 Subject: [PATCH] feat: add max_request_size --- crates/fluvio-cli/src/client/produce/mod.rs | 13 ++++++++++++- crates/fluvio-storage/src/config.rs | 2 ++ crates/fluvio-storage/src/replica.rs | 6 +++--- crates/fluvio/src/producer/accumulator.rs | 2 +- crates/fluvio/src/producer/config.rs | 13 +++++++++++++ crates/fluvio/src/producer/error.rs | 2 +- .../fluvio/src/producer/partition_producer.rs | 8 ++++++++ .../consume-batch-size.bats | 19 +++++++++++++++++-- .../produce-batch-size.bats | 3 +-- .../cli/fluvio_smoke_tests/produce-error.bats | 2 +- 10 files changed, 59 insertions(+), 11 deletions(-) diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index f8c1ffb633..3c6846330c 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -100,10 +100,14 @@ mod cmd { #[arg(long, value_parser=parse_duration)] pub linger: Option, - /// Max amount of bytes accumulated before sending + /// Max number of records to batch before sending #[arg(long)] pub batch_size: Option, + /// Max amount of bytes accumulated before sending + #[arg(long)] + pub max_request_size: Option, + /// Isolation level that producer must respect. /// Supported values: read_committed (ReadCommitted) - wait for records to be committed before response, /// read_uncommitted (ReadUncommitted) - just wait for leader to accept records. @@ -212,6 +216,13 @@ mod cmd { config_builder }; + // Max request size + let config_builder = if let Some(max_request_size) = self.max_request_size { + config_builder.max_request_size(max_request_size) + } else { + config_builder + }; + // Isolation let config_builder = if let Some(isolation) = self.isolation { config_builder.isolation(isolation) diff --git a/crates/fluvio-storage/src/config.rs b/crates/fluvio-storage/src/config.rs index dc19937b31..fd51f8b6bb 100644 --- a/crates/fluvio-storage/src/config.rs +++ b/crates/fluvio-storage/src/config.rs @@ -225,6 +225,7 @@ pub struct SharedReplicaConfig { pub flush_write_count: SharedConfigU32Value, pub flush_idle_msec: SharedConfigU32Value, pub max_batch_size: SharedConfigU32Value, + pub max_request_size: SharedConfigU32Value, pub update_hw: bool, // if true, enable hw update pub retention_seconds: SharedConfigU32Value, pub max_partition_size: SharedConfigU64Value, @@ -240,6 +241,7 @@ impl From for SharedReplicaConfig { flush_write_count: SharedConfigU32Value::new(config.flush_write_count), flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec), max_batch_size: SharedConfigU32Value::new(config.max_batch_size), + max_request_size: SharedConfigU32Value::new(config.max_batch_size), update_hw: config.update_hw, retention_seconds: SharedConfigU32Value::new(config.retention_seconds), max_partition_size: SharedConfigU64Value::new(config.max_partition_size), diff --git a/crates/fluvio-storage/src/replica.rs b/crates/fluvio-storage/src/replica.rs index 5155408d0e..25f064eee9 100644 --- a/crates/fluvio-storage/src/replica.rs +++ b/crates/fluvio-storage/src/replica.rs @@ -132,7 +132,7 @@ impl ReplicaStorage for FileReplica { records: &mut RecordSet, update_highwatermark: bool, ) -> Result { - let max_batch_size = self.option.max_batch_size.get() as usize; + let max_request_size = self.option.max_request_size.get() as usize; let max_segment_size = self.option.segment_max_bytes.get() as usize; let mut total_size = 0; // check if any of the records's batch exceed max length @@ -146,8 +146,8 @@ impl ReplicaStorage for FileReplica { .into()); } total_size += batch_size; - if batch_size > max_batch_size { - return Err(StorageError::BatchTooBig(max_batch_size).into()); + if batch_size > max_request_size { + return Err(StorageError::BatchTooBig(max_request_size).into()); } } diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 8a90c5935d..b02f87ba4b 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -7,7 +7,6 @@ use std::time::Duration; use async_channel::Sender; use async_lock::RwLock; -use fluvio_protocol::Encoder; use tracing::trace; use futures_util::future::{BoxFuture, Either, Shared}; use futures_util::{FutureExt, ready}; @@ -15,6 +14,7 @@ use futures_util::{FutureExt, ready}; use fluvio_future::sync::Mutex; use fluvio_future::sync::Condvar; use fluvio_protocol::record::{Batch, RawRecords}; +use fluvio_protocol::Encoder; use fluvio_compression::Compression; use fluvio_protocol::record::Offset; use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index d2cd23d552..d8add9816d 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -17,6 +17,7 @@ const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_TIMEOUT_MS: u64 = 1500; const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384; const DEFAULT_BATCH_QUEUE_SIZE: usize = 100; +const DEFAULT_MAX_REQUEST_SIZE: usize = 1_048_576; const DEFAULT_RETRIES_TIMEOUT: Duration = Duration::from_secs(300); const DEFAULT_INITIAL_DELAY: Duration = Duration::from_millis(20); @@ -27,6 +28,10 @@ fn default_batch_size() -> usize { DEFAULT_BATCH_SIZE_BYTES } +fn default_max_request_size() -> usize { + DEFAULT_MAX_REQUEST_SIZE +} + fn default_batch_queue_size() -> usize { DEFAULT_BATCH_QUEUE_SIZE } @@ -68,6 +73,9 @@ pub struct TopicProducerConfig { /// Maximum amount of bytes accumulated by the records before sending the batch. #[builder(default = "default_batch_size()")] pub(crate) batch_size: usize, + /// Maximum amount of bytes that the server is allowed to process in a single request. + #[builder(default = "default_max_request_size()")] + pub(crate) max_request_size: usize, /// Maximum amount of batches waiting in the queue before sending to the SPU. #[builder(default = "default_batch_queue_size()")] pub(crate) batch_queue_size: usize, @@ -118,6 +126,10 @@ impl TopicProducerConfig { self.batch_size } + pub fn max_request_size(&self) -> usize { + self.max_request_size + } + pub fn batch_queue_size(&self) -> usize { self.batch_queue_size } @@ -148,6 +160,7 @@ impl Default for TopicProducerConfig { Self { linger: default_linger_duration(), batch_size: default_batch_size(), + max_request_size: default_max_request_size(), batch_queue_size: default_batch_queue_size(), partitioner: default_partitioner(), compression: None, diff --git a/crates/fluvio/src/producer/error.rs b/crates/fluvio/src/producer/error.rs index d873cf7c6b..4bedac61e2 100644 --- a/crates/fluvio/src/producer/error.rs +++ b/crates/fluvio/src/producer/error.rs @@ -9,7 +9,7 @@ use crate::producer::PartitionId; #[derive(thiserror::Error, Debug, Clone)] #[non_exhaustive] pub enum ProducerError { - #[error("the given record is larger than the buffer max_size ({0} bytes)")] + #[error("the given record is larger than the max_request_size ({0} bytes)")] RecordTooLarge(usize), #[error("failed to send record metadata: {0}")] SendRecordMetadata(#[from] async_channel::SendError), diff --git a/crates/fluvio/src/producer/partition_producer.rs b/crates/fluvio/src/producer/partition_producer.rs index 432106ff3a..a7bc727d6a 100644 --- a/crates/fluvio/src/producer/partition_producer.rs +++ b/crates/fluvio/src/producer/partition_producer.rs @@ -190,6 +190,7 @@ where }; let mut batch_notifiers = vec![]; + let mut request_size = 0; for p_batch in batches_ready { let mut partition_request = DefaultPartitionRequest { @@ -204,12 +205,19 @@ where let producer_metrics = self.metrics.producer_client(); producer_metrics.add_records(raw_batch.records_len() as u64); producer_metrics.add_bytes(raw_batch.batch_len() as u64); + request_size += raw_batch.batch_len(); partition_request.records.batches.push(raw_batch); batch_notifiers.push(notify); topic_request.partitions.push(partition_request); } + if request_size > self.config.max_request_size as i32 { + return Err(FluvioError::Producer(ProducerError::RecordTooLarge( + request_size as usize, + ))); + } + request.isolation = self.config.isolation; request.timeout = self.config.timeout; request.smartmodules.clone_from(&self.config.smartmodules); diff --git a/tests/cli/fluvio_smoke_tests/consume-batch-size.bats b/tests/cli/fluvio_smoke_tests/consume-batch-size.bats index 6580bb17c4..3f4273b066 100644 --- a/tests/cli/fluvio_smoke_tests/consume-batch-size.bats +++ b/tests/cli/fluvio_smoke_tests/consume-batch-size.bats @@ -25,14 +25,29 @@ teardown_file() { run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" } -@test "Produce message with batch large" { +@test "Produce message with max request size" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt" - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 15000000 --file $TOPIC_NAME.txt --linger 30s' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --max-request-size 15000000 --file $TOPIC_NAME.txt --linger 30s' + assert_success } # Consume message and compare message @test "Consume message" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -B -d --maxbytes 16000000 assert_output --partial "abcdefghijklmnopqrstuvwxyz" assert_success diff --git a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats index 2f9de49554..a88283f7a4 100644 --- a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats +++ b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats @@ -37,8 +37,7 @@ teardown_file() { skip "don't run on cluster stable version" fi run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 2500000 > $TOPIC_NAME-small.txt" - # run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt" - run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 4999989 > $TOPIC_NAME-med.txt" + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt" run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt" debug_msg small 25 diff --git a/tests/cli/fluvio_smoke_tests/produce-error.bats b/tests/cli/fluvio_smoke_tests/produce-error.bats index f984bd1cc4..ad48b5130a 100644 --- a/tests/cli/fluvio_smoke_tests/produce-error.bats +++ b/tests/cli/fluvio_smoke_tests/produce-error.bats @@ -49,7 +49,7 @@ teardown_file() { skip "don't check output on stable version" fi - assert_output --partial "the message is too large to send" + assert_output --partial "the given record is larger than the max_request_size" } # This should fail due to wrong compression algorithm