Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helper methods for CDF Physical to Logical Transformation #579

Merged
merged 58 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
bd9585d
scan file implementation for cdf
OussamaSaoudi-db Nov 25, 2024
e2c2d90
change naming and update documentation for cdf scan files
OussamaSaoudi-db Dec 6, 2024
3db127f
fixup some naming
OussamaSaoudi-db Dec 6, 2024
ec6e629
inline dvinfo creation
OussamaSaoudi-db Dec 7, 2024
74eed10
Change name of expression transform
OussamaSaoudi-db Dec 7, 2024
9fe16db
Add test for null parttion columns
OussamaSaoudi-db Dec 7, 2024
8635e38
Improve testing for scan_file
OussamaSaoudi-db Dec 8, 2024
a17f4b0
Change visitor, remove (un)resolved cdf scan file
OussamaSaoudi-db Dec 8, 2024
5fab00c
Only keep track of remove dv instead of hashmap
OussamaSaoudi-db Dec 8, 2024
21553a4
Fix remove dv
OussamaSaoudi-db Dec 8, 2024
08867f8
patch rm_dv
OussamaSaoudi-db Dec 8, 2024
1228107
Update comment for CdfScanFileVisitor
OussamaSaoudi-db Dec 8, 2024
97a5790
Initial cdf read phase with deletion vector resolution
OussamaSaoudi-db Dec 6, 2024
4969e44
lazily construct empty treemaps
OussamaSaoudi-db Dec 6, 2024
dcb17fa
Change treemap handling in kernel, use selection vector, and simplify…
OussamaSaoudi-db Dec 6, 2024
ebaf225
Rebase onto scan file
OussamaSaoudi-db Dec 6, 2024
bd49142
update doc comment for dvs
OussamaSaoudi-db Dec 6, 2024
6287e6e
fix comment
OussamaSaoudi-db Dec 7, 2024
7fe4f4d
prototype of add/rm dv in cdfscanfile
OussamaSaoudi-db Dec 8, 2024
d4f95d5
Use default for dv info
OussamaSaoudi-db Dec 8, 2024
8a8f6bf
shorten tests, check error cases
OussamaSaoudi-db Dec 8, 2024
eeaabb0
Rename to physical_to_logical, add comment explaining the deletion/se…
OussamaSaoudi-db Dec 8, 2024
fa13ade
Add note about ordinary scans for dv resolution
OussamaSaoudi-db Dec 8, 2024
4dabdaf
Add doc comment to treemap_to_bools_with
OussamaSaoudi-db Dec 8, 2024
7fcb531
Update resolve_scan_file_dv docs
OussamaSaoudi-db Dec 8, 2024
577b424
Add test and docs
OussamaSaoudi-db Dec 8, 2024
a970df1
Add documentation
OussamaSaoudi-db Dec 8, 2024
5b70aab
Rename to resolve-dvs
OussamaSaoudi-db Dec 8, 2024
3e0ead6
Fixup naming and docs
OussamaSaoudi-db Dec 9, 2024
0b207c4
address pr comments
OussamaSaoudi-db Dec 9, 2024
68a4fb1
Merge branch 'main' into cdf_dv_resolve
OussamaSaoudi-db Dec 9, 2024
3259769
Add test comment, ignore sv to bools test
OussamaSaoudi-db Dec 10, 2024
2e9c29e
remove ignore from test
OussamaSaoudi-db Dec 10, 2024
70fe573
initial schema and expr for phys to log
OussamaSaoudi-db Dec 6, 2024
876dd15
physical to logical transform
OussamaSaoudi-db Dec 8, 2024
35b38d4
logical to physical
OussamaSaoudi-db Dec 8, 2024
dfdc491
remove to_string from generated columns
OussamaSaoudi-db Dec 8, 2024
020a19d
Add read phase and a test
OussamaSaoudi-db Dec 9, 2024
8a92379
factor out test
OussamaSaoudi-db Dec 9, 2024
3d2e53a
Add cdf test and text
OussamaSaoudi-db Dec 9, 2024
8df172f
Add tests for cdf
OussamaSaoudi-db Dec 9, 2024
7846757
remove unneeded import
OussamaSaoudi-db Dec 9, 2024
67a9a18
more formatting
OussamaSaoudi-db Dec 9, 2024
fa042c1
Add some docs
OussamaSaoudi-db Dec 9, 2024
4098b67
Removed allow(unused)
OussamaSaoudi-db Dec 9, 2024
610d62e
Remove data for next PR
OussamaSaoudi-db Dec 9, 2024
4bc5819
Remove dv test file
OussamaSaoudi-db Dec 9, 2024
02599e6
appease clippy
OussamaSaoudi-db Dec 9, 2024
bd43bba
Add expression test
OussamaSaoudi-db Dec 9, 2024
61da4c3
Address PR comments
OussamaSaoudi-db Dec 10, 2024
65bce5f
Remove read_scan_data
OussamaSaoudi-db Dec 10, 2024
bea39ba
fix compiler warnings
OussamaSaoudi-db Dec 10, 2024
5622874
fix test
OussamaSaoudi-db Dec 10, 2024
221b96f
Switch to no timezone
OussamaSaoudi-db Dec 10, 2024
857d644
Address pr comments
OussamaSaoudi-db Dec 10, 2024
496a69a
Merge branch 'main' into cdf_phys_to_log
OussamaSaoudi-db Dec 10, 2024
e3031c3
Remove unneeded changes
OussamaSaoudi-db Dec 10, 2024
88df7fa
make raw mask private
OussamaSaoudi-db Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl Scalar {
matches!(self, Self::Null(_))
}

