Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set projection before configuring the source #14685

Merged
merged 16 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-testing
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1934,7 +1934,8 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those lines were correct, because tests had the same bug: projection was set after the source, and so source statistics (which were used here) weren't really updated properly

RustRover-EAP 2025-02-28 08 01 28

A proper way to fix this is set bytes size in projection_stats

image

Since it's a separate existing todo, I think it's better to change it in a PR on top of this one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Ok(())
}
Expand Down Expand Up @@ -1976,7 +1977,8 @@ mod tests {

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,8 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size

Ok(())
}
Expand All @@ -1239,7 +1240,9 @@ mod tests {

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

// assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
// todo: uncomment when FileScanConfig::projection_stats puts byte size

Ok(())
}
Expand Down
172 changes: 102 additions & 70 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig {
}

fn statistics(&self) -> Result<Statistics> {
self.file_source.statistics()
Ok(self.projection_stats())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down Expand Up @@ -324,13 +324,7 @@ impl FileScanConfig {

/// Set the file source
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
let (
_projected_schema,
_constraints,
projected_statistics,
_projected_output_ordering,
) = self.project();
self.file_source = file_source.with_statistics(projected_statistics);
self.file_source = file_source;
self
}

Expand All @@ -346,6 +340,65 @@ impl FileScanConfig {
self
}

fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
+ self.table_partition_cols.len())
.collect(),
}
}

fn projection_stats(&self) -> Statistics {
let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: self.statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
}
}

fn projection_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.file_schema.field(idx).clone()
} else {
let partition_idx = idx - self.file_schema.fields().len();
self.table_partition_cols[partition_idx].clone()
}
})
.collect();

Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
))
}

fn projection_constraints(&self) -> Constraints {
let indexes = self.projection_indices();

self.constraints
.project(&indexes)
.unwrap_or_else(Constraints::empty)
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
Expand Down Expand Up @@ -433,54 +486,13 @@ impl FileScanConfig {
);
}

let proj_indices = if let Some(proj) = &self.projection {
proj
} else {
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
&(0..len).collect::<Vec<_>>()
};

let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_indices {
if *idx < self.file_schema.fields().len() {
let field = self.file_schema.field(*idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
}

let table_stats = Statistics {
num_rows: self.statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
};

let projected_schema = Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
));

let projected_constraints = self
.constraints
.project(proj_indices)
.unwrap_or_else(Constraints::empty);
let schema = self.projection_schema();
let constraints = self.projection_constraints();
let stats = self.projection_stats();

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);
let output_ordering = get_projected_output_ordering(self, &schema);

(
projected_schema,
projected_constraints,
table_stats,
projected_output_ordering,
)
(schema, constraints, stats, output_ordering)
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
Expand Down Expand Up @@ -660,17 +672,17 @@ pub struct PartitionColumnProjector {
/// insert the partition columns in the target record batch.
projected_partition_indexes: Vec<(usize, usize)>,
/// The schema of the table once the projection was applied.
projected_schema: SchemaRef,
projection_schema: SchemaRef,
}

