Skip to content

Commit

Permalink
chore: add some tests and fix storage max_request_size
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Oct 15, 2024
1 parent 539d7d2 commit fd7fb17
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 47 deletions.
13 changes: 11 additions & 2 deletions crates/fluvio-storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use serde::Deserialize;
use fluvio_controlplane_metadata::topic::CleanupPolicy;
use fluvio_types::defaults::{
SPU_LOG_INDEX_MAX_BYTES, SPU_LOG_BASE_DIR, STORAGE_FLUSH_WRITE_COUNT, STORAGE_FLUSH_IDLE_MSEC,
STORAGE_MAX_BATCH_SIZE, STORAGE_RETENTION_SECONDS, SPU_PARTITION_MAX_BYTES,
STORAGE_MAX_BATCH_SIZE, STORAGE_MAX_REQUEST_SIZE, STORAGE_RETENTION_SECONDS,
SPU_PARTITION_MAX_BYTES,
};
use fluvio_types::defaults::SPU_LOG_INDEX_MAX_INTERVAL_BYTES;
use fluvio_types::defaults::SPU_LOG_SEGMENT_MAX_BYTES;
Expand Down Expand Up @@ -44,6 +45,9 @@ pub struct ReplicaConfig {
#[builder(default = "default_max_batch_size()")]
#[serde(default = "default_max_batch_size")]
pub max_batch_size: Size,
#[builder(default = "default_max_request_size()")]
#[serde(default = "default_max_request_size")]
pub max_request_size: Size,
#[builder(default = "default_update_hw()")]
#[serde(default = "default_update_hw")]
pub update_hw: bool, // if true, enable hw update
Expand Down Expand Up @@ -120,6 +124,10 @@ const fn default_max_batch_size() -> Size {
STORAGE_MAX_BATCH_SIZE
}

const fn default_max_request_size() -> Size {
STORAGE_MAX_REQUEST_SIZE
}

const fn default_retention_seconds() -> Size {
STORAGE_RETENTION_SECONDS
}
Expand Down Expand Up @@ -149,6 +157,7 @@ impl Default for ReplicaConfig {
flush_write_count: default_flush_write_count(),
flush_idle_msec: default_flush_idle_msec(),
max_batch_size: default_max_batch_size(),
max_request_size: default_max_request_size(),
retention_seconds: default_retention_seconds(),
max_partition_size: default_max_partition_size(),
update_hw: true,
Expand Down Expand Up @@ -241,7 +250,7 @@ impl From<ReplicaConfig> for SharedReplicaConfig {
flush_write_count: SharedConfigU32Value::new(config.flush_write_count),
flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec),
max_batch_size: SharedConfigU32Value::new(config.max_batch_size),
max_request_size: SharedConfigU32Value::new(config.max_batch_size),
max_request_size: SharedConfigU32Value::new(config.max_request_size),
update_hw: config.update_hw,
retention_seconds: SharedConfigU32Value::new(config.retention_seconds),
max_partition_size: SharedConfigU64Value::new(config.max_partition_size),
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,9 @@ mod tests {
}

#[fluvio_future::test]
async fn test_replica_limit_batch() {
async fn test_replica_limit_request_size() {
let mut option = base_option("test_batch_limit");
option.max_batch_size = 100;
option.max_request_size = 100;
option.update_hw = false;

let mut replica = create_replica("test", START_OFFSET, option).await;
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-types/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub const STORAGE_RETENTION_SECONDS: u32 = 7 * 24 * 3600;
pub const STORAGE_RETENTION_SECONDS_MIN: u32 = 10; // crd
pub const STORAGE_FLUSH_WRITE_COUNT: u32 = 1;
pub const STORAGE_FLUSH_IDLE_MSEC: u32 = 0;
pub const STORAGE_MAX_BATCH_SIZE: u32 = 33_554_432;
pub const STORAGE_MAX_BATCH_SIZE: u32 = 2_097_152;
pub const STORAGE_MAX_REQUEST_SIZE: u32 = 33_554_432;

pub const SPU_SMARTENGINE_STORE_MAX_BYTES: usize = 1_073_741_824; //1Gb

Expand Down
69 changes: 45 additions & 24 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,11 @@ where {
}
}

pub enum ProduceBatchStatus {
enum ProduceBatchStatus {
Added(PartialFutureRecordMetadata),
NotAdded(Record),
}

impl ProduceBatchStatus {
#[allow(dead_code)]
pub fn added(&self) -> bool {
matches!(self, ProduceBatchStatus::Added(_))
}

#[allow(dead_code)]
pub fn not_added(&self) -> bool {
matches!(self, ProduceBatchStatus::NotAdded(_))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
Expand Down Expand Up @@ -411,13 +399,25 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));

assert!(!pb.is_full());

assert!(pb.push_record(record).unwrap().not_added());
assert!(matches!(
pb.push_record(record),
Ok(ProduceBatchStatus::NotAdded(_))
));
}

#[test]
Expand All @@ -434,13 +434,25 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));

assert!(pb.is_full());

assert!(pb.push_record(record).unwrap().not_added());
assert!(matches!(
pb.push_record(record),
Ok(ProduceBatchStatus::NotAdded(_))
));
}