/// Constructs a Scalar timestamp from an `i64` millisecond since unix epoch
pub fn timestamp_from_millis(millis: i64) -> DeltaResult<Self> {
/// Constructs a Scalar timestamp with no timezone from an `i64` millisecond since unix epoch
pub(crate) fn timestamp_ntz_from_millis(millis: i64) -> DeltaResult<Self> {
let Some(timestamp) = DateTime::from_timestamp_millis(millis) else {
return Err(Error::generic(format!(
"Failed to create millisecond timestamp from {millis}"
)));
};
Ok(Self::Timestamp(timestamp.timestamp_micros()))
Ok(Self::TimestampNtz(timestamp.timestamp_micros()))
}
}

Expand Down Expand Up @@ -472,6 +472,7 @@ impl PrimitiveType {

#[cfg(test)]
mod tests {
use core::panic;
use std::f32::consts::PI;

use crate::expressions::{column_expr, BinaryOperator};
Expand Down
11 changes: 8 additions & 3 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ mod resolve_dvs;
pub mod scan;
mod scan_file;

static CHANGE_TYPE_COL_NAME: &str = "_change_type";
static COMMIT_VERSION_COL_NAME: &str = "_commit_version";
static COMMIT_TIMESTAMP_COL_NAME: &str = "_commit_timestamp";
static ADD_CHANGE_TYPE: &str = "insert";
static REMOVE_CHANGE_TYPE: &str = "delete";
static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]
});

Expand Down
159 changes: 79 additions & 80 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,50 @@ use std::collections::HashMap;
use std::iter;

use itertools::Itertools;
use url::Url;

use crate::actions::deletion_vector::split_vector;
use crate::expressions::{column_expr, Scalar};
use crate::scan::state::GlobalScanState;
use crate::scan::{parse_partition_value, ColumnType, ScanResult};
use crate::expressions::Scalar;
use crate::scan::{parse_partition_value, ColumnType};
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType};
use crate::{DeltaResult, Engine, Error, Expression, ExpressionRef, FileMeta};
use crate::{DeltaResult, Error, Expression};

use super::resolve_dvs::ResolvedCdfScanFile;
use super::scan_file::{CdfScanFile, CdfScanFileType};
use super::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};

/// Returns a map from change data feed column name to an expression that generates the row data.
#[allow(unused)]
fn get_generated_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> {
let timestamp = Scalar::timestamp_from_millis(scan_file.commit_timestamp)?;
fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> {
let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?;
let version = scan_file.commit_version;
let change_type: Expression = match scan_file.scan_type {
CdfScanFileType::Cdc => column_expr!("_change_type"),
CdfScanFileType::Add => "insert".into(),

CdfScanFileType::Remove => "delete".into(),
CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]),
CdfScanFileType::Add => ADD_CHANGE_TYPE.into(),
CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(),
};
let expressions = [
("_change_type", change_type),
("_commit_version", Expression::literal(version)),
("_commit_timestamp", timestamp.into()),
(CHANGE_TYPE_COL_NAME, change_type),
(COMMIT_VERSION_COL_NAME, Expression::literal(version)),
(COMMIT_TIMESTAMP_COL_NAME, timestamp.into()),
];
Ok(expressions.into_iter().collect())
}

