Skip to content

Commit

Permalink
feat: Only confirm ESP when sink is flushed (#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei authored Mar 1, 2024
1 parent f384c4f commit 0f2977e
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 97 deletions.
25 changes: 12 additions & 13 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct SinkNode {
/// The metrics labels.
labels: LabelsAndProgress,

last_op_was_commit: bool,
last_op_if_commit: Option<Epoch>,
flush_on_next_commit: bool,
flush_scheduler_sender: Sender<Duration>,
should_flush_receiver: Receiver<()>,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl SinkNode {
sink,
error_manager: dag.error_manager().clone(),
labels: dag.labels().clone(),
last_op_was_commit: false,
last_op_if_commit: None,
flush_on_next_commit: false,
flush_scheduler_sender: schedule_sender,
should_flush_receiver,
Expand All @@ -162,13 +162,17 @@ impl SinkNode {
&self.node_handle
}

fn flush(&mut self) -> Result<(), ExecutionError> {
fn flush(&mut self, epoch: Epoch) -> Result<(), ExecutionError> {
if let Err(e) = self.sink.flush_batch() {
self.error_manager.report(e);
}
self.flush_scheduler_sender
.send(MAX_FLUSH_INTERVAL)
.unwrap();
let _ = self.event_sender.send(Event::SinkFlushed {
node: self.node_handle.clone(),
epoch,
});
Ok(())
}
}
Expand Down Expand Up @@ -212,8 +216,8 @@ impl ReceiverLoop for SinkNode {
let mut sel = init_select(&receivers);
loop {
if self.should_flush_receiver.try_recv().is_ok() {
if self.last_op_was_commit {
self.flush()?;
if let Some(epoch) = self.last_op_if_commit.take() {
self.flush(epoch)?;
} else {
self.flush_on_next_commit = true;
}
Expand Down Expand Up @@ -267,7 +271,7 @@ impl ReceiverLoop for SinkNode {
}

fn on_op(&mut self, _index: usize, op: TableOperation) -> Result<(), ExecutionError> {
self.last_op_was_commit = false;
self.last_op_if_commit = None;
let mut labels = self.labels.labels().clone();
labels.push("table", self.node_handle.id.clone());
const OPERATION_TYPE_LABEL: &str = "operation_type";
Expand Down Expand Up @@ -304,7 +308,7 @@ impl ReceiverLoop for SinkNode {
if let Err(e) = self.sink.commit(&epoch) {
self.error_manager.report(e);
}
self.last_op_was_commit = true;
self.last_op_if_commit = Some(epoch.clone());

if let Ok(duration) = epoch.decision_instant.elapsed() {
let mut labels = self.labels.labels().clone();
Expand All @@ -313,15 +317,10 @@ impl ReceiverLoop for SinkNode {
}

if self.flush_on_next_commit {
self.flush()?;
self.flush(epoch)?;
self.flush_on_next_commit = false;
}

let _ = self.event_sender.send(Event::SinkCommitted {
node: self.node_handle.clone(),
epoch,
});

Ok(())
}

Expand Down
Loading

0 comments on commit 0f2977e

Please sign in to comment.