impl PartitionColumnProjector {
// Create a projector to insert the partitioning columns into batches read from files
// - `projected_schema`: the target schema with both file and partitioning columns
// - `projection_schema`: the target schema with both file and partitioning columns
// - `table_partition_cols`: all the partitioning column names
pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
pub fn new(projection_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
let mut idx_map = HashMap::new();
for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
if let Ok(schema_idx) = projection_schema.index_of(partition_name) {
idx_map.insert(partition_idx, schema_idx);
}
}
Expand All @@ -681,21 +693,21 @@ impl PartitionColumnProjector {
Self {
projected_partition_indexes,
key_buffer_cache: Default::default(),
projected_schema,
projection_schema,
}
}

// Transform the batch read from the file by inserting the partitioning columns
// to the right positions as deduced from `projected_schema`
// to the right positions as deduced from `projection_schema`
// - `file_batch`: batch read from the file, with internal projection applied
// - `partition_values`: the list of partition values, one for each partition column
pub fn project(
&mut self,
file_batch: RecordBatch,
partition_values: &[ScalarValue],
) -> Result<RecordBatch> {
let expected_cols =
self.projected_schema.fields().len() - self.projected_partition_indexes.len();
let expected_cols = self.projection_schema.fields().len()
- self.projected_partition_indexes.len();

if file_batch.columns().len() != expected_cols {
return exec_err!(
Expand All @@ -717,7 +729,7 @@ impl PartitionColumnProjector {
let mut partition_value = Cow::Borrowed(p_value);

// check if user forgot to dict-encode the partition value
let field = self.projected_schema.field(sidx);
let field = self.projection_schema.field(sidx);
let expected_data_type = field.data_type();
let actual_data_type = partition_value.data_type();
if let DataType::Dictionary(key_type, _) = expected_data_type {
Expand All @@ -741,7 +753,7 @@ impl PartitionColumnProjector {
}

RecordBatch::try_new_with_options(
Arc::clone(&self.projected_schema),
Arc::clone(&self.projection_schema),
cols,
&RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
)
Expand Down Expand Up @@ -953,15 +965,15 @@ fn create_output_array(
///```
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
projection_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = LexOrdering::default();
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
if let Some((idx, _)) = projection_schema.column_with_name(name) {
// Compute the new sort expression (with correct index) after projection:
new_ordering.push(PhysicalSortExpr {
expr: Arc::new(Column::new(name, idx)),
Expand All @@ -970,7 +982,7 @@ fn get_projected_output_ordering(
continue;
}
}
// Cannot find expression in the projected_schema, stop iterating
// Cannot find expression in the projection_schema, stop iterating
// since rest of the orderings are violated
break;
}
Expand All @@ -990,7 +1002,7 @@ fn get_projected_output_ordering(

let statistics = match MinMaxStatistics::new_from_files(
&new_ordering,
projected_schema,
projection_schema,
base_config.projection.as_deref(),
group,
) {
Expand Down Expand Up @@ -1203,6 +1215,12 @@ mod tests {
),
];
// create a projected schema
let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};

let conf = config_for_projection(
file_batch.schema(),
// keep all cols from file and 2 from partitioning
Expand All @@ -1213,9 +1231,23 @@ mod tests {
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);

let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));

// 3 original statistics + 2 partition statistics
assert_eq!(conf_stats.column_statistics.len(), 5);

// file statics should not be modified
assert_eq!(source_statistics, statistics);
assert_eq!(source_statistics.column_statistics.len(), 3);

let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
Expand Down Expand Up @@ -1679,7 +1711,7 @@ mod tests {
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
file_schema,
Arc::new(MockSource::default()),
MockSource::default().with_statistics(statistics.clone()),
)
.with_projection(projection)
.with_statistics(statistics)
Expand Down
43 changes: 42 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
FileSinkConfig, ParquetSource,
FileSinkConfig, FileSource, ParquetSource,
};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::sum::sum_udaf;
Expand Down Expand Up @@ -1579,3 +1579,44 @@ async fn roundtrip_coalesce() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn roundtrip_projection_source() -> Result<()> {
let schema = Arc::new(Schema::new(Fields::from([
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(Field::new("b", DataType::Utf8, false)),
Arc::new(Field::new("c", DataType::Int32, false)),
Arc::new(Field::new("d", DataType::Int32, false)),
])));

let statistics = Statistics::new_unknown(&schema);

let source = ParquetSource::default().with_statistics(statistics.clone());
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
)]],
constraints: Constraints::empty(),
statistics,
file_schema: schema.clone(),
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
new_lines_in_values: false,
file_source: source,
};

let filter = Arc::new(
FilterExec::try_new(
Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))),
scan_config.build(),
)?
.with_projection(Some(vec![0, 1]))?,
);

roundtrip_test(filter)
}
Loading