Skip to content

Commit

Permalink
refactor: Make source state source wide instead of per table (#2344)
Browse files Browse the repository at this point in the history
* refactor: Make source state source wide instead of per table

* refactor: Separate conneciton level state and operation level state

* refactor: Store binlog position in tuple

* refactor: Fetch prefix from mysql binlogs list

* Dont store . in prefix

---------

Co-authored-by: Karolis Gudiškis <[email protected]>
  • Loading branch information
chubei and karolisg authored Jan 24, 2024
1 parent 026e273 commit c328376
Show file tree
Hide file tree
Showing 56 changed files with 668 additions and 588 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<'a> PipelineBuilder<'a> {

let mut connector_map = HashMap::new();
for connection in self.connections {
let connector = get_connector(runtime.clone(), connection.clone())
let connector = get_connector(runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

if let Some(info_table) = get_connector_info_table(connection) {
Expand Down
59 changes: 26 additions & 33 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState,
};
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_ingestion::{
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo, TableToIngest,
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo,
};
use dozer_ingestion::{IngestionConfig, Ingestor};

Expand All @@ -12,7 +10,7 @@ use dozer_types::errors::internal::BoxedError;
use dozer_types::log::{error, info};
use dozer_types::models::connection::Connection;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::parking_lot::Mutex;
use dozer_types::node::OpIdentifier;
use dozer_types::thiserror::{self, Error};
use dozer_types::tracing::{span, Level};
use dozer_types::types::{Operation, Schema, SourceDefinition};
Expand Down Expand Up @@ -46,10 +44,9 @@ pub enum ConnectorSourceFactoryError {

#[derive(Debug)]
pub struct ConnectorSourceFactory {
connection_name: String,
connection: Connection,
runtime: Arc<Runtime>,
tables: Vec<Table>,
/// Will be moved to `ConnectorSource` in `build`.
connector: Mutex<Option<Box<dyn Connector>>>,
labels: LabelsAndProgress,
shutdown: ShutdownReceiver,
}
Expand All @@ -66,9 +63,7 @@ impl ConnectorSourceFactory {
labels: LabelsAndProgress,
shutdown: ShutdownReceiver,
) -> Result<Self, ConnectorSourceFactoryError> {
let connection_name = connection.name.clone();

let connector = get_connector(runtime.clone(), connection)
let connector = get_connector(runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

// Fill column names if not provided.
Expand Down Expand Up @@ -116,9 +111,9 @@ impl ConnectorSourceFactory {
}

Ok(Self {
connection_name,
connection,
runtime,
tables,
connector: Mutex::new(Some(connector)),
labels,
shutdown,
})
Expand All @@ -138,7 +133,7 @@ impl SourceFactory for ConnectorSourceFactory {
// Add source information to the schema.
for field in &mut schema.fields {
field.source = SourceDefinition::Table {
connection: self.connection_name.clone(),
connection: self.connection.name.clone(),
name: table_name.clone(),
};
}
Expand Down Expand Up @@ -174,35 +169,27 @@ impl SourceFactory for ConnectorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
mut last_checkpoint: SourceState,
state: Option<Vec<u8>>,
) -> Result<Box<dyn Source>, BoxedError> {
// Construct the tables to ingest.
// Construct table info.
let tables = self
.tables
.iter()
.map(|table| {
let state = last_checkpoint.remove(&table.port).flatten();
TableToIngest {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
state,
}
.map(|table| TableInfo {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
})
.collect();
let ports = self.tables.iter().map(|table| table.port).collect();

let connector = self
.connector
.lock()
.take()
.expect("ConnectorSource was already built");
let connector = get_connector(self.runtime.clone(), self.connection.clone(), state)?;

Ok(Box::new(ConnectorSource {
tables,
ports,
connector,
connection_name: self.connection_name.clone(),
connection_name: self.connection.name.clone(),
labels: self.labels.clone(),
shutdown: self.shutdown.clone(),
ingestion_config: IngestionConfig::default(),
Expand All @@ -212,7 +199,7 @@ impl SourceFactory for ConnectorSourceFactory {

#[derive(Debug)]
pub struct ConnectorSource {
tables: Vec<TableToIngest>,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
connector: Box<dyn Connector>,
connection_name: String,
Expand All @@ -223,9 +210,14 @@ pub struct ConnectorSource {

#[async_trait]
impl Source for ConnectorSource {
async fn serialize_state(&self) -> Result<Vec<u8>, BoxedError> {
self.connector.serialize_state().await
}

async fn start(
&self,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone());
let connection_name = self.connection_name.clone();
Expand Down Expand Up @@ -254,7 +246,8 @@ impl Source for ConnectorSource {
eprintln!("Aborted connector {}", name);
});
let result = Abortable::new(
self.connector.start(&ingestor, self.tables.clone()),
self.connector
.start(&ingestor, self.tables.clone(), last_checkpoint),
abort_registration,
)
.await;
Expand All @@ -281,7 +274,7 @@ async fn forward_message_to_pipeline(
mut iterator: IngestionIterator,
sender: Sender<(PortHandle, IngestionMessage)>,
connection_name: String,
tables: Vec<TableToIngest>,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
labels: LabelsAndProgress,
) {
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ impl SimpleOrchestrator {
) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
let mut schema_map = HashMap::new();
for connection in &self.config.connections {
let connector = get_connector(self.runtime.clone(), connection.clone())
// We're not really going to start ingestion, so passing `None` as state here is OK.
let connector = get_connector(self.runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;
let schema_tuples = connector
.list_all_schemas()
Expand Down
29 changes: 11 additions & 18 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daggy::{
petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences},
NodeIndex,
};
use dozer_types::node::NodeHandle;
use dozer_types::node::{NodeHandle, OpIdentifier};

use crate::{
checkpoint::OptionCheckpoint,
Expand All @@ -26,7 +26,10 @@ pub struct NodeType {
#[derive(Debug)]
/// Node kind, source, processor or sink. Source has a checkpoint to start from.
pub enum NodeKind {
Source(Box<dyn Source>),
Source {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
},
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
}
Expand Down Expand Up @@ -69,32 +72,22 @@ impl BuilderDag {
let node = node.weight;
let node = match node.kind {
DagNodeKind::Source(source) => {
let mut last_checkpoint_by_name = checkpoint.get_source_state(&node.handle)?;
let mut last_checkpoint = HashMap::new();
for port_def in source.get_output_ports() {
let port_name = source.get_output_port_name(&port_def.handle);
last_checkpoint.insert(
port_def.handle,
last_checkpoint_by_name
.as_mut()
.and_then(|last_checkpoint| {
last_checkpoint.remove(&port_name).flatten().cloned()
}),
);
}

let source_state = checkpoint.get_source_state(&node.handle)?;
let source = source
.build(
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
last_checkpoint,
source_state.map(|state| state.0.to_vec()),
)
.map_err(ExecutionError::Factory)?;

NodeType {
handle: node.handle,
kind: NodeKind::Source(source),
kind: NodeKind::Source {
source,
last_checkpoint: source_state.map(|state| state.1),
},
}
}
DagNodeKind::Processor(processor) => {
Expand Down
30 changes: 11 additions & 19 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::Deref, sync::Arc};
use std::{ops::Deref, sync::Arc};

use dozer_log::{
camino::Utf8Path,
Expand All @@ -12,7 +12,7 @@ use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
node::{NodeHandle, RestartableState, SourceStates, TableState},
node::{NodeHandle, OpIdentifier, SourceState, SourceStates},
parking_lot::Mutex,
tonic::codegen::tokio_stream::StreamExt,
types::Field,
Expand Down Expand Up @@ -114,29 +114,21 @@ impl OptionCheckpoint {
pub fn get_source_state(
&self,
node_handle: &NodeHandle,
) -> Result<Option<HashMap<String, Option<&RestartableState>>>, ExecutionError> {
) -> Result<Option<(&[u8], OpIdentifier)>, ExecutionError> {
let Some(checkpoint) = self.checkpoint.as_ref() else {
return Ok(None);
};
let Some(source_state) = checkpoint.source_states.get(node_handle) else {
let Some(state) = checkpoint.source_states.get(node_handle) else {
return Ok(None);
};

let mut result = HashMap::new();
for (table_name, state) in source_state {
let state = match state {
TableState::NotStarted => None,
TableState::NonRestartable => {
return Err(ExecutionError::SourceCannotRestart {
source_name: node_handle.clone(),
table_name: table_name.clone(),
});
}
TableState::Restartable(state) => Some(state),
};
result.insert(table_name.clone(), state);
match state {
SourceState::NotStarted => Ok(None),
SourceState::NonRestartable => {
Err(ExecutionError::SourceCannotRestart(node_handle.clone()))
}
SourceState::Restartable { state, checkpoint } => Ok(Some((state, *checkpoint))),
}
Ok(Some(result))
}

pub async fn load_processor_data(
Expand Down Expand Up @@ -374,7 +366,7 @@ pub async fn create_checkpoint_factory_for_test(
let epoch_id = 42;
let source_states: SourceStates = [(
NodeHandle::new(Some(1), "id".to_string()),
Default::default(),
SourceState::NotStarted,
)]
.into_iter()
.collect();
Expand Down
19 changes: 8 additions & 11 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::log::info;
use dozer_types::node::{NodeHandle, SourceStates, TableState};
use dozer_types::node::{NodeHandle, SourceState, SourceStates};
use dozer_types::parking_lot::Mutex;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::{Arc, Barrier};
use std::thread::sleep;
Expand Down Expand Up @@ -162,7 +161,7 @@ impl EpochManager {
/// - `request_commit`: Whether the source wants to commit. The `EpochManager` checks if any source wants to commit and returns `Some` if so.
pub fn wait_for_epoch_close(
&self,
source_state: (NodeHandle, HashMap<String, TableState>),
source_state: (NodeHandle, SourceState),
request_termination: bool,
request_commit: bool,
) -> ClosedEpoch {
Expand Down Expand Up @@ -300,11 +299,9 @@ impl EpochManager {
}

fn is_restartable(source_states: &SourceStates) -> bool {
source_states.values().all(|table_states| {
table_states
.values()
.all(|table_state| table_state != &TableState::NonRestartable)
})
source_states
.values()
.all(|source_state| source_state != &SourceState::NonRestartable)
}

#[cfg(test)]
Expand Down Expand Up @@ -336,7 +333,7 @@ mod tests {
epoch_manager: &EpochManager,
termination_gen: &(impl Fn(u16) -> bool + Sync),
commit_gen: &(impl Fn(u16) -> bool + Sync),
source_state_gen: &(impl Fn(u16) -> (NodeHandle, HashMap<String, TableState>) + Sync),
source_state_gen: &(impl Fn(u16) -> (NodeHandle, SourceState) + Sync),
) -> ClosedEpoch {
scope(|scope| {
let handles = (0..NUM_THREADS)
Expand Down Expand Up @@ -368,10 +365,10 @@ mod tests {
})
}

fn generate_source_state(index: u16) -> (NodeHandle, HashMap<String, TableState>) {
fn generate_source_state(index: u16) -> (NodeHandle, SourceState) {
(
NodeHandle::new(Some(index), index.to_string()),
Default::default(),
SourceState::NotStarted,
)
}

Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ pub enum ExecutionError {
CheckpointedLogReader(#[from] CheckpointedLogReaderError),
#[error("Cannot deserialize checkpoint: {0}")]
CorruptedCheckpoint(#[source] bincode::error::DecodeError),
#[error("Table {table_name} of source {source_name} cannot restart. You have to clean data from previous runs by running `dozer clean`")]
SourceCannotRestart {
source_name: NodeHandle,
table_name: String,
},
#[error("Source {0} cannot restart. You have to clean data from previous runs by running `dozer clean`")]
SourceCannotRestart(NodeHandle),
#[error("Failed to create checkpoint: {0}")]
FailedToCreateCheckpoint(BoxedError),
#[error("Failed to serialize record writer: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl DagExecutor {
running.clone(),
runtime.clone(),
)
.await;
.await?;
join_handles.push(start_source(source_node)?);
}
NodeKind::Processor(_) => {
Expand Down
Loading

0 comments on commit c328376

Please sign in to comment.