diff --git a/kernel/src/expressions/scalars.rs b/kernel/src/expressions/scalars.rs index a923f5e47..a2476a990 100644 --- a/kernel/src/expressions/scalars.rs +++ b/kernel/src/expressions/scalars.rs @@ -151,6 +151,16 @@ impl Scalar { pub fn is_null(&self) -> bool { matches!(self, Self::Null(_)) } + + /// Constructs a Scalar timestamp with no timezone from an `i64` millisecond since unix epoch + pub(crate) fn timestamp_ntz_from_millis(millis: i64) -> DeltaResult { + let Some(timestamp) = DateTime::from_timestamp_millis(millis) else { + return Err(Error::generic(format!( + "Failed to create millisecond timestamp from {millis}" + ))); + }; + Ok(Self::TimestampNtz(timestamp.timestamp_micros())) + } } impl Display for Scalar { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4785173f8..f03d62cc9 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -384,7 +384,10 @@ pub fn scan_row_schema() -> Schema { log_replay::SCAN_ROW_SCHEMA.as_ref().clone() } -fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult { +pub(crate) fn parse_partition_value( + raw: Option<&String>, + data_type: &DataType, +) -> DeltaResult { match (raw, data_type.as_primitive_opt()) { (Some(v), Some(primitive)) => primitive.parse_scalar(v), (Some(_), None) => Err(Error::generic(format!( diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 86ba0cc49..a5938082b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -16,15 +16,21 @@ use crate::utils::require; use crate::{DeltaResult, Engine, Error, Version}; mod log_replay; +mod physical_to_logical; mod resolve_dvs; pub mod scan; mod scan_file; +static CHANGE_TYPE_COL_NAME: &str = "_change_type"; +static COMMIT_VERSION_COL_NAME: &str = "_commit_version"; +static COMMIT_TIMESTAMP_COL_NAME: &str = "_commit_timestamp"; +static ADD_CHANGE_TYPE: &str = "insert"; +static REMOVE_CHANGE_TYPE: &str = "delete"; static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ - StructField::new("_change_type", DataType::STRING, false), - StructField::new("_commit_version", DataType::LONG, false), - StructField::new("_commit_timestamp", DataType::TIMESTAMP, false), + StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), + StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), + StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), ] }); diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs new file mode 100644 index 000000000..7232e2cf8 --- /dev/null +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; +use std::iter; + +use itertools::Itertools; + +use crate::expressions::Scalar; +use crate::scan::{parse_partition_value, ColumnType}; +use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType}; +use crate::{DeltaResult, Error, Expression}; + +use super::scan_file::{CdfScanFile, CdfScanFileType}; +use super::{ + ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, + REMOVE_CHANGE_TYPE, +}; + +/// Returns a map from change data feed column name to an expression that generates the row data. +#[allow(unused)] +fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult> { + let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?; + let version = scan_file.commit_version; + let change_type: Expression = match scan_file.scan_type { + CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]), + CdfScanFileType::Add => ADD_CHANGE_TYPE.into(), + CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(), + }; + let expressions = [ + (CHANGE_TYPE_COL_NAME, change_type), + (COMMIT_VERSION_COL_NAME, Expression::literal(version)), + (COMMIT_TIMESTAMP_COL_NAME, timestamp.into()), + ]; + Ok(expressions.into_iter().collect()) +} + +/// Generates the expression used to convert physical data from the `scan_file` path into logical +/// data matching the `logical_schema` +#[allow(unused)] +fn physical_to_logical_expr( + scan_file: &CdfScanFile, + logical_schema: &StructType, + all_fields: &[ColumnType], +) -> DeltaResult { + let mut cdf_columns = get_cdf_columns(scan_file)?; + let all_fields = all_fields + .iter() + .map(|field| match field { + ColumnType::Partition(field_idx) => { + let field = logical_schema.fields.get_index(*field_idx); + let Some((_, field)) = field else { + return Err(Error::generic( + "logical schema did not contain expected field, can't transform data", + )); + }; + let name = field.physical_name(); + let value_expression = + parse_partition_value(scan_file.partition_values.get(name), field.data_type())?; + Ok(value_expression.into()) + } + ColumnType::Selected(field_name) => { + // Remove to take ownership + let generated_column = cdf_columns.remove(field_name.as_str()); + Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into())) + } + }) + .try_collect()?; + Ok(Expression::Struct(all_fields)) +} + +/// Gets the physical schema that will be used to read data in the `scan_file` path. +#[allow(unused)] +fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef { + if scan_file.scan_type == CdfScanFileType::Cdc { + let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); + let fields = read_schema.fields().cloned().chain(iter::once(change_type)); + StructType::new(fields).into() + } else { + read_schema.clone().into() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::expressions::{column_expr, Expression, Scalar}; + use crate::scan::ColumnType; + use crate::schema::{DataType, StructField, StructType}; + use crate::table_changes::physical_to_logical::physical_to_logical_expr; + use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType}; + use crate::table_changes::{ + ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, + REMOVE_CHANGE_TYPE, + }; + + #[test] + fn verify_physical_to_logical_expression() { + let test = |scan_type, expected_expr| { + let scan_file = CdfScanFile { + scan_type, + path: "fake_path".to_string(), + dv_info: Default::default(), + remove_dv: None, + partition_values: HashMap::from([("age".to_string(), "20".to_string())]), + commit_version: 42, + commit_timestamp: 1234, + }; + let logical_schema = StructType::new([ + StructField::new("id", DataType::STRING, true), + StructField::new("age", DataType::LONG, false), + StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), + StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), + StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + ]); + let all_fields = vec![ + ColumnType::Selected("id".to_string()), + ColumnType::Partition(1), + ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()), + ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()), + ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()), + ]; + let phys_to_logical_expr = + physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap(); + let expected_expr = Expression::struct_from([ + column_expr!("id"), + Scalar::Long(20).into(), + expected_expr, + Expression::literal(42i64), + Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond + ]); + + assert_eq!(phys_to_logical_expr, expected_expr) + }; + + let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]); + test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into()); + test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into()); + test(CdfScanFileType::Cdc, cdc_change_type); + } +} diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 87fd918f9..980374177 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use tracing::debug; +use crate::scan::state::GlobalScanState; use crate::scan::ColumnType; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, Engine, ExpressionRef}; @@ -188,6 +189,20 @@ impl TableChangesScan { let schema = self.table_changes.end_snapshot.schema().clone().into(); table_changes_action_iter(engine, commits, schema, self.predicate.clone()) } + + /// Get global state that is valid for the entire scan. This is somewhat expensive so should + /// only be called once per scan. + #[allow(unused)] + fn global_scan_state(&self) -> GlobalScanState { + let end_snapshot = &self.table_changes.end_snapshot; + GlobalScanState { + table_root: self.table_changes.table_root.to_string(), + partition_columns: end_snapshot.metadata().partition_columns.clone(), + logical_schema: self.logical_schema.clone(), + read_schema: self.physical_schema.clone(), + column_mapping_mode: end_snapshot.column_mapping_mode, + } + } } #[cfg(test)] @@ -198,6 +213,7 @@ mod tests { use crate::expressions::{column_expr, Scalar}; use crate::scan::ColumnType; use crate::schema::{DataType, StructField, StructType}; + use crate::table_changes::COMMIT_VERSION_COL_NAME; use crate::{Expression, Table}; #[test] @@ -236,7 +252,7 @@ mod tests { let schema = table_changes .schema() - .project(&["id", "_commit_version"]) + .project(&["id", COMMIT_VERSION_COL_NAME]) .unwrap(); let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); let scan = table_changes