Skip to content

Commit

Permalink
chore: use result instead enum for push_record
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Sep 27, 2024
1 parent 6945ab5 commit f53eeee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 82 deletions.
50 changes: 18 additions & 32 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::producer::ProducerError;
use crate::error::Result;

use super::event::EventHandler;
use super::memory_batch::{BatchRecordStatus, BatchSize, MemoryBatch};
use super::memory_batch::{BatchSize, MemoryBatch};

const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -118,15 +118,15 @@ impl RecordAccumulator {
}
if let Some(batch) = batches.back_mut() {
match batch.push_record(record.clone()) {
PushRecordStatus::Ok(push_record) => {
Ok(push_record) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
PushRecordStatus::DontFit(batch_size, record) => {
Err((batch_size, record)) => {
batch_events.notify_batch_full().await;
return self
.create_batch_single_record(
Expand All @@ -149,7 +149,7 @@ impl RecordAccumulator {
let mut batch = ProducerBatch::new(self.batch_size, self.compression);

match batch.push_record(record) {
PushRecordStatus::Ok(push_record) => {
Ok(push_record) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
Expand All @@ -159,7 +159,7 @@ impl RecordAccumulator {
push_record.into_future_record_metadata(partition_id),
))
}
PushRecordStatus::DontFit(size, record) => {
Err((size, record)) => {
// send the single record without batch size limit
self.create_batch_single_record(
size,
Expand All @@ -183,14 +183,14 @@ impl RecordAccumulator {
) -> Result<PushRecord, ProducerError> {
let mut batch = ProducerBatch::new(batch_size, self.compression);
match batch.push_record(record) {
PushRecordStatus::Ok(push_record) => {
Ok(push_record) => {
batch_events.notify_new_batch().await;
batches.push_back(batch);
Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}
PushRecordStatus::DontFit(size, _record) => {
Err((size, _record)) => {
// This should never happen, as we are creating a batch with a single record, with
// the same size as the record
// Only if we add a max_size option to the record (not batch)
Expand All @@ -215,23 +215,6 @@ where {
}
}

pub enum PushRecordStatus {
Ok(PartialFutureRecordMetadata),
DontFit(BatchSize, Record),
}

impl PushRecordStatus {
#[allow(dead_code)]
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_))
}

#[allow(dead_code)]
pub fn dont_fit(&self) -> bool {
matches!(self, Self::DontFit(_, _))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
Expand All @@ -253,13 +236,16 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(&mut self, record: Record) -> PushRecordStatus {
fn push_record(
&mut self,
record: Record,
) -> Result<PartialFutureRecordMetadata, (BatchSize, Record)> {
match self.batch.push_record(record) {
BatchRecordStatus::NotFull(relative_offset)
| BatchRecordStatus::Full(relative_offset) => PushRecordStatus::Ok(
PartialFutureRecordMetadata::new(relative_offset, self.batch_metadata.clone()),
),
BatchRecordStatus::DontFit(size, record) => PushRecordStatus::DontFit(size, record),
Ok(relative_offset) => Ok(PartialFutureRecordMetadata::new(
relative_offset,
self.batch_metadata.clone(),
)),
Err((size, record)) => Err((size, record)),
}
}

Expand Down Expand Up @@ -402,7 +388,7 @@ mod test {

assert!(!pb.is_full());

assert!(pb.push_record(record).dont_fit());
assert!(pb.push_record(record).is_err());
}

#[test]
Expand All @@ -424,7 +410,7 @@ mod test {

assert!(pb.is_full());

assert!(pb.push_record(record).dont_fit());
assert!(pb.push_record(record).is_err());
}

#[fluvio_future::test]
Expand Down
62 changes: 12 additions & 50 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,6 @@ use fluvio_types::Timestamp;
use super::*;

pub type BatchSize = usize;
pub enum BatchRecordStatus {
DontFit(BatchSize, Record), // Actual record does not fit in the batch
Full(Offset), // Actual record fits in the batch and the batch is full.
NotFull(Offset), // Actual record fits in the batch and the batch is not full.
}

impl BatchRecordStatus {
pub fn fit(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => true,
}
}

pub fn is_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => false,
}
}

pub fn not_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => false,
BatchRecordStatus::NotFull(_) => true,
}
}
}

pub struct MemoryBatch {
compression: Compression,
Expand Down Expand Up @@ -68,7 +37,7 @@ impl MemoryBatch {

/// Add a record to the batch.
/// The value of `Offset` is relative to the `MemoryBatch` instance.
pub fn push_record(&mut self, mut record: Record) -> BatchRecordStatus {
pub fn push_record(&mut self, mut record: Record) -> Result<Offset, (BatchSize, Record)> {
let current_offset = self.offset() as i64;
record
.get_mut_header()
Expand All @@ -81,18 +50,17 @@ impl MemoryBatch {
let est_size = self.estimated_size() + record_size;

if est_size > self.write_limit {
return BatchRecordStatus::DontFit(est_size, record);
return Err((est_size, record));
}

self.current_size_uncompressed += record_size;
self.records.push(record);

if est_size == self.write_limit {
self.is_full = true;
return BatchRecordStatus::Full(current_offset);
}

BatchRecordStatus::NotFull(current_offset)
Ok(current_offset)
}

pub fn is_full(&self) -> bool {
Expand Down Expand Up @@ -174,13 +142,13 @@ mod test {
Compression::None,
);

assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -226,18 +194,12 @@ mod test {
let mut offset = 0;

for _ in 0..num_records {
let status = memory_batch.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
});

if let BatchRecordStatus::Full(o) = status {
offset = o;
} else if let BatchRecordStatus::NotFull(o) = status {
offset = o;
} else {
panic!("Offset should exist");
}
offset = memory_batch
.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
})
.expect("Offset should exist");
}

let memory_batch_records_len = memory_batch.records_len();
Expand Down

0 comments on commit f53eeee

Please sign in to comment.