Skip to content

Commit

Permalink
chore: use result instead enum for push_record
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Sep 29, 2024
1 parent d9bc1e2 commit 97c1bc1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 104 deletions.
92 changes: 46 additions & 46 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::producer::ProducerError;
use crate::error::Result;

use super::event::EventHandler;
use super::memory_batch::{BatchRecordStatus, BatchSize, MemoryBatch};
use super::memory_batch::{BatchSize, MemoryBatch};

const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -117,26 +117,20 @@ impl RecordAccumulator {
batches = guard;
}
if let Some(batch) = batches.back_mut() {
// there is a batch not full yet for this partition
// let's reuse it
match batch.push_record(record.clone()) {
PushRecordStatus::Ok(push_record) => {
Ok(push_record) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
PushRecordStatus::DontFit(batch_size, record) => {
Err(_) => {
// the record is too large for the actual batch we need to create a new batch
batch_events.notify_batch_full().await;
return self
.create_batch_single_record(
batch_size,
record,
&mut batches,
batch_events,
partition_id,
)
.await;
}
}
}
Expand All @@ -146,10 +140,11 @@ impl RecordAccumulator {
"Batch is full. Creating a new batch for partition"
);

// check if the record fits in a new batch
// if not, send the record directly
let mut batch = ProducerBatch::new(self.batch_size, self.compression);

match batch.push_record(record) {
PushRecordStatus::Ok(push_record) => {
Ok(push_record) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
Expand All @@ -159,7 +154,7 @@ impl RecordAccumulator {
push_record.into_future_record_metadata(partition_id),
))
}
PushRecordStatus::DontFit(size, record) => {
Err((size, record)) => {
// send the single record without batch size limit
self.create_batch_single_record(
size,
Expand All @@ -182,18 +177,17 @@ impl RecordAccumulator {
partition_id: PartitionId,
) -> Result<PushRecord, ProducerError> {
let mut batch = ProducerBatch::new(batch_size, self.compression);
match batch.push_record(record) {
PushRecordStatus::Ok(push_record) => {
match batch.push_record_directly(record) {
Ok(push_record) => {
batch_events.notify_new_batch().await;
batches.push_back(batch);
Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}
PushRecordStatus::DontFit(size, _record) => {
// This should never happen, as we are creating a batch with a single record, with
// the same size as the record
// Only if we add a max_size option to the record (not batch)
Err((size, _record)) => {
// this should never happen, as we are creating a batch with a single record,
// with the same size as the record
Err(ProducerError::RecordTooLarge(size))
}
}
Expand All @@ -215,23 +209,6 @@ where {
}
}

pub enum PushRecordStatus {
Ok(PartialFutureRecordMetadata),
DontFit(BatchSize, Record),
}

impl PushRecordStatus {
#[allow(dead_code)]
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_))
}

#[allow(dead_code)]
pub fn dont_fit(&self) -> bool {
matches!(self, Self::DontFit(_, _))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
Expand All @@ -253,13 +230,36 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(&mut self, record: Record) -> PushRecordStatus {
fn push_record(
&mut self,
record: Record,
) -> Result<PartialFutureRecordMetadata, (BatchSize, Record)> {
let mut record = record;
self.add_record_metadata(&mut record);

match self.batch.push_record(record) {
Ok(relative_offset) => Ok(PartialFutureRecordMetadata::new(
relative_offset,
self.batch_metadata.clone(),
)),
Err((size, record)) => Err((size, record)),
}
}

fn add_record_metadata(&mut self, record: &mut Record) {
self.batch.add_record_metadata(record);
}

fn push_record_directly(
&mut self,
record: Record,
) -> Result<PartialFutureRecordMetadata, (BatchSize, Record)> {
match self.batch.push_record(record) {
BatchRecordStatus::NotFull(relative_offset)
| BatchRecordStatus::Full(relative_offset) => PushRecordStatus::Ok(
PartialFutureRecordMetadata::new(relative_offset, self.batch_metadata.clone()),
),
BatchRecordStatus::DontFit(size, record) => PushRecordStatus::DontFit(size, record),
Ok(relative_offset) => Ok(PartialFutureRecordMetadata::new(
relative_offset,
self.batch_metadata.clone(),
)),
Err((size, record)) => Err((size, record)),
}
}

Expand Down Expand Up @@ -402,7 +402,7 @@ mod test {

assert!(!pb.is_full());

assert!(pb.push_record(record).dont_fit());
assert!(pb.push_record(record).is_err());
}

#[test]
Expand All @@ -424,7 +424,7 @@ mod test {

assert!(pb.is_full());

assert!(pb.push_record(record).dont_fit());
assert!(pb.push_record(record).is_err());
}

#[fluvio_future::test]
Expand Down
81 changes: 23 additions & 58 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,6 @@ use fluvio_types::Timestamp;
use super::*;

pub type BatchSize = usize;
pub enum BatchRecordStatus {
DontFit(BatchSize, Record), // Actual record does not fit in the batch
Full(Offset), // Actual record fits in the batch and the batch is full.
NotFull(Offset), // Actual record fits in the batch and the batch is not full.
}

impl BatchRecordStatus {
pub fn fit(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => true,
}
}

pub fn is_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => false,
}
}

pub fn not_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => false,
BatchRecordStatus::NotFull(_) => true,
}
}
}

pub struct MemoryBatch {
compression: Compression,
Expand Down Expand Up @@ -68,31 +37,33 @@ impl MemoryBatch {

/// Add a record to the batch.
/// The value of `Offset` is relative to the `MemoryBatch` instance.
pub fn push_record(&mut self, mut record: Record) -> BatchRecordStatus {
pub fn push_record(&mut self, record: Record) -> Result<Offset, (BatchSize, Record)> {
let current_offset = self.offset() as i64;
record
.get_mut_header()
.set_offset_delta(current_offset as Offset);

let timestamp_delta = self.elapsed();
record.get_mut_header().set_timestamp_delta(timestamp_delta);

let record_size = record.write_size(0);
let est_size = self.estimated_size() + record_size;

if est_size > self.write_limit {
return BatchRecordStatus::DontFit(est_size, record);
self.is_full = true;
return Err((est_size, record));
}

self.current_size_uncompressed += record_size;
self.records.push(record);

if est_size == self.write_limit {
self.is_full = true;
return BatchRecordStatus::Full(current_offset);
}

BatchRecordStatus::NotFull(current_offset)
Ok(current_offset)
}

pub fn add_record_metadata(&mut self, record: &mut Record) {
let current_offset = self.offset() as i64;
record
.get_mut_header()
.set_offset_delta(current_offset as Offset);
let timestamp_delta = self.elapsed();
record.get_mut_header().set_timestamp_delta(timestamp_delta);
}

pub fn is_full(&self) -> bool {
Expand All @@ -105,7 +76,7 @@ impl MemoryBatch {
std::cmp::max(0, now - self.create_time)
}

fn estimated_size(&self) -> usize {
pub fn estimated_size(&self) -> usize {
self.current_size_uncompressed + Batch::<RawRecords>::default().write_size(0)
}

Expand Down Expand Up @@ -174,13 +145,13 @@ mod test {
Compression::None,
);

assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).not_full());
assert!(mb.push_record(record).is_ok());

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -226,18 +197,12 @@ mod test {
let mut offset = 0;

for _ in 0..num_records {
let status = memory_batch.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
});

if let BatchRecordStatus::Full(o) = status {
offset = o;
} else if let BatchRecordStatus::NotFull(o) = status {
offset = o;
} else {
panic!("Offset should exist");
}
offset = memory_batch
.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
})
.expect("Offset should exist");
}

let memory_batch_records_len = memory_batch.records_len();
Expand Down

0 comments on commit 97c1bc1

Please sign in to comment.