Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for non-opid-emitting sources to be restartable. #2474

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use dozer_core::event::EventHub;
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceMessage,
};
use dozer_core::shutdown::ShutdownReceiver;
use dozer_ingestion::{
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo,
Expand All @@ -13,7 +15,7 @@ use dozer_tracing::{emit_event, DozerMonitorContext};
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::connection::Connection;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::OpIdentifier;
use dozer_types::node::SourceState;
use dozer_types::thiserror::{self, Error};
use dozer_types::tracing::info;
use dozer_types::types::{Operation, Schema, SourceDefinition};
Expand Down Expand Up @@ -225,8 +227,8 @@ impl Source for ConnectorSource {

async fn start(
&mut self,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: Option<OpIdentifier>,
sender: Sender<SourceMessage>,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone());
let connection_name = self.connection_name.clone();
Expand Down Expand Up @@ -305,7 +307,7 @@ impl Source for ConnectorSource {

async fn forward_message_to_pipeline(
mut iterator: IngestionIterator,
sender: Sender<(PortHandle, IngestionMessage)>,
sender: Sender<SourceMessage>,
connection_name: String,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
Expand All @@ -325,7 +327,7 @@ async fn forward_message_to_pipeline(
.init();

let mut counter = vec![(0u64, 0u64); tables.len()];
while let Some(message) = iterator.receiver.recv().await {
while let Some((idx, message)) = iterator.receiver.recv().await {
match &message {
IngestionMessage::OperationEvent {
table_index, op, ..
Expand Down Expand Up @@ -374,13 +376,29 @@ async fn forward_message_to_pipeline(
}

// Send message to the pipeline
if sender.send((port, message)).await.is_err() {
if sender
.send(SourceMessage {
id: idx,
port,
message,
})
.await
.is_err()
{
break;
}
}
IngestionMessage::TransactionInfo(_) => {
// For transaction level messages, we can send to any port.
if sender.send((ports[0], message)).await.is_err() {
if sender
.send(SourceMessage {
id: idx,
port: ports[0],
message,
})
.await
.is_err()
{
break;
}
}
Expand Down
10 changes: 5 additions & 5 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use dozer_core::{
node::{PortHandle, Sink, SinkFactory},
DEFAULT_PORT_HANDLE,
};
use dozer_types::log::debug;
use dozer_types::{
chrono::Local,
errors::internal::BoxedError,
log::{info, warn},
node::OpIdentifier,
types::{FieldType, Operation, Schema, TableOperation},
};
use dozer_types::{log::debug, node::SourceState};

use crate::async_trait::async_trait;

Expand Down Expand Up @@ -179,15 +179,15 @@ impl Sink for DummySink {
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
fn get_source_state_data(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
fn get_source_state(&mut self) -> Result<SourceState, BoxedError> {
Ok(SourceState::NotStarted)
}
}
50 changes: 28 additions & 22 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use daggy::{petgraph::visit::IntoNodeIdentifiers, NodeIndex};
use dozer_types::{
log::warn,
node::{NodeHandle, OpIdentifier},
node::{NodeHandle, SourceState},
};

use crate::{
Expand All @@ -31,7 +31,7 @@ pub struct NodeType {
pub enum NodeKind {
Source {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
last_checkpoint: SourceState,
},
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
Expand Down Expand Up @@ -76,8 +76,8 @@ impl BuilderDag {
// Build the sinks and load checkpoint.
let event_hub = EventHub::new(event_hub_capacity);
let mut graph = daggy::Dag::new();
let mut source_states = HashMap::new();
let mut source_op_ids = HashMap::new();
let mut source_state_data = HashMap::new();
let mut source_states: HashMap<NodeHandle, SourceState> = HashMap::new();
let mut source_id_to_sinks = HashMap::<NodeHandle, Vec<NodeIndex>>::new();
let mut node_index_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
for (node_index, node) in nodes.iter_mut().enumerate() {
Expand All @@ -99,9 +99,9 @@ impl BuilderDag {
.await
.map_err(ExecutionError::Factory)?;

let state = sink.get_source_state().map_err(ExecutionError::Sink)?;
let state = sink.get_source_state_data().map_err(ExecutionError::Sink)?;
if let Some(state) = state {
match source_states.entry(source.clone()) {
match source_state_data.entry(source.clone()) {
Entry::Occupied(entry) => {
if entry.get() != &state {
return Err(ExecutionError::SourceStateConflict(source));
Expand All @@ -113,16 +113,18 @@ impl BuilderDag {
}
}

let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?;
if let Some(op_id) = op_id {
match source_op_ids.entry(source.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() = op_id.min(*entry.get());
}
Entry::Vacant(entry) => {
entry.insert(op_id);
let resume_state = sink.get_source_state().map_err(ExecutionError::Sink)?;
match source_states.entry(source.clone()) {
Entry::Occupied(mut entry) => {
if let Some(merged) = entry.get().clone().merge(resume_state) {
*entry.get_mut() = merged;
} else {
return Err(ExecutionError::ResumeStateConflict(source));
}
}
Entry::Vacant(entry) => {
entry.insert(resume_state);
}
}

let new_node_index = graph.add_node(NodeType {
Expand Down Expand Up @@ -151,7 +153,7 @@ impl BuilderDag {
.remove(&node_index)
.expect("we collected all output schemas"),
event_hub.clone(),
source_states.remove(&node.handle),
source_state_data.remove(&node.handle),
)
.map_err(ExecutionError::Factory)?;

Expand All @@ -163,23 +165,27 @@ impl BuilderDag {
let mut checkpoint = None;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let sink_handle = &sink.handle;
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
sink.set_source_state_data(&state)
.map_err(ExecutionError::Sink)?;
if let Some(sink_checkpoint) = source_op_ids.remove(sink_handle) {
checkpoint =
Some(checkpoint.unwrap_or(sink_checkpoint).min(sink_checkpoint));
}
let resume_state = source_states.remove(&node.handle);
checkpoint =
match (checkpoint, resume_state) {
(None, new) => new,
(old, None) => old,
(Some(old), Some(new)) => Some(old.merge(new).ok_or(
ExecutionError::ResumeStateConflict(node.handle.clone()),
)?),
};
}

NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint: checkpoint,
last_checkpoint: checkpoint.unwrap_or(SourceState::NotStarted),
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub enum ExecutionError {
Sink(#[source] BoxedError),
#[error("State of {0} is not consistent across sinks")]
SourceStateConflict(NodeHandle),
#[error("Resume state of {0} is not consistent across sinks")]
ResumeStateConflict(NodeHandle),
#[error("File system error {0:?}: {1}")]
FileSystemError(PathBuf, #[source] std::io::Error),
#[error("Checkpoint writer thread panicked")]
Expand Down
35 changes: 18 additions & 17 deletions dozer-core/src/executor/source_node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::{fmt::Debug, future::Future, pin::pin, sync::Arc, time::SystemTime};

use daggy::petgraph::visit::IntoNodeIdentifiers;
use dozer_types::{
log::debug, models::ingestion_types::TransactionInfo, node::OpIdentifier, types::TableOperation,
};
use dozer_types::{log::debug, models::ingestion_types::TransactionInfo, types::TableOperation};
use dozer_types::{models::ingestion_types::IngestionMessage, node::SourceState};
use futures::{future::Either, StreamExt};
use tokio::{
runtime::Runtime,
sync::mpsc::{channel, Receiver, Sender},
};

use crate::node::SourceMessage;
use crate::{
builder_dag::NodeKind,
epoch::Epoch,
errors::ExecutionError,
executor_operation::ExecutorOperation,
forwarder::ChannelManager,
node::{PortHandle, Source},
builder_dag::NodeKind, epoch::Epoch, errors::ExecutionError,
executor_operation::ExecutorOperation, forwarder::ChannelManager, node::Source,
};

use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
Expand All @@ -30,7 +25,7 @@ pub struct SourceNode<F> {
/// Structs for running a source.
source_runners: Vec<SourceRunner>,
/// Receivers from sources.
receivers: Vec<Receiver<(PortHandle, IngestionMessage)>>,
receivers: Vec<Receiver<SourceMessage>>,
/// The current epoch id.
epoch_id: u64,
/// The shutdown future.
Expand Down Expand Up @@ -68,7 +63,12 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
let next = next.expect("We return just when the stream ends");
self.shutdown = shutdown;
let index = next.0;
let Some((port, message)) = next.1 else {
let Some(SourceMessage {
id: message_id,
port,
message,
}) = next.1
else {
debug!("[{}] quit", self.sources[index].channel_manager.owner().id);
match self.runtime.block_on(
handles[index]
Expand All @@ -92,17 +92,17 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
let source = &mut self.sources[index];
match message {
IngestionMessage::OperationEvent { op, id, .. } => {
source.state = SourceState::NonRestartable;
source.state.set(SourceState::Started);
source
.channel_manager
.send_op(TableOperation { op, id, port })?;
}
IngestionMessage::TransactionInfo(info) => match info {
TransactionInfo::Commit { id, source_time } => {
if let Some(id) = id {
source.state = SourceState::Restartable(id);
source.state.set(SourceState::Restartable(id));
} else {
source.state = SourceState::NonRestartable;
source.state.set(SourceState::Started);
}

let source_states = Arc::new(
Expand All @@ -117,7 +117,8 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
.collect(),
);
let mut epoch =
Epoch::new(self.epoch_id, source_states, SystemTime::now());
Epoch::new(self.epoch_id, source_states, SystemTime::now())
.with_originating_msg(message_id);
if let Some(st) = source_time {
epoch = epoch.with_source_time(st);
}
Expand Down Expand Up @@ -155,8 +156,8 @@ struct RunningSource {
#[derive(Debug)]
struct SourceRunner {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: SourceState,
sender: Sender<SourceMessage>,
}

/// Returns if the operation is sent successfully.
Expand Down
41 changes: 40 additions & 1 deletion dozer-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,46 @@ pub mod shutdown;
pub use tokio;

#[cfg(test)]
pub mod tests;
mod tests;

pub mod test_utils {
use std::sync::atomic::AtomicUsize;

use dozer_types::{models::ingestion_types::IngestionMessage, types::PortHandle};
use tokio::sync::mpsc::Sender;

use crate::node::SourceMessage;

pub struct CountingSender {
count: AtomicUsize,
sender: Sender<SourceMessage>,
}

impl CountingSender {
pub fn new(sender: Sender<SourceMessage>) -> Self {
Self {
count: 0.into(),
sender,
}
}

pub async fn send(
&self,
port: PortHandle,
message: IngestionMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<SourceMessage>> {
let idx = self.count.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
self.sender
.send(SourceMessage {
id: idx,
port,
message,
})
.await?;
Ok(())
}
}
}

pub use daggy::{self, petgraph};
pub use dozer_types::{epoch, event};
Loading
Loading