Skip to content

Commit

Permalink
chore: Move last_checkpoint from Source::start to `SourceFactory:…
Browse files Browse the repository at this point in the history
…:build` (#2264)
  • Loading branch information
chubei authored Jan 5, 2024
1 parent aeb48bf commit ac4625d
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 90 deletions.
46 changes: 17 additions & 29 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,20 @@ impl SourceFactory for ConnectorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
mut last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
// Construct the tables to ingest.
let tables = self
.tables
.iter()
.map(|table| TableInfo {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
.map(|table| {
let state = last_checkpoint.remove(&table.port).flatten();
TableToIngest {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
state,
}
})
.collect();
let ports = self.tables.iter().map(|table| table.port).collect();
Expand Down Expand Up @@ -221,7 +227,7 @@ impl SourceFactory for ConnectorSourceFactory {

#[derive(Debug)]
pub struct ConnectorSource {
tables: Vec<TableInfo>,
tables: Vec<TableToIngest>,
ports: Vec<PortHandle>,
connector: Box<dyn Connector>,
runtime: Arc<Runtime>,
Expand All @@ -235,11 +241,7 @@ pub struct ConnectorSource {
const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation";

impl Source for ConnectorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
thread::scope(|scope| {
describe_counter!(
SOURCE_OPERATION_COUNTER_NAME,
Expand All @@ -256,22 +258,6 @@ impl Source for ConnectorSource {
let shutdown_future = self.shutdown.create_shutdown_future();
let (abort_handle, abort_registration) = AbortHandle::new_pair();

// Construct the tables to ingest.
let tables = self
.tables
.iter()
.zip(&self.ports)
.map(|(table, port)| {
let state = last_checkpoint.get(port).cloned().flatten();
TableToIngest {
schema: table.schema.clone(),
name: table.name.clone(),
column_names: table.column_names.clone(),
state,
}
})
.collect::<Vec<_>>();

// Abort the connector when we shut down
// TODO: pass a `CancellationToken` to the connector to allow
// it to gracefully shut down.
Expand All @@ -281,9 +267,11 @@ impl Source for ConnectorSource {
abort_handle.abort();
eprintln!("Aborted connector {}", name);
});
let result =
Abortable::new(self.connector.start(&ingestor, tables), abort_registration)
.await;
let result = Abortable::new(
self.connector.start(&ingestor, self.tables.clone()),
abort_registration,
)
.await;
match result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
Expand Down
13 changes: 4 additions & 9 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
checkpoint::OptionCheckpoint,
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{Processor, Sink, Source, SourceState},
node::{Processor, Sink, Source},
NodeKind as DagNodeKind,
};

Expand All @@ -26,10 +26,7 @@ pub struct NodeType {
#[derive(Debug)]
/// Node kind, source, processor or sink. Source has a checkpoint to start from.
pub enum NodeKind {
Source {
source: Box<dyn Source>,
last_checkpoint: SourceState,
},
Source(Box<dyn Source>),
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
}
Expand Down Expand Up @@ -91,15 +88,13 @@ impl BuilderDag {
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
last_checkpoint,
)
.map_err(ExecutionError::Factory)?;

NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint,
},
kind: NodeKind::Source(source),
}
}
DagNodeKind::Processor(processor) => {
Expand Down
13 changes: 3 additions & 10 deletions dozer-core/src/executor/source_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
dag_schemas::EdgeKind,
errors::ExecutionError,
forwarder::SourceChannelManager,
node::{PortHandle, Source, SourceState},
node::{PortHandle, Source},
};

use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
Expand All @@ -35,8 +35,6 @@ pub struct SourceSenderNode {
node_handle: NodeHandle,
/// The source.
source: Box<dyn Source>,
/// Last checkpointed output data sequence numbers.
last_checkpoint: SourceState,
/// The forwarder that will be passed to the source for outputting data.
forwarder: InternalChannelSourceForwarder,
}
Expand All @@ -49,7 +47,7 @@ impl SourceSenderNode {

impl Node for SourceSenderNode {
fn run(mut self) -> Result<(), ExecutionError> {
let result = self.source.start(&mut self.forwarder, self.last_checkpoint);
let result = self.source.start(&mut self.forwarder);
debug!("[{}-sender] Quit", self.node_handle);
result.map_err(ExecutionError::Source)
}
Expand Down Expand Up @@ -144,11 +142,7 @@ pub async fn create_source_nodes(
panic!("Must pass in a node")
};
let node_handle = node.handle;
let NodeKind::Source {
source,
last_checkpoint,
} = node.kind
else {
let NodeKind::Source(source) = node.kind else {
panic!("Must pass in a source node");
};
let port_names = dag
Expand All @@ -171,7 +165,6 @@ pub async fn create_source_nodes(
let source_sender_node = SourceSenderNode {
node_handle: node_handle.clone(),
source,
last_checkpoint,
forwarder,
};

Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,14 @@ pub trait SourceFactory: Send + Sync + Debug {
fn build(
&self,
output_schemas: HashMap<PortHandle, Schema>,
last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError>;
}

pub type SourceState = HashMap<PortHandle, Option<RestartableState>>;

pub trait Source: Send + Sync + Debug {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError>;
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError>;
}

#[async_trait]
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::app::{App, AppPipeline, PipelineEntryPoint};
use crate::appsource::{AppSourceManager, AppSourceMappings};
use crate::checkpoint::create_checkpoint_for_test;
use crate::executor::DagExecutor;
use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory};
use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory, SourceState};
use crate::tests::dag_base_run::{
NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT,
};
Expand Down Expand Up @@ -40,6 +40,7 @@ impl SourceFactory for NoneSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/tests/dag_base_create_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::checkpoint::create_checkpoint_for_test;
use crate::executor::DagExecutor;
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};

Expand Down Expand Up @@ -60,6 +61,7 @@ impl SourceFactory for CreateErrSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
if self.panic {
panic!("Generated error");
Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl SourceFactory for ErrGeneratorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(ErrGeneratorSource {
count: self.count,
Expand All @@ -359,11 +360,7 @@ pub(crate) struct ErrGeneratorSource {
}

impl Source for ErrGeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in 1..(self.count + 1) {
if n == self.err_at {
return Err("Generated Error".to_string().into());
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/tests/dag_ports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_recordstore::ProcessorRecordStoreDeserializer;
Expand Down Expand Up @@ -38,6 +39,7 @@ impl SourceFactory for DynPortsSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
4 changes: 3 additions & 1 deletion dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dag_schemas::{DagHaveSchemas, DagSchemas};
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source,
SourceFactory,
SourceFactory, SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};

Expand Down Expand Up @@ -63,6 +63,7 @@ impl SourceFactory for TestUsersSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down Expand Up @@ -109,6 +110,7 @@ impl SourceFactory for TestCountriesSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
32 changes: 14 additions & 18 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,16 @@ impl SourceFactory for GeneratorSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);
Ok(Box::new(GeneratorSource {
start,
count: self.count,
running: self.running.clone(),
}))
Expand All @@ -85,24 +93,14 @@ impl SourceFactory for GeneratorSourceFactory {

#[derive(Debug)]
pub(crate) struct GeneratorSource {
start: u64,
count: u64,
running: Arc<AtomicBool>,
}

impl Source for GeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);

for n in start..(start + self.count) {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in self.start..(self.start + self.count) {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand Down Expand Up @@ -205,6 +203,7 @@ impl SourceFactory for DualPortGeneratorSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(DualPortGeneratorSource {
count: self.count,
Expand All @@ -220,11 +219,7 @@ pub(crate) struct DualPortGeneratorSource {
}

impl Source for DualPortGeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in 1..(self.count + 1) {
fw.send(
IngestionMessage::OperationEvent {
Expand Down Expand Up @@ -285,6 +280,7 @@ impl SourceFactory for ConnectivityTestSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
unimplemented!("This struct is for connectivity test, only output ports are defined")
}
Expand Down
11 changes: 4 additions & 7 deletions dozer-sql/src/tests/builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl SourceFactory for TestSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(TestSource {}))
}
Expand All @@ -106,12 +107,8 @@ impl SourceFactory for TestSourceFactory {
pub struct TestSource {}

impl Source for TestSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
for n in 0..10u64 {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for _ in 0..10 {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand All @@ -125,7 +122,7 @@ impl Source for TestSource {
),
]),
},
state: Some(n.to_be_bytes().to_vec().into()),
state: None,
},
DEFAULT_PORT_HANDLE,
)
Expand Down
Loading

0 comments on commit ac4625d

Please sign in to comment.