/// Generates the expression used to convert physical data from the `scan_file` path into logical
/// data matching the `global_state.logical_schema`
/// data matching the `logical_schema`
#[allow(unused)]
fn get_expression(
fn physical_to_logical_expr(
scan_file: &CdfScanFile,
global_state: &GlobalScanState,
logical_schema: &StructType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! i like taking the more constrained data

all_fields: &[ColumnType],
) -> DeltaResult<Expression> {
let mut generated_columns = get_generated_columns(scan_file)?;
let mut cdf_columns = get_cdf_columns(scan_file)?;
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = global_state.logical_schema.fields.get_index(*field_idx);
let field = logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
Expand All @@ -59,7 +58,7 @@ fn get_expression(
}
ColumnType::Selected(field_name) => {
// Remove to take ownership
let generated_column = generated_columns.remove(field_name.as_str());
let generated_column = cdf_columns.remove(field_name.as_str());
Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into()))
}
})
Expand All @@ -69,72 +68,72 @@ fn get_expression(

/// Gets the physical schema that will be used to read data in the `scan_file` path.
#[allow(unused)]
fn get_read_schema(scan_file: &CdfScanFile, global_scan_state: &GlobalScanState) -> SchemaRef {
fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef {
if scan_file.scan_type == CdfScanFileType::Cdc {
let change_type = StructField::new("_change_type", DataType::STRING, false);
let fields = global_scan_state
.read_schema
.fields()
.cloned()
.chain(iter::once(change_type));
let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false);
let fields = read_schema.fields().cloned().chain(iter::once(change_type));
StructType::new(fields).into()
} else {
global_scan_state.read_schema.clone()
read_schema.clone().into()
}
}

/// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical.
/// The result is a fallible iterator of [`ScanResult`] containing the logical data.
#[allow(unused)]
pub(crate) fn read_scan_data(
engine: &dyn Engine,
resolved_scan_file: ResolvedCdfScanFile,
global_state: &GlobalScanState,
all_fields: &[ColumnType],
predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
let ResolvedCdfScanFile {
scan_file,
mut selection_vector,
} = resolved_scan_file;

let expression = get_expression(&scan_file, global_state, all_fields)?;
let schema = get_read_schema(&scan_file, global_state);
let evaluator = engine.get_expression_handler().get_evaluator(
schema.clone(),
expression,
global_state.logical_schema.clone().into(),
);
#[cfg(test)]
mod tests {
use std::collections::HashMap;

let table_root = Url::parse(&global_state.table_root)?;
let location = table_root.join(&scan_file.path)?;
let file = FileMeta {
last_modified: 0,
size: 0,
location,
use crate::expressions::{column_expr, Expression, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::physical_to_logical::physical_to_logical_expr;
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType};
use crate::table_changes::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};
let read_result_iter =
engine
.get_parquet_handler()
.read_parquet_files(&[file], schema, predicate)?;

let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
let batch = batch?;
// to transform the physical data into the correct logical form
let logical = evaluator.evaluate(batch.as_ref());
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
// trying to return a captured variable. We're going to reassign `selection_vector`
// to `rest` in a moment anyway
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
raw_mask: sv,
#[test]
fn verify_physical_to_logical_expression() {
let test = |scan_type, expected_expr| {
let scan_file = CdfScanFile {
scan_type,
path: "fake_path".to_string(),
dv_info: Default::default(),
remove_dv: None,
partition_values: HashMap::from([("age".to_string(), "20".to_string())]),
commit_version: 42,
commit_timestamp: 1234,
};
let logical_schema = StructType::new([
StructField::new("id", DataType::STRING, true),
StructField::new("age", DataType::LONG, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]);
let all_fields = vec![
ColumnType::Selected("id".to_string()),
ColumnType::Partition(1),
ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()),
];
let phys_to_logical_expr =
physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap();
let expected_expr = Expression::struct_from([
column_expr!("id"),
Scalar::Long(20).into(),
expected_expr,
Expression::literal(42i64),
Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond
]);

assert_eq!(phys_to_logical_expr, expected_expr)
};
selection_vector = rest;
Ok(result)
});
Ok(result)

let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]);
test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into());
test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into());
test(CdfScanFileType::Cdc, cdc_change_type);
}
}
20 changes: 10 additions & 10 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ use itertools::Itertools;
use tracing::debug;

use crate::scan::state::GlobalScanState;
use crate::scan::{ColumnType, ScanResult};
use crate::scan::ColumnType;
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, Engine, ExpressionRef};

use super::log_replay::{table_changes_action_iter, TableChangesScanData};
use super::physical_to_logical::read_scan_data;
use super::resolve_dvs::resolve_scan_file_dv;
use super::scan_file::scan_data_to_scan_file;
use super::{TableChanges, CDF_FIELDS};

/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change
Expand Down Expand Up @@ -216,6 +213,9 @@ mod tests {
use crate::expressions::{column_expr, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::{
CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
};
use crate::{Expression, Table};

#[test]
Expand All @@ -234,9 +234,9 @@ mod tests {
vec![
ColumnType::Selected("part".to_string()),
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_change_type".to_string()),
ColumnType::Selected("_commit_version".to_string()),
ColumnType::Selected("_commit_timestamp".to_string()),
ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually (and sorry for the churn) I think I like the literal strings here: this let's us check that they are the actual strings we expect in the protocol (failure case we prevent is someone changes the const and then this test would change and pass)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha that makes sense. So we get to assert more about the code through this test 👍

]
);
assert_eq!(scan.predicate, None);
Expand All @@ -254,7 +254,7 @@ mod tests {

let schema = table_changes
.schema()
.project(&["id", "_commit_version"])
.project(&["id", COMMIT_VERSION_COL_NAME])
.unwrap();
let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
let scan = table_changes
Expand All @@ -267,14 +267,14 @@ mod tests {
scan.all_fields,
vec![
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_commit_version".to_string()),
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()),
]
);
assert_eq!(
scan.logical_schema,
StructType::new([
StructField::new("id", DataType::INTEGER, true),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
])
.into()
);
Expand Down
Loading