Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set projection before configuring the source #14685

Merged
merged 16 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-testing
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1934,7 +1934,8 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those lines were correct, because tests had the same bug: projection was set after the source, and so source statistics (which were used here) weren't really updated properly

RustRover-EAP 2025-02-28 08 01 28

A proper way to fix this is set bytes size in projection_stats

image

Since it's a separate existing todo, I think it's better to change it in a PR on top of this one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Ok(())
}
Expand Down Expand Up @@ -1976,7 +1977,8 @@ mod tests {

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,8 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size

Ok(())
}
Expand All @@ -1239,7 +1240,9 @@ mod tests {

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec1.statistics().unwrap().total_byte_size,
Precision::Exact(671)
Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation
);
assert_eq!(get_static_cache_size(&state1), 1);

Expand All @@ -93,7 +93,7 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec2.statistics().unwrap().total_byte_size,
Precision::Exact(671)
Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation
);
assert_eq!(get_static_cache_size(&state2), 1);

Expand All @@ -104,7 +104,7 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec3.statistics().unwrap().total_byte_size,
Precision::Exact(671)
Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation
);
// List same file no increase
assert_eq!(get_static_cache_size(&state1), 1);
Expand Down
160 changes: 100 additions & 60 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,22 @@ use crate::{
/// # Field::new("c4", DataType::Int32, false),
/// # ]));
/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
/// # struct ParquetSource {};
/// # struct ParquetSource {
/// # projected_statistics: Option<Statistics>
/// # };
/// # impl FileSource for ParquetSource {
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
/// # fn as_any(&self) -> &dyn Any { self }
/// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_statistics(&self, _: Statistics) -> Arc<dyn FileSource> { Arc::new(Self::new()) }
/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
/// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
/// # fn file_type(&self) -> &str { "parquet" }
/// # }
/// # impl ParquetSource {
/// # fn new() -> Self { Self{} }
/// # fn new() -> Self { Self {projected_statistics: None} }
/// # }
/// // create FileScan config for reading parquet files from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
Expand Down Expand Up @@ -244,7 +246,7 @@ impl DataSource for FileScanConfig {
}

fn statistics(&self) -> Result<Statistics> {
self.file_source.statistics()
Ok(self.projected_stats())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down Expand Up @@ -324,13 +326,7 @@ impl FileScanConfig {

/// Set the file source
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
let (
_projected_schema,
_constraints,
projected_statistics,
_projected_output_ordering,
) = self.project();
self.file_source = file_source.with_statistics(projected_statistics);
self.file_source = file_source.with_statistics(self.statistics.clone());
self
}

Expand All @@ -342,10 +338,75 @@ impl FileScanConfig {

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self.statistics = statistics.clone();
self.file_source = self.file_source.with_statistics(statistics);
self
}

fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
+ self.table_partition_cols.len())
.collect(),
}
}

fn projected_stats(&self) -> Statistics {
let statistics = self
.file_source
.statistics()
.unwrap_or(self.statistics.clone());

let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
}
}

fn projected_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.file_schema.field(idx).clone()
} else {
let partition_idx = idx - self.file_schema.fields().len();
self.table_partition_cols[partition_idx].clone()
}
})
.collect();

Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
))
}

fn projected_constraints(&self) -> Constraints {
let indexes = self.projection_indices();

self.constraints
.project(&indexes)
.unwrap_or_else(Constraints::empty)
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
Expand Down Expand Up @@ -433,54 +494,13 @@ impl FileScanConfig {
);
}

let proj_indices = if let Some(proj) = &self.projection {
proj
} else {
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
&(0..len).collect::<Vec<_>>()
};

let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_indices {
if *idx < self.file_schema.fields().len() {
let field = self.file_schema.field(*idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
}

let table_stats = Statistics {
num_rows: self.statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
};

let projected_schema = Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
));

let projected_constraints = self
.constraints
.project(proj_indices)
.unwrap_or_else(Constraints::empty);
let schema = self.projected_schema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks really nice now

let constraints = self.projected_constraints();
let stats = self.projected_stats();

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);
let output_ordering = get_projected_output_ordering(self, &schema);

(
projected_schema,
projected_constraints,
table_stats,
projected_output_ordering,
)
(schema, constraints, stats, output_ordering)
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
Expand Down Expand Up @@ -1203,6 +1223,12 @@ mod tests {
),
];
// create a projected schema
let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};

let conf = config_for_projection(
file_batch.schema(),
// keep all cols from file and 2 from partitioning
Expand All @@ -1213,9 +1239,23 @@ mod tests {
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);

let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));

// 3 original statistics + 2 partition statistics
assert_eq!(conf_stats.column_statistics.len(), 5);

// file statics should not be modified
assert_eq!(source_statistics, statistics);
assert_eq!(source_statistics.column_statistics.len(), 3);

let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
Expand Down
72 changes: 52 additions & 20 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
FileSinkConfig, ParquetSource,
FileSinkConfig, FileSource, ParquetSource,
};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::sum::sum_udaf;
Expand Down Expand Up @@ -778,24 +778,16 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let source = Arc::new(ParquetSource::default());
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![file_group]],
constraints: Constraints::empty(),
statistics: Statistics::new_unknown(&schema),
file_schema: schema,
projection: Some(vec![0, 1]),
limit: None,
table_partition_cols: vec![Field::new(
"part".to_string(),
wrap_partition_type_in_dict(DataType::Int16),
false,
)],
output_ordering: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
new_lines_in_values: false,
file_source: source,
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
.with_projection(Some(vec![0, 1]))
.with_file_group(vec![file_group])
.with_table_partition_cols(vec![Field::new(
"part".to_string(),
wrap_partition_type_in_dict(DataType::Int16),
false,
)])
.with_newlines_in_values(false);

roundtrip_test(scan_config.build())
}
Expand Down Expand Up @@ -1605,6 +1597,47 @@ async fn roundtrip_coalesce() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn roundtrip_projection_source() -> Result<()> {
let schema = Arc::new(Schema::new(Fields::from([
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(Field::new("b", DataType::Utf8, false)),
Arc::new(Field::new("c", DataType::Int32, false)),
Arc::new(Field::new("d", DataType::Int32, false)),
])));

let statistics = Statistics::new_unknown(&schema);

let source = ParquetSource::default().with_statistics(statistics.clone());
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
)]],
constraints: Constraints::empty(),
statistics,
file_schema: schema.clone(),
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
new_lines_in_values: false,
file_source: source,
};

let filter = Arc::new(
FilterExec::try_new(
Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))),
scan_config.build(),
)?
.with_projection(Some(vec![0, 1]))?,
);

roundtrip_test(filter)
}

#[tokio::test]
async fn roundtrip_parquet_select_star() -> Result<()> {
let ctx = all_types_context().await?;
Expand All @@ -1626,7 +1659,6 @@ async fn roundtrip_parquet_select_star_predicate() -> Result<()> {
roundtrip_test_sql_with_context(sql, &ctx).await
}

#[ignore = "Test failing due to https://github.com/apache/datafusion/issues/14679"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

#[tokio::test]
async fn roundtrip_parquet_select_projection_predicate() -> Result<()> {
let ctx = all_types_context().await?;
Expand Down