diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 836758b21318..77c32f562355 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1382,3 +1382,54 @@ fn test_union_after_projection() -> Result<()> { Ok(()) } + +#[test] +fn test_partition_col_projection_pushdown() -> Result<()> { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, true), + Field::new("string_col", DataType::Utf8, true), + ])); + + let partitioned_schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, true), + Field::new("string_col", DataType::Utf8, true), + Field::new("partition_col", DataType::Utf8, true), + ])); + + let source = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + file_schema.clone(), + Arc::new(CsvSource::default()), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)]) + .with_projection(Some(vec![0, 1, 2])) + .build(); + + let projection = Arc::new(ProjectionExec::try_new( + vec![ + ( + col("string_col", partitioned_schema.as_ref())?, + "string_col".to_string(), + ), + ( + col("partition_col", partitioned_schema.as_ref())?, + "partition_col".to_string(), + ), + ( + col("int_col", partitioned_schema.as_ref())?, + "int_col".to_string(), + ), + ], + source, + )?); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = ["ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 79279b5c8231..bee74e042f22 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -266,22 +266,33 @@ impl DataSource for FileScanConfig { ) -> Result>> { // If there is any non-column or alias-carrier expression, Projection should not be removed. // This process can be moved into CsvExec, but it would be an overlap of their responsibility. - Ok(all_alias_free_columns(projection.expr()).then(|| { - let file_scan = self.clone(); - let source = Arc::clone(&file_scan.file_source); - let new_projections = new_projections_for_columns( - projection, - &file_scan - .projection - .clone() - .unwrap_or((0..self.file_schema.fields().len()).collect()), - ); - file_scan - // Assign projected statistics to source - .with_projection(Some(new_projections)) - .with_source(source) - .build() as _ - })) + + let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| { + expr.as_any() + .downcast_ref::() + .map(|expr| expr.index() >= self.file_schema.fields().len()) + .unwrap_or(false) + }); + + Ok( + (all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj) + .then(|| { + let file_scan = self.clone(); + let source = Arc::clone(&file_scan.file_source); + let new_projections = new_projections_for_columns( + projection, + &file_scan + .projection + .clone() + .unwrap_or((0..self.file_schema.fields().len()).collect()), + ); + file_scan + // Assign projected statistics to source + .with_projection(Some(new_projections)) + .with_source(source) + .build() as _ + }), + ) } }