From c1b202a294bf96cb71db3c8fa296201027132241 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 10 Dec 2024 11:08:15 -0800 Subject: [PATCH] Helper methods for CDF Physical to Logical Transformation (#579) ## What changes are proposed in this pull request? This PR introduces methods methods that will be useful to read CDF data and transform it from its physical form into the logical schema. The methods are: - `get_cdf_columns`: Generates a map from cdf column name to expression that fills that column - `physical_to_logical_expression`: Generates the physical to logical expression used to transform the engine data - `scan_file_read_schema`: Gets the physical schema. This depends on the cdf scan file type We also introduce the method `Scalar::timestamp_ntz_from_millis` which converts from an `i64` millisecond value to a `Scalar::TimestampNtz`. ## How was this change tested? We test that `physical_to_logical_expression` generates the correct expression for a `CdfScanFile` with: - partition columns - normal selected columns - generate expression for the `_change_type` column in the case of add, cdc, and remove CdfScanFile. --- kernel/src/expressions/scalars.rs | 10 ++ kernel/src/scan/mod.rs | 5 +- kernel/src/table_changes/mod.rs | 12 +- .../src/table_changes/physical_to_logical.rs | 139 ++++++++++++++++++ kernel/src/table_changes/scan.rs | 18 ++- 5 files changed, 179 insertions(+), 5 deletions(-) create mode 100644 kernel/src/table_changes/physical_to_logical.rs diff --git a/kernel/src/expressions/scalars.rs b/kernel/src/expressions/scalars.rs index a923f5e47..a2476a990 100644 --- a/kernel/src/expressions/scalars.rs +++ b/kernel/src/expressions/scalars.rs @@ -151,6 +151,16 @@ impl Scalar { pub fn is_null(&self) -> bool { matches!(self, Self::Null(_)) } + + /// Constructs a Scalar timestamp with no timezone from an `i64` millisecond since unix epoch + pub(crate) fn timestamp_ntz_from_millis(millis: i64) -> DeltaResult { + let Some(timestamp) = DateTime::from_timestamp_millis(millis) else { + return Err(Error::generic(format!( + "Failed to create millisecond timestamp from {millis}" + ))); + }; + Ok(Self::TimestampNtz(timestamp.timestamp_micros())) + } } impl Display for Scalar { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4785173f8..f03d62cc9 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -384,7 +384,10 @@ pub fn scan_row_schema() -> Schema { log_replay::SCAN_ROW_SCHEMA.as_ref().clone() } -fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult { +pub(crate) fn parse_partition_value( + raw: Option<&String>, + data_type: &DataType, +) -> DeltaResult { match (raw, data_type.as_primitive_opt()) { (Some(v), Some(primitive)) => primitive.parse_scalar(v), (Some(_), None) => Err(Error::generic(format!( diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 86ba0cc49..a5938082b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -16,15 +16,21 @@ use crate::utils::require; use crate::{DeltaResult, Engine, Error, Version}; mod log_replay; +mod physical_to_logical; mod resolve_dvs; pub mod scan; mod scan_file; +static CHANGE_TYPE_COL_NAME: &str = "_change_type"; +static COMMIT_VERSION_COL_NAME: &str = "_commit_version"; +static COMMIT_TIMESTAMP_COL_NAME: &str = "_commit_timestamp"; +static ADD_CHANGE_TYPE: &str = "insert"; +static REMOVE_CHANGE_TYPE: &str = "delete"; static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ - StructField::new("_change_type", DataType::STRING, false), - StructField::new("_commit_version", DataType::LONG, false), - StructField::new("_commit_timestamp", DataType::TIMESTAMP, false), + StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), + StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), + StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), ] }); diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs new file mode 100644 index 000000000..7232e2cf8 --- /dev/null +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; +use std::iter; + +use itertools::Itertools; + +use crate::expressions::Scalar; +use crate::scan::{parse_partition_value, ColumnType}; +use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType}; +use crate::{DeltaResult, Error, Expression}; + +use super::scan_file::{CdfScanFile, CdfScanFileType}; +use super::{ + ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, + REMOVE_CHANGE_TYPE, +}; + +/// Returns a map from change data feed column name to an expression that generates the row data. +#[allow(unused)] +fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult> { + let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?; + let version = scan_file.commit_version; + let change_type: Expression = match scan_file.scan_type { + CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]), + CdfScanFileType::Add => ADD_CHANGE_TYPE.into(), + CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(), + }; + let expressions = [ + (CHANGE_TYPE_COL_NAME, change_type), + (COMMIT_VERSION_COL_NAME, Expression::literal(version)), + (COMMIT_TIMESTAMP_COL_NAME, timestamp.into()), + ]; + Ok(expressions.into_iter().collect()) +} + +/// Generates the expression used to convert physical data from the `scan_file` path into logical +/// data matching the `logical_schema` +#[allow(unused)] +fn physical_to_logical_expr( + scan_file: &CdfScanFile, + logical_schema: &StructType, + all_fields: &[ColumnType], +) -> DeltaResult { + let mut cdf_columns = get_cdf_columns(scan_file)?; + let all_fields = all_fields + .iter() + .map(|field| match field { + ColumnType::Partition(field_idx) => { + let field = logical_schema.fields.get_index(*field_idx); + let Some((_, field)) = field else { + return Err(Error::generic( + "logical schema did not contain expected field, can't transform data", + )); + }; + let name = field.physical_name(); + let value_expression = + parse_partition_value(scan_file.partition_values.get(name), field.data_type())?; + Ok(value_expression.into()) + } + ColumnType::Selected(field_name) => { + // Remove to take ownership + let generated_column = cdf_columns.remove(field_name.as_str()); + Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into())) + } + }) + .try_collect()?; + Ok(Expression::Struct(all_fields)) +} + +/// Gets the physical schema that will be used to read data in the `scan_file` path. +#[allow(unused)] +fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef { + if scan_file.scan_type == CdfScanFileType::Cdc { + let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); + let fields = read_schema.fields().cloned().chain(iter::once(change_type)); + StructType::new(fields).into() + } else { + read_schema.clone().into() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::expressions::{column_expr, Expression, Scalar}; + use crate::scan::ColumnType; + use crate::schema::{DataType, StructField, StructType}; + use crate::table_changes::physical_to_logical::physical_to_logical_expr; + use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType}; + use crate::table_changes::{ + ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, + REMOVE_CHANGE_TYPE, + }; + + #[test] + fn verify_physical_to_logical_expression() { + let test = |scan_type, expected_expr| { + let scan_file = CdfScanFile { + scan_type, + path: "fake_path".to_string(), + dv_info: Default::default(), + remove_dv: None, + partition_values: HashMap::from([("age".to_string(), "20".to_string())]), + commit_version: 42, + commit_timestamp: 1234, + }; + let logical_schema = StructType::new([ + StructField::new("id", DataType::STRING, true), + StructField::new("age", DataType::LONG, false), + StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), + StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), + StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + ]); + let all_fields = vec![ + ColumnType::Selected("id".to_string()), + ColumnType::Partition(1), + ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()), + ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()), + ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()), + ]; + let phys_to_logical_expr = + physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap(); + let expected_expr = Expression::struct_from([ + column_expr!("id"), + Scalar::Long(20).into(), + expected_expr, + Expression::literal(42i64), + Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond + ]); + + assert_eq!(phys_to_logical_expr, expected_expr) + }; + + let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]); + test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into()); + test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into()); + test(CdfScanFileType::Cdc, cdc_change_type); + } +} diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 87fd918f9..980374177 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use tracing::debug; +use crate::scan::state::GlobalScanState; use crate::scan::ColumnType; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, Engine, ExpressionRef}; @@ -188,6 +189,20 @@ impl TableChangesScan { let schema = self.table_changes.end_snapshot.schema().clone().into(); table_changes_action_iter(engine, commits, schema, self.predicate.clone()) } + + /// Get global state that is valid for the entire scan. This is somewhat expensive so should + /// only be called once per scan. + #[allow(unused)] + fn global_scan_state(&self) -> GlobalScanState { + let end_snapshot = &self.table_changes.end_snapshot; + GlobalScanState { + table_root: self.table_changes.table_root.to_string(), + partition_columns: end_snapshot.metadata().partition_columns.clone(), + logical_schema: self.logical_schema.clone(), + read_schema: self.physical_schema.clone(), + column_mapping_mode: end_snapshot.column_mapping_mode, + } + } } #[cfg(test)] @@ -198,6 +213,7 @@ mod tests { use crate::expressions::{column_expr, Scalar}; use crate::scan::ColumnType; use crate::schema::{DataType, StructField, StructType}; + use crate::table_changes::COMMIT_VERSION_COL_NAME; use crate::{Expression, Table}; #[test] @@ -236,7 +252,7 @@ mod tests { let schema = table_changes .schema() - .project(&["id", "_commit_version"]) + .project(&["id", COMMIT_VERSION_COL_NAME]) .unwrap(); let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); let scan = table_changes