diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 51ca075b1..d5695c130 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -96,7 +96,7 @@ pub unsafe extern "C" fn get_global_read_schema( state: Handle, ) -> Handle { let state = unsafe { state.as_ref() }; - state.read_schema.clone().into() + state.physical_schema.clone().into() } /// Free a global read schema diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index 69770a1aa..d97b6c2d3 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -256,7 +256,7 @@ fn do_work( ) { // get the type for the function calls let engine: &dyn Engine = engine.as_ref(); - let read_schema = scan_state.read_schema.clone(); + let physical_schema = scan_state.physical_schema.clone(); // in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side // hangs up, which indicates there's no more data to process. while let Ok(scan_file) = scan_file_rx.recv() { @@ -287,7 +287,7 @@ fn do_work( // vector let read_results = engine .get_parquet_handler() - .read_parquet_files(&[meta], read_schema.clone(), None) + .read_parquet_files(&[meta], physical_schema.clone(), None) .unwrap(); for read_result in read_results { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 9ccfa8a76..360e2bcb1 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -113,7 +113,7 @@ impl ScanBuilder { logical_schema, physical_schema: Arc::new(StructType::new(state_info.read_fields)), physical_predicate, - all_fields: state_info.all_fields, + all_fields: Arc::new(state_info.all_fields), have_partition_cols: state_info.have_partition_cols, }) } @@ -329,7 +329,7 @@ pub struct Scan { logical_schema: SchemaRef, physical_schema: SchemaRef, physical_predicate: PhysicalPredicate, - all_fields: Vec, + all_fields: Arc>, have_partition_cols: bool, } @@ -411,7 +411,7 @@ impl Scan { table_root: self.snapshot.table_root.to_string(), partition_columns: self.snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), - read_schema: self.physical_schema.clone(), + physical_schema: self.physical_schema.clone(), column_mapping_mode: self.snapshot.column_mapping_mode, } } @@ -427,7 +427,7 @@ impl Scan { pub fn execute( &self, engine: Arc, - ) -> DeltaResult> + '_> { + ) -> DeltaResult>> { struct ScanFile { path: String, size: i64, @@ -456,6 +456,11 @@ impl Scan { ); let global_state = Arc::new(self.global_scan_state()); + let table_root = self.snapshot.table_root.clone(); + let physical_predicate = self.physical_predicate(); + let all_fields = self.all_fields.clone(); + let have_partition_cols = self.have_partition_cols; + let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data .map(|res| { @@ -469,10 +474,10 @@ impl Scan { let result = scan_files_iter .map(move |scan_file| -> DeltaResult<_> { let scan_file = scan_file?; - let file_path = self.snapshot.table_root.join(&scan_file.path)?; + let file_path = table_root.join(&scan_file.path)?; let mut selection_vector = scan_file .dv_info - .get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?; + .get_selection_vector(engine.as_ref(), &table_root)?; let meta = FileMeta { last_modified: 0, size: scan_file.size as usize, @@ -485,13 +490,14 @@ impl Scan { // https://github.com/delta-io/delta-kernel-rs/issues/434 for more details. let read_result_iter = engine.get_parquet_handler().read_parquet_files( &[meta], - global_state.read_schema.clone(), - self.physical_predicate(), + global_state.physical_schema.clone(), + physical_predicate.clone(), )?; // Arc clones let engine = engine.clone(); let global_state = global_state.clone(); + let all_fields = all_fields.clone(); Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> { let read_result = read_result?; // to transform the physical data into the correct logical form @@ -500,8 +506,8 @@ impl Scan { read_result, &global_state, &scan_file.partition_values, - &self.all_fields, - self.have_partition_cols, + &all_fields, + have_partition_cols, ); let len = logical.as_ref().map_or(0, |res| res.len()); // need to split the dv_mask. what's left in dv_mask covers this result, and rest @@ -651,7 +657,7 @@ fn transform_to_logical_internal( all_fields: &[ColumnType], have_partition_cols: bool, ) -> DeltaResult> { - let read_schema = global_state.read_schema.clone(); + let physical_schema = global_state.physical_schema.clone(); if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None { return Ok(data); } @@ -678,7 +684,7 @@ fn transform_to_logical_internal( let result = engine .get_expression_handler() .get_evaluator( - read_schema, + physical_schema, read_expression, global_state.logical_schema.clone().into(), ) diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 12bbed552..b57f0c120 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -24,7 +24,7 @@ pub struct GlobalScanState { pub table_root: String, pub partition_columns: Vec, pub logical_schema: SchemaRef, - pub read_schema: SchemaRef, + pub physical_schema: SchemaRef, pub column_mapping_mode: ColumnMappingMode, } diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index b40eaa4c6..627f1de93 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -208,7 +208,7 @@ impl TableChangesScan { table_root: self.table_changes.table_root.to_string(), partition_columns: end_snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), - read_schema: self.physical_schema.clone(), + physical_schema: self.physical_schema.clone(), column_mapping_mode: end_snapshot.column_mapping_mode, } } @@ -276,7 +276,8 @@ fn read_scan_file( let physical_to_logical_expr = physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?; - let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref()); + let physical_schema = + scan_file_physical_schema(&scan_file, global_state.physical_schema.as_ref()); let phys_to_logical_eval = engine.get_expression_handler().get_evaluator( physical_schema.clone(), physical_to_logical_expr, diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 7a674ce57..ae49b70e2 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -389,7 +389,7 @@ fn read_with_scan_data( .get_parquet_handler() .read_parquet_files( &[meta], - global_state.read_schema.clone(), + global_state.physical_schema.clone(), scan.physical_predicate().clone(), ) .unwrap();