From 0163c63da6cf4d077ec154330e34b023376df462 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 10 Dec 2024 16:07:40 -0800 Subject: [PATCH] add tmp ict fixup for writes --- kernel/src/actions/mod.rs | 14 ++++++++++++-- kernel/src/table_changes/log_replay/tests.rs | 2 +- kernel/src/table_changes/scan_file.rs | 2 +- kernel/src/transaction.rs | 2 ++ 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 6d05aa434..84e5c2e48 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -10,7 +10,7 @@ use std::sync::LazyLock; use self::deletion_vector::DeletionVectorDescriptor; use crate::actions::schemas::GetStructField; -use crate::schema::{SchemaRef, StructType}; +use crate::schema::{DataType, SchemaRef, StructField, StructType}; use crate::table_features::{ ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES, }; @@ -84,6 +84,13 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { &LOG_COMMIT_INFO_SCHEMA } +pub(crate) fn get_log_commit_info_schema_no_ict() -> &'static SchemaRef { + StructType::new([ + StructField::new("timestamp", DataType::LONG, true), + StructField::new("operation", DataType::STRING, true), + ]) +} + #[derive(Debug, Clone, PartialEq, Eq, Schema)] #[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))] pub struct Format { @@ -331,8 +338,10 @@ where struct CommitInfo { /// The time this logical file was created, as milliseconds since the epoch. /// Read: optional, write: required (that is, kernel always writes). - /// If in-commit timestamps are enabled, this is always required. pub(crate) timestamp: Option, + /// The time this logical file was created, as milliseconds since the epoch. Unlike + /// `timestamp`, this field is guaranteed to be monotonically increase with each commit. + /// If in-commit timestamps are enabled, this is always required. pub(crate) in_commit_timestamp: Option, /// An arbitrary string that identifies the operation associated with this commit. This is /// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes). @@ -695,6 +704,7 @@ mod tests { "commitInfo", StructType::new(vec![ StructField::new("timestamp", DataType::LONG, true), + StructField::new("inCommitTimestamp", DataType::LONG, true), StructField::new("operation", DataType::STRING, true), StructField::new( "operationParameters", diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 5da05cb83..ae7cbe85e 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -615,7 +615,7 @@ async fn table_changes_in_commit_timestamp() { mock_table .commit([ Action::CommitInfo(CommitInfo { - timestamp: Some(timestamp), + in_commit_timestamp: Some(timestamp), ..Default::default() }), Action::Add(Add { diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 1d86c9fb1..3e5000afe 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -320,7 +320,7 @@ mod tests { let cdc_timestamp = 12345678; let commit_info = CommitInfo { - timestamp: Some(cdc_timestamp), + in_commit_timestamp: Some(cdc_timestamp), ..Default::default() }; diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c6e93ea7b..05b1d00a8 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -590,6 +590,7 @@ mod tests { serde_json::json!({ "commitInfo": { "timestamp": 0, + "inCommitTimestamp": 0, "operation": "test operation", "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), "operationParameters": {}, @@ -600,6 +601,7 @@ mod tests { serde_json::json!({ "commitInfo": { "timestamp": 0, + "inCommitTimestamp": 0, "operation": "test operation", "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), "operationParameters": {},