diff --git a/crates/fluvio-storage/src/config.rs b/crates/fluvio-storage/src/config.rs index fd51f8b6bb8..d26b8f95f91 100644 --- a/crates/fluvio-storage/src/config.rs +++ b/crates/fluvio-storage/src/config.rs @@ -11,7 +11,8 @@ use serde::Deserialize; use fluvio_controlplane_metadata::topic::CleanupPolicy; use fluvio_types::defaults::{ SPU_LOG_INDEX_MAX_BYTES, SPU_LOG_BASE_DIR, STORAGE_FLUSH_WRITE_COUNT, STORAGE_FLUSH_IDLE_MSEC, - STORAGE_MAX_BATCH_SIZE, STORAGE_RETENTION_SECONDS, SPU_PARTITION_MAX_BYTES, + STORAGE_MAX_BATCH_SIZE, STORAGE_MAX_REQUEST_SIZE, STORAGE_RETENTION_SECONDS, + SPU_PARTITION_MAX_BYTES, }; use fluvio_types::defaults::SPU_LOG_INDEX_MAX_INTERVAL_BYTES; use fluvio_types::defaults::SPU_LOG_SEGMENT_MAX_BYTES; @@ -44,6 +45,9 @@ pub struct ReplicaConfig { #[builder(default = "default_max_batch_size()")] #[serde(default = "default_max_batch_size")] pub max_batch_size: Size, + #[builder(default = "default_max_request_size()")] + #[serde(default = "default_max_request_size")] + pub max_request_size: Size, #[builder(default = "default_update_hw()")] #[serde(default = "default_update_hw")] pub update_hw: bool, // if true, enable hw update @@ -120,6 +124,10 @@ const fn default_max_batch_size() -> Size { STORAGE_MAX_BATCH_SIZE } +const fn default_max_request_size() -> Size { + STORAGE_MAX_REQUEST_SIZE +} + const fn default_retention_seconds() -> Size { STORAGE_RETENTION_SECONDS } @@ -149,6 +157,7 @@ impl Default for ReplicaConfig { flush_write_count: default_flush_write_count(), flush_idle_msec: default_flush_idle_msec(), max_batch_size: default_max_batch_size(), + max_request_size: default_max_request_size(), retention_seconds: default_retention_seconds(), max_partition_size: default_max_partition_size(), update_hw: true, @@ -241,7 +250,7 @@ impl From for SharedReplicaConfig { flush_write_count: SharedConfigU32Value::new(config.flush_write_count), flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec), max_batch_size: SharedConfigU32Value::new(config.max_batch_size), - max_request_size: SharedConfigU32Value::new(config.max_batch_size), + max_request_size: SharedConfigU32Value::new(config.max_request_size), update_hw: config.update_hw, retention_seconds: SharedConfigU32Value::new(config.retention_seconds), max_partition_size: SharedConfigU64Value::new(config.max_partition_size), diff --git a/crates/fluvio-storage/src/replica.rs b/crates/fluvio-storage/src/replica.rs index 25f064eee90..d2feafc0b8c 100644 --- a/crates/fluvio-storage/src/replica.rs +++ b/crates/fluvio-storage/src/replica.rs @@ -862,9 +862,9 @@ mod tests { } #[fluvio_future::test] - async fn test_replica_limit_batch() { + async fn test_replica_limit_request_size() { let mut option = base_option("test_batch_limit"); - option.max_batch_size = 100; + option.max_request_size = 100; option.update_hw = false; let mut replica = create_replica("test", START_OFFSET, option).await; diff --git a/crates/fluvio-types/src/defaults.rs b/crates/fluvio-types/src/defaults.rs index b369d9aa739..6797dc7a0e8 100644 --- a/crates/fluvio-types/src/defaults.rs +++ b/crates/fluvio-types/src/defaults.rs @@ -49,7 +49,8 @@ pub const STORAGE_RETENTION_SECONDS: u32 = 7 * 24 * 3600; pub const STORAGE_RETENTION_SECONDS_MIN: u32 = 10; // crd pub const STORAGE_FLUSH_WRITE_COUNT: u32 = 1; pub const STORAGE_FLUSH_IDLE_MSEC: u32 = 0; -pub const STORAGE_MAX_BATCH_SIZE: u32 = 33_554_432; +pub const STORAGE_MAX_BATCH_SIZE: u32 = 2_097_152; +pub const STORAGE_MAX_REQUEST_SIZE: u32 = 33_554_432; pub const SPU_SMARTENGINE_STORE_MAX_BYTES: usize = 1_073_741_824; //1Gb diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 5fb219161eb..537910ad3ac 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -229,23 +229,11 @@ where { } } -pub enum ProduceBatchStatus { +enum ProduceBatchStatus { Added(PartialFutureRecordMetadata), NotAdded(Record), } -impl ProduceBatchStatus { - #[allow(dead_code)] - pub fn added(&self) -> bool { - matches!(self, ProduceBatchStatus::Added(_)) - } - - #[allow(dead_code)] - pub fn not_added(&self) -> bool { - matches!(self, ProduceBatchStatus::NotAdded(_)) - } -} - pub(crate) struct ProducerBatch { pub(crate) notify: Sender, batch_metadata: Arc, @@ -411,13 +399,25 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); assert!(!pb.is_full()); - assert!(pb.push_record(record).unwrap().not_added()); + assert!(matches!( + pb.push_record(record), + Ok(ProduceBatchStatus::NotAdded(_)) + )); } #[test] @@ -434,13 +434,25 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); assert!(pb.is_full()); - assert!(pb.push_record(record).unwrap().not_added()); + assert!(matches!( + pb.push_record(record), + Ok(ProduceBatchStatus::NotAdded(_)) + )); } #[test] @@ -459,9 +471,18 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); - assert!(pb.push_record(record.clone()).unwrap().added()); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); assert!(pb.is_full()); diff --git a/crates/fluvio/src/producer/memory_batch.rs b/crates/fluvio/src/producer/memory_batch.rs index 9ef27d54b93..286459a3f70 100644 --- a/crates/fluvio/src/producer/memory_batch.rs +++ b/crates/fluvio/src/producer/memory_batch.rs @@ -13,18 +13,6 @@ pub enum MemoryBatchStatus { NotAdded(Record), } -impl MemoryBatchStatus { - #[allow(dead_code)] - fn added(&self) -> bool { - matches!(self, Self::Added(_)) - } - - #[allow(dead_code)] - fn not_added(&self) -> bool { - matches!(self, Self::NotAdded(_)) - } -} - pub struct MemoryBatch { compression: Compression, batch_limit: usize, @@ -66,7 +54,7 @@ impl MemoryBatch { record.get_mut_header().set_timestamp_delta(timestamp_delta); let record_size = record.write_size(0); - let actual_batch_size = self.estimated_size() + record_size; + let actual_batch_size = self.raw_size() + record_size; // Error if the record is too large if actual_batch_size > self.write_limit { @@ -95,7 +83,7 @@ impl MemoryBatch { } pub fn is_full(&self) -> bool { - self.is_full || self.estimated_size() >= self.batch_limit + self.is_full || self.raw_size() >= self.batch_limit } pub fn elapsed(&self) -> Timestamp { @@ -104,7 +92,7 @@ impl MemoryBatch { std::cmp::max(0, now - self.create_time) } - fn estimated_size(&self) -> usize { + fn raw_size(&self) -> usize { self.current_size_uncompressed + Batch::::default().write_size(0) } @@ -174,13 +162,22 @@ mod test { Compression::None, ); - assert!(mb.push_record(record).unwrap().added()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).unwrap().added()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).unwrap().added()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); let batch: Batch = mb.into(); assert!( @@ -213,6 +210,31 @@ mod test { ); } + #[test] + fn test_is_the_first_record_from_batch_and_actual_batch_size_larger_then_batch_limit() { + let record = Record::from(("key", "value")); + let size = record.write_size(0); + + let mut mb = MemoryBatch::new( + 1_048_576, + size / 2 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), + Compression::None, + ); + + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); + std::thread::sleep(std::time::Duration::from_millis(100)); + let record = Record::from(("key", "value")); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::NotAdded(_)) + )); + } + #[test] fn test_convert_memory_batch_to_batch() { let num_records = 10;