Skip to content

Commit

Permalink
Use index
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 1, 2024
1 parent 2dc46dc commit 27d4b92
Showing 1 changed file with 91 additions and 9 deletions.
100 changes: 91 additions & 9 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use oracle::{
Connection,
};

const TXN_ID_COL: &str = "__txn_id";
const TXN_SEQ_COL: &str = "__txn_seq";
const METADATA_TABLE: &str = "__replication_metadata";
const META_TXN_ID_COL: &str = "txn_id";
const META_TABLE_COL: &str = "table";
Expand Down Expand Up @@ -72,6 +74,7 @@ impl From<oracle::Error> for Error {

#[derive(Debug)]
struct BatchedOperation {
op_id: Option<OpIdentifier>,
op_kind: OpKind,
params: Record,
}
Expand Down Expand Up @@ -277,10 +280,53 @@ impl OracleSinkFactory {

Ok(())
}
}

fn create_index(
&self,
connection: &Connection,
table_name: &str,
schema: &Schema,
) -> Result<(), Error> {
let mut columns = schema
.primary_index
.iter()
.map(|ix| schema.fields[*ix].name.clone())
.collect::<Vec<_>>();

columns.iter_mut().for_each(|col| {
*col = col.to_uppercase();
});

let index_name =
format!("{table_name}_{}_TXN_ID_TXN_SEQ_INDEX", columns.join("_")).replace('#', "");

columns.push(format!("\"{}\"", TXN_ID_COL));
columns.push(format!("\"{}\"", TXN_SEQ_COL));

let query = "SELECT index_name FROM all_indexes WHERE table_name = :1";
info!("Index check query {query}");

let mut index_exist = connection.query(query, &[&table_name])?;
if index_exist.next().is_some() {
info!("Index {index_name} already exist");
} else {
let query = format!(
"CREATE INDEX {index_name} ON {table_name} ({})",
columns.join(", ")
);
info!("### CREATE INDEX #### \n: {index_name}. Query: {query}");
connection.execute(&query, &[])?;
}

Ok(())
}
}
fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {
let field_names = schema.fields.iter().map(|field| &field.name);
let field_names = schema
.fields
.iter()
.map(|field| field.name.as_str())
.chain([TXN_ID_COL, TXN_SEQ_COL]);

let mut parameter_index = 1usize..;
let input_fields = field_names
Expand Down Expand Up @@ -322,17 +368,24 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String {

let opkind_idx = parameter_index.next().unwrap();

let opid_select = format!(
r#"(D."{TXN_ID_COL}" IS NULL
OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}"
OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"#
);

// Match on PK and txn_id.
// If the record does not exist and the op is INSERT, do the INSERT
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"MERGE INTO "{table_name}" D
USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S
ON (S.DOZER_OPKIND > 0)
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values})
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE {pk_select}
DELETE WHERE S.DOZER_OPKIND = 2"#
ON ({pk_select})
WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0
WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select}
DELETE WHERE S.DOZER_OPKIND = 2 AND {opid_select}
"#
)
}

Expand Down Expand Up @@ -372,7 +425,28 @@ impl SinkFactory for OracleSinkFactory {

let table_name = &self.table;

self.validate_or_create_table(&connection, table_name, &schema)?;
let mut amended_schema = schema.clone();
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_ID_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);
amended_schema.field(
dozer_types::types::FieldDefinition {
name: TXN_SEQ_COL.to_owned(),
typ: FieldType::UInt,
nullable: true,
source: dozer_types::types::SourceDefinition::Dynamic,
},
false,
);

self.validate_or_create_table(&connection, table_name, &amended_schema)?;
self.create_index(&connection, table_name, &amended_schema)?;
self.validate_or_create_table(
&connection,
METADATA_TABLE,
Expand Down Expand Up @@ -400,7 +474,7 @@ impl SinkFactory for OracleSinkFactory {
let insert_append = format!(
//"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})",
"INSERT INTO \"{table_name}\" VALUES ({})",
(1..=schema.fields.len())
(1..=amended_schema.fields.len())
.map(|i| format!(":{i}"))
.collect::<Vec<_>>()
.join(", ")
Expand Down Expand Up @@ -499,6 +573,9 @@ impl OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;
batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?;
batch.append_row(&[])?;
}
Expand All @@ -508,11 +585,12 @@ impl OracleSink {

fn batch(
&mut self,
_op_id: Option<OpIdentifier>,
op_id: Option<OpIdentifier>,
kind: OpKind,
record: Record,
) -> oracle::Result<()> {
self.batch_params.push(BatchedOperation {
op_id,
op_kind: kind,
params: record,
});
Expand Down Expand Up @@ -587,6 +665,10 @@ impl Sink for OracleSink {
{
batch.set(i, &OraField(field, *typ))?;
}
let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip();
batch.set(bind_idx.next().unwrap(), &txid)?;
batch.set(bind_idx.next().unwrap(), &seq_in_tx)?;

batch.append_row(&[])?;
}
batch.execute()?;
Expand Down

0 comments on commit 27d4b92

Please sign in to comment.