Skip to content

Commit

Permalink
refactor: Change OpIdentifier to RestartableState (#2263)
Browse files Browse the repository at this point in the history
* refactor: Change `OpIdentifier` to `RestartableState`

* clippy fix

* fmt
  • Loading branch information
chubei authored Jan 3, 2024
1 parent 0dc64b4 commit 0455c07
Show file tree
Hide file tree
Showing 56 changed files with 277 additions and 306 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/cli/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn load_config(
ignore_pipe: bool,
) -> Result<(Config, Vec<String>), CliError> {
let read_stdin = atty::isnt(Stream::Stdin) && !ignore_pipe;
let first_config_path = config_url_or_paths.get(0);
let first_config_path = config_url_or_paths.first();
match first_config_path {
None => Err(ConfigurationFilePathNotProvided),
Some(path) => {
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ impl Source for ConnectorSource {
.iter()
.zip(&self.ports)
.map(|(table, port)| {
let checkpoint = last_checkpoint.get(port).copied().flatten();
let state = last_checkpoint.get(port).cloned().flatten();
TableToIngest {
schema: table.schema.clone(),
name: table.name.clone(),
column_names: table.column_names.clone(),
checkpoint,
state,
}
})
.collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl BuilderDag {
last_checkpoint_by_name
.as_mut()
.and_then(|last_checkpoint| {
last_checkpoint.remove(&port_name).flatten()
last_checkpoint.remove(&port_name).flatten().cloned()
}),
);
}
Expand Down
10 changes: 5 additions & 5 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
node::{NodeHandle, OpIdentifier, SourceStates, TableState},
node::{NodeHandle, RestartableState, SourceStates, TableState},
parking_lot::Mutex,
tonic::codegen::tokio_stream::StreamExt,
types::Field,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl OptionCheckpoint {
pub fn get_source_state(
&self,
node_handle: &NodeHandle,
) -> Result<Option<HashMap<String, Option<OpIdentifier>>>, ExecutionError> {
) -> Result<Option<HashMap<String, Option<&RestartableState>>>, ExecutionError> {
let Some(checkpoint) = self.checkpoint.as_ref() else {
return Ok(None);
};
Expand All @@ -124,17 +124,17 @@ impl OptionCheckpoint {

let mut result = HashMap::new();
for (table_name, state) in source_state {
let id = match state {
let state = match state {
TableState::NotStarted => None,
TableState::NonRestartable => {
return Err(ExecutionError::SourceCannotRestart {
source_name: node_handle.clone(),
table_name: table_name.clone(),
});
}
TableState::Restartable(id) => Some(*id),
TableState::Restartable(state) => Some(state),
};
result.insert(table_name.clone(), id);
result.insert(table_name.clone(), state);
}
Ok(Some(result))
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ impl SourceChannelManager {
request_termination: bool,
) -> Result<bool, ExecutionError> {
match message {
IngestionMessage::OperationEvent { op, id, .. } => {
IngestionMessage::OperationEvent { op, state, .. } => {
let port_name = self.port_names[&port].clone();
self.current_op_ids.insert(
port_name,
if let Some(id) = id {
TableState::Restartable(id)
if let Some(state) = state {
TableState::Restartable(state)
} else {
TableState::NonRestartable
},
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};

use dozer_log::storage::{Object, Queue};
use dozer_types::errors::internal::BoxedError;
use dozer_types::node::OpIdentifier;
use dozer_types::node::RestartableState;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::tonic::async_trait;
use dozer_types::types::Schema;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub trait SourceFactory: Send + Sync + Debug {
) -> Result<Box<dyn Source>, BoxedError>;
}

pub type SourceState = HashMap<PortHandle, Option<OpIdentifier>>;
pub type SourceState = HashMap<PortHandle, Option<RestartableState>>;

pub trait Source: Send + Sync + Debug {
fn start(
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use dozer_log::tokio;
use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::{NodeHandle, OpIdentifier};
use dozer_types::node::NodeHandle;
use dozer_types::tonic::async_trait;
use dozer_types::types::{
Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Source for ErrGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
GENERATOR_SOURCE_OUTPUT_PORT,
)?;
Expand Down
22 changes: 10 additions & 12 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFacto
use crate::DEFAULT_PORT_HANDLE;
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::OpIdentifier;
use dozer_types::types::{
Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
};
Expand Down Expand Up @@ -96,15 +95,14 @@ impl Source for GeneratorSource {
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let start = last_checkpoint
.values()
.copied()
.next()
.flatten()
.unwrap_or(OpIdentifier::new(0, 0))
.txid;
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);

for n in start + 1..(start + self.count + 1) {
for n in start..(start + self.count) {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand All @@ -114,7 +112,7 @@ impl Source for GeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
GENERATOR_SOURCE_OUTPUT_PORT,
)?;
Expand Down Expand Up @@ -237,7 +235,7 @@ impl Source for DualPortGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
)?;
Expand All @@ -250,7 +248,7 @@ impl Source for DualPortGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
)?;
Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/connector/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ mod tests {
.handle_message(IngestionMessage::OperationEvent {
table_index: 0,
op: operation.clone(),
id: None,
state: None,
})
.await
.unwrap();
ingestor
.handle_message(IngestionMessage::OperationEvent {
table_index: 0,
op: operation2.clone(),
id: None,
state: None,
})
.await
.unwrap();
Expand All @@ -121,7 +121,7 @@ mod tests {
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None
state: None
},
msg
);
Expand Down
8 changes: 4 additions & 4 deletions dozer-ingestion/connector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;

use dozer_types::errors::internal::BoxedError;
use dozer_types::node::OpIdentifier;
use dozer_types::node::RestartableState;
use dozer_types::serde;
use dozer_types::serde::{Deserialize, Serialize};
pub use dozer_types::tonic::async_trait;
Expand Down Expand Up @@ -146,8 +146,8 @@ pub struct TableToIngest {
pub name: String,
/// The column names to be mapped.
pub column_names: Vec<String>,
/// The checkpoint to start after.
pub checkpoint: Option<OpIdentifier>,
/// The state to restart after.
pub state: Option<RestartableState>,
}

impl TableToIngest {
Expand All @@ -156,7 +156,7 @@ impl TableToIngest {
schema: table_info.schema,
name: table_info.name,
column_names: table_info.column_names,
checkpoint: None,
state: None,
}
}
}
4 changes: 2 additions & 2 deletions dozer-ingestion/deltalake/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl DeltaLakeReader {
table: &TableToIngest,
ingestor: &Ingestor,
) -> Result<(), BoxedError> {
assert!(table.checkpoint.is_none());
assert!(table.state.is_none());

let table_path = table_path(&self.config, &table.name)?;
let ctx = SessionContext::new();
Expand Down Expand Up @@ -75,7 +75,7 @@ impl DeltaLakeReader {
lifetime: None,
},
},
id: None,
state: None,
})
.await
.unwrap();
Expand Down
27 changes: 21 additions & 6 deletions dozer-ingestion/dozer/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dozer_ingestion_connector::{
default_buffer_size, default_log_batch_size, default_timeout, IngestionMessage,
NestedDozerConfig, NestedDozerLogOptions,
},
node::OpIdentifier,
node::RestartableState,
serde_json,
tonic::{async_trait, transport::Channel},
types::{FieldType, Operation, Record, Schema},
Expand Down Expand Up @@ -208,10 +208,11 @@ async fn read_table(
reader_builder: LogReaderBuilder,
sender: Sender<IngestionMessage>,
) -> Result<(), NestedDozerConnectorError> {
let starting_point = table_info
.checkpoint
.map(|checkpoint| checkpoint.seq_in_tx + 1)
.unwrap_or(0);
let state = table_info
.state
.map(|state| decode_state(&state))
.transpose()?;
let starting_point = state.map(|pos| pos + 1).unwrap_or(0);
let mut reader = reader_builder.build(starting_point);
let schema = reader.schema.schema.clone();
let map = SchemaMapper::new(schema, &table_info.column_names)?;
Expand Down Expand Up @@ -243,12 +244,26 @@ async fn read_table(
.send(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(0, op_and_pos.pos)),
state: Some(encode_state(op_and_pos.pos)),
})
.await;
}
}

fn encode_state(pos: u64) -> RestartableState {
pos.to_be_bytes().to_vec().into()
}

fn decode_state(state: &RestartableState) -> Result<u64, NestedDozerConnectorError> {
Ok(u64::from_be_bytes(
state
.0
.as_slice()
.try_into()
.map_err(|_| NestedDozerConnectorError::CorruptedState)?,
))
}

struct SchemaMapper {
source_schema: Schema,
fields: Vec<usize>,
Expand Down
3 changes: 3 additions & 0 deletions dozer-ingestion/dozer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use dozer_log::errors::{ReaderBuilderError, ReaderError};

#[derive(Error, Debug)]
enum NestedDozerConnectorError {
#[error("Failed to parse checkpoint state")]
CorruptedState,

#[error("Failed to connect to upstream dozer at {0}: {1:?}")]
ConnectionError(String, #[source] dozer_types::tonic::transport::Error),

Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/ethereum/src/log/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl EthLogConnector {
.map(|t| vec![H256::from_str(t).unwrap()])
.collect();
builder.topics(
topics.get(0).cloned(),
topics.first().cloned(),
topics.get(1).cloned(),
topics.get(2).cloned(),
topics.get(3).cloned(),
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/ethereum/src/log/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn decode_event(
// Topics 0, 1, 2 should be name, buyer, seller in most cases
let name = log
.topics
.get(0)
.first()
.expect("name is expected")
.to_owned()
.to_string();
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/ethereum/src/log/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn process_log(details: Arc<EthDetails<'_>>, msg: Log) {
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await
.is_err()
Expand All @@ -245,7 +245,7 @@ async fn process_log(details: Arc<EthDetails<'_>>, msg: Log) {
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await;
} else {
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/ethereum/src/trace/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn run(
.handle_message(IngestionMessage::OperationEvent {
table_index: 0, // We have only one table
op,
id: None,
state: None,
})
.await
.is_err()
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/grpc/src/adapter/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub async fn handle_message(
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await
.is_err()
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/grpc/src/adapter/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn handle_message(
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/javascript/src/js_extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> {
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None,
state: None,
}
}
};
Expand Down
Loading

0 comments on commit 0455c07

Please sign in to comment.