Skip to content

Commit

Permalink
Helper methods for CDF Physical to Logical Transformation (#579)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?
This PR introduces methods methods that will be useful to read CDF data
and transform it from its physical form into the logical schema. The
methods are:
- `get_cdf_columns`: Generates a map from cdf column name to expression
that fills that column
- `physical_to_logical_expression`: Generates the physical to logical
expression used to transform the engine data
- `scan_file_read_schema`: Gets the physical schema. This depends on the
cdf scan file type

We also introduce the method `Scalar::timestamp_ntz_from_millis` which
converts from an `i64` millisecond value to a `Scalar::TimestampNtz`.

## How was this change tested?
We test that `physical_to_logical_expression` generates the correct
expression for a `CdfScanFile` with:
- partition columns
- normal selected columns
- generate expression for the `_change_type` column in the case of add,
cdc, and remove CdfScanFile.
  • Loading branch information
OussamaSaoudi-db authored Dec 10, 2024
1 parent 5816620 commit c1b202a
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 5 deletions.
10 changes: 10 additions & 0 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ impl Scalar {
pub fn is_null(&self) -> bool {
matches!(self, Self::Null(_))
}

/// Constructs a Scalar timestamp with no timezone from an `i64` millisecond since unix epoch
pub(crate) fn timestamp_ntz_from_millis(millis: i64) -> DeltaResult<Self> {
let Some(timestamp) = DateTime::from_timestamp_millis(millis) else {
return Err(Error::generic(format!(
"Failed to create millisecond timestamp from {millis}"
)));
};
Ok(Self::TimestampNtz(timestamp.timestamp_micros()))
}
}

impl Display for Scalar {
Expand Down
5 changes: 4 additions & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,10 @@ pub fn scan_row_schema() -> Schema {
log_replay::SCAN_ROW_SCHEMA.as_ref().clone()
}

fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult<Scalar> {
pub(crate) fn parse_partition_value(
raw: Option<&String>,
data_type: &DataType,
) -> DeltaResult<Scalar> {
match (raw, data_type.as_primitive_opt()) {
(Some(v), Some(primitive)) => primitive.parse_scalar(v),
(Some(_), None) => Err(Error::generic(format!(
Expand Down
12 changes: 9 additions & 3 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ use crate::utils::require;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
mod physical_to_logical;
mod resolve_dvs;
pub mod scan;
mod scan_file;

static CHANGE_TYPE_COL_NAME: &str = "_change_type";
static COMMIT_VERSION_COL_NAME: &str = "_commit_version";
static COMMIT_TIMESTAMP_COL_NAME: &str = "_commit_timestamp";
static ADD_CHANGE_TYPE: &str = "insert";
static REMOVE_CHANGE_TYPE: &str = "delete";
static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]
});

Expand Down
139 changes: 139 additions & 0 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::collections::HashMap;
use std::iter;

use itertools::Itertools;

use crate::expressions::Scalar;
use crate::scan::{parse_partition_value, ColumnType};
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType};
use crate::{DeltaResult, Error, Expression};

use super::scan_file::{CdfScanFile, CdfScanFileType};
use super::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};

/// Returns a map from change data feed column name to an expression that generates the row data.
#[allow(unused)]
fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> {
let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?;
let version = scan_file.commit_version;
let change_type: Expression = match scan_file.scan_type {
CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]),
CdfScanFileType::Add => ADD_CHANGE_TYPE.into(),
CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(),
};
let expressions = [
(CHANGE_TYPE_COL_NAME, change_type),
(COMMIT_VERSION_COL_NAME, Expression::literal(version)),
(COMMIT_TIMESTAMP_COL_NAME, timestamp.into()),
];
Ok(expressions.into_iter().collect())
}

/// Generates the expression used to convert physical data from the `scan_file` path into logical
/// data matching the `logical_schema`
#[allow(unused)]
fn physical_to_logical_expr(
scan_file: &CdfScanFile,
logical_schema: &StructType,
all_fields: &[ColumnType],
) -> DeltaResult<Expression> {
let mut cdf_columns = get_cdf_columns(scan_file)?;
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name();
let value_expression =
parse_partition_value(scan_file.partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => {
// Remove to take ownership
let generated_column = cdf_columns.remove(field_name.as_str());
Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into()))
}
})
.try_collect()?;
Ok(Expression::Struct(all_fields))
}

/// Gets the physical schema that will be used to read data in the `scan_file` path.
#[allow(unused)]
fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef {
if scan_file.scan_type == CdfScanFileType::Cdc {
let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false);
let fields = read_schema.fields().cloned().chain(iter::once(change_type));
StructType::new(fields).into()
} else {
read_schema.clone().into()
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::expressions::{column_expr, Expression, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::physical_to_logical::physical_to_logical_expr;
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType};
use crate::table_changes::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};

#[test]
fn verify_physical_to_logical_expression() {
let test = |scan_type, expected_expr| {
let scan_file = CdfScanFile {
scan_type,
path: "fake_path".to_string(),
dv_info: Default::default(),
remove_dv: None,
partition_values: HashMap::from([("age".to_string(), "20".to_string())]),
commit_version: 42,
commit_timestamp: 1234,
};
let logical_schema = StructType::new([
StructField::new("id", DataType::STRING, true),
StructField::new("age", DataType::LONG, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]);
let all_fields = vec![
ColumnType::Selected("id".to_string()),
ColumnType::Partition(1),
ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()),
];
let phys_to_logical_expr =
physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap();
let expected_expr = Expression::struct_from([
column_expr!("id"),
Scalar::Long(20).into(),
expected_expr,
Expression::literal(42i64),
Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond
]);

assert_eq!(phys_to_logical_expr, expected_expr)
};

let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]);
test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into());
test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into());
test(CdfScanFileType::Cdc, cdc_change_type);
}
}
18 changes: 17 additions & 1 deletion kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use itertools::Itertools;
use tracing::debug;

use crate::scan::state::GlobalScanState;
use crate::scan::ColumnType;
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, Engine, ExpressionRef};
Expand Down Expand Up @@ -188,6 +189,20 @@ impl TableChangesScan {
let schema = self.table_changes.end_snapshot.schema().clone().into();
table_changes_action_iter(engine, commits, schema, self.predicate.clone())
}

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
/// only be called once per scan.
#[allow(unused)]
fn global_scan_state(&self) -> GlobalScanState {
let end_snapshot = &self.table_changes.end_snapshot;
GlobalScanState {
table_root: self.table_changes.table_root.to_string(),
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode,
}
}
}

#[cfg(test)]
Expand All @@ -198,6 +213,7 @@ mod tests {
use crate::expressions::{column_expr, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::COMMIT_VERSION_COL_NAME;
use crate::{Expression, Table};

#[test]
Expand Down Expand Up @@ -236,7 +252,7 @@ mod tests {

let schema = table_changes
.schema()
.project(&["id", "_commit_version"])
.project(&["id", COMMIT_VERSION_COL_NAME])
.unwrap();
let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
let scan = table_changes
Expand Down

0 comments on commit c1b202a

Please sign in to comment.