Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix record size larger than batch size #4195

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ mod cmd {
#[arg(long, value_parser=parse_duration)]
pub linger: Option<Duration>,

/// Max amount of bytes accumulated before sending
/// Max number of records to batch before sending
#[arg(long)]
pub batch_size: Option<usize>,

/// Max amount of bytes accumulated before sending
#[arg(long)]
pub max_request_size: Option<usize>,

/// Isolation level that producer must respect.
/// Supported values: read_committed (ReadCommitted) - wait for records to be committed before response,
/// read_uncommitted (ReadUncommitted) - just wait for leader to accept records.
Expand Down Expand Up @@ -212,6 +216,13 @@ mod cmd {
config_builder
};

// Max request size
let config_builder = if let Some(max_request_size) = self.max_request_size {
config_builder.max_request_size(max_request_size)
} else {
config_builder
};

// Isolation
let config_builder = if let Some(isolation) = self.isolation {
config_builder.isolation(isolation)
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub struct SharedReplicaConfig {
pub flush_write_count: SharedConfigU32Value,
pub flush_idle_msec: SharedConfigU32Value,
pub max_batch_size: SharedConfigU32Value,
pub max_request_size: SharedConfigU32Value,
pub update_hw: bool, // if true, enable hw update
pub retention_seconds: SharedConfigU32Value,
pub max_partition_size: SharedConfigU64Value,
Expand All @@ -240,6 +241,7 @@ impl From<ReplicaConfig> for SharedReplicaConfig {
flush_write_count: SharedConfigU32Value::new(config.flush_write_count),
flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec),
max_batch_size: SharedConfigU32Value::new(config.max_batch_size),
max_request_size: SharedConfigU32Value::new(config.max_batch_size),
update_hw: config.update_hw,
retention_seconds: SharedConfigU32Value::new(config.retention_seconds),
max_partition_size: SharedConfigU64Value::new(config.max_partition_size),
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ReplicaStorage for FileReplica {
records: &mut RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize> {
let max_batch_size = self.option.max_batch_size.get() as usize;
let max_request_size = self.option.max_request_size.get() as usize;
let max_segment_size = self.option.segment_max_bytes.get() as usize;
let mut total_size = 0;
// check if any of the records's batch exceed max length
Expand All @@ -146,8 +146,8 @@ impl ReplicaStorage for FileReplica {
.into());
}
total_size += batch_size;
if batch_size > max_batch_size {
return Err(StorageError::BatchTooBig(max_batch_size).into());
if batch_size > max_request_size {
return Err(StorageError::BatchTooBig(max_request_size).into());
}
}
sehz marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.23.3"
version = "0.23.4"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
192 changes: 148 additions & 44 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::trace;
use futures_util::future::{BoxFuture, Either, Shared};
use futures_util::{FutureExt, ready};

use fluvio_future::sync::Mutex;
use fluvio_future::sync::{Mutex, MutexGuard};
use fluvio_future::sync::Condvar;
use fluvio_protocol::record::Batch;
use fluvio_compression::Compression;
Expand All @@ -27,7 +27,7 @@ use crate::producer::ProducerError;
use crate::error::Result;

use super::event::EventHandler;
use super::memory_batch::MemoryBatch;
use super::memory_batch::{MemoryBatch, MemoryBatchStatus};

const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -55,6 +55,7 @@ impl BatchesDeque {
/// The batches are separated by PartitionId
pub(crate) struct RecordAccumulator {
batch_size: usize,
max_request_size: usize,
queue_size: usize,
batches: Arc<RwLock<HashMap<PartitionId, BatchHandler>>>,
compression: Compression,
Expand All @@ -63,16 +64,17 @@ pub(crate) struct RecordAccumulator {
impl RecordAccumulator {
pub(crate) fn new(
batch_size: usize,
max_request_size: usize,
queue_size: usize,
partition_n: PartitionCount,
compression: Compression,
) -> Self {
let mut batches = HashMap::new();
for p in 0..partition_n {
batches.insert(p, (BatchEvents::shared(), BatchesDeque::shared()));
}
let batches = (0..partition_n)
.map(|p| (p, (BatchEvents::shared(), BatchesDeque::shared())))
.collect::<HashMap<_, _>>();
Self {
batches: Arc::new(RwLock::new(batches)),
max_request_size,
batch_size,
compression,
queue_size,
Expand Down Expand Up @@ -103,6 +105,56 @@ impl RecordAccumulator {
.get(&partition_id)
.ok_or(ProducerError::PartitionNotFound(partition_id))?;

// Wait for space in the batch queue
let mut batches = self.wait_for_space(batches_lock).await?;

// If the last batch is not full, push the record to it
if let Some(batch) = batches.back_mut() {
match batch.push_record(record) {
Ok(ProduceBatchStatus::Added(push_record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Ok(ProduceBatchStatus::NotAdded(record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}

// Create and push a new batch if needed
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record, 1)
.await?;

return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Err(err) => {
return Err(err);
}
}
}

trace!(partition_id, "Creating a new batch");

// Create and push a new batch if needed
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record, 1)
.await?;

Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}

async fn wait_for_space<'a>(
&self,
batches_lock: &'a Arc<BatchesDeque>,
) -> Result<MutexGuard<'a, VecDeque<ProducerBatch>>, ProducerError> {
let mut batches = batches_lock.batches.lock().await;
if batches.len() >= self.queue_size {
let (guard, wait_result) = batches_lock
Expand All @@ -116,41 +168,48 @@ impl RecordAccumulator {
}
batches = guard;
}
if let Some(batch) = batches.back_mut() {
if let Some(push_record) = batch.push_record(record.clone()) {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
} else {
batch_events.notify_batch_full().await;
}
}
Ok(batches)
}

trace!(
partition_id,
"Batch is full. Creating a new batch for partition"
);
async fn create_and_new_batch(
&self,
batch_events: &BatchEvents,
batches: &mut VecDeque<ProducerBatch>,
record: Record,
attempts: usize,
) -> Result<PartialFutureRecordMetadata, ProducerError> {
if attempts > 2 {
// This should never happen, but if it does, we should stop the recursion
return Err(ProducerError::Internal(
"Attempts exceeded while creating a new batch".to_string(),
));
}

let mut batch = ProducerBatch::new(self.batch_size, self.compression);
let mut batch =
ProducerBatch::new(self.max_request_size, self.batch_size, self.compression);

match batch.push_record(record) {
Some(push_record) => {
Ok(ProduceBatchStatus::Added(push_record)) => {
batch_events.notify_new_batch().await;

if batch.is_full() {
batch_events.notify_batch_full().await;
}

batches.push_back(batch);
Ok(push_record)
}
Ok(ProduceBatchStatus::NotAdded(record)) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
}

Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
batches.push_back(batch);
// Box the future to avoid infinite size due to recursion
Box::pin(self.create_and_new_batch(batch_events, batches, record, attempts + 1))
.await
}
None => Err(ProducerError::RecordTooLarge(self.batch_size)),
Err(err) => Err(err),
}
}

Expand All @@ -170,16 +229,33 @@ where {
}
}

pub enum ProduceBatchStatus {
Added(PartialFutureRecordMetadata),
NotAdded(Record),
}

impl ProduceBatchStatus {
#[allow(dead_code)]
pub fn added(&self) -> bool {
sehz marked this conversation as resolved.
Show resolved Hide resolved
matches!(self, ProduceBatchStatus::Added(_))
}

#[allow(dead_code)]
pub fn not_added(&self) -> bool {
matches!(self, ProduceBatchStatus::NotAdded(_))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
batch: MemoryBatch,
}
impl ProducerBatch {
fn new(write_limit: usize, compression: Compression) -> Self {
fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self {
let (sender, receiver) = async_channel::bounded(1);
let batch_metadata = Arc::new(BatchMetadata::new(receiver));
let batch = MemoryBatch::new(write_limit, compression);
let batch = MemoryBatch::new(write_limit, batch_limit, compression);

Self {
notify: sender,
Expand All @@ -191,13 +267,13 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(&mut self, record: Record) -> Option<PartialFutureRecordMetadata> {
fn push_record(&mut self, record: Record) -> Result<ProduceBatchStatus, ProducerError> {
match self.batch.push_record(record) {
None => None,
Some(relative_offset) => Some(PartialFutureRecordMetadata::new(
relative_offset,
self.batch_metadata.clone(),
Ok(MemoryBatchStatus::Added(offset)) => Ok(ProduceBatchStatus::Added(
PartialFutureRecordMetadata::new(offset, self.batch_metadata.clone()),
)),
Ok(MemoryBatchStatus::NotAdded(record)) => Ok(ProduceBatchStatus::NotAdded(record)),
Err(err) => Err(err),
}
}

Expand Down Expand Up @@ -327,20 +403,21 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
1_048_576,
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(!pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).unwrap().not_added());
}

#[test]
Expand All @@ -350,19 +427,45 @@ mod test {

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
1_048_576,
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(pb.is_full());

assert!(pb.push_record(record).unwrap().not_added());
}

#[test]
fn test_producer_write_limit() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).is_err());
}

#[fluvio_future::test]
Expand All @@ -374,6 +477,7 @@ mod test {
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
1_048_576,
10,
1,
Compression::None,
Expand Down
Loading
Loading