From 0173b0c76b1da2eabe367707b3b626454284b723 Mon Sep 17 00:00:00 2001 From: Jesse Bakker Date: Tue, 5 Mar 2024 10:01:20 +0100 Subject: [PATCH] Make sink batch duration and size configurable --- dozer-core/src/executor/sink_node.rs | 37 +++++++++++++++++++--------- dozer-core/src/node.rs | 8 ++++++ dozer-sink-aerospike/src/lib.rs | 20 ++++++++++++++- dozer-types/src/models/sink.rs | 2 ++ json_schemas/dozer.json | 16 ++++++++++++ 5 files changed, 71 insertions(+), 12 deletions(-) diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 042e769c2a..028975aacd 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -25,15 +25,14 @@ use crate::{ use super::execution_dag::ExecutionDag; use super::{name::Name, receiver_loop::ReceiverLoop}; -// TODO: make configurable -const SCHEDULE_LOOP_INTERVAL: Duration = Duration::from_millis(5); -const MAX_FLUSH_INTERVAL: Duration = Duration::from_millis(100); +const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_millis(20); struct FlushScheduler { receiver: Receiver, sender: Sender<()>, next_schedule: Option, next_schedule_from: Instant, + loop_interval: Duration, } impl FlushScheduler { @@ -74,7 +73,7 @@ impl FlushScheduler { self.next_schedule = None; } else { let time_to_next_schedule = schedule - self.next_schedule_from.elapsed(); - std::thread::sleep(SCHEDULE_LOOP_INTERVAL.min(time_to_next_schedule)); + std::thread::sleep(self.loop_interval.min(time_to_next_schedule)); } } } @@ -98,8 +97,11 @@ pub struct SinkNode { /// The metrics labels. labels: LabelsAndProgress, + max_flush_interval: Duration, + + ops_since_flush: u64, last_op_if_commit: Option, - flush_on_next_commit: bool, + flush_scheduled_on_next_commit: bool, flush_scheduler_sender: Sender, should_flush_receiver: Receiver<()>, @@ -131,6 +133,9 @@ impl SinkNode { "The pipeline processing latency in seconds" ); + let max_flush_interval = sink + .max_batch_duration_ms() + .map_or(DEFAULT_FLUSH_INTERVAL, Duration::from_millis); let (schedule_sender, schedule_receiver) = crossbeam::channel::bounded(10); let (should_flush_sender, should_flush_receiver) = crossbeam::channel::bounded(0); let mut scheduler = FlushScheduler { @@ -138,6 +143,7 @@ impl SinkNode { sender: should_flush_sender, next_schedule: None, next_schedule_from: Instant::now(), + loop_interval: max_flush_interval / 5, }; std::thread::spawn(move || scheduler.run()); @@ -151,10 +157,12 @@ impl SinkNode { error_manager: dag.error_manager().clone(), labels: dag.labels().clone(), last_op_if_commit: None, - flush_on_next_commit: false, + flush_scheduled_on_next_commit: false, flush_scheduler_sender: schedule_sender, should_flush_receiver, event_sender: dag.event_hub().sender.clone(), + max_flush_interval, + ops_since_flush: 0, } } @@ -166,8 +174,9 @@ impl SinkNode { if let Err(e) = self.sink.flush_batch() { self.error_manager.report(e); } + self.ops_since_flush = 0; self.flush_scheduler_sender - .send(MAX_FLUSH_INTERVAL) + .send(self.max_flush_interval) .unwrap(); let _ = self.event_sender.send(Event::SinkFlushed { node: self.node_handle.clone(), @@ -211,7 +220,7 @@ impl ReceiverLoop for SinkNode { let mut epoch_id = initial_epoch_id; self.flush_scheduler_sender - .send(MAX_FLUSH_INTERVAL) + .send(self.max_flush_interval) .unwrap(); let mut sel = init_select(&receivers); loop { @@ -219,7 +228,7 @@ impl ReceiverLoop for SinkNode { if let Some(epoch) = self.last_op_if_commit.take() { self.flush(epoch)?; } else { - self.flush_on_next_commit = true; + self.flush_scheduled_on_next_commit = true; } } let index = sel.ready(); @@ -294,6 +303,7 @@ impl ReceiverLoop for SinkNode { Operation::BatchInsert { new } => new.len() as u64, _ => 1, }; + self.ops_since_flush += counter_number; if let Err(e) = self.sink.process(op) { self.error_manager.report(e); @@ -316,9 +326,14 @@ impl ReceiverLoop for SinkNode { gauge!(PIPELINE_LATENCY_GAUGE_NAME, duration.as_secs_f64(), labels); } - if self.flush_on_next_commit { + if self + .sink + .preferred_batch_size() + .is_some_and(|batch_size| self.ops_since_flush >= batch_size) + || self.flush_scheduled_on_next_commit + { self.flush(epoch)?; - self.flush_on_next_commit = false; + self.flush_scheduled_on_next_commit = false; } Ok(()) diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 0a57f74d93..0574f9cb11 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -125,6 +125,14 @@ pub trait Sink: Send + Sync + Debug { fn get_source_state(&mut self) -> Result>, BoxedError>; fn get_latest_op_id(&mut self) -> Result, BoxedError>; + fn preferred_batch_size(&self) -> Option { + None + } + + fn max_batch_duration_ms(&self) -> Option { + None + } + /// If the Sink batches operations, flush the batch to the store when this method is called. /// This method is guaranteed to only be called on commit boundaries fn flush_batch(&mut self) -> Result<(), BoxedError> { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 0ac4262f55..1e8cbc37c6 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -397,6 +397,7 @@ impl SinkFactory for AerospikeSinkFactory { }); } Ok(Box::new(AerospikeSink::new( + self.config.clone(), client, tables, n_threads.into(), @@ -448,6 +449,7 @@ impl Drop for AsRecord<'_> { struct AerospikeSink { sender: Sender, snapshotting_started_instant: HashMap, + config: AerospikeSinkConfig, } #[derive(Debug)] @@ -516,7 +518,12 @@ struct AerospikeTable { } impl AerospikeSink { - fn new(client: Client, tables: Vec, n_threads: usize) -> Self { + fn new( + config: AerospikeSinkConfig, + client: Client, + tables: Vec, + n_threads: usize, + ) -> Self { let client = Arc::new(client); let mut workers = Vec::with_capacity(n_threads); let (sender, receiver) = bounded(n_threads); @@ -532,6 +539,7 @@ impl AerospikeSink { } Self { + config, sender, snapshotting_started_instant: Default::default(), } @@ -1253,6 +1261,14 @@ impl Sink for AerospikeSink { fn get_latest_op_id(&mut self) -> Result, BoxedError> { Ok(None) } + + fn max_batch_duration_ms(&self) -> Option { + self.config.max_batch_duration_ms + } + + fn preferred_batch_size(&self) -> Option { + self.config.preferred_batch_size + } } #[cfg(test)] @@ -1364,6 +1380,8 @@ mod tests { set_name: set.to_owned(), denormalize: vec![], }], + max_batch_duration_ms: None, + preferred_batch_size: None, }, ); factory diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index ec81477552..766d870db8 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -150,6 +150,8 @@ pub struct AerospikeSinkConfig { pub n_threads: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub tables: Vec, + pub max_batch_duration_ms: Option, + pub preferred_batch_size: Option, } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 21d9f812aa..a6a8ee2d93 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -194,6 +194,14 @@ "connection": { "type": "string" }, + "max_batch_duration_ms": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "n_threads": { "type": [ "integer", @@ -202,6 +210,14 @@ "format": "uint", "minimum": 1.0 }, + "preferred_batch_size": { + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "tables": { "type": "array", "items": {