Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set projection before configuring the source #14685

Merged
merged 16 commits into from
Feb 28, 2025
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,32 @@ impl FileScanConfig {
/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self.with_updated_statistics()
}

// Update source statistics with the current projection data
fn with_updated_statistics(mut self) -> Self {
let max_projection_column = *self
.projection
.as_ref()
.and_then(|proj| proj.iter().max())
.unwrap_or(&0);

if max_projection_column
>= self.file_schema.fields().len() + self.table_partition_cols.len()
{
Copy link
Contributor

@berkaysynnada berkaysynnada Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this case possible? it seems not obvious to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this now. but answering to the question - this could happen if projection set but table_partition_cols isn't yet set (or vice versa).

This whole logic should be much cleaner when we switch to the builder approach (I want to do a PR on top after this one is merged)

// we don't yet have enough information (file schema info or partition column info) to perform projection
return self;
}

let (
_projected_schema,
_constraints,
projected_statistics,
_projected_output_ordering,
) = self.project();

self.source = self.source.with_statistics(projected_statistics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand why the source would need projected statistics

I am testing out if the issue is that the FileScanConfig is providing the wrong statistics (like maybe this line should be self.statistics rather than self.source.statistics

https://github.com/apache/datafusion/blob/1c54b38e4a4012fd8d1b4f48e2c3d6d35016bad0/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L233-L232

Copy link
Contributor Author

@blaginin blaginin Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea! We can't use self.source.statistics directly, because statistics match projection we're applying - so I had to apply projection before (89ed225).

This made the PR a bit messier, and I had to comment several test lines - LMK if you prefer the old version

self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

let source = self.source.clone();
self.with_source(source)

}

Expand Down Expand Up @@ -383,7 +409,7 @@ impl FileScanConfig {
/// Set the partitioning columns of the files
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
self.with_updated_statistics()
}

/// Set the output ordering of the files
Expand Down Expand Up @@ -737,6 +763,13 @@ mod tests {
),
];
// create a projected schema

let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};

let conf = config_for_projection(
file_batch.schema(),
// keep all cols from file and 2 from partitioning
Expand All @@ -747,10 +780,20 @@ mod tests {
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);

let source_statistics = conf.source.statistics().unwrap();

// statistics should be preserved and passed into the source
assert_eq!(source_statistics.num_rows, Precision::Inexact(3));

// 3 original statistics + 2 partition statistics
assert_eq!(source_statistics.column_statistics.len(), 5);

let (proj_schema, ..) = conf.project();

// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
proj_schema,
Expand Down
Loading