-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Helper methods for CDF Physical to Logical Transformation #579
Merged
OussamaSaoudi-db
merged 58 commits into
delta-io:main
from
OussamaSaoudi-db:cdf_phys_to_log
Dec 10, 2024
Merged
Changes from all commits
Commits
Show all changes
58 commits
Select commit
Hold shift + click to select a range
bd9585d
scan file implementation for cdf
OussamaSaoudi-db e2c2d90
change naming and update documentation for cdf scan files
OussamaSaoudi-db 3db127f
fixup some naming
OussamaSaoudi-db ec6e629
inline dvinfo creation
OussamaSaoudi-db 74eed10
Change name of expression transform
OussamaSaoudi-db 9fe16db
Add test for null parttion columns
OussamaSaoudi-db 8635e38
Improve testing for scan_file
OussamaSaoudi-db a17f4b0
Change visitor, remove (un)resolved cdf scan file
OussamaSaoudi-db 5fab00c
Only keep track of remove dv instead of hashmap
OussamaSaoudi-db 21553a4
Fix remove dv
OussamaSaoudi-db 08867f8
patch rm_dv
OussamaSaoudi-db 1228107
Update comment for CdfScanFileVisitor
OussamaSaoudi-db 97a5790
Initial cdf read phase with deletion vector resolution
OussamaSaoudi-db 4969e44
lazily construct empty treemaps
OussamaSaoudi-db dcb17fa
Change treemap handling in kernel, use selection vector, and simplify…
OussamaSaoudi-db ebaf225
Rebase onto scan file
OussamaSaoudi-db bd49142
update doc comment for dvs
OussamaSaoudi-db 6287e6e
fix comment
OussamaSaoudi-db 7fe4f4d
prototype of add/rm dv in cdfscanfile
OussamaSaoudi-db d4f95d5
Use default for dv info
OussamaSaoudi-db 8a8f6bf
shorten tests, check error cases
OussamaSaoudi-db eeaabb0
Rename to physical_to_logical, add comment explaining the deletion/se…
OussamaSaoudi-db fa13ade
Add note about ordinary scans for dv resolution
OussamaSaoudi-db 4dabdaf
Add doc comment to treemap_to_bools_with
OussamaSaoudi-db 7fcb531
Update resolve_scan_file_dv docs
OussamaSaoudi-db 577b424
Add test and docs
OussamaSaoudi-db a970df1
Add documentation
OussamaSaoudi-db 5b70aab
Rename to resolve-dvs
OussamaSaoudi-db 3e0ead6
Fixup naming and docs
OussamaSaoudi-db 0b207c4
address pr comments
OussamaSaoudi-db 68a4fb1
Merge branch 'main' into cdf_dv_resolve
OussamaSaoudi-db 3259769
Add test comment, ignore sv to bools test
OussamaSaoudi-db 2e9c29e
remove ignore from test
OussamaSaoudi-db 70fe573
initial schema and expr for phys to log
OussamaSaoudi-db 876dd15
physical to logical transform
OussamaSaoudi-db 35b38d4
logical to physical
OussamaSaoudi-db dfdc491
remove to_string from generated columns
OussamaSaoudi-db 020a19d
Add read phase and a test
OussamaSaoudi-db 8a92379
factor out test
OussamaSaoudi-db 3d2e53a
Add cdf test and text
OussamaSaoudi-db 8df172f
Add tests for cdf
OussamaSaoudi-db 7846757
remove unneeded import
OussamaSaoudi-db 67a9a18
more formatting
OussamaSaoudi-db fa042c1
Add some docs
OussamaSaoudi-db 4098b67
Removed allow(unused)
OussamaSaoudi-db 610d62e
Remove data for next PR
OussamaSaoudi-db 4bc5819
Remove dv test file
OussamaSaoudi-db 02599e6
appease clippy
OussamaSaoudi-db bd43bba
Add expression test
OussamaSaoudi-db 61da4c3
Address PR comments
OussamaSaoudi-db 65bce5f
Remove read_scan_data
OussamaSaoudi-db bea39ba
fix compiler warnings
OussamaSaoudi-db 5622874
fix test
OussamaSaoudi-db 221b96f
Switch to no timezone
OussamaSaoudi-db 857d644
Address pr comments
OussamaSaoudi-db 496a69a
Merge branch 'main' into cdf_phys_to_log
OussamaSaoudi-db e3031c3
Remove unneeded changes
OussamaSaoudi-db 88df7fa
make raw mask private
OussamaSaoudi-db File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
use std::collections::HashMap; | ||
use std::iter; | ||
|
||
use itertools::Itertools; | ||
|
||
use crate::expressions::Scalar; | ||
use crate::scan::{parse_partition_value, ColumnType}; | ||
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType}; | ||
use crate::{DeltaResult, Error, Expression}; | ||
|
||
use super::scan_file::{CdfScanFile, CdfScanFileType}; | ||
use super::{ | ||
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, | ||
REMOVE_CHANGE_TYPE, | ||
}; | ||
|
||
/// Returns a map from change data feed column name to an expression that generates the row data. | ||
#[allow(unused)] | ||
fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> { | ||
let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?; | ||
let version = scan_file.commit_version; | ||
let change_type: Expression = match scan_file.scan_type { | ||
CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]), | ||
CdfScanFileType::Add => ADD_CHANGE_TYPE.into(), | ||
CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(), | ||
}; | ||
let expressions = [ | ||
(CHANGE_TYPE_COL_NAME, change_type), | ||
(COMMIT_VERSION_COL_NAME, Expression::literal(version)), | ||
(COMMIT_TIMESTAMP_COL_NAME, timestamp.into()), | ||
]; | ||
Ok(expressions.into_iter().collect()) | ||
} | ||
|
||
/// Generates the expression used to convert physical data from the `scan_file` path into logical | ||
/// data matching the `logical_schema` | ||
#[allow(unused)] | ||
fn physical_to_logical_expr( | ||
scan_file: &CdfScanFile, | ||
logical_schema: &StructType, | ||
all_fields: &[ColumnType], | ||
) -> DeltaResult<Expression> { | ||
let mut cdf_columns = get_cdf_columns(scan_file)?; | ||
let all_fields = all_fields | ||
.iter() | ||
.map(|field| match field { | ||
ColumnType::Partition(field_idx) => { | ||
let field = logical_schema.fields.get_index(*field_idx); | ||
let Some((_, field)) = field else { | ||
return Err(Error::generic( | ||
"logical schema did not contain expected field, can't transform data", | ||
)); | ||
}; | ||
let name = field.physical_name(); | ||
let value_expression = | ||
parse_partition_value(scan_file.partition_values.get(name), field.data_type())?; | ||
Ok(value_expression.into()) | ||
} | ||
ColumnType::Selected(field_name) => { | ||
// Remove to take ownership | ||
let generated_column = cdf_columns.remove(field_name.as_str()); | ||
Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into())) | ||
} | ||
}) | ||
.try_collect()?; | ||
Ok(Expression::Struct(all_fields)) | ||
} | ||
|
||
/// Gets the physical schema that will be used to read data in the `scan_file` path. | ||
#[allow(unused)] | ||
fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef { | ||
if scan_file.scan_type == CdfScanFileType::Cdc { | ||
let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); | ||
let fields = read_schema.fields().cloned().chain(iter::once(change_type)); | ||
StructType::new(fields).into() | ||
} else { | ||
read_schema.clone().into() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::collections::HashMap; | ||
|
||
use crate::expressions::{column_expr, Expression, Scalar}; | ||
use crate::scan::ColumnType; | ||
use crate::schema::{DataType, StructField, StructType}; | ||
use crate::table_changes::physical_to_logical::physical_to_logical_expr; | ||
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType}; | ||
use crate::table_changes::{ | ||
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, | ||
REMOVE_CHANGE_TYPE, | ||
}; | ||
|
||
#[test] | ||
fn verify_physical_to_logical_expression() { | ||
let test = |scan_type, expected_expr| { | ||
let scan_file = CdfScanFile { | ||
scan_type, | ||
path: "fake_path".to_string(), | ||
dv_info: Default::default(), | ||
remove_dv: None, | ||
partition_values: HashMap::from([("age".to_string(), "20".to_string())]), | ||
commit_version: 42, | ||
commit_timestamp: 1234, | ||
}; | ||
let logical_schema = StructType::new([ | ||
StructField::new("id", DataType::STRING, true), | ||
StructField::new("age", DataType::LONG, false), | ||
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), | ||
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), | ||
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), | ||
]); | ||
let all_fields = vec![ | ||
ColumnType::Selected("id".to_string()), | ||
ColumnType::Partition(1), | ||
ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()), | ||
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()), | ||
ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()), | ||
]; | ||
let phys_to_logical_expr = | ||
physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap(); | ||
let expected_expr = Expression::struct_from([ | ||
column_expr!("id"), | ||
Scalar::Long(20).into(), | ||
expected_expr, | ||
Expression::literal(42i64), | ||
Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond | ||
]); | ||
|
||
assert_eq!(phys_to_logical_expr, expected_expr) | ||
}; | ||
|
||
let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]); | ||
test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into()); | ||
test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into()); | ||
test(CdfScanFileType::Cdc, cdc_change_type); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ use std::sync::Arc; | |
use itertools::Itertools; | ||
use tracing::debug; | ||
|
||
use crate::scan::state::GlobalScanState; | ||
use crate::scan::ColumnType; | ||
use crate::schema::{SchemaRef, StructType}; | ||
use crate::{DeltaResult, Engine, ExpressionRef}; | ||
|
@@ -188,6 +189,20 @@ impl TableChangesScan { | |
let schema = self.table_changes.end_snapshot.schema().clone().into(); | ||
table_changes_action_iter(engine, commits, schema, self.predicate.clone()) | ||
} | ||
|
||
/// Get global state that is valid for the entire scan. This is somewhat expensive so should | ||
/// only be called once per scan. | ||
#[allow(unused)] | ||
fn global_scan_state(&self) -> GlobalScanState { | ||
let end_snapshot = &self.table_changes.end_snapshot; | ||
GlobalScanState { | ||
table_root: self.table_changes.table_root.to_string(), | ||
partition_columns: end_snapshot.metadata().partition_columns.clone(), | ||
logical_schema: self.logical_schema.clone(), | ||
read_schema: self.physical_schema.clone(), | ||
column_mapping_mode: end_snapshot.column_mapping_mode, | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -198,6 +213,7 @@ mod tests { | |
use crate::expressions::{column_expr, Scalar}; | ||
use crate::scan::ColumnType; | ||
use crate::schema::{DataType, StructField, StructType}; | ||
use crate::table_changes::COMMIT_VERSION_COL_NAME; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can just do use super::*? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't seem like it. |
||
use crate::{Expression, Table}; | ||
|
||
#[test] | ||
|
@@ -236,7 +252,7 @@ mod tests { | |
|
||
let schema = table_changes | ||
.schema() | ||
.project(&["id", "_commit_version"]) | ||
.project(&["id", COMMIT_VERSION_COL_NAME]) | ||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.unwrap(); | ||
let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); | ||
let scan = table_changes | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! i like taking the more constrained data