diff --git a/README.md b/README.md
index ff37b8fdcf..c971899975 100644
--- a/README.md
+++ b/README.md
@@ -1,88 +1,53 @@
## Overview
-Dozer is a **data platform for building, deploying and maintaining real-time data products.**
-It is ideal for companies with multiple databases, data warehouses and data lakes that are in need of combining, aggregating and transforming data in real time, and create customer facing or internal data applications.
-*Put it simply, Dozer empowers a single developer go from data sources to ready-made APIs in just a few minutes. All with just a with a simple configuration file.*
+Dozer is a **real time data movement tool leveraging CDC from various sources to multiple sinks.**
-## How it works
-Dozer pulls data from various sources like databases, data lakes, and data warehouses using Change Data Capture (CDC) and periodic polling mechanisms. This ensures up-to-date data ingestion in real-time or near-real-time.
-After capturing data, Dozer offers the possibility of combining, transforming and aggregating it
-using its own internal real-time transformation engine. It supports Streaming SQL, WebAssembly (coming soon) and TypeScript (coming soon), as well as ONNX for performing AI predictions in real-time.
-After processing, data is stored and indexed in a low-latency datastore (based on [LMDB](https://github.com/LMDB/lmdb)), queryable using REST and gRPC.
+Dozer is magnitudes of times faster than Debezium+Kafka and natively supports stateless transformations.
+Primarily used for moving data into warehouses. In our own application, we move data to **Clickhouse** and build data APIs and integration with LLMs.
## How to use it
-### ① Build
-A Dozer application consists of a YAML file that can be run locally using the Dozer Live UI or Dozer CLI. As YAML is edited,
-changes are immediately reflected on Dozer Live UI.
-
-### ② Test
-Dozer can run the entire infrastructure locally. You can inspect data flowing in in real time or use the built-it API explorer to query data through REST and gRPC. Dozer Live explorer also provides ready-made samples to integrate results into your front-end applications.
-
-### ③ Deploy
-Dozer applications can be self-hosted or deployed in the cloud with a single command. Dozer Cloud (coming soon) provides self-healing and monitoring capabilities, making sure your APIs are always available.
-## Supported Sources and Tranformation Engines
-Dozer currently supports a variety of source databases, data warehouses and object stores. Whenever possible, Dozer leverages Change Data Capture (CDC) to keep data always fresh. For sources that do not support CDC, periodic polling is used.
-Dozer transformations can be executed using Dozer's highly cutomizable streaming SQL engine, which provides UDF supports in WASM (coming soon), TypeScript (coming soon) and ONNX.
-Here is an overview of all supported source types and transformation engines:
-
-## Why Dozer ?
-As teams embark on the journey of implementing real-time data products, they invariably come across a host of challenges that can make the task seem daunting:
-1. **Integration with Various Systems**: Integrating with various data sources can present numerous technical hurdles and interoperability issues.
-2. **Managing Latency**: Ensuring low-latency data access, especially for customer-facing applications, can be a significant challenge.
-3. **Real-Time Data Transformation**: Managing real-time data transformations, especially when dealing with complex queries or large volumes of data, can be difficult and resource-intensive.
-4. **Maintaining Data Freshness**: Keeping the data up-to-date in real-time, particularly when it's sourced from multiple locations like databases, data lakes, or warehouses, can be a daunting task.
-4. **Scalability and High Availability**: Building a data application that can efficiently handle high-volume operations and remain reliable under heavy loads requires advanced architecture design and robust infrastructure.
-To address all the above issues, teams often find themselves stitching together multiple technologies and a significant amount of custom code. This could involve integrating diverse systems like Kafka for real-time data streaming, Redis for low-latency data access and caching, and Spark or Flink for processing and analyzing streaming data.
-
-The complexity of such a setup can become overwhelming. Ensuring that these different technologies communicate effectively, maintaining them, and handling potential failure points requires extensive effort and expertise.
-This is where Dozer steps in, aiming to dramatically simplify this process. Dozer is designed as an all-in-one backend solution that integrates the capabilities of these disparate technologies into a single, streamlined tool. By doing so, Dozer offers the capacity to build an end-to-end real-time data product without the need to manage multiple technologies and extensive custom code.
-Dozer's goal is to empower a single engineer or a small team of engineers to fully manage the entire lifecycle of a Data Product!
-## Getting Started
-Follow the links below to get started with Dozer:
-- [Installation](https://getdozer.io/docs/installation)
-- [Build a sample application using NY Taxi dataset](https://getdozer.io/docs/getting_started)
-For a more comprehensive list of samples check out our [GitHub Samples repo](https://github.com/getdozer/dozer-samples)
+Dozer runs with a single configuration file like the following:
+app_name: dozer-bench
+version: 1
+ - name: pg_1
+ config: !Postgres
+ user: user
+ password: postgres
+ host: localhost
+ port: 5432
+ database: customers
+ - name: customers
+ config: !Dummy
+ table_name: customers
+Full documentation can be found [here](https://github.com/getdozer/dozer/blob/main/dozer-types/src/models/config.rs#L15)
+## Supported Sources
+| Connector | Extraction | Resuming | Enterprise |
+| -------------------- | ---------- | -------- | ------------------- |
+| Postgres | ✅ | ✅ | ✅ |
+| MySQL | ✅ | ✅ | ✅ |
+| Snowflake | ✅ | ✅ | ✅ |
+| Kafka | ✅ | 🚧 | ✅ |
+| MongoDB | ✅ | 🎯 | ✅ |
+| Amazon S3 | ✅ | 🎯 | ✅ |
+| Google Cloud Storage | ✅ | 🎯 | ✅ |
+| **Oracle | ✅ | ✅ | **Enterprise Only** |
+| **Aerospike | ✅ | ✅ | **Enterprise Only** |
+## Supported Sinks
+| Database | Connectivity | Enterprise |
+| ---------- | ------------ | ------------------- |
+| Clickhouse | ✅ | |
+| Postgres | ✅ | |
+| MySQL | ✅ | |
+| Big Query | ✅ | |
+| Oracle | ✅ | **Enterprise Only** |
+| Aerospike | ✅ | **Enterprise Only** |
\ No newline at end of file
- .map(|table| {
- table
- .schema
- .clone()
- .unwrap_or_else(|| self.username.clone())
- })
- .collect::>();
- let table_columns =
- listing::TableColumn::list(&self.connection, &schemas.into_iter().collect::>())?;
- let mut table_to_columns = HashMap::<(String, String), Vec>::new();
- for table_column in table_columns {
- let table_pair = (table_column.owner, table_column.table_name);
- table_to_columns
- .entry(table_pair)
- .or_default()
- .push(table_column.column_name);
- }
- // Collect columns for requested tables.
- let mut result = vec![];
- for table in tables {
- let schema = table
- .schema
- .clone()
- .unwrap_or_else(|| self.username.clone());
- let table_pair = (schema, table.name.clone());
- let column_names = table_to_columns
- .remove(&table_pair)
- .ok_or_else(|| Error::TableNotFound(table.clone()))?;
- result.push(TableInfo {
- schema: table.schema,
- name: table.name,
- column_names,
- });
- }
- self.connection.commit()?;
- Ok(result)
- }
- pub fn get_schemas(
- &mut self,
- table_infos: &[TableInfo],
- ) -> Result>, Error> {
- // Collect all tables and columns.
- let schemas = table_infos
- .iter()
- .map(|table| {
- table
- .schema
- .clone()
- .unwrap_or_else(|| self.username.clone())
- })
- .collect::>()
- .into_iter()
- .collect::>();
- let table_columns = listing::TableColumn::list(&self.connection, &schemas)?;
- let constraint_columns =
- listing::ConstraintColumn::list(&self.connection, &schemas).unwrap();
- let constraints = listing::Constraint::list(&self.connection, &schemas).unwrap();
- let table_columns =
- join::join_columns_constraints(table_columns, constraint_columns, constraints);
- // Map all the columns.
- let mut table_columns = mapping::map_tables(table_columns);
- // Decide `SourceSchemaResult` for each `table_info`
- let mut result = vec![];
- for table_info in table_infos {
- let schema = table_info
- .schema
- .clone()
- .unwrap_or_else(|| self.username.clone());
- let table_pair = (schema, table_info.name.clone());
- let columns = table_columns.remove(&table_pair).ok_or_else(|| {
- Error::TableNotFound(TableIdentifier {
- schema: table_info.schema.clone(),
- name: table_info.name.clone(),
- })
- })?;
- result.push(mapping::decide_schema(
- &self.connection_name,
- table_info.schema.clone(),
- table_pair.1,
- &table_info.column_names,
- columns,
- ));
- }
- self.connection.commit()?;
- Ok(result)
- }
- pub fn snapshot(&mut self, ingestor: &Ingestor, tables: Vec) -> Result {
- let schemas = self
- .get_schemas(&tables)?
- .into_iter()
- .collect::, _>>()?;
- debug!("{}", sql);
- self.connection.execute(sql, &[])?;
- for (table_index, (table, schema)) in tables.into_iter().zip(schemas).enumerate() {
- let columns = table.column_names.join(", ");
- let owner = table.schema.unwrap_or_else(|| self.username.clone());
- let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name);
- debug!("{}", sql);
- let rows = self.connection.query(&sql, &[])?;
- let mut batch = Vec::with_capacity(self.batch_size);
- for row in rows {
- batch.push(mapping::map_row(&schema.schema, row?)?);
- if batch.len() >= self.batch_size
- && ingestor
- .blocking_handle_message(IngestionMessage::OperationEvent {
- table_index,
- op: Operation::BatchInsert {
- new: std::mem::take(&mut batch),
- },
- id: None,
- })
- .is_err()
- {
- return self.get_scn_and_commit();
- }
- }
- if !batch.is_empty()
- && ingestor
- .blocking_handle_message(IngestionMessage::OperationEvent {
- table_index,
- op: Operation::BatchInsert { new: batch },
- id: None,
- })
- .is_err()
- {
- return self.get_scn_and_commit();
- }
- }
- self.get_scn_and_commit()
- }
- fn get_scn_and_commit(&mut self) -> Result {
- let scn = self.connection.query_row_as::(sql, &[])?;
- self.connection.commit()?;
- Ok(scn)
- }
- pub fn replicate(
- &mut self,
- ingestor: &Ingestor,
- tables: Vec,
- schemas: Vec,
- checkpoint: Scn,
- con_id: Option,
- ) {
- match self.replicator {
- OracleReplicator::LogMiner {
- poll_interval_in_milliseconds,
- } => self.replicate_log_miner(
- ingestor,
- tables,
- schemas,
- checkpoint,
- con_id,
- Duration::from_millis(poll_interval_in_milliseconds),
- ),
- OracleReplicator::DozerLogReader => unimplemented!("dozer log reader"),
- }
- }
- fn replicate_log_miner(
- &mut self,
- ingestor: &Ingestor,
- tables: Vec,
- schemas: Vec,
- checkpoint: Scn,
- con_id: Option,
- poll_interval: Duration,
- ) {
- let start_scn = checkpoint + 1;
- let table_pair_to_index = tables
- .into_iter()
- .enumerate()
- .map(|(index, table)| {
- let schema = table.schema.unwrap_or_else(|| self.username.clone());
- ((schema, table.name), index)
- })
- .collect::>();
- let processor = replicate::Processor::new(start_scn, table_pair_to_index, schemas);
- let (sender, receiver) = std::sync::mpsc::sync_channel(100);
- let handle = {
- let connection = self.connection.clone();
- let ingestor = ingestor.clone();
- std::thread::spawn(move || {
- replicate::log_miner_loop(
- &connection,
- start_scn,
- con_id,
- poll_interval,
- sender,
- &ingestor,
- )
- })
- };
- for transaction in processor.process(receiver) {
- let transaction = match transaction {
- Ok(transaction) => transaction,
- Err(e) => {
- error!("Error during transaction processing: {e}");
- continue;
- }
- };
- for (seq, (table_index, op)) in transaction.operations.into_iter().enumerate() {
- if ingestor
- .blocking_handle_message(IngestionMessage::OperationEvent {
- table_index,
- op,
- id: Some(OpIdentifier::new(transaction.commit_scn, seq as u64)),
- })
- .is_err()
- {
- return;
- };
- }
- if ingestor
- .blocking_handle_message(IngestionMessage::TransactionInfo(
- TransactionInfo::Commit {
- id: Some(OpIdentifier::new(transaction.commit_scn, 0)),
- source_time: Some(SourceTime::from_chrono(
- &transaction.commit_timestamp,
- 1000,
- )),
- },
- ))
- .is_err()
- {
- return;
- }
- }
- handle.join().unwrap();
- }
-mod join;
-mod listing;
-mod mapping;
-mod replicate;
-fn temp_varray_of_vchar2(
- connection: &Connection,
- num_strings: usize,
- max_num_chars: usize,
-) -> Result {
- let sql = format!(
- TEMP_DOZER_TYPE_NAME, num_strings, max_num_chars
- );
- debug!("{}", sql);
- connection.execute(&sql, &[])?;
- connection
- .object_type(TEMP_DOZER_TYPE_NAME)
- .map_err(Into::into)
-fn string_collection(connection: &Connection, strings: &[String]) -> Result {
- let temp_type = temp_varray_of_vchar2(
- connection,
- strings.len(),
- strings.iter().map(|s| s.len()).max().unwrap(),
- )?;
- let mut collection = temp_type.new_collection()?;
- for string in strings {
- collection.push(&str_to_sql!(*string))?;
- }
- Ok(collection)
-mod tests {
- #[test]
- #[ignore]
- fn test_connector() {
- use dozer_ingestion_connector::{
- dozer_types::models::ingestion_types::OracleReplicator, IngestionConfig, Ingestor,
- };
- use dozer_ingestion_connector::{
- dozer_types::{models::ingestion_types::IngestionMessage, types::Operation},
- IngestionIterator,
- };
- use std::time::Instant;
- fn row_count(message: &IngestionMessage) -> usize {
- match message {
- IngestionMessage::OperationEvent { op, .. } => match op {
- Operation::BatchInsert { new } => new.len(),
- Operation::Insert { .. } => 1,
- Operation::Delete { .. } => 1,
- Operation::Update { .. } => 1,
- },
- _ => 0,
- }
- }
- fn estimate_throughput(iterator: IngestionIterator) {
- let mut tic = None;
- let mut count = 0;
- let print_count_interval = 10_000;
- let mut count_mod_interval = 0;
- for message in iterator {
- if tic.is_none() {
- tic = Some(Instant::now());
- }
- count += row_count(&message);
- let new_count_mod_interval = count / print_count_interval;
- if new_count_mod_interval > count_mod_interval {
- count_mod_interval = new_count_mod_interval;
- println!("{} rows in {:?}", count, tic.unwrap().elapsed());
- }
- }
- println!("{} rows in {:?}", count, tic.unwrap().elapsed());
- println!(
- "Throughput: {} rows/s",
- count as f64 / tic.unwrap().elapsed().as_secs_f64()
- );
- }
- env_logger::init();
- let replicate_user = "DOZER";
- let data_user = "DOZER";
- let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
- let sid = "ORCL";
- let mut connector = super::Connector::new(
- "oracle".into(),
- replicate_user.into(),
- "123",
- &format!("{}:{}/{}", host, 1521, sid),
- 100_000,
- OracleReplicator::DozerLogReader,
- )
- .unwrap();
- let tables = connector.list_tables(&[data_user.into()]).unwrap();
- let tables = connector.list_columns(tables).unwrap();
- let schemas = connector.get_schemas(&tables).unwrap();
- let schemas = schemas.into_iter().map(Result::unwrap).collect::>();
- dbg!(&schemas);
- let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
- let handle = {
- let tables = tables.clone();
- std::thread::spawn(move || connector.snapshot(&ingestor, tables))
- };
- estimate_throughput(iterator);
- let checkpoint = handle.join().unwrap().unwrap();
- let mut connector = super::Connector::new(
- "oracle".into(),
- replicate_user.into(),
- "123",
- &format!("{}:{}/{}", host, 1521, sid),
- 1,
- OracleReplicator::LogMiner {
- poll_interval_in_milliseconds: 1000,
- },
- )
- .unwrap();
- let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
- let schemas = schemas.into_iter().map(|schema| schema.schema).collect();
- let handle = std::thread::spawn(move || {
- connector.replicate(&ingestor, tables, schemas, checkpoint, None)
- });
- estimate_throughput(iterator);
- handle.join().unwrap();
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/listing.rs b/dozer-ingestion/oracle/src/connector/replicate/log/listing.rs
deleted file mode 100644
index 533747130a..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/log/listing.rs
+++ /dev/null
@@ -1,152 +0,0 @@
-use dozer_ingestion_connector::dozer_types::log::{debug, warn};
-use oracle::Connection;
-use crate::connector::{Error, Scn};
-#[derive(Debug, Clone)]
-pub struct ArchivedLog {
- pub name: String,
- pub sequence: u32,
- pub first_change: Scn,
- pub next_change: Scn,
-impl ArchivedLog {
- pub fn list(connection: &Connection, start_scn: Scn) -> Result, Error> {
- debug!("{}, {}", sql, start_scn);
- let rows = connection
- .query_as::<(String, u32, Scn, Scn)>(sql, &[&start_scn])
- .unwrap();
- let mut result = vec![];
- for row in rows {
- let (name, sequence, first_change, next_change) = row?;
- let log = ArchivedLog {
- name,
- sequence,
- first_change,
- next_change,
- };
- if is_continuous(result.last(), &log) {
- result.push(log);
- }
- }
- Ok(result)
- }
-#[derive(Debug, Clone, Copy)]
-pub struct Log {
- pub group: u32,
- pub sequence: u32,
- pub first_change: Scn,
- pub next_change: Scn,
-impl Log {
- pub fn list(connection: &Connection, start_scn: Scn) -> Result, Error> {
- debug!("{}, {}", sql, start_scn);
- let rows = connection
- .query_as::<(u32, u32, Scn, Scn)>(sql, &[&start_scn])
- .unwrap();
- let mut result = vec![];
- for row in rows {
- let (group, sequence, first_change, next_change) = row?;
- let log = Log {
- group,
- sequence,
- first_change,
- next_change,
- };
- if is_continuous(result.last(), &log) {
- result.push(log);
- }
- }
- Ok(result)
- }
-#[derive(Debug, Clone)]
-pub struct LogFile {
- pub group: u32,
- pub member: String,
-impl LogFile {
- pub fn list(connection: &Connection) -> Result, Error> {
- debug!("{}", sql);
- let rows = connection.query_as::<(u32, String)>(sql, &[]).unwrap();
- let mut result = vec![];
- for row in rows {
- let (group, member) = row?;
- let log_file = LogFile { group, member };
- result.push(log_file);
- }
- Ok(result)
- }
-pub trait HasLogIdentifier {
- fn sequence(&self) -> u32;
- fn first_change(&self) -> Scn;
- fn next_change(&self) -> Scn;
-impl HasLogIdentifier for ArchivedLog {
- fn sequence(&self) -> u32 {
- self.sequence
- }
- fn first_change(&self) -> Scn {
- self.first_change
- }
- fn next_change(&self) -> Scn {
- self.next_change
- }
-impl HasLogIdentifier for Log {
- fn sequence(&self) -> u32 {
- self.sequence
- }
- fn first_change(&self) -> Scn {
- self.first_change
- }
- fn next_change(&self) -> Scn {
- self.next_change
- }
-pub fn is_continuous(
- last_log: Option<&impl HasLogIdentifier>,
- current_log: &impl HasLogIdentifier,
-) -> bool {
- let Some(last_log) = last_log else {
- return true;
- };
- let sequence_is_continuous = last_log.sequence() + 1 == current_log.sequence();
- let scn_is_continuous = last_log.next_change() == current_log.first_change();
- if sequence_is_continuous != scn_is_continuous {
- warn!(
- "Log {} has next change {}, but log {} has first change {}",
- last_log.sequence(),
- last_log.next_change(),
- current_log.sequence(),
- current_log.first_change()
- );
- }
- sequence_is_continuous && scn_is_continuous
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/merge.rs b/dozer-ingestion/oracle/src/connector/replicate/log/merge.rs
deleted file mode 100644
index c51f092bfc..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/log/merge.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-use std::collections::HashMap;
-use oracle::Connection;
-use crate::connector::{Error, Scn};
-use super::listing::{is_continuous, ArchivedLog, Log, LogFile};
-pub fn list_and_join_online_log(
- connection: &Connection,
- start_scn: Scn,
-) -> Result, Error> {
- let logs = Log::list(connection, start_scn)?;
- let log_files = LogFile::list(connection)?;
- let mut log_files = log_files
- .into_iter()
- .map(|log_file| (log_file.group, log_file.member))
- .collect::>();
- let mut result = vec![];
- for log in logs {
- if let Some(name) = log_files.remove(&log.group) {
- let archived_log = ArchivedLog {
- name,
- sequence: log.sequence,
- first_change: log.first_change,
- next_change: log.next_change,
- };
- result.push(archived_log);
- } else {
- // We only want continuous logs
- break;
- }
- }
- Ok(result)
-pub fn list_and_merge_archived_log(
- connection: &Connection,
- start_scn: Scn,
- mut online_logs: Vec,
-) -> Result, Error> {
- let mut archived_logs = ArchivedLog::list(connection, start_scn)?;
- let first_continuous_online_log_index = online_logs
- .iter()
- .position(|log| is_continuous(archived_logs.last(), log));
- if let Some(index) = first_continuous_online_log_index {
- archived_logs.extend(online_logs.drain(index..));
- }
- Ok(archived_logs)
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
deleted file mode 100644
index aa0a15a098..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
+++ /dev/null
@@ -1,176 +0,0 @@
-use std::{sync::mpsc::SyncSender, time::Duration};
-use dozer_ingestion_connector::dozer_types::log::debug;
-use dozer_ingestion_connector::{
- dozer_types::{
- chrono::{DateTime, Utc},
- log::{error, info},
- },
- Ingestor,
-use oracle::Connection;
-use crate::connector::{Error, Scn};
-mod listing;
-mod merge;
-mod redo;
-pub type TransactionId = [u8; 8];
-#[derive(Debug, Clone)]
-/// This is a raw row from V$LOGMNR_CONTENTS
-pub struct LogManagerContent {
- pub scn: Scn,
- pub timestamp: DateTime,
- pub xid: TransactionId,
- pub pxid: TransactionId,
- pub operation_code: u8,
- pub seg_owner: Option,
- pub table_name: Option,
- pub rbasqn: u32,
- pub rbablk: u32,
- pub rbabyte: u16,
- pub sql_redo: Option,
- pub csf: u8,
-/// `ingestor` is only used for checking if ingestion has ended so we can break the loop.
-pub fn log_miner_loop(
- connection: &Connection,
- start_scn: Scn,
- con_id: Option,
- poll_interval: Duration,
- sender: SyncSender,
- ingestor: &Ingestor,
-) {
- log_reader_loop(
- connection,
- start_scn,
- con_id,
- poll_interval,
- redo::LogMiner,
- sender,
- ingestor,
- )
-fn log_reader_loop(
- connection: &Connection,
- mut start_scn: Scn,
- con_id: Option,
- poll_interval: Duration,
- reader: impl redo::RedoReader,
- sender: SyncSender,
- ingestor: &Ingestor,
-) {
- #[derive(Debug, Clone, Copy)]
- struct LastRba {
- sqn: u32,
- blk: u32,
- byte: u16,
- }
- let mut last_rba: Option = None;
- loop {
- debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn);
- let mut logs = match list_logs(connection, start_scn) {
- Ok(logs) => logs,
- Err(e) => {
- if ingestor.is_closed() {
- return;
- }
- error!("Error listing logs: {}. Retrying.", e);
- continue;
- }
- };
- if logs.is_empty() {
- if ingestor.is_closed() {
- return;
- }
- info!("No logs found, retrying after {:?}", poll_interval);
- std::thread::sleep(poll_interval);
- continue;
- }
- 'replicate_logs: while !logs.is_empty() {
- let log = logs.remove(0);
- debug!(target: "oracle_replication",
- "Reading log {} ({}) ({}, {}), starting from {:?}",
- log.name, log.sequence, log.first_change, log.next_change, last_rba
- );
- let iterator = {
- let last_rba = last_rba.and_then(|last_rba| {
- if log.sequence == last_rba.sqn {
- Some((last_rba.blk, last_rba.byte))
- } else {
- None
- }
- });
- match reader.read(connection, &log.name, last_rba, con_id) {
- Ok(iterator) => iterator,
- Err(e) => {
- if ingestor.is_closed() {
- return;
- }
- error!("Error reading log {}: {}. Retrying.", log.name, e);
- break 'replicate_logs;
- }
- }
- };
- for content in iterator {
- let content = match content {
- Ok(content) => content,
- Err(e) => {
- if ingestor.is_closed() {
- return;
- }
- error!("Error reading log {}: {}. Retrying.", log.name, e);
- break 'replicate_logs;
- }
- };
- last_rba = Some(LastRba {
- sqn: content.rbasqn,
- blk: content.rbablk,
- byte: content.rbabyte,
- });
- if sender.send(content).is_err() {
- return;
- }
- }
- if logs.is_empty() {
- if ingestor.is_closed() {
- return;
- }
- debug!(target: "oracle_replication", "Read all logs, retrying after {:?}", poll_interval);
- std::thread::sleep(poll_interval);
- } else {
- // If there are more logs, we need to start from the next log's first change.
- start_scn = log.next_change;
- }
- }
- }
-fn list_logs(connection: &Connection, start_scn: Scn) -> Result, Error> {
- let logs = merge::list_and_join_online_log(connection, start_scn)?;
- if !log_contains_scn(logs.first(), start_scn) {
- info!(
- "Online log is empty or doesn't contain start scn {}, listing and merging archived logs",
- start_scn
- );
- merge::list_and_merge_archived_log(connection, start_scn, logs)
- } else {
- Ok(logs)
- }
-fn log_contains_scn(log: Option<&listing::ArchivedLog>, scn: Scn) -> bool {
- log.map_or(false, |log| {
- log.first_change <= scn && log.next_change > scn
- })
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs b/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs
deleted file mode 100644
index 675db2ba58..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs
+++ /dev/null
@@ -1,148 +0,0 @@
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, Utc},
- log::{error, trace},
-use oracle::{Connection, ResultSet, RowValue};
-use crate::connector::{Error, Scn};
-use super::{LogManagerContent, RedoReader};
-#[derive(Debug, Clone, Copy)]
-pub struct LogMiner;
-pub struct LogMinerIter<'a> {
- result_set: ResultSet<'a, LogManagerContent>,
- connection: &'a Connection,
-impl<'a> Drop for LogMinerIter<'a> {
- fn drop(&mut self) {
- trace!("{}", sql);
- if let Err(e) = self.connection.execute(sql, &[]) {
- error!("Failed to end log miner: {}", e);
- }
- }
-impl<'a> Iterator for LogMinerIter<'a> {
- type Item = Result;
- fn next(&mut self) -> Option {
- self.result_set.next().map(|row| row.map_err(Into::into))
- }
-impl RedoReader for LogMiner {
- type Iterator<'a> = LogMinerIter<'a>;
- fn read<'a>(
- &self,
- connection: &'a Connection,
- log_file_name: &str,
- last_rba: Option<(u32, u16)>,
- con_id: Option,
- ) -> Result, Error> {
- let sql =
- trace!("{}, {}", sql, log_file_name);
- connection.execute(sql, &[&str_to_sql!(log_file_name)])?;
- let sql = "
- );
- END;";
- trace!("{}", sql);
- connection.execute(sql, &[])?;
- let rba_filter = "(RBABLK > :last_blk OR (RBABLK = :last_blk AND RBABYTE > :last_byte))";
- let con_id_filter = "SRC_CON_ID = :con_id";
- let result_set = match (last_rba, con_id) {
- (Some((last_blk, last_byte)), Some(con_id)) => {
- let sql = format!("{} WHERE {} AND {}", base_sql, rba_filter, con_id_filter);
- trace!("{}, {}, {}, {}", sql, last_blk, last_byte, con_id);
- connection.query_as_named(
- &sql,
- &[
- ("last_blk", &last_blk),
- ("last_byte", &last_byte),
- ("con_id", &con_id),
- ],
- )
- }
- (Some((last_blk, last_byte)), None) => {
- let sql = format!("{} WHERE {}", base_sql, rba_filter);
- trace!("{}, {}, {}", sql, last_blk, last_byte);
- connection
- .query_as_named(&sql, &[("last_blk", &last_blk), ("last_byte", &last_byte)])
- }
- (None, Some(con_id)) => {
- let sql = format!("{} WHERE {}", base_sql, con_id_filter);
- trace!("{}, {}", sql, con_id);
- connection.query_as_named(&sql, &[("con_id", &con_id)])
- }
- (None, None) => {
- trace!("{}", base_sql);
- connection.query_as(base_sql, &[])
- }
- }?;
- Ok(LogMinerIter {
- result_set,
- connection,
- })
- }
-impl RowValue for LogManagerContent {
- fn get(row: &oracle::Row) -> oracle::Result {
- let (
- scn,
- timestamp,
- xid,
- pxid,
- operation_code,
- seg_owner,
- table_name,
- rbasqn,
- rbablk,
- rbabyte,
- sql_redo,
- csf,
- ) = <(
- Scn,
- DateTime,
- Vec,
- Vec,
- u8,
- Option,
- Option,
- u32,
- u32,
- u16,
- Option,
- u8,
- ) as RowValue>::get(row)?;
- Ok(LogManagerContent {
- scn,
- timestamp,
- xid: xid.try_into().expect("xid must be 8 bytes"),
- pxid: pxid.try_into().expect("pxid must be 8 bytes"),
- operation_code,
- seg_owner,
- table_name,
- rbasqn,
- rbablk,
- rbabyte,
- sql_redo,
- csf,
- })
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs
deleted file mode 100644
index 7d011dfdab..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs
+++ /dev/null
@@ -1,26 +0,0 @@
-use oracle::Connection;
-use crate::connector::Error;
-/// Given a log file name, a redo reader emits `LogManagerContent` rows
-pub trait RedoReader {
- type Iterator<'a>: Iterator- >;
- /// Reads the `LogManagerContent` rows that have:
- ///
- /// - scn >= start_scn
- /// - rba > last_rba.0 || (rba == last_rba.0 && rbabyte > last_rba.1)
- fn read<'a>(
- &self,
- connection: &'a Connection,
- log_file_name: &str,
- last_rba: Option<(u32, u16)>,
- con_id: Option
- ) -> Result, Error>;
-mod log_miner;
-pub use log_miner::LogMiner;
-use super::LogManagerContent;
diff --git a/dozer-ingestion/oracle/src/connector/replicate/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/mod.rs
deleted file mode 100644
index 284bfee75b..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-mod log;
-mod transaction;
-pub use log::log_miner_loop;
-pub use transaction::Processor;
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/commit.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/commit.rs
deleted file mode 100644
index ea06f2c3db..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/commit.rs
+++ /dev/null
@@ -1,39 +0,0 @@
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, Utc},
- log::warn,
-use crate::connector::{replicate::log::TransactionId, Scn};
-use super::{Transaction, TransactionForest};
-pub fn commit(
- xid: TransactionId,
- pxid: TransactionId,
- scn: Scn,
- timestamp: DateTime,
- transaction_forest: &mut TransactionForest,
-) -> Option {
- let mut operations = vec![];
- transaction_forest.remove_subtree(xid, |_, ops| operations.extend(ops));
- if xid == pxid {
- // This is a top level transaction
- Some(Transaction {
- commit_scn: scn,
- commit_timestamp: timestamp,
- operations,
- })
- } else {
- // This is a sub transaction.
- let Some(parent_operations) = transaction_forest.get_mut(&pxid) else {
- warn!(
- "Parent transaction {:02X?} not found for sub transaction {:02X?}",
- pxid, xid
- );
- return None;
- };
- parent_operations.extend(operations);
- None
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/forest.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/forest.rs
deleted file mode 100644
index 97de993ed9..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/forest.rs
+++ /dev/null
@@ -1,111 +0,0 @@
-use std::{
- collections::{hash_map::Entry, HashMap},
- hash::Hash,
-#[derive(Debug, Clone)]
-struct Node {
- data: T,
- parent: Option,
- children: Vec,
-impl Default for Node {
- fn default() -> Self {
- Self {
- data: T::default(),
- parent: None,
- children: vec![],
- }
- }
-#[derive(Debug, Clone, Default)]
-pub struct Forest {
- nodes: HashMap>,
-impl Forest {
- pub fn remove_subtree(&mut self, id: Id, mut f: impl FnMut(Id, T)) -> bool {
- let Some(node) = self.nodes.remove(&id) else {
- return false;
- };
- if let Some(parent) = node.parent.as_ref() {
- self.nodes
- .get_mut(parent)
- .unwrap()
- .children
- .retain(|child| child != &id);
- }
- let mut stack = vec![(id, node)];
- while let Some((id, node)) = stack.pop() {
- f(id, node.data);
- for child in node.children {
- let node = self.nodes.remove(&child).unwrap();
- stack.push((child, node));
- }
- }
- true
- }
- pub fn get_mut(&mut self, id: &Id) -> Option<&mut T> {
- self.nodes.get_mut(id).map(|node| &mut node.data)
- }
-impl Forest {
- pub fn insert_or_get_root(&mut self, id: Id) -> &mut T {
- &mut self.nodes.entry(id).or_default().data
- }
-impl Forest {
- pub fn insert_or_get_child(&mut self, parent: Id, child: Id) -> Option<&mut T> {
- if !self.nodes.contains_key(&parent) {
- return None;
- }
- let is_new_child = if let Entry::Vacant(entry) = self.nodes.entry(child.clone()) {
- entry.insert(Node {
- data: T::default(),
- parent: Some(parent.clone()),
- children: vec![],
- });
- true
- } else {
- false
- };
- if is_new_child {
- self.nodes
- .get_mut(&parent)
- .unwrap()
- .children
- .push(child.clone());
- }
- Some(&mut self.nodes.get_mut(&child).unwrap().data)
- }
-mod tests {
- use super::*;
- #[test]
- fn test_transaction_forest() {
- let mut forest = Forest::>::default();
- let node1 = forest.insert_or_get_root(1);
- assert_eq!(node1, &vec![]);
- node1.push(());
- assert_eq!(forest.insert_or_get_root(2), &vec![]);
- assert_eq!(forest.insert_or_get_child(0, 3), None);
- let node3 = forest.insert_or_get_child(1, 3).unwrap();
- assert_eq!(node3, &vec![]);
- node3.extend([(), ()]);
- let mut collected = vec![];
- forest.remove_subtree(1, |_, data| collected.extend(data));
- assert_eq!(collected.len(), 3);
- assert_eq!(forest.insert_or_get_root(1), &vec![]);
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs
deleted file mode 100644
index bf54d3e1ef..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs
+++ /dev/null
@@ -1,144 +0,0 @@
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, Utc},
- log::{trace, warn},
-use crate::connector::{
- replicate::log::{LogManagerContent, TransactionId},
- Scn,
-#[derive(Debug, Clone)]
-pub struct Transaction {
- pub commit_scn: Scn,
- pub commit_timestamp: DateTime,
- pub operations: Vec,
-#[derive(Debug, Clone)]
-pub struct Operation {
- pub seg_owner: String,
- pub table_name: String,
- pub kind: OperationKind,
- pub sql_redo: String,
-#[derive(Debug, Clone, Copy)]
-pub enum OperationKind {
- Insert,
- Delete,
- Update,
-#[derive(Debug, Clone)]
-pub struct Aggregator {
- start_scn: Scn,
-impl Aggregator {
- pub fn new(start_scn: Scn) -> Self {
- Self { start_scn }
- }
- pub fn process(
- &self,
- iterator: impl Iterator- ,
- ) -> impl Iterator
- {
- Processor {
- iterator,
- start_scn: self.start_scn,
- transaction_forest: Default::default(),
- }
- }
-type TransactionForest = forest::Forest
-struct Processor> {
- iterator: I,
- start_scn: Scn,
- transaction_forest: TransactionForest,
-impl> Iterator for Processor {
- type Item = Transaction;
- fn next(&mut self) -> Option {
- loop {
- let content = self.iterator.next()?;
- if content.operation_code == OP_CODE_COMMIT {
- if let Some(transaction) = commit::commit(
- content.xid,
- content.pxid,
- content.scn,
- content.timestamp,
- &mut self.transaction_forest,
- ) {
- if transaction.commit_scn >= self.start_scn {
- return Some(transaction);
- }
- }
- continue;
- }
- if content.operation_code == OP_CODE_ROLLBACK {
- self.transaction_forest
- .remove_subtree(content.xid, |_, _| ());
- continue;
- }
- let Some(seg_owner) = content.seg_owner else {
- continue;
- };
- let Some(table_name) = content.table_name else {
- continue;
- };
- let (kind, sql_redo) = match content.operation_code {
- OperationKind::Insert,
- content.sql_redo.expect("insert must have redo"),
- ),
- OperationKind::Delete,
- content.sql_redo.expect("delete must have redo"),
- ),
- OperationKind::Update,
- content.sql_redo.expect("update must have redo"),
- ),
- OP_CODE_DDL => {
- warn!("Ignoring DDL operation: {:?}", content.sql_redo);
- continue;
- }
- _ => {
- trace!("Ignoring operation: {:?}", content.sql_redo);
- continue;
- }
- };
- op::process_operation(
- content.xid,
- content.pxid,
- Operation {
- seg_owner,
- table_name,
- kind,
- sql_redo,
- },
- &mut self.transaction_forest,
- );
- }
- }
-mod commit;
-mod forest;
-mod op;
-const OP_CODE_INSERT: u8 = 1;
-const OP_CODE_DELETE: u8 = 2;
-const OP_CODE_UPDATE: u8 = 3;
-const OP_CODE_DDL: u8 = 5;
-const OP_CODE_COMMIT: u8 = 7;
-const OP_CODE_ROLLBACK: u8 = 36;
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs
deleted file mode 100644
index ad393b4729..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-use dozer_ingestion_connector::dozer_types::log::warn;
-use crate::connector::replicate::log::TransactionId;
-use super::{Operation, TransactionForest};
-pub fn process_operation(
- xid: TransactionId,
- pxid: TransactionId,
- operation: Operation,
- transaction_forest: &mut TransactionForest,
-) {
- if xid == pxid {
- // This is a top level transaction
- transaction_forest.insert_or_get_root(xid).push(operation);
- } else {
- // This is a sub transaction.
- let Some(operations) = transaction_forest.insert_or_get_child(pxid, xid) else {
- warn!(
- "Parent transaction {:02X?} not found for sub transaction {:02X?}",
- pxid, xid
- );
- return;
- };
- operations.push(operation);
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/csf.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/csf.rs
deleted file mode 100644
index 3fd5dec366..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/csf.rs
+++ /dev/null
@@ -1,48 +0,0 @@
-//! Handles the Continuation SQL flag in V$LOGMNR_CONTENTS.
-use crate::connector::replicate::log::LogManagerContent;
-/// Output items is guaranteed to have CSF = 0.
-pub fn process(
- iterator: impl Iterator- ,
-) -> impl Iterator
- {
- Processor {
- iterator,
- pending: None,
- }
-struct Processor
> {
- iterator: I,
- pending: Option,
-impl> Iterator for Processor {
- type Item = LogManagerContent;
- fn next(&mut self) -> Option {
- loop {
- let content = self.iterator.next()?;
- if let Some(mut previous_content) = self.pending.take() {
- previous_content.sql_redo = match (previous_content.sql_redo, content.sql_redo) {
- (Some(mut previous), Some(current)) => {
- previous.push_str(¤t);
- Some(previous)
- }
- (previous, current) => previous.or(current),
- };
- if content.csf == 0 {
- previous_content.csf = 0;
- return Some(previous_content);
- } else {
- self.pending = Some(previous_content);
- }
- } else if content.csf == 0 {
- return Some(content);
- } else {
- self.pending = Some(content);
- }
- }
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs
deleted file mode 100644
index 122ce3b5f9..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs
+++ /dev/null
@@ -1,161 +0,0 @@
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, ParseError, Utc},
- ordered_float::OrderedFloat,
- rust_decimal::prelude::ToPrimitive,
- types::{Field, FieldType, Operation, Record, Schema},
-use crate::connector::Error;
-use super::{
- parse::{ParsedOperation, ParsedOperationKind, ParsedRow, ParsedTransaction, ParsedValue},
- Transaction,
-#[derive(Debug, Clone)]
-pub struct Mapper {
- schemas: Vec,
-impl Mapper {
- pub fn new(schemas: Vec) -> Self {
- Self { schemas }
- }
- pub fn process<'a>(
- &'a self,
- iterator: impl Iterator- > + 'a,
- ) -> impl Iterator
- > + 'a {
- Processor {
- iterator,
- mapper: self,
- }
- }
- fn map(&self, operation: ParsedOperation) -> Result<(usize, Operation), Error> {
- let schema = &self.schemas[operation.table_index];
- Ok((
- operation.table_index,
- match operation.kind {
- ParsedOperationKind::Insert(row) => Operation::Insert {
- new: map_row(row, schema)?,
- },
- ParsedOperationKind::Delete(row) => Operation::Delete {
- old: map_row(row, schema)?,
- },
- ParsedOperationKind::Update { old, new } => Operation::Update {
- old: map_row(old, schema)?,
- new: map_row(new, schema)?,
- },
- },
- ))
- }
-struct Processor<'a, I: Iterator
- >> {
- iterator: I,
- mapper: &'a Mapper,
-impl<'a, I: Iterator
- >> Iterator for Processor<'a, I> {
- type Item = Result
- fn next(&mut self) -> Option {
- let transaction = match self.iterator.next()? {
- Ok(transaction) => transaction,
- Err(err) => return Some(Err(err)),
- };
- let mut operations = vec![];
- for operation in transaction.operations {
- match self.mapper.map(operation) {
- Ok(operation) => operations.push(operation),
- Err(err) => return Some(Err(err)),
- }
- }
- Some(Ok(Transaction {
- commit_scn: transaction.commit_scn,
- commit_timestamp: transaction.commit_timestamp,
- operations,
- }))
- }
-fn map_row(mut row: ParsedRow, schema: &Schema) -> Result {
- let mut values = vec![];
- for field in &schema.fields {
- let value = row
- .remove(&field.name)
- .ok_or_else(|| Error::FieldNotFound(field.name.clone()))?;
- values.push(map_value(value, field.typ, field.nullable, &field.name)?);
- }
- Ok(Record::new(values))
-fn map_value(
- value: ParsedValue,
- typ: FieldType,
- nullable: bool,
- name: &str,
-) -> Result {
- match (value, typ, nullable) {
- (ParsedValue::Null, _, false) => Err(Error::NullValue(name.to_string())),
- (ParsedValue::Null, _, true) => Ok(Field::Null),
- (ParsedValue::String(string), FieldType::Float, _) => {
- Ok(Field::Float(OrderedFloat(string.parse()?)))
- }
- (ParsedValue::Number(number), FieldType::Float, _) => Ok(Field::Float(OrderedFloat(
- number
- .to_f64()
- .ok_or_else(|| Error::FloatOverflow(number))?,
- ))),
- (ParsedValue::String(string), FieldType::Decimal, _) => Ok(Field::Decimal(string.parse()?)),
- (ParsedValue::Number(number), FieldType::Decimal, _) => Ok(Field::Decimal(number)),
- (ParsedValue::Number(number), FieldType::Int, _) => Ok(Field::Int(
- number
- .to_i64()
- .ok_or_else(|| Error::ParseIntFailed(number))?,
- )),
- (ParsedValue::Number(number), FieldType::UInt, _) => Ok(Field::UInt(
- number
- .to_u64()
- .ok_or_else(|| Error::ParseUIntFailed(number))?,
- )),
- (ParsedValue::String(string), FieldType::String, _) => Ok(Field::String(string)),
- (ParsedValue::Number(_), FieldType::String, _) => Err(Error::TypeMismatch {
- field: name.to_string(),
- expected: FieldType::String,
- actual: FieldType::Decimal,
- }),
- (_, FieldType::Binary, _) => unimplemented!("parse binary from redo sql"),
- (ParsedValue::String(string), FieldType::Date, _) => Ok(Field::Date(
- parse_date(&string).map_err(|e| Error::ParseDateTime(e, string))?,
- )),
- (ParsedValue::Number(_), FieldType::Date, _) => Err(Error::TypeMismatch {
- field: name.to_string(),
- expected: FieldType::Date,
- actual: FieldType::Decimal,
- }),
- (ParsedValue::String(string), FieldType::Timestamp, _) => Ok(Field::Timestamp(
- parse_date_time(&string).map_err(|e| Error::ParseDateTime(e, string))?,
- )),
- (ParsedValue::Number(_), FieldType::Timestamp, _) => Err(Error::TypeMismatch {
- field: name.to_string(),
- expected: FieldType::Timestamp,
- actual: FieldType::Decimal,
- }),
- _ => unreachable!(),
- }
-fn parse_date(string: &str) -> Result {
- NaiveDate::parse_from_str(string, "%d-%b-%y")
-fn parse_date_time(string: &str) -> Result, ParseError> {
- let date_time = NaiveDateTime::parse_from_str(string, "%d-%b-%y %I.%M.%S%.6f %p")?;
- Ok(Ok(DateTime::::from_naive_utc_and_offset(date_time, Utc))?.fixed_offset())
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs
deleted file mode 100644
index d12afc6c81..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use std::collections::HashMap;
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, Utc},
- types::{Operation, Schema},
-use crate::connector::{Error, Scn};
-use super::log::LogManagerContent;
-#[derive(Debug, Clone)]
-pub struct Transaction {
- pub commit_scn: Scn,
- pub commit_timestamp: DateTime,
- pub operations: Vec<(usize, Operation)>,
-#[derive(Debug, Clone)]
-pub struct Processor {
- aggregator: aggregate::Aggregator,
- parser: parse::Parser,
- mapper: map::Mapper,
-impl Processor {
- pub fn new(
- start_scn: Scn,
- table_pair_to_index: HashMap<(String, String), usize>,
- schemas: Vec,
- ) -> Self {
- Self {
- aggregator: aggregate::Aggregator::new(start_scn),
- parser: parse::Parser::new(table_pair_to_index),
- mapper: map::Mapper::new(schemas),
- }
- }
- pub fn process<'a>(
- &'a self,
- iterator: impl IntoIterator- + 'a,
- ) -> impl Iterator
- > + 'a {
- let csf = csf::process(iterator.into_iter());
- let transaction = self.aggregator.process(csf);
- let parse = self.parser.process(transaction);
- self.mapper.process(parse)
- }
-mod aggregate;
-mod csf;
-mod map;
-mod parse;
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs
deleted file mode 100644
index 8a90dc8f8f..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-use dozer_ingestion_connector::dozer_types::log::warn;
-use regex::Regex;
-use crate::connector::Error;
-use super::{row, ParsedRow};
-#[derive(Debug, Clone)]
-pub struct Parser {
- regex: Regex,
- row_parser: row::Parser,
-impl Parser {
- pub fn new() -> Self {
- let regex =
- Regex::new(r#"^delete from "((?:C##)?\w+)"\."(\w+)"\n *where\n(?s)(.+)$"#).unwrap();
- Self {
- regex,
- row_parser: row::Parser::new(" and", ";"),
- }
- }
- pub fn parse(&self, sql_redo: &str, table_pair: &(String, String)) -> Result
- let captures = self
- .regex
- .captures(sql_redo)
- .ok_or_else(|| Error::DeleteFailedToMatch(sql_redo.to_string()))?;
- let owner = captures.get(1).unwrap().as_str();
- let table_name = captures.get(2).unwrap().as_str();
- if owner != table_pair.0 || table_name != table_pair.1 {
- warn!(
- "Table name {}.{} doesn't match {}.{} in log content",
- owner, table_name, table_pair.0, table_pair.1
- );
- }
- self.row_parser.parse(captures.get(3).unwrap().as_str())
- }
-fn test_parse() {
- let parser = Parser::new();
- let sql_redo = r#"delete from "HR"."EMPLOYEES"
- where
- "EMPLOYEE_ID" = 306 and
- "FIRST_NAME" = 'Nandini' and
- "LAST_NAME" = 'Shastry' and
- "EMAIL" = 'NSHASTRY' and
- "PHONE_NUMBER" = '1234567890' and
- "JOB_ID" = 'HR_REP' and
- "SALARY" = 120000 and
- "COMMISSION_PCT" = .05 and
- "MANAGER_ID" = 105 and
- "#;
- let parsed = parser
- .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string()))
- .unwrap();
- assert_eq!(parsed.len(), 10);
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs
deleted file mode 100644
index a5ab56daf2..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use dozer_ingestion_connector::dozer_types::log::warn;
-use regex::Regex;
-use crate::connector::Error;
-use super::{row, ParsedRow};
-#[derive(Debug, Clone)]
-pub struct Parser {
- regex: Regex,
- row_parser: row::Parser,
-impl Parser {
- pub fn new() -> Self {
- let regex =
- Regex::new(r#"^insert into "((?:C##)?\w+)"\."(\w+)"\n *values\n(?s)(.+)$"#).unwrap();
- Self {
- regex,
- row_parser: row::Parser::new(",", ";"),
- }
- }
- pub fn parse(&self, sql_redo: &str, table_pair: &(String, String)) -> Result {
- let captures = self
- .regex
- .captures(sql_redo)
- .ok_or_else(|| Error::InsertFailedToMatch(sql_redo.to_string()))?;
- let owner = captures.get(1).unwrap().as_str();
- let table_name = captures.get(2).unwrap().as_str();
- if owner != table_pair.0 || table_name != table_pair.1 {
- warn!(
- "Table name {}.{} doesn't match {}.{} in log content",
- owner, table_name, table_pair.0, table_pair.1
- );
- }
- self.row_parser.parse(captures.get(3).unwrap().as_str())
- }
-fn test_parse() {
- let parser = Parser::new();
- let sql_redo = r#"insert into "HR"."EMPLOYEES"
- values
- "EMPLOYEE_ID" = 306,
- "FIRST_NAME" = 'Nandini',
- "LAST_NAME" = 'Shastry',
- "PHONE_NUMBER" = '1234567890',
- "JOB_ID" = 'HR_REP',
- "SALARY" = 120000,
- "MANAGER_ID" = 105,
- "#;
- let parsed = parser
- .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string()))
- .unwrap();
- assert_eq!(parsed.len(), 11);
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs
deleted file mode 100644
index 1bbb371ab8..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs
+++ /dev/null
@@ -1,141 +0,0 @@
-use std::{collections::HashMap, str::FromStr};
-use dozer_ingestion_connector::dozer_types::{
- chrono::{DateTime, Utc},
- log::trace,
- rust_decimal::Decimal,
-use crate::connector::{Error, Scn};
-use super::aggregate::{Operation, OperationKind, Transaction};
-#[derive(Debug, Clone)]
-pub struct ParsedTransaction {
- pub commit_scn: Scn,
- pub commit_timestamp: DateTime,
- pub operations: Vec,
-#[derive(Debug, Clone)]
-pub struct ParsedOperation {
- pub table_index: usize,
- pub kind: ParsedOperationKind,
-pub type ParsedRow = HashMap;
-#[derive(Debug, Clone)]
-pub enum ParsedOperationKind {
- Insert(ParsedRow),
- Delete(ParsedRow),
- Update { old: ParsedRow, new: ParsedRow },
-#[derive(Debug, Clone, PartialEq)]
-pub enum ParsedValue {
- String(String),
- Number(Decimal),
- Null,
-#[derive(Debug, Clone)]
-pub struct Parser {
- insert_parser: insert::Parser,
- delete_parser: delete::Parser,
- update_parser: update::Parser,
- table_pair_to_index: HashMap<(String, String), usize>,
-impl Parser {
- pub fn new(table_pair_to_index: HashMap<(String, String), usize>) -> Self {
- Self {
- insert_parser: insert::Parser::new(),
- delete_parser: delete::Parser::new(),
- update_parser: update::Parser::new(),
- table_pair_to_index,
- }
- }
- pub fn process<'a>(
- &'a self,
- iterator: impl Iterator- + 'a,
- ) -> impl Iterator
- > + 'a {
- Processor {
- iterator,
- parser: self,
- }
- }
- fn parse(&self, operation: Operation) -> Result
, Error> {
- let table_pair = (operation.seg_owner, operation.table_name);
- let Some(&table_index) = self.table_pair_to_index.get(&table_pair) else {
- trace!(
- "Ignoring operation on table {}.{}",
- table_pair.0,
- table_pair.1
- );
- return Ok(None);
- };
- let kind = match operation.kind {
- OperationKind::Insert => ParsedOperationKind::Insert(
- self.insert_parser.parse(&operation.sql_redo, &table_pair)?,
- ),
- OperationKind::Delete => ParsedOperationKind::Delete(
- self.delete_parser.parse(&operation.sql_redo, &table_pair)?,
- ),
- OperationKind::Update => {
- let (old, new) = self.update_parser.parse(&operation.sql_redo, &table_pair)?;
- ParsedOperationKind::Update { old, new }
- }
- };
- Ok(Some(ParsedOperation { table_index, kind }))
- }
-struct Processor<'a, I: Iterator- > {
- iterator: I,
- parser: &'a Parser,
-impl<'a, I: Iterator
- > Iterator for Processor<'a, I> {
- type Item = Result
- fn next(&mut self) -> Option {
- let transaction = self.iterator.next()?;
- let mut operations = vec![];
- for operation in transaction.operations {
- match self.parser.parse(operation) {
- Ok(Some(operation)) => operations.push(operation),
- Ok(None) => continue,
- Err(err) => return Some(Err(err)),
- }
- }
- Some(Ok(ParsedTransaction {
- commit_scn: transaction.commit_scn,
- commit_timestamp: transaction.commit_timestamp,
- operations,
- }))
- }
-impl FromStr for ParsedValue {
- type Err = Error;
- fn from_str(s: &str) -> Result {
- if s.starts_with('\'') {
- Ok(ParsedValue::String(s[1..s.len() - 1].to_string()))
- } else {
- Ok(ParsedValue::Number(s.parse()?))
- }
- }
-mod delete;
-mod insert;
-mod row;
-mod update;
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs
deleted file mode 100644
index fcafe6a5ed..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs
+++ /dev/null
@@ -1,36 +0,0 @@
-use std::collections::HashMap;
-use regex::Regex;
-use crate::connector::Error;
-use super::{ParsedRow, ParsedValue};
-#[derive(Debug, Clone)]
-pub struct Parser {
- regex: Regex,
-impl Parser {
- pub fn new(delimiter: &str, end: &str) -> Self {
- let regex = Regex::new(&format!(
- "\"(\\w+)\" (= (.+)|IS NULL)({} *\\n|{})",
- delimiter, end
- ))
- .unwrap();
- Self { regex }
- }
- pub fn parse(&self, values: &str) -> Result {
- let mut result = HashMap::new();
- for cap in self.regex.captures_iter(values) {
- let column = cap.get(1).unwrap().as_str();
- let value = match cap.get(3) {
- Some(value) => value.as_str().parse()?,
- None => ParsedValue::Null,
- };
- result.insert(column.to_string(), value);
- }
- Ok(result)
- }
diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs
deleted file mode 100644
index e963938010..0000000000
--- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-use dozer_ingestion_connector::dozer_types::log::warn;
-use regex::Regex;
-use crate::connector::Error;
-use super::{row, ParsedRow};
-#[derive(Debug, Clone)]
-pub struct Parser {
- regex: Regex,
- new_row_parser: row::Parser,
- old_row_parser: row::Parser,
-impl Parser {
- pub fn new() -> Self {
- let regex = Regex::new(
- r#"^update "((?:C##)?\w+)"\."(\w+)"\n *set *\n *(?s)(.+) *where *\n(?s)(.+)$"#,
- )
- .unwrap();
- Self {
- regex,
- new_row_parser: row::Parser::new(",", "\n"),
- old_row_parser: row::Parser::new(" and", ";"),
- }
- }
- pub fn parse(
- &self,
- sql_redo: &str,
- table_pair: &(String, String),
- ) -> Result<(ParsedRow, ParsedRow), Error> {
- let captures = self
- .regex
- .captures(sql_redo)
- .ok_or_else(|| Error::UpdateFailedToMatch(sql_redo.to_string()))?;
- let owner = captures.get(1).unwrap().as_str();
- let table_name = captures.get(2).unwrap().as_str();
- if owner != table_pair.0 || table_name != table_pair.1 {
- warn!(
- "Table name {}.{} doesn't match {}.{} in log content",
- owner, table_name, table_pair.0, table_pair.1
- );
- }
- let mut new_row = self
- .new_row_parser
- .parse(captures.get(3).unwrap().as_str())?;
- let old_row = self
- .old_row_parser
- .parse(captures.get(4).unwrap().as_str())?;
- for (column, old_value) in old_row.iter() {
- if !new_row.contains_key(column) {
- new_row.insert(column.clone(), old_value.clone());
- }
- }
- Ok((old_row, new_row))
- }
-fn test_parse() {
- use super::ParsedValue;
- let parser = Parser::new();
- let sql_redo = r#"update "DOZER"."TRANSACTIONS"
- set
- where
- "TRANSACTION_ID" = 12001 and
- "CUSTOMER_ID" = 63147 and
- "TYPE" = 'Withdrawal' and
- "AMOUNT" = 9691.34 and
- "CURRENCY" = 'USD' and
- "TRANSACTION_DATE" = '28-JAN-24' and
- "STATUS" = 'Completed' and
- "DESCRIPTION" = 'Yeah become language inside purpose.';
- "#;
- let (old, new) = parser
- .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string()))
- .unwrap();
- assert_eq!(old.len(), 8);
- assert_eq!(new.len(), 8);
- assert_eq!(
- old.get("TRANSACTION_ID").unwrap(),
- &ParsedValue::Number("12001".parse().unwrap())
- );
- assert_eq!(
- new.get("TRANSACTION_ID").unwrap(),
- &ParsedValue::Number("12001".parse().unwrap())
- );
- assert_eq!(
- old.get("TYPE").unwrap(),
- &ParsedValue::String("Withdrawal".to_string())
- );
- assert_eq!(
- new.get("TYPE").unwrap(),
- &ParsedValue::String("REBATE".to_string())
- );
diff --git a/dozer-ingestion/oracle/src/lib.rs b/dozer-ingestion/oracle/src/lib.rs
deleted file mode 100644
index cf398cb097..0000000000
--- a/dozer-ingestion/oracle/src/lib.rs
+++ /dev/null
@@ -1,215 +0,0 @@
-use dozer_ingestion_connector::{
- async_trait,
- dozer_types::{
- errors::internal::BoxedError,
- log::info,
- models::ingestion_types::{IngestionMessage, OracleConfig, TransactionInfo},
- node::OpIdentifier,
- types::FieldType,
- },
- tokio, Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo,
-pub struct OracleConnector {
- connection_name: String,
- config: OracleConfig,
- connectors: Option,
-#[derive(Debug, Clone)]
-struct Connectors {
- root_connector: connector::Connector,
- pdb_connector: connector::Connector,
- con_id: Option,
-const DEFAULT_BATCH_SIZE: usize = 100_000;
-impl OracleConnector {
- pub fn new(connection_name: String, config: OracleConfig) -> Self {
- Self {
- connection_name,
- config,
- connectors: None,
- }
- }
- async fn ensure_connection(
- &mut self,
- force_reconnect: bool,
- ) -> Result {
- if self.connectors.is_none() || force_reconnect {
- let connection_name = self.connection_name.clone();
- let config = self.config.clone();
- let pdb = self.config.pdb.clone();
- self.connectors = Some(
- tokio::task::spawn_blocking(move || {
- let root_connect_string =
- format!("{}:{}/{}", config.host, config.port, config.sid);
- let batch_size = config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
- let mut root_connector = connector::Connector::new(
- connection_name.clone(),
- config.user.clone(),
- &config.password,
- &root_connect_string,
- batch_size,
- config.replicator,
- )?;
- let (pdb_connector, con_id) = if let Some(pdb) = pdb {
- let pdb_connect_string = format!("{}:{}/{}", config.host, config.port, pdb);
- let pdb_connector = connector::Connector::new(
- connection_name,
- config.user.clone(),
- &config.password,
- &pdb_connect_string,
- batch_size,
- config.replicator,
- )?;
- let con_id = root_connector.get_con_id(&pdb)?;
- (pdb_connector, Some(con_id))
- } else {
- (root_connector.clone(), None)
- };
- Ok::<_, connector::Error>(Connectors {
- root_connector,
- pdb_connector,
- con_id,
- })
- })
- .await
- .unwrap()?,
- );
- }
- Ok(self.connectors.as_ref().unwrap().clone())
- }
-impl Connector for OracleConnector {
- fn types_mapping() -> Vec<(String, Option)>
- where
- Self: Sized,
- {
- todo!()
- }
- async fn validate_connection(&mut self) -> Result<(), BoxedError> {
- self.ensure_connection(false).await?;
- Ok(())
- }
- async fn list_tables(&mut self) -> Result, BoxedError> {
- let mut connectors = self.ensure_connection(false).await?;
- let schemas = self.config.schemas.clone();
- tokio::task::spawn_blocking(move || connectors.pdb_connector.list_tables(&schemas))
- .await
- .unwrap()
- .map_err(Into::into)
- }
- async fn validate_tables(&mut self, tables: &[TableIdentifier]) -> Result<(), BoxedError> {
- self.list_columns(tables.to_vec()).await?;
- Ok(())
- }
- async fn list_columns(
- &mut self,
- tables: Vec,
- ) -> Result, BoxedError> {
- let mut connectors = self.ensure_connection(false).await?;
- tokio::task::spawn_blocking(move || connectors.pdb_connector.list_columns(tables))
- .await
- .unwrap()
- .map_err(Into::into)
- }
- async fn get_schemas(
- &mut self,
- table_infos: &[TableInfo],
- ) -> Result, BoxedError> {
- let mut connectors = self.ensure_connection(false).await?;
- let table_infos = table_infos.to_vec();
- Ok(
- tokio::task::spawn_blocking(move || connectors.pdb_connector.get_schemas(&table_infos))
- .await
- .unwrap()?
- .into_iter()
- .map(|result| result.map_err(Into::into))
- .collect(),
- )
- }
- async fn serialize_state(&self) -> Result, BoxedError> {
- Ok(vec![])
- }
- async fn start(
- &mut self,
- ingestor: &Ingestor,
- tables: Vec,
- last_checkpoint: Option,
- ) -> Result<(), BoxedError> {
- let checkpoint = if let Some(last_checkpoint) = last_checkpoint {
- last_checkpoint.txid
- } else {
- info!("No checkpoint passed, starting snapshotting");
- let ingestor_clone = ingestor.clone();
- let tables = tables.clone();
- let mut connectors = self.ensure_connection(false).await?;
- if ingestor
- .handle_message(IngestionMessage::TransactionInfo(
- TransactionInfo::SnapshottingStarted,
- ))
- .await
- .is_err()
- {
- return Ok(());
- }
- let scn = tokio::task::spawn_blocking(move || {
- connectors.pdb_connector.snapshot(&ingestor_clone, tables)
- })
- .await
- .unwrap()?;
- ingestor
- .handle_message(IngestionMessage::TransactionInfo(
- TransactionInfo::SnapshottingDone {
- id: Some(OpIdentifier {
- txid: scn,
- seq_in_tx: 0,
- }),
- },
- ))
- .await?;
- scn
- };
- info!("Replicating from checkpoint: {}", checkpoint);
- let ingestor = ingestor.clone();
- let schemas = self.get_schemas(&tables).await?;
- let schemas = schemas
- .into_iter()
- .map(|schema| schema.map(|schema| schema.schema))
- .collect::, _>>()?;
- let mut connectors = self.ensure_connection(false).await?;
- tokio::task::spawn_blocking(move || {
- connectors.root_connector.replicate(
- &ingestor,
- tables,
- schemas,
- checkpoint,
- connectors.con_id,
- )
- })
- .await
- .unwrap();
- Ok(())
- }
-mod connector;
diff --git a/dozer-ingestion/src/errors.rs b/dozer-ingestion/src/errors.rs
index 5f6646c4e2..8668bc6e20 100644
--- a/dozer-ingestion/src/errors.rs
+++ b/dozer-ingestion/src/errors.rs
@@ -33,6 +33,9 @@ pub enum ConnectorError {
#[error("javascript feature is not enabled")]
+ #[error("{0}: This feature is only avaialble in enteprise. Please contact us.")]
+ FeatureNotEnabled(String),
#[error("{0} is not supported as a source connector")]
diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs
index 72090e531a..934429f0d3 100644
--- a/dozer-ingestion/src/lib.rs
+++ b/dozer-ingestion/src/lib.rs
@@ -1,6 +1,3 @@
-use std::sync::Arc;
-use dozer_ingestion_aerospike::connector::AerospikeConnector;
#[cfg(feature = "ethereum")]
use dozer_ingestion_connector::dozer_types::models::ingestion_types::EthProviderConfig;
use dozer_ingestion_connector::dozer_types::{
@@ -10,7 +7,6 @@ use dozer_ingestion_connector::dozer_types::{
connection::{Connection, ConnectionConfig},
- node::NodeHandle,
#[cfg(feature = "datafusion")]
@@ -27,7 +23,6 @@ use dozer_ingestion_mongodb::MongodbConnector;
use dozer_ingestion_mysql::connector::{mysql_connection_opts_from_url, MySQLConnector};
#[cfg(feature = "datafusion")]
use dozer_ingestion_object_store::connector::ObjectStoreConnector;
-use dozer_ingestion_oracle::OracleConnector;
use dozer_ingestion_postgres::{
connector::{PostgresConfig, PostgresConnector},
@@ -36,6 +31,7 @@ use dozer_ingestion_postgres::{
use dozer_ingestion_snowflake::connector::SnowflakeConnector;
use dozer_ingestion_webhook::connector::WebhookConnector;
use errors::ConnectorError;
+use std::sync::Arc;
use tokio::runtime::Runtime;
pub mod errors;
@@ -157,15 +153,10 @@ pub fn get_connector(
- ConnectionConfig::Aerospike(config) => Ok(Box::new(AerospikeConnector::new(
- config,
- NodeHandle::new(None, connection.name),
- event_hub.receiver,
- ))),
- ConnectionConfig::Oracle(oracle_config) => Ok(Box::new(OracleConnector::new(
- connection.name,
- oracle_config,
- ))),
+ ConnectionConfig::Aerospike(_) => {
+ Err(ConnectorError::FeatureNotEnabled("Aerospike".to_string()))
+ }
+ ConnectionConfig::Oracle(_) => Err(ConnectorError::FeatureNotEnabled("Oracle".to_string())),
diff --git a/dozer-sink-aerospike/Cargo.toml b/dozer-sink-aerospike/Cargo.toml
deleted file mode 100644
index 2f711eaf79..0000000000
--- a/dozer-sink-aerospike/Cargo.toml
+++ /dev/null
@@ -1,14 +0,0 @@
-name = "dozer-sink-aerospike"
-version = "0.1.0"
-edition = "2021"
-license = "AGPL-3.0-or-later"
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-dozer-core = { path = "../dozer-core" }
-dozer-types = { path = "../dozer-types" }
-aerospike-client-sys = { path = "./aerospike-client-sys" }
-itertools = "0.12"
-smallvec = "1.13.1"
diff --git a/dozer-sink-aerospike/aerospike-client-sys/Cargo.lock b/dozer-sink-aerospike/aerospike-client-sys/Cargo.lock
deleted file mode 100644
index 7ae1b029d4..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/Cargo.lock
+++ /dev/null
@@ -1,7 +0,0 @@
-# This file is automatically @generated by Cargo.
-# It is not intended for manual editing.
-version = 3
-name = "aerospike-client-sys"
-version = "0.1.0"
diff --git a/dozer-sink-aerospike/aerospike-client-sys/Cargo.toml b/dozer-sink-aerospike/aerospike-client-sys/Cargo.toml
deleted file mode 100644
index ec99384aae..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-name = "aerospike-client-sys"
-version = "0.1.0"
-edition = "2021"
-license = "AGPL-3.0-or-later"
-doctest = false
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-bindgen = "0.69.4"
diff --git a/dozer-sink-aerospike/aerospike-client-sys/aerospike-client-c b/dozer-sink-aerospike/aerospike-client-sys/aerospike-client-c
deleted file mode 160000
index 029db7ac63..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/aerospike-client-c
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 029db7ac63ba3533150c359e0dec5a51e54914ab
diff --git a/dozer-sink-aerospike/aerospike-client-sys/aerospike_client.h b/dozer-sink-aerospike/aerospike-client-sys/aerospike_client.h
deleted file mode 100644
index c7716020e1..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/aerospike_client.h
+++ /dev/null
@@ -1,16 +0,0 @@
diff --git a/dozer-sink-aerospike/aerospike-client-sys/build.rs b/dozer-sink-aerospike/aerospike-client-sys/build.rs
deleted file mode 100644
index cd6143c8fd..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/build.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-use core::panic;
-use std::{
- env, fs,
- path::{Path, PathBuf},
- process::Command,
-fn cp_r(dir: &Path, dest: &Path) {
- for entry in fs::read_dir(dir).unwrap() {
- let entry = entry.unwrap();
- let path = entry.path();
- let dst = dest.join(path.file_name().expect("Failed to get filename of path"));
- if fs::metadata(&path).unwrap().is_file() {
- fs::copy(path, dst).unwrap();
- } else {
- fs::create_dir_all(&dst).unwrap();
- cp_r(&path, &dst);
- }
- }
-fn main() {
- let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap());
- let build_dir = out_dir.join("build");
- fs::create_dir_all(&build_dir).unwrap();
- let output_dir = build_dir.join("out");
- let lib_dir = output_dir.join("lib");
- let include_dir = output_dir.join("include");
- let make_flags = vec!["TARGET_BASE=out"];
- let current_dir = env::current_dir().unwrap();
- let source_dir = current_dir.join("aerospike-client-c");
- cp_r(&source_dir, &build_dir);
- let mut make = Command::new("make");
- make.args(make_flags)
- .env("MAKEFLAGS", std::env::var("CARGO_MAKEFLAGS").unwrap())
- // The Makefile checks whether DEBUG is defined and cargo always sets it
- // (it's either DEBUG=false or DEBUG=true, but always defined). When DEBUG,
- // it tries to link against gcov, which we don't want
- .env_remove("DEBUG")
- .current_dir(build_dir);
- let out = make.output().unwrap();
- if !out.status.success() {
- panic!(
- "Building aerospike client failed with exit code {}.\nstout: {}\nstderr: {}",
- out.status.code().unwrap(),
- String::from_utf8_lossy(&out.stdout),
- String::from_utf8_lossy(&out.stderr),
- );
- }
- println!("cargo:rustc-link-search=native={}", lib_dir.display());
- println!("cargo:rustc-link-lib=static=aerospike");
- println!("cargo:rustc-link-lib=ssl");
- println!("cargo:rustc-link-lib=crypto");
- println!("cargo:rustc-link-lib=m");
- println!("cargo:rustc-link-lib=z");
- println!("cargo:rustc-link-lib=pthread");
- println!("cargo:rerun-if-changed=aerospike_client.h");
- println!("cargo:rerun-if-changed=aerospike-client-c");
- let bindings = bindgen::Builder::default()
- .header("aerospike_client.h")
- .allowlist_type("(as|aerospike)_.*")
- .allowlist_type("aerospike")
- .allowlist_function("(as|aerospike)_.*")
- .allowlist_var("(as|AS)_.*")
- .clang_arg(format!("-I{}", include_dir.to_str().unwrap()))
- .generate()
- .expect("Unable to generate bindings");
- bindings
- .write_to_file(out_dir.join("generated.rs"))
- .expect("Failed to write bindings");
diff --git a/dozer-sink-aerospike/aerospike-client-sys/src/lib.rs b/dozer-sink-aerospike/aerospike-client-sys/src/lib.rs
deleted file mode 100644
index e67d2da84b..0000000000
--- a/dozer-sink-aerospike/aerospike-client-sys/src/lib.rs
+++ /dev/null
@@ -1,174 +0,0 @@
-include!(concat!(env!("OUT_DIR"), "/generated.rs"));
-macro_rules! as_exp_build {
- ($func:ident $args:tt ) => {{
- let mut v = Vec::new();
- $crate::as_exp_build_inner!(v, $func $args);
- $crate::as_exp_compile(v.as_mut_ptr(), v.len() as u32)
- }}
-macro_rules! as_exp_build_inner {
- ($v:expr, as_exp_bin_int($bin_name:expr $(,)?)) => {{
- let bin_name: *const i8 = $bin_name;
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_BIN,
- count: 3,
- sz: 0,
- prev_va_args: 0,
- v: std::mem::zeroed(),
- });
- $crate::as_exp_build_inner!($v, as_exp_int($crate::as_exp_type_AS_EXP_TYPE_INT as i64));
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_VAL_RAWSTR,
- v: $crate::as_exp_entry__bindgen_ty_1 { str_val: bin_name },
- count: 0,
- sz: 0,
- prev_va_args: 0,
- });
- }};
- ($v:expr, as_exp_int($val:expr)) => {
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_VAL_INT,
- v: $crate::as_exp_entry__bindgen_ty_1 { int_val: $val },
- count: 0,
- sz: 0,
- prev_va_args: 0,
- })
- };
- ($v:expr, as_exp_uint($val:expr)) => {
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_VAL_UINT,
- v: $crate::as_exp_entry__bindgen_ty_1 { uint_val: $val },
- count: 0,
- sz: 0,
- prev_va_args: 0,
- })
- };
- ($v:expr, as_exp_cmp_eq($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_CMP_EQ,
- count: 3,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $crate::as_exp_build_inner!($v, $left_name $left_args);
- $crate::as_exp_build_inner!($v, $right_name $right_args);
- }};
- ($v:expr, as_exp_cmp_gt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GT,
- count: 3,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $crate::as_exp_build_inner!($v, $left_name $left_args);
- $crate::as_exp_build_inner!($v, $right_name $right_args);
- }};
- ($v:expr, as_exp_cmp_ge($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GE,
- count: 3,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $crate::as_exp_build_inner!($v, $left);
- $crate::as_exp_build_inner!($v, $right);
- }};
- ($v:expr, as_exp_cmp_lt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LT,
- count: 3,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $crate::as_exp_build_inner!($v, $left_name $left_args);
- $crate::as_exp_build_inner!($v, $right_name $right_args);
- }};
- ($v:expr, as_exp_cmp_le($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LE,
- count: 3,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $crate::as_exp_build_inner!($v, $left_name $left_args);
- $crate::as_exp_build_inner!($v, $right_name $right_args);
- }};
- ($v:expr, as_exp_and($($arg_name:ident $arg_args:tt),*)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_AND,
- count: 0,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
- count: 0,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- }};
-($v:expr, as_exp_or($($arg_name:ident $arg_args:tt),*)) => {{
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_OR,
- count: 0,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- $($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
- $v.push($crate::as_exp_entry {
- op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
- count: 0,
- v: std::mem::zeroed(),
- sz: 0,
- prev_va_args: 0,
- });
- }};
-mod tests {
- use std::ffi::CString;
- use super::*;
- #[test]
- fn test_as_exp_build() {
- // Tested that this results in the same compiled expression as when
- // using the macros from the C library
- let bin_name = CString::new("bin_name").unwrap();
- unsafe {
- let exp = as_exp_build! {
- as_exp_and(
- as_exp_cmp_gt(
- as_exp_bin_int(bin_name.as_ptr()),
- as_exp_int(3)
- ),
- as_exp_cmp_lt(
- as_exp_bin_int(bin_name.as_ptr()),
- as_exp_int(8)
- )
- )
- };
- assert!(!exp.is_null());
- as_exp_destroy(exp);
- }
- }
diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs
deleted file mode 100644
index d6d59d9a1e..0000000000
--- a/dozer-sink-aerospike/src/aerospike.rs
+++ /dev/null
@@ -1,1252 +0,0 @@
-use std::time::Instant;
-use std::{
- alloc::{handle_alloc_error, Layout},
- ffi::{c_char, c_void, CStr, CString, NulError},
- fmt::Display,
- mem::MaybeUninit,
- ptr::{addr_of, addr_of_mut, NonNull},
- slice,
-use itertools::Itertools;
-use aerospike_client_sys::*;
-use dozer_types::log::debug;
-use dozer_types::{
- chrono::{DateTime, NaiveDate},
- geo::{Coord, Point},
- json_types::{DestructuredJsonRef, JsonValue},
- ordered_float::OrderedFloat,
- rust_decimal::prelude::*,
- thiserror,
- types::{DozerDuration, DozerPoint, Field, Schema},
-use crate::{denorm_dag::Error, AerospikeSinkError};
-pub struct BinNames {
- storage: Vec,
- _ptrs: Vec<*mut i8>,
-unsafe impl Send for BinNames {}
-impl Clone for BinNames {
- fn clone(&self) -> Self {
- let storage = self.storage.clone();
- let ptrs = Self::make_ptrs(&storage);
- Self {
- storage,
- _ptrs: ptrs,
- }
- }
-impl BinNames {
- fn make_ptrs(storage: &[CString]) -> Vec<*mut i8> {
- storage
- .iter()
- .map(|name| name.as_ptr() as *mut i8)
- .collect()
- }
- pub(crate) fn _len(&self) -> usize {
- self.storage.len()
- }
- pub(crate) unsafe fn _ptrs(&mut self) -> *mut *mut i8 {
- self._ptrs.as_mut_ptr()
- }
- pub(crate) fn names(&self) -> &[CString] {
- &self.storage
- }
- pub(crate) fn new<'a, I: IntoIterator- >(names: I) -> Result
- let storage: Vec = names
- .into_iter()
- .map(CString::new)
- .collect::>()?;
- let ptrs = Self::make_ptrs(&storage);
- Ok(Self {
- storage,
- _ptrs: ptrs,
- })
- }
-#[derive(Debug, thiserror::Error)]
-pub struct AerospikeError {
- pub(crate) code: i32,
- pub(crate) message: String,
-impl AerospikeError {
- pub(crate) fn from_code(value: as_status) -> Self {
- let message = unsafe { as_error_string(value) };
- let message = unsafe { CStr::from_ptr(message) };
- // The message is ASCII (I think?), so this should not fail
- Self {
- code: value,
- message: message.to_str().unwrap().to_owned(),
- }
- }
-impl From