Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove lifetime requirement on Scan::execute #588

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub unsafe extern "C" fn get_global_read_schema(
state: Handle<SharedGlobalScanState>,
) -> Handle<SharedSchema> {
let state = unsafe { state.as_ref() };
state.read_schema.clone().into()
state.physical_schema.clone().into()
}

/// Free a global read schema
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ fn do_work(
) {
// get the type for the function calls
let engine: &dyn Engine = engine.as_ref();
let read_schema = scan_state.read_schema.clone();
let physical_schema = scan_state.physical_schema.clone();
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
// hangs up, which indicates there's no more data to process.
while let Ok(scan_file) = scan_file_rx.recv() {
Expand Down Expand Up @@ -287,7 +287,7 @@ fn do_work(
// vector
let read_results = engine
.get_parquet_handler()
.read_parquet_files(&[meta], read_schema.clone(), None)
.read_parquet_files(&[meta], physical_schema.clone(), None)
.unwrap();

for read_result in read_results {
Expand Down
30 changes: 18 additions & 12 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ScanBuilder {
logical_schema,
physical_schema: Arc::new(StructType::new(state_info.read_fields)),
physical_predicate,
all_fields: state_info.all_fields,
all_fields: Arc::new(state_info.all_fields),
have_partition_cols: state_info.have_partition_cols,
})
}
Expand Down Expand Up @@ -329,7 +329,7 @@ pub struct Scan {
logical_schema: SchemaRef,
physical_schema: SchemaRef,
physical_predicate: PhysicalPredicate,
all_fields: Vec<ColumnType>,
all_fields: Arc<Vec<ColumnType>>,
have_partition_cols: bool,
}

Expand Down Expand Up @@ -411,7 +411,7 @@ impl Scan {
table_root: self.snapshot.table_root.to_string(),
partition_columns: self.snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: self.snapshot.column_mapping_mode,
}
}
Expand All @@ -427,7 +427,7 @@ impl Scan {
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
struct ScanFile {
path: String,
size: i64,
Expand Down Expand Up @@ -456,6 +456,11 @@ impl Scan {
);

let global_state = Arc::new(self.global_scan_state());
let table_root = self.snapshot.table_root.clone();
let physical_predicate = self.physical_predicate();
let all_fields = self.all_fields.clone();
let have_partition_cols = self.have_partition_cols;

let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
Expand All @@ -469,10 +474,10 @@ impl Scan {
let result = scan_files_iter
.map(move |scan_file| -> DeltaResult<_> {
let scan_file = scan_file?;
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let file_path = table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?;
.get_selection_vector(engine.as_ref(), &table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
Expand All @@ -485,13 +490,14 @@ impl Scan {
// https://github.com/delta-io/delta-kernel-rs/issues/434 for more details.
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
self.physical_predicate(),
global_state.physical_schema.clone(),
physical_predicate.clone(),
)?;

// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
let all_fields = all_fields.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we clone here and above: can we get by with just one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so 😔 Both closures are move, so each closure wants ownership of its own all_fields. We'll eventually be able to avoid the clone by moving the construction of P2L expression evaluator out of the inner loop.

Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
Expand All @@ -500,8 +506,8 @@ impl Scan {
read_result,
&global_state,
&scan_file.partition_values,
&self.all_fields,
self.have_partition_cols,
&all_fields,
have_partition_cols,
);
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
Expand Down Expand Up @@ -651,7 +657,7 @@ fn transform_to_logical_internal(
all_fields: &[ColumnType],
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let read_schema = global_state.read_schema.clone();
let physical_schema = global_state.physical_schema.clone();
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None {
return Ok(data);
}
Expand All @@ -678,7 +684,7 @@ fn transform_to_logical_internal(
let result = engine
.get_expression_handler()
.get_evaluator(
read_schema,
physical_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct GlobalScanState {
pub table_root: String,
pub partition_columns: Vec<String>,
pub logical_schema: SchemaRef,
pub read_schema: SchemaRef,
pub physical_schema: SchemaRef,
pub column_mapping_mode: ColumnMappingMode,
}

Expand Down
5 changes: 3 additions & 2 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl TableChangesScan {
table_root: self.table_changes.table_root.to_string(),
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode,
}
}
Expand Down Expand Up @@ -276,7 +276,8 @@ fn read_scan_file(

let physical_to_logical_expr =
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?;
let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref());
let physical_schema =
scan_file_physical_schema(&scan_file, global_state.physical_schema.as_ref());
let phys_to_logical_eval = engine.get_expression_handler().get_evaluator(
physical_schema.clone(),
physical_to_logical_expr,
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ fn read_with_scan_data(
.get_parquet_handler()
.read_parquet_files(
&[meta],
global_state.read_schema.clone(),
global_state.physical_schema.clone(),
scan.physical_predicate().clone(),
)
.unwrap();
Expand Down
Loading