From ac4625dae91b9c2ca1d28b1e212b0e6bf62d1f33 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 5 Jan 2024 21:14:18 +0800 Subject: [PATCH] chore: Move `last_checkpoint` from `Source::start` to `SourceFactory::build` (#2264) --- dozer-cli/src/pipeline/connector_source.rs | 46 +++++++------------ dozer-core/src/builder_dag.rs | 13 ++---- dozer-core/src/executor/source_node.rs | 13 ++---- dozer-core/src/node.rs | 7 +-- dozer-core/src/tests/app.rs | 3 +- .../src/tests/dag_base_create_errors.rs | 2 + dozer-core/src/tests/dag_base_errors.rs | 7 +-- dozer-core/src/tests/dag_ports.rs | 2 + dozer-core/src/tests/dag_schemas.rs | 4 +- dozer-core/src/tests/sources.rs | 32 ++++++------- dozer-sql/src/tests/builder_test.rs | 11 ++--- dozer-tests/src/sql_tests/helper/pipeline.rs | 7 +-- 12 files changed, 57 insertions(+), 90 deletions(-) diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index 5fd3a3194d..0e2792efeb 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -181,14 +181,20 @@ impl SourceFactory for ConnectorSourceFactory { fn build( &self, _output_schemas: HashMap, + mut last_checkpoint: SourceState, ) -> Result, BoxedError> { + // Construct the tables to ingest. let tables = self .tables .iter() - .map(|table| TableInfo { - schema: table.schema_name.clone(), - name: table.name.clone(), - column_names: table.columns.clone(), + .map(|table| { + let state = last_checkpoint.remove(&table.port).flatten(); + TableToIngest { + schema: table.schema_name.clone(), + name: table.name.clone(), + column_names: table.columns.clone(), + state, + } }) .collect(); let ports = self.tables.iter().map(|table| table.port).collect(); @@ -221,7 +227,7 @@ impl SourceFactory for ConnectorSourceFactory { #[derive(Debug)] pub struct ConnectorSource { - tables: Vec, + tables: Vec, ports: Vec, connector: Box, runtime: Arc, @@ -235,11 +241,7 @@ pub struct ConnectorSource { const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation"; impl Source for ConnectorSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - last_checkpoint: SourceState, - ) -> Result<(), BoxedError> { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { thread::scope(|scope| { describe_counter!( SOURCE_OPERATION_COUNTER_NAME, @@ -256,22 +258,6 @@ impl Source for ConnectorSource { let shutdown_future = self.shutdown.create_shutdown_future(); let (abort_handle, abort_registration) = AbortHandle::new_pair(); - // Construct the tables to ingest. - let tables = self - .tables - .iter() - .zip(&self.ports) - .map(|(table, port)| { - let state = last_checkpoint.get(port).cloned().flatten(); - TableToIngest { - schema: table.schema.clone(), - name: table.name.clone(), - column_names: table.column_names.clone(), - state, - } - }) - .collect::>(); - // Abort the connector when we shut down // TODO: pass a `CancellationToken` to the connector to allow // it to gracefully shut down. @@ -281,9 +267,11 @@ impl Source for ConnectorSource { abort_handle.abort(); eprintln!("Aborted connector {}", name); }); - let result = - Abortable::new(self.connector.start(&ingestor, tables), abort_registration) - .await; + let result = Abortable::new( + self.connector.start(&ingestor, self.tables.clone()), + abort_registration, + ) + .await; match result { Ok(Ok(_)) => {} Ok(Err(e)) => { diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index a4d1d82b19..071ea2faef 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -10,7 +10,7 @@ use crate::{ checkpoint::OptionCheckpoint, dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType}, errors::ExecutionError, - node::{Processor, Sink, Source, SourceState}, + node::{Processor, Sink, Source}, NodeKind as DagNodeKind, }; @@ -26,10 +26,7 @@ pub struct NodeType { #[derive(Debug)] /// Node kind, source, processor or sink. Source has a checkpoint to start from. pub enum NodeKind { - Source { - source: Box, - last_checkpoint: SourceState, - }, + Source(Box), Processor(Box), Sink(Box), } @@ -91,15 +88,13 @@ impl BuilderDag { output_schemas .remove(&node_index) .expect("we collected all output schemas"), + last_checkpoint, ) .map_err(ExecutionError::Factory)?; NodeType { handle: node.handle, - kind: NodeKind::Source { - source, - last_checkpoint, - }, + kind: NodeKind::Source(source), } } DagNodeKind::Processor(processor) => { diff --git a/dozer-core/src/executor/source_node.rs b/dozer-core/src/executor/source_node.rs index 6f2a11644b..157ca16b97 100644 --- a/dozer-core/src/executor/source_node.rs +++ b/dozer-core/src/executor/source_node.rs @@ -17,7 +17,7 @@ use crate::{ dag_schemas::EdgeKind, errors::ExecutionError, forwarder::SourceChannelManager, - node::{PortHandle, Source, SourceState}, + node::{PortHandle, Source}, }; use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions}; @@ -35,8 +35,6 @@ pub struct SourceSenderNode { node_handle: NodeHandle, /// The source. source: Box, - /// Last checkpointed output data sequence numbers. - last_checkpoint: SourceState, /// The forwarder that will be passed to the source for outputting data. forwarder: InternalChannelSourceForwarder, } @@ -49,7 +47,7 @@ impl SourceSenderNode { impl Node for SourceSenderNode { fn run(mut self) -> Result<(), ExecutionError> { - let result = self.source.start(&mut self.forwarder, self.last_checkpoint); + let result = self.source.start(&mut self.forwarder); debug!("[{}-sender] Quit", self.node_handle); result.map_err(ExecutionError::Source) } @@ -144,11 +142,7 @@ pub async fn create_source_nodes( panic!("Must pass in a node") }; let node_handle = node.handle; - let NodeKind::Source { - source, - last_checkpoint, - } = node.kind - else { + let NodeKind::Source(source) = node.kind else { panic!("Must pass in a source node"); }; let port_names = dag @@ -171,7 +165,6 @@ pub async fn create_source_nodes( let source_sender_node = SourceSenderNode { node_handle: node_handle.clone(), source, - last_checkpoint, forwarder, }; diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 90b494d288..01bccb0fac 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -51,17 +51,14 @@ pub trait SourceFactory: Send + Sync + Debug { fn build( &self, output_schemas: HashMap, + last_checkpoint: SourceState, ) -> Result, BoxedError>; } pub type SourceState = HashMap>; pub trait Source: Send + Sync + Debug { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - last_checkpoint: SourceState, - ) -> Result<(), BoxedError>; + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError>; } #[async_trait] diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 7ba9abb365..be7290fd0d 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -2,7 +2,7 @@ use crate::app::{App, AppPipeline, PipelineEntryPoint}; use crate::appsource::{AppSourceManager, AppSourceMappings}; use crate::checkpoint::create_checkpoint_for_test; use crate::executor::DagExecutor; -use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory}; +use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory, SourceState}; use crate::tests::dag_base_run::{ NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT, }; @@ -40,6 +40,7 @@ impl SourceFactory for NoneSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/dag_base_create_errors.rs b/dozer-core/src/tests/dag_base_create_errors.rs index 8d146f153b..05e6a2b7da 100644 --- a/dozer-core/src/tests/dag_base_create_errors.rs +++ b/dozer-core/src/tests/dag_base_create_errors.rs @@ -2,6 +2,7 @@ use crate::checkpoint::create_checkpoint_for_test; use crate::executor::DagExecutor; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, + SourceState, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; @@ -60,6 +61,7 @@ impl SourceFactory for CreateErrSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { if self.panic { panic!("Generated error"); diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index d57db2bcfa..867ffa5acd 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -344,6 +344,7 @@ impl SourceFactory for ErrGeneratorSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { Ok(Box::new(ErrGeneratorSource { count: self.count, @@ -359,11 +360,7 @@ pub(crate) struct ErrGeneratorSource { } impl Source for ErrGeneratorSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - _checkpoint: SourceState, - ) -> Result<(), BoxedError> { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { for n in 1..(self.count + 1) { if n == self.err_at { return Err("Generated Error".to_string().into()); diff --git a/dozer-core/src/tests/dag_ports.rs b/dozer-core/src/tests/dag_ports.rs index b07bee9e4a..d1874d5194 100644 --- a/dozer-core/src/tests/dag_ports.rs +++ b/dozer-core/src/tests/dag_ports.rs @@ -1,5 +1,6 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, + SourceState, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_recordstore::ProcessorRecordStoreDeserializer; @@ -38,6 +39,7 @@ impl SourceFactory for DynPortsSourceFactory { fn build( &self, _input_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index b774d366f8..f2aef9902b 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -1,7 +1,7 @@ use crate::dag_schemas::{DagHaveSchemas, DagSchemas}; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source, - SourceFactory, + SourceFactory, SourceState, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; @@ -63,6 +63,7 @@ impl SourceFactory for TestUsersSourceFactory { fn build( &self, _input_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { todo!() } @@ -109,6 +110,7 @@ impl SourceFactory for TestCountriesSourceFactory { fn build( &self, _input_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 449ef66507..d7cc55f246 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -75,8 +75,16 @@ impl SourceFactory for GeneratorSourceFactory { fn build( &self, _input_schemas: HashMap, + last_checkpoint: SourceState, ) -> Result, BoxedError> { + let state = last_checkpoint.values().next().and_then(|state| { + state + .as_ref() + .map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap())) + }); + let start = state.map(|state| state + 1).unwrap_or(0); Ok(Box::new(GeneratorSource { + start, count: self.count, running: self.running.clone(), })) @@ -85,24 +93,14 @@ impl SourceFactory for GeneratorSourceFactory { #[derive(Debug)] pub(crate) struct GeneratorSource { + start: u64, count: u64, running: Arc, } impl Source for GeneratorSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - last_checkpoint: SourceState, - ) -> Result<(), BoxedError> { - let state = last_checkpoint.values().next().and_then(|state| { - state - .as_ref() - .map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap())) - }); - let start = state.map(|state| state + 1).unwrap_or(0); - - for n in start..(start + self.count) { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { + for n in self.start..(self.start + self.count) { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -205,6 +203,7 @@ impl SourceFactory for DualPortGeneratorSourceFactory { fn build( &self, _input_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { Ok(Box::new(DualPortGeneratorSource { count: self.count, @@ -220,11 +219,7 @@ pub(crate) struct DualPortGeneratorSource { } impl Source for DualPortGeneratorSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - _last_checkpoint: SourceState, - ) -> Result<(), BoxedError> { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { for n in 1..(self.count + 1) { fw.send( IngestionMessage::OperationEvent { @@ -285,6 +280,7 @@ impl SourceFactory for ConnectivityTestSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { unimplemented!("This struct is for connectivity test, only output ports are defined") } diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index e8cbdce76e..11fe027a6d 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -97,6 +97,7 @@ impl SourceFactory for TestSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { Ok(Box::new(TestSource {})) } @@ -106,12 +107,8 @@ impl SourceFactory for TestSourceFactory { pub struct TestSource {} impl Source for TestSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - _last_checkpoint: SourceState, - ) -> Result<(), BoxedError> { - for n in 0..10u64 { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { + for _ in 0..10 { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -125,7 +122,7 @@ impl Source for TestSource { ), ]), }, - state: Some(n.to_be_bytes().to_vec().into()), + state: None, }, DEFAULT_PORT_HANDLE, ) diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 2998fdf3cc..ce603273ae 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -107,6 +107,7 @@ impl SourceFactory for TestSourceFactory { fn build( &self, _output_schemas: HashMap, + _last_checkpoint: SourceState, ) -> Result, BoxedError> { Ok(Box::new(TestSource { name_to_port: self.name_to_port.to_owned(), @@ -122,11 +123,7 @@ pub struct TestSource { } impl Source for TestSource { - fn start( - &self, - fw: &mut dyn SourceChannelForwarder, - _last_checkpoint: SourceState, - ) -> Result<(), BoxedError> { + fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> { while let Ok(Some((schema_name, op))) = self.receiver.recv() { let port = self.name_to_port.get(&schema_name).expect("port not found"); fw.send(