Skip to content

Commit

Permalink
Make sink batch duration and size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Mar 5, 2024
1 parent 45c6b61 commit 0173b0c
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
37 changes: 26 additions & 11 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ use crate::{
use super::execution_dag::ExecutionDag;
use super::{name::Name, receiver_loop::ReceiverLoop};

// TODO: make configurable
const SCHEDULE_LOOP_INTERVAL: Duration = Duration::from_millis(5);
const MAX_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_millis(20);

struct FlushScheduler {
receiver: Receiver<Duration>,
sender: Sender<()>,
next_schedule: Option<Duration>,
next_schedule_from: Instant,
loop_interval: Duration,
}

impl FlushScheduler {
Expand Down Expand Up @@ -74,7 +73,7 @@ impl FlushScheduler {
self.next_schedule = None;
} else {
let time_to_next_schedule = schedule - self.next_schedule_from.elapsed();
std::thread::sleep(SCHEDULE_LOOP_INTERVAL.min(time_to_next_schedule));
std::thread::sleep(self.loop_interval.min(time_to_next_schedule));
}
}
}
Expand All @@ -98,8 +97,11 @@ pub struct SinkNode {
/// The metrics labels.
labels: LabelsAndProgress,

max_flush_interval: Duration,

ops_since_flush: u64,
last_op_if_commit: Option<Epoch>,
flush_on_next_commit: bool,
flush_scheduled_on_next_commit: bool,
flush_scheduler_sender: Sender<Duration>,
should_flush_receiver: Receiver<()>,

Expand Down Expand Up @@ -131,13 +133,17 @@ impl SinkNode {
"The pipeline processing latency in seconds"
);

let max_flush_interval = sink
.max_batch_duration_ms()
.map_or(DEFAULT_FLUSH_INTERVAL, Duration::from_millis);
let (schedule_sender, schedule_receiver) = crossbeam::channel::bounded(10);
let (should_flush_sender, should_flush_receiver) = crossbeam::channel::bounded(0);
let mut scheduler = FlushScheduler {
receiver: schedule_receiver,
sender: should_flush_sender,
next_schedule: None,
next_schedule_from: Instant::now(),
loop_interval: max_flush_interval / 5,
};

std::thread::spawn(move || scheduler.run());
Expand All @@ -151,10 +157,12 @@ impl SinkNode {
error_manager: dag.error_manager().clone(),
labels: dag.labels().clone(),
last_op_if_commit: None,
flush_on_next_commit: false,
flush_scheduled_on_next_commit: false,
flush_scheduler_sender: schedule_sender,
should_flush_receiver,
event_sender: dag.event_hub().sender.clone(),
max_flush_interval,
ops_since_flush: 0,
}
}

Expand All @@ -166,8 +174,9 @@ impl SinkNode {
if let Err(e) = self.sink.flush_batch() {
self.error_manager.report(e);
}
self.ops_since_flush = 0;
self.flush_scheduler_sender
.send(MAX_FLUSH_INTERVAL)
.send(self.max_flush_interval)
.unwrap();
let _ = self.event_sender.send(Event::SinkFlushed {
node: self.node_handle.clone(),
Expand Down Expand Up @@ -211,15 +220,15 @@ impl ReceiverLoop for SinkNode {
let mut epoch_id = initial_epoch_id;

self.flush_scheduler_sender
.send(MAX_FLUSH_INTERVAL)
.send(self.max_flush_interval)
.unwrap();
let mut sel = init_select(&receivers);
loop {
if self.should_flush_receiver.try_recv().is_ok() {
if let Some(epoch) = self.last_op_if_commit.take() {
self.flush(epoch)?;
} else {
self.flush_on_next_commit = true;
self.flush_scheduled_on_next_commit = true;
}
}
let index = sel.ready();
Expand Down Expand Up @@ -294,6 +303,7 @@ impl ReceiverLoop for SinkNode {
Operation::BatchInsert { new } => new.len() as u64,
_ => 1,
};
self.ops_since_flush += counter_number;

if let Err(e) = self.sink.process(op) {
self.error_manager.report(e);
Expand All @@ -316,9 +326,14 @@ impl ReceiverLoop for SinkNode {
gauge!(PIPELINE_LATENCY_GAUGE_NAME, duration.as_secs_f64(), labels);
}

if self.flush_on_next_commit {
if self
.sink
.preferred_batch_size()
.is_some_and(|batch_size| self.ops_since_flush >= batch_size)
|| self.flush_scheduled_on_next_commit
{
self.flush(epoch)?;
self.flush_on_next_commit = false;
self.flush_scheduled_on_next_commit = false;
}

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ pub trait Sink: Send + Sync + Debug {
fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError>;
fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError>;

fn preferred_batch_size(&self) -> Option<u64> {
None
}

fn max_batch_duration_ms(&self) -> Option<u64> {
None
}

/// If the Sink batches operations, flush the batch to the store when this method is called.
/// This method is guaranteed to only be called on commit boundaries
fn flush_batch(&mut self) -> Result<(), BoxedError> {
Expand Down
20 changes: 19 additions & 1 deletion dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ impl SinkFactory for AerospikeSinkFactory {
});
}
Ok(Box::new(AerospikeSink::new(
self.config.clone(),
client,
tables,
n_threads.into(),
Expand Down Expand Up @@ -448,6 +449,7 @@ impl Drop for AsRecord<'_> {
struct AerospikeSink {
sender: Sender<TableOperation>,
snapshotting_started_instant: HashMap<String, Instant>,
config: AerospikeSinkConfig,
}

#[derive(Debug)]
Expand Down Expand Up @@ -516,7 +518,12 @@ struct AerospikeTable {
}

impl AerospikeSink {
fn new(client: Client, tables: Vec<AerospikeTable>, n_threads: usize) -> Self {
fn new(
config: AerospikeSinkConfig,
client: Client,
tables: Vec<AerospikeTable>,
n_threads: usize,
) -> Self {
let client = Arc::new(client);
let mut workers = Vec::with_capacity(n_threads);
let (sender, receiver) = bounded(n_threads);
Expand All @@ -532,6 +539,7 @@ impl AerospikeSink {
}

Self {
config,
sender,
snapshotting_started_instant: Default::default(),
}
Expand Down Expand Up @@ -1253,6 +1261,14 @@ impl Sink for AerospikeSink {
fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
}

fn max_batch_duration_ms(&self) -> Option<u64> {
self.config.max_batch_duration_ms
}

fn preferred_batch_size(&self) -> Option<u64> {
self.config.preferred_batch_size
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1364,6 +1380,8 @@ mod tests {
set_name: set.to_owned(),
denormalize: vec![],
}],
max_batch_duration_ms: None,
preferred_batch_size: None,
},
);
factory
Expand Down
2 changes: 2 additions & 0 deletions dozer-types/src/models/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ pub struct AerospikeSinkConfig {
pub n_threads: Option<NonZeroUsize>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tables: Vec<AerospikeSinkTable>,
pub max_batch_duration_ms: Option<u64>,
pub preferred_batch_size: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)]
Expand Down
16 changes: 16 additions & 0 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@
"connection": {
"type": "string"
},
"max_batch_duration_ms": {
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
},
"n_threads": {
"type": [
"integer",
Expand All @@ -202,6 +210,14 @@
"format": "uint",
"minimum": 1.0
},
"preferred_batch_size": {
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
},
"tables": {
"type": "array",
"items": {
Expand Down

0 comments on commit 0173b0c

Please sign in to comment.