Skip to content

Commit

Permalink
chore: remove clones
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Oct 15, 2024
1 parent 34587e4 commit c1f6e4e
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 43 deletions.
113 changes: 79 additions & 34 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::producer::ProducerError;
use crate::error::Result;

use super::event::EventHandler;
use super::memory_batch::MemoryBatch;
use super::memory_batch::{MemoryBatch, MemoryBatchStatus};

const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -110,17 +110,28 @@ impl RecordAccumulator {

// If the last batch is not full, push the record to it
if let Some(batch) = batches.back_mut() {
match batch.push_record(record.clone()) {
Ok(Some(push_record)) => {
match batch.push_record(record) {
Ok(ProduceBatchStatus::Added(push_record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Ok(None) => {
batch_events.notify_batch_full().await;
Ok(ProduceBatchStatus::NotAdded(record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}

// Create and push a new batch if needed
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record, 1)
.await?;

return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Err(err) => {
return Err(err);
Expand Down Expand Up @@ -159,6 +170,7 @@ impl RecordAccumulator {
}
Ok(batches)
}

async fn create_and_new_batch(
&self,
batch_events: &BatchEvents,
Expand All @@ -176,32 +188,26 @@ impl RecordAccumulator {
let mut batch =
ProducerBatch::new(self.max_request_size, self.batch_size, self.compression);

match batch.push_record(record.clone()) {
Ok(Some(push_record)) => {
match batch.push_record(record) {
Ok(ProduceBatchStatus::Added(push_record)) => {
batch_events.notify_new_batch().await;

if batch.is_full() {
batch_events.notify_batch_full().await;
}

batches.push_back(batch);
Ok(push_record)
}
Ok(None) => {
Ok(ProduceBatchStatus::NotAdded(record)) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
}

batches.push_back(batch);

// Box the future to avoid infinite size due to recursion
Box::pin(self.create_and_new_batch(
batch_events,
batches,
record,
attempts + 1,
))
.await
Box::pin(self.create_and_new_batch(batch_events, batches, record, attempts + 1))
.await
}
Err(err) => Err(err),
}
Expand All @@ -223,6 +229,23 @@ where {
}
}

pub enum ProduceBatchStatus {
Added(PartialFutureRecordMetadata),
NotAdded(Record),
}

impl ProduceBatchStatus {
#[allow(dead_code)]
pub fn added(&self) -> bool {
matches!(self, ProduceBatchStatus::Added(_))
}

#[allow(dead_code)]
pub fn not_added(&self) -> bool {
matches!(self, ProduceBatchStatus::NotAdded(_))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
Expand All @@ -244,16 +267,12 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(
&mut self,
record: Record,
) -> Result<Option<PartialFutureRecordMetadata>, ProducerError> {
fn push_record(&mut self, record: Record) -> Result<ProduceBatchStatus, ProducerError> {
match self.batch.push_record(record) {
Ok(Some(offset)) => Ok(Some(PartialFutureRecordMetadata::new(
offset,
self.batch_metadata.clone(),
))),
Ok(None) => Ok(None),
Ok(MemoryBatchStatus::Added(offset)) => Ok(ProduceBatchStatus::Added(
PartialFutureRecordMetadata::new(offset, self.batch_metadata.clone()),
)),
Ok(MemoryBatchStatus::NotAdded(record)) => Ok(ProduceBatchStatus::NotAdded(record)),
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -392,13 +411,13 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(!pb.is_full());

assert!(pb.push_record(record).unwrap().is_none());
assert!(pb.push_record(record).unwrap().not_added());
}

#[test]
Expand All @@ -415,15 +434,41 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().is_some());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(pb.is_full());

assert!(pb.push_record(record).unwrap().is_none());
assert!(pb.push_record(record).unwrap().not_added());
}

#[test]
fn test_producer_write_limit() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);

// Producer batch that can store three instances of Record::from(("key", "value"))
let mut pb = ProducerBatch::new(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
);

assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());
assert!(pb.push_record(record.clone()).unwrap().added());

assert!(pb.is_full());

assert!(pb.push_record(record).is_err());
}


#[fluvio_future::test]
async fn test_record_accumulator() {
let record = Record::from(("key", "value"));
Expand Down
40 changes: 31 additions & 9 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@ use fluvio_types::Timestamp;

use super::*;

pub enum MemoryBatchStatus {
Added(Offset),
NotAdded(Record),
}

impl MemoryBatchStatus {
#[allow(dead_code)]
fn added(&self) -> bool {
matches!(self, Self::Added(_))
}

#[allow(dead_code)]
fn not_added(&self) -> bool {
matches!(self, Self::NotAdded(_))
}
}

pub struct MemoryBatch {
compression: Compression,
batch_limit: usize,
Expand Down Expand Up @@ -37,7 +54,7 @@ impl MemoryBatch {

/// Add a record to the batch.
/// The value of `Offset` is relative to the `MemoryBatch` instance.
pub fn push_record(&mut self, mut record: Record) -> Result<Option<Offset>, ProducerError> {
pub fn push_record(&mut self, mut record: Record) -> Result<MemoryBatchStatus, ProducerError> {
let is_the_first_record = self.records_len() == 0;

let current_offset = self.offset() as i64;
Expand Down Expand Up @@ -66,19 +83,19 @@ impl MemoryBatch {
}
} else if actual_batch_size > self.batch_limit {
self.is_full = true;
return Ok(None);
return Ok(MemoryBatchStatus::NotAdded(record));
} else if actual_batch_size == self.batch_limit {
self.is_full = true;
}

self.current_size_uncompressed += record_size;
self.records.push(record);

Ok(Some(current_offset))
Ok(MemoryBatchStatus::Added(current_offset))
}

pub fn is_full(&self) -> bool {
self.is_full || self.estimated_size() > self.batch_limit
self.is_full || self.estimated_size() >= self.batch_limit
}

pub fn elapsed(&self) -> Timestamp {
Expand Down Expand Up @@ -157,13 +174,13 @@ mod test {
Compression::None,
);

assert!(mb.push_record(record).unwrap().is_some());
assert!(mb.push_record(record).unwrap().added());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).unwrap().is_some());
assert!(mb.push_record(record).unwrap().added());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).unwrap().is_some());
assert!(mb.push_record(record).unwrap().added());

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -209,13 +226,18 @@ mod test {
let mut offset = 0;

for _ in 0..num_records {
offset = memory_batch
let status = memory_batch
.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
})
.unwrap()
.expect("Offset should exist");

if let MemoryBatchStatus::Added(o) = status {
offset = o;
} else {
panic!("this should not happen");
}
}

let memory_batch_records_len = memory_batch.records_len();
Expand Down

0 comments on commit c1f6e4e

Please sign in to comment.