#[test]
Expand All @@ -459,9 +471,18 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));

assert!(pb.is_full());

Expand Down
58 changes: 40 additions & 18 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ pub enum MemoryBatchStatus {
NotAdded(Record),
}

impl MemoryBatchStatus {
#[allow(dead_code)]
fn added(&self) -> bool {
matches!(self, Self::Added(_))
}

#[allow(dead_code)]
fn not_added(&self) -> bool {
matches!(self, Self::NotAdded(_))
}
}

pub struct MemoryBatch {
compression: Compression,
batch_limit: usize,
Expand Down Expand Up @@ -66,7 +54,7 @@ impl MemoryBatch {
record.get_mut_header().set_timestamp_delta(timestamp_delta);

let record_size = record.write_size(0);
let actual_batch_size = self.estimated_size() + record_size;
let actual_batch_size = self.raw_size() + record_size;

// Error if the record is too large
if actual_batch_size > self.write_limit {
Expand Down Expand Up @@ -95,7 +83,7 @@ impl MemoryBatch {
}

pub fn is_full(&self) -> bool {
self.is_full || self.estimated_size() >= self.batch_limit
self.is_full || self.raw_size() >= self.batch_limit
}

pub fn elapsed(&self) -> Timestamp {
Expand All @@ -104,7 +92,7 @@ impl MemoryBatch {
std::cmp::max(0, now - self.create_time)
}

fn estimated_size(&self) -> usize {
fn raw_size(&self) -> usize {
self.current_size_uncompressed + Batch::<RawRecords>::default().write_size(0)
}

Expand Down Expand Up @@ -174,13 +162,22 @@ mod test {
Compression::None,
);

assert!(mb.push_record(record).unwrap().added());
assert!(matches!(
mb.push_record(record.clone()),
Ok(MemoryBatchStatus::Added(_))
));
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).unwrap().added());
assert!(matches!(
mb.push_record(record.clone()),
Ok(MemoryBatchStatus::Added(_))
));
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).unwrap().added());
assert!(matches!(
mb.push_record(record.clone()),
Ok(MemoryBatchStatus::Added(_))
));

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -213,6 +210,31 @@ mod test {
);
}

#[test]
fn test_is_the_first_record_from_batch_and_actual_batch_size_larger_then_batch_limit() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);

let mut mb = MemoryBatch::new(
1_048_576,
size / 2
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
);

assert!(matches!(
mb.push_record(record.clone()),
Ok(MemoryBatchStatus::Added(_))
));
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(matches!(
mb.push_record(record.clone()),
Ok(MemoryBatchStatus::NotAdded(_))
));
}

#[test]
fn test_convert_memory_batch_to_batch() {
let num_records = 10;
Expand Down

0 comments on commit fd7fb17

Please sign in to comment.