Skip to content

Commit

Permalink
Add perf timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 19, 2024
1 parent 896dc51 commit a7fdf85
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
33 changes: 32 additions & 1 deletion dozer-ingestion/oracle/src/connector/listing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_ingestion_connector::dozer_types::log::debug;
use std::collections::HashSet;
use dozer_ingestion_connector::dozer_types::log::{debug, info};
use oracle::Connection;

use super::Error;
Expand Down Expand Up @@ -48,6 +49,36 @@ impl TableColumn {
};
columns.push(column);
}

let mut table_names = HashSet::new();
for column in &columns {
table_names.insert(column.table_name.clone());
}
let first_column = &columns.get(0).clone();
if let Some(first_column) = first_column {
let owner = first_column.owner.clone();
for table_name in table_names.iter() {
columns.push(TableColumn {
owner: owner.clone(),
table_name: table_name.clone(),
column_name: "miner_timestamp".to_string(),
data_type: Some("TIMESTAMP".to_string()),
nullable: Some("Y".to_string()),
precision: None,
scale: None,
});
columns.push(TableColumn {
owner: owner.clone(),
table_name: table_name.clone(),
column_name: "ingested_at".to_string(),
data_type: Some("TIMESTAMP".to_string()),
nullable: Some("Y".to_string()),
precision: None,
scale: None,
});
}
}

Ok(columns)
}
}
Expand Down
5 changes: 3 additions & 2 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use oracle::{
sql_type::{Collection, ObjectType},
Connection,
};
use dozer_ingestion_connector::dozer_types::log::info;

#[derive(Debug, Clone)]
pub struct Connector {
Expand Down Expand Up @@ -261,9 +262,9 @@ impl Connector {
self.connection.execute(sql, &[])?;

for (table_index, (table, schema)) in tables.into_iter().zip(schemas).enumerate() {
let columns = table.column_names.join(", ");
let columns = table.column_names.into_iter().filter(|s| s != "miner_timestamp" && s != "ingested_at").map(|s| format!("\"{s}\"")).collect::<Vec<String>>().join(", ");
let owner = table.schema.unwrap_or_else(|| self.username.clone());
let sql = format!("SELECT {} FROM {}.{}", columns, owner, table.name);
let sql = format!("SELECT {}, NULL as \"miner_timestamp\", NULL as \"ingested_at\" FROM {}.{}", columns, owner, table.name);
debug!("{}", sql);
let rows = self.connection.query(&sql, &[])?;

Expand Down
22 changes: 15 additions & 7 deletions dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ impl Mapper {
}
}

fn map(&self, operation: ParsedOperation) -> Result<(usize, Operation), Error> {
fn map(&self, operation: ParsedOperation, commit_transaction: DateTime<Utc>) -> Result<(usize, Operation), Error> {
let schema = &self.schemas[operation.table_index];
Ok((
operation.table_index,
match operation.kind {
ParsedOperationKind::Insert(row) => Operation::Insert {
new: map_row(row, schema)?,
new: map_row(row, schema, &commit_transaction)?,
},
ParsedOperationKind::Delete(row) => Operation::Delete {
old: map_row(row, schema)?,
old: map_row(row, schema, &commit_transaction)?,
},
ParsedOperationKind::Update { old, new } => Operation::Update {
old: map_row(old, schema)?,
new: map_row(new, schema)?,
old: map_row(old, schema, &commit_transaction)?,
new: map_row(new, schema, &commit_transaction)?,
},
},
))
Expand All @@ -69,7 +69,7 @@ impl<'a, I: Iterator<Item = Result<ParsedTransaction, Error>>> Iterator for Proc

let mut operations = vec![];
for operation in transaction.operations {
match self.mapper.map(operation) {
match self.mapper.map(operation, transaction.commit_timestamp) {
Ok(operation) => operations.push(operation),
Err(err) => return Some(Err(err)),
}
Expand All @@ -83,9 +83,17 @@ impl<'a, I: Iterator<Item = Result<ParsedTransaction, Error>>> Iterator for Proc
}
}

fn map_row(mut row: ParsedRow, schema: &Schema) -> Result<Record, Error> {
fn map_row(mut row: ParsedRow, schema: &Schema, commit_transaction: &DateTime<Utc>) -> Result<Record, Error> {
let mut values = vec![];
for field in &schema.fields {
if field.name == "miner_timestamp" {
values.push(Field::Timestamp(commit_transaction.clone().with_timezone(&FixedOffset::east(0))));
continue;
}
if field.name == "ingested_at" {
values.push(Field::Timestamp(dozer_ingestion_connector::dozer_types::chrono::offset::Utc::now().with_timezone(&FixedOffset::east(0))));
continue;
}
let value = row
.remove(&field.name)
.ok_or_else(|| Error::FieldNotFound(field.name.clone()))?;
Expand Down

0 comments on commit a7fdf85

Please sign in to comment.