Skip to content

Commit

Permalink
Set projection before configuring the source (#14685)
Browse files Browse the repository at this point in the history
* Set projection before configuring the source

* Refresh source manually

* Update statistics on proj / partition columns update

* Use `FileScanConfig` own `source`

* Extend test to ensure stats are different

* Unify names

* Comment `total_byte_size` in tests

* Use source stats as a base

* Return correct stats in the `ParquetSource` mock

* Revert test changes, add follow on ticket

* Revert statistics total_byte_count change

* Update test

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
blaginin and alamb authored Feb 28, 2025
1 parent cf2b7e6 commit 2fd558f
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 100 deletions.
2 changes: 1 addition & 1 deletion datafusion-testing
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

Ok(())
Expand Down Expand Up @@ -1976,6 +1977,7 @@ mod tests {

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ mod tests {

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

Ok(())
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec1.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
assert_eq!(get_static_cache_size(&state1), 1);

Expand All @@ -93,7 +94,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec2.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
assert_eq!(get_static_cache_size(&state2), 1);

Expand All @@ -104,7 +106,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec3.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
// List same file no increase
assert_eq!(get_static_cache_size(&state1), 1);
Expand Down
165 changes: 102 additions & 63 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use arrow::{
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
};
use datafusion_common::{
exec_err, stats::Precision, ColumnStatistics, Constraints, Result, Statistics,
};
use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
Expand Down Expand Up @@ -86,20 +84,22 @@ use crate::{
/// # Field::new("c4", DataType::Int32, false),
/// # ]));
/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
/// # struct ParquetSource {};
/// # struct ParquetSource {
/// # projected_statistics: Option<Statistics>
/// # };
/// # impl FileSource for ParquetSource {
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
/// # fn as_any(&self) -> &dyn Any { self }
/// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_statistics(&self, _: Statistics) -> Arc<dyn FileSource> { Arc::new(Self::new()) }
/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
/// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
/// # fn file_type(&self) -> &str { "parquet" }
/// # }
/// # impl ParquetSource {
/// # fn new() -> Self { Self{} }
/// # fn new() -> Self { Self {projected_statistics: None} }
/// # }
/// // create FileScan config for reading parquet files from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
Expand Down Expand Up @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig {
}

fn statistics(&self) -> Result<Statistics> {
self.file_source.statistics()
Ok(self.projected_stats())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down Expand Up @@ -324,13 +324,7 @@ impl FileScanConfig {

/// Set the file source
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
let (
_projected_schema,
_constraints,
projected_statistics,
_projected_output_ordering,
) = self.project();
self.file_source = file_source.with_statistics(projected_statistics);
self.file_source = file_source.with_statistics(self.statistics.clone());
self
}

Expand All @@ -342,10 +336,75 @@ impl FileScanConfig {

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self.statistics = statistics.clone();
self.file_source = self.file_source.with_statistics(statistics);
self
}

fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
+ self.table_partition_cols.len())
.collect(),
}
}

fn projected_stats(&self) -> Statistics {
let statistics = self
.file_source
.statistics()
.unwrap_or(self.statistics.clone());

let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: statistics.num_rows,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
total_byte_size: statistics.total_byte_size,
column_statistics: table_cols_stats,
}
}

fn projected_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.file_schema.field(idx).clone()
} else {
let partition_idx = idx - self.file_schema.fields().len();
self.table_partition_cols[partition_idx].clone()
}
})
.collect();

Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
))
}

fn projected_constraints(&self) -> Constraints {
let indexes = self.projection_indices();

self.constraints
.project(&indexes)
.unwrap_or_else(Constraints::empty)
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
Expand Down Expand Up @@ -433,54 +492,13 @@ impl FileScanConfig {
);
}

let proj_indices = if let Some(proj) = &self.projection {
proj
} else {
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
&(0..len).collect::<Vec<_>>()
};

let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_indices {
if *idx < self.file_schema.fields().len() {
let field = self.file_schema.field(*idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
}

let table_stats = Statistics {
num_rows: self.statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
};

let projected_schema = Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
));
let schema = self.projected_schema();
let constraints = self.projected_constraints();
let stats = self.projected_stats();

let projected_constraints = self
.constraints
.project(proj_indices)
.unwrap_or_else(Constraints::empty);
let output_ordering = get_projected_output_ordering(self, &schema);

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);

(
projected_schema,
projected_constraints,
table_stats,
projected_output_ordering,
)
(schema, constraints, stats, output_ordering)
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
Expand Down Expand Up @@ -1048,6 +1066,7 @@ mod tests {
compute::SortOptions,
};

use datafusion_common::stats::Precision;
use datafusion_common::{assert_batches_eq, DFSchema};
use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
use datafusion_physical_expr::create_physical_expr;
Expand Down Expand Up @@ -1203,6 +1222,12 @@ mod tests {
),
];
// create a projected schema
let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};

let conf = config_for_projection(
file_batch.schema(),
// keep all cols from file and 2 from partitioning
Expand All @@ -1213,9 +1238,23 @@ mod tests {
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);

let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));

// 3 original statistics + 2 partition statistics
assert_eq!(conf_stats.column_statistics.len(), 5);

// file statics should not be modified
assert_eq!(source_statistics, statistics);
assert_eq!(source_statistics.column_statistics.len(), 3);

let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
Expand Down
Loading

0 comments on commit 2fd558f

Please sign in to comment.