Skip to content

Commit

Permalink
Remove obsolete code and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mosyp committed Jun 11, 2021
1 parent 8b34ee3 commit 9ac5f16
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
34 changes: 15 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,23 +496,25 @@ impl KafkaJsonToDelta {
Ok((values, partition_offsets))
}

async fn build_actions(
fn build_actions(
&self,
partition_offsets: &HashMap<DataTypePartition, DataTypeOffset>,
add: Add,
) -> Vec<Action> {
partition_offsets.iter().map(|(partition, offset)| {
action::Action::txn(action::Txn {
app_id: self.app_id_for_partition(*partition),
version: *offset,
last_updated: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
partition_offsets
.iter()
.map(|(partition, offset)| {
action::Action::txn(action::Txn {
app_id: self.app_id_for_partition(*partition),
version: *offset,
last_updated: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
})
})
})
.chain(std::iter::once(Action::add(add)))
.collect::Vec<_>()
.chain(std::iter::once(Action::add(add)))
.collect()
}

async fn complete_file(
Expand All @@ -531,10 +533,6 @@ impl KafkaJsonToDelta {
// TODO remove it if we got conflict error? or it'll be considered as tombstone
let add = state.delta_writer.write_parquet_file().await?;

// update the table to get the latest version
// the consecutive updates will be made within error handles
state.delta_writer.update_table().await?;

let mut attempt_number: u32 = 0;

loop {
Expand All @@ -548,7 +546,7 @@ impl KafkaJsonToDelta {
}

let version = state.delta_writer.table_version() + 1;
let actions = self.build_actions(&partition_offsets, add.clone()).await;
let actions = self.build_actions(&partition_offsets, add.clone());
let commit_result = state.delta_writer.commit_version(version, actions).await;

match commit_result {
Expand Down Expand Up @@ -592,8 +590,6 @@ impl KafkaJsonToDelta {
.last_transaction_version(&self.app_id_for_partition(*partition));

if let Some(version) = version {
// if messages in kafka are consecutive then offset should always be `version+1` for
// safe commit, but since kafka does not guarantee contiguous offsets, we only check for `less than`.
if *offset != version {
info!(
"Conflict offset for partition {}: state={:?}, delta={:?}",
Expand Down
2 changes: 2 additions & 0 deletions tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub async fn read_files_from_s3(paths: Vec<String>) -> Vec<i32> {
}
}

std::fs::remove_file(tmp).unwrap();

list.sort();
list
}

0 comments on commit 9ac5f16

Please sign in to comment.