Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move last_checkpoint from Source::start to SourceFactory::build #2264

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,20 @@ impl SourceFactory for ConnectorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
mut last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
// Construct the tables to ingest.
let tables = self
.tables
.iter()
.map(|table| TableInfo {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
.map(|table| {
let state = last_checkpoint.remove(&table.port).flatten();
TableToIngest {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
state,
}
})
.collect();
let ports = self.tables.iter().map(|table| table.port).collect();
Expand Down Expand Up @@ -221,7 +227,7 @@ impl SourceFactory for ConnectorSourceFactory {

#[derive(Debug)]
pub struct ConnectorSource {
tables: Vec<TableInfo>,
tables: Vec<TableToIngest>,
ports: Vec<PortHandle>,
connector: Box<dyn Connector>,
runtime: Arc<Runtime>,
Expand All @@ -235,11 +241,7 @@ pub struct ConnectorSource {
const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation";

impl Source for ConnectorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
thread::scope(|scope| {
describe_counter!(
SOURCE_OPERATION_COUNTER_NAME,
Expand All @@ -256,22 +258,6 @@ impl Source for ConnectorSource {
let shutdown_future = self.shutdown.create_shutdown_future();
let (abort_handle, abort_registration) = AbortHandle::new_pair();

// Construct the tables to ingest.
let tables = self
.tables
.iter()
.zip(&self.ports)
.map(|(table, port)| {
let state = last_checkpoint.get(port).cloned().flatten();
TableToIngest {
schema: table.schema.clone(),
name: table.name.clone(),
column_names: table.column_names.clone(),
state,
}
})
.collect::<Vec<_>>();

// Abort the connector when we shut down
// TODO: pass a `CancellationToken` to the connector to allow
// it to gracefully shut down.
Expand All @@ -281,9 +267,11 @@ impl Source for ConnectorSource {
abort_handle.abort();
eprintln!("Aborted connector {}", name);
});
let result =
Abortable::new(self.connector.start(&ingestor, tables), abort_registration)
.await;
let result = Abortable::new(
self.connector.start(&ingestor, self.tables.clone()),
abort_registration,
)
.await;
match result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
Expand Down
13 changes: 4 additions & 9 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
checkpoint::OptionCheckpoint,
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{Processor, Sink, Source, SourceState},
node::{Processor, Sink, Source},
NodeKind as DagNodeKind,
};

Expand All @@ -26,10 +26,7 @@ pub struct NodeType {
#[derive(Debug)]
/// Node kind, source, processor or sink. Source has a checkpoint to start from.
pub enum NodeKind {
Source {
source: Box<dyn Source>,
last_checkpoint: SourceState,
},
Source(Box<dyn Source>),
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
}
Expand Down Expand Up @@ -91,15 +88,13 @@ impl BuilderDag {
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
last_checkpoint,
)
.map_err(ExecutionError::Factory)?;

NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint,
},
kind: NodeKind::Source(source),
}
}
DagNodeKind::Processor(processor) => {
Expand Down
13 changes: 3 additions & 10 deletions dozer-core/src/executor/source_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
dag_schemas::EdgeKind,
errors::ExecutionError,
forwarder::SourceChannelManager,
node::{PortHandle, Source, SourceState},
node::{PortHandle, Source},
};

use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
Expand All @@ -35,8 +35,6 @@ pub struct SourceSenderNode {
node_handle: NodeHandle,
/// The source.
source: Box<dyn Source>,
/// Last checkpointed output data sequence numbers.
last_checkpoint: SourceState,
/// The forwarder that will be passed to the source for outputting data.
forwarder: InternalChannelSourceForwarder,
}
Expand All @@ -49,7 +47,7 @@ impl SourceSenderNode {

impl Node for SourceSenderNode {
fn run(mut self) -> Result<(), ExecutionError> {
let result = self.source.start(&mut self.forwarder, self.last_checkpoint);
let result = self.source.start(&mut self.forwarder);
debug!("[{}-sender] Quit", self.node_handle);
result.map_err(ExecutionError::Source)
}
Expand Down Expand Up @@ -144,11 +142,7 @@ pub async fn create_source_nodes(
panic!("Must pass in a node")
};
let node_handle = node.handle;
let NodeKind::Source {
source,
last_checkpoint,
} = node.kind
else {
let NodeKind::Source(source) = node.kind else {
panic!("Must pass in a source node");
};
let port_names = dag
Expand All @@ -171,7 +165,6 @@ pub async fn create_source_nodes(
let source_sender_node = SourceSenderNode {
node_handle: node_handle.clone(),
source,
last_checkpoint,
forwarder,
};

Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,14 @@ pub trait SourceFactory: Send + Sync + Debug {
fn build(
&self,
output_schemas: HashMap<PortHandle, Schema>,
last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError>;
}

pub type SourceState = HashMap<PortHandle, Option<RestartableState>>;

pub trait Source: Send + Sync + Debug {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError>;
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError>;
}

#[async_trait]
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::app::{App, AppPipeline, PipelineEntryPoint};
use crate::appsource::{AppSourceManager, AppSourceMappings};
use crate::checkpoint::create_checkpoint_for_test;
use crate::executor::DagExecutor;
use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory};
use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory, SourceState};
use crate::tests::dag_base_run::{
NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT,
};
Expand Down Expand Up @@ -40,6 +40,7 @@ impl SourceFactory for NoneSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/tests/dag_base_create_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::checkpoint::create_checkpoint_for_test;
use crate::executor::DagExecutor;
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};

Expand Down Expand Up @@ -60,6 +61,7 @@ impl SourceFactory for CreateErrSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
if self.panic {
panic!("Generated error");
Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl SourceFactory for ErrGeneratorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(ErrGeneratorSource {
count: self.count,
Expand All @@ -359,11 +360,7 @@ pub(crate) struct ErrGeneratorSource {
}

impl Source for ErrGeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in 1..(self.count + 1) {
if n == self.err_at {
return Err("Generated Error".to_string().into());
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/tests/dag_ports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_recordstore::ProcessorRecordStoreDeserializer;
Expand Down Expand Up @@ -38,6 +39,7 @@ impl SourceFactory for DynPortsSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
4 changes: 3 additions & 1 deletion dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dag_schemas::{DagHaveSchemas, DagSchemas};
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source,
SourceFactory,
SourceFactory, SourceState,
};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};

Expand Down Expand Up @@ -63,6 +63,7 @@ impl SourceFactory for TestUsersSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down Expand Up @@ -109,6 +110,7 @@ impl SourceFactory for TestCountriesSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
todo!()
}
Expand Down
32 changes: 14 additions & 18 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,16 @@ impl SourceFactory for GeneratorSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);
Ok(Box::new(GeneratorSource {
start,
count: self.count,
running: self.running.clone(),
}))
Expand All @@ -85,24 +93,14 @@ impl SourceFactory for GeneratorSourceFactory {

#[derive(Debug)]
pub(crate) struct GeneratorSource {
start: u64,
count: u64,
running: Arc<AtomicBool>,
}

impl Source for GeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);

for n in start..(start + self.count) {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in self.start..(self.start + self.count) {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand Down Expand Up @@ -205,6 +203,7 @@ impl SourceFactory for DualPortGeneratorSourceFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(DualPortGeneratorSource {
count: self.count,
Expand All @@ -220,11 +219,7 @@ pub(crate) struct DualPortGeneratorSource {
}

impl Source for DualPortGeneratorSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for n in 1..(self.count + 1) {
fw.send(
IngestionMessage::OperationEvent {
Expand Down Expand Up @@ -285,6 +280,7 @@ impl SourceFactory for ConnectivityTestSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
unimplemented!("This struct is for connectivity test, only output ports are defined")
}
Expand Down
11 changes: 4 additions & 7 deletions dozer-sql/src/tests/builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl SourceFactory for TestSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
_last_checkpoint: SourceState,
) -> Result<Box<dyn Source>, BoxedError> {
Ok(Box::new(TestSource {}))
}
Expand All @@ -106,12 +107,8 @@ impl SourceFactory for TestSourceFactory {
pub struct TestSource {}

impl Source for TestSource {
fn start(
&self,
fw: &mut dyn SourceChannelForwarder,
_last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
for n in 0..10u64 {
fn start(&self, fw: &mut dyn SourceChannelForwarder) -> Result<(), BoxedError> {
for _ in 0..10 {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand All @@ -125,7 +122,7 @@ impl Source for TestSource {
),
]),
},
state: Some(n.to_be_bytes().to_vec().into()),
state: None,
},
DEFAULT_PORT_HANDLE,
)
Expand Down
Loading
Loading