From 2fd558fdd0fa95e70d6ae5135776d164119f3536 Mon Sep 17 00:00:00 2001 From: Dmitrii Blaginin Date: Fri, 28 Feb 2025 15:26:50 +0000 Subject: [PATCH] Set projection before configuring the source (#14685) * Set projection before configuring the source * Refresh source manually * Update statistics on proj / partition columns update * Use `FileScanConfig` own `source` * Extend test to ensure stats are different * Unify names * Comment `total_byte_size` in tests * Use source stats as a base * Return correct stats in the `ParquetSource` mock * Revert test changes, add follow on ticket * Revert statistics total_byte_count change * Update test --------- Co-authored-by: Andrew Lamb --- datafusion-testing | 2 +- .../src/datasource/file_format/parquet.rs | 2 + .../core/src/datasource/listing/table.rs | 1 + .../core/tests/parquet/file_statistics.rs | 9 +- datafusion/datasource/src/file_scan_config.rs | 165 +++++++++++------- .../tests/cases/roundtrip_physical_plan.rs | 72 +++++--- .../sqllogictest/test_files/explain.slt | 26 +-- 7 files changed, 177 insertions(+), 100 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index 3462eaa78745..a169d8ded675 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 +Subproject commit a169d8ded67529174f08e22fe88c014397a4d967 diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 98aa24ad00cb..4a24871aeef7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1934,6 +1934,7 @@ mod tests { // test metadata assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); Ok(()) @@ -1976,6 +1977,7 @@ mod tests { // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index adef02c38d73..41e939d60b08 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1239,6 +1239,7 @@ mod tests { let exec = table.scan(&state, None, &[], None).await?; assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); Ok(()) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 4c1d17c8426e..7e98ebed6c9a 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -82,7 +82,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec1.statistics().unwrap().total_byte_size, - Precision::Exact(671) + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); assert_eq!(get_static_cache_size(&state1), 1); @@ -93,7 +94,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec2.statistics().unwrap().total_byte_size, - Precision::Exact(671) + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); assert_eq!(get_static_cache_size(&state2), 1); @@ -104,7 +106,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec3.statistics().unwrap().total_byte_size, - Precision::Exact(671) + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); // List same file no increase assert_eq!(get_static_cache_size(&state1), 1); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index df38464f1b00..79279b5c8231 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -31,9 +31,7 @@ use arrow::{ buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, }; -use datafusion_common::{ - exec_err, stats::Precision, ColumnStatistics, Constraints, Result, Statistics, -}; +use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics}; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, @@ -86,20 +84,22 @@ use crate::{ /// # Field::new("c4", DataType::Int32, false), /// # ])); /// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate -/// # struct ParquetSource {}; +/// # struct ParquetSource { +/// # projected_statistics: Option +/// # }; /// # impl FileSource for ParquetSource { /// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Arc { unimplemented!() } /// # fn as_any(&self) -> &dyn Any { self } /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } /// # fn with_schema(&self, _: SchemaRef) -> Arc { unimplemented!() } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } -/// # fn with_statistics(&self, _: Statistics) -> Arc { Arc::new(Self::new()) } +/// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {projected_statistics: Some(statistics)} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } -/// # fn statistics(&self) -> datafusion_common::Result { unimplemented!() } +/// # fn statistics(&self) -> datafusion_common::Result { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } /// # fn file_type(&self) -> &str { "parquet" } /// # } /// # impl ParquetSource { -/// # fn new() -> Self { Self{} } +/// # fn new() -> Self { Self {projected_statistics: None} } /// # } /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig { } fn statistics(&self) -> Result { - self.file_source.statistics() + Ok(self.projected_stats()) } fn with_fetch(&self, limit: Option) -> Option> { @@ -324,13 +324,7 @@ impl FileScanConfig { /// Set the file source pub fn with_source(mut self, file_source: Arc) -> Self { - let ( - _projected_schema, - _constraints, - projected_statistics, - _projected_output_ordering, - ) = self.project(); - self.file_source = file_source.with_statistics(projected_statistics); + self.file_source = file_source.with_statistics(self.statistics.clone()); self } @@ -342,10 +336,75 @@ impl FileScanConfig { /// Set the statistics of the files pub fn with_statistics(mut self, statistics: Statistics) -> Self { - self.statistics = statistics; + self.statistics = statistics.clone(); + self.file_source = self.file_source.with_statistics(statistics); self } + fn projection_indices(&self) -> Vec { + match &self.projection { + Some(proj) => proj.clone(), + None => (0..self.file_schema.fields().len() + + self.table_partition_cols.len()) + .collect(), + } + } + + fn projected_stats(&self) -> Statistics { + let statistics = self + .file_source + .statistics() + .unwrap_or(self.statistics.clone()); + + let table_cols_stats = self + .projection_indices() + .into_iter() + .map(|idx| { + if idx < self.file_schema.fields().len() { + statistics.column_statistics[idx].clone() + } else { + // TODO provide accurate stat for partition column (#1186) + ColumnStatistics::new_unknown() + } + }) + .collect(); + + Statistics { + num_rows: statistics.num_rows, + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + total_byte_size: statistics.total_byte_size, + column_statistics: table_cols_stats, + } + } + + fn projected_schema(&self) -> Arc { + let table_fields: Vec<_> = self + .projection_indices() + .into_iter() + .map(|idx| { + if idx < self.file_schema.fields().len() { + self.file_schema.field(idx).clone() + } else { + let partition_idx = idx - self.file_schema.fields().len(); + self.table_partition_cols[partition_idx].clone() + } + }) + .collect(); + + Arc::new(Schema::new_with_metadata( + table_fields, + self.file_schema.metadata().clone(), + )) + } + + fn projected_constraints(&self) -> Constraints { + let indexes = self.projection_indices(); + + self.constraints + .project(&indexes) + .unwrap_or_else(Constraints::empty) + } + /// Set the projection of the files pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; @@ -433,54 +492,13 @@ impl FileScanConfig { ); } - let proj_indices = if let Some(proj) = &self.projection { - proj - } else { - let len = self.file_schema.fields().len() + self.table_partition_cols.len(); - &(0..len).collect::>() - }; - - let mut table_fields = vec![]; - let mut table_cols_stats = vec![]; - for idx in proj_indices { - if *idx < self.file_schema.fields().len() { - let field = self.file_schema.field(*idx); - table_fields.push(field.clone()); - table_cols_stats.push(self.statistics.column_statistics[*idx].clone()) - } else { - let partition_idx = idx - self.file_schema.fields().len(); - table_fields.push(self.table_partition_cols[partition_idx].to_owned()); - // TODO provide accurate stat for partition column (#1186) - table_cols_stats.push(ColumnStatistics::new_unknown()) - } - } - - let table_stats = Statistics { - num_rows: self.statistics.num_rows, - // TODO correct byte size? - total_byte_size: Precision::Absent, - column_statistics: table_cols_stats, - }; - - let projected_schema = Arc::new(Schema::new_with_metadata( - table_fields, - self.file_schema.metadata().clone(), - )); + let schema = self.projected_schema(); + let constraints = self.projected_constraints(); + let stats = self.projected_stats(); - let projected_constraints = self - .constraints - .project(proj_indices) - .unwrap_or_else(Constraints::empty); + let output_ordering = get_projected_output_ordering(self, &schema); - let projected_output_ordering = - get_projected_output_ordering(self, &projected_schema); - - ( - projected_schema, - projected_constraints, - table_stats, - projected_output_ordering, - ) + (schema, constraints, stats, output_ordering) } #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro @@ -1048,6 +1066,7 @@ mod tests { compute::SortOptions, }; + use datafusion_common::stats::Precision; use datafusion_common::{assert_batches_eq, DFSchema}; use datafusion_expr::{execution_props::ExecutionProps, SortExpr}; use datafusion_physical_expr::create_physical_expr; @@ -1203,6 +1222,12 @@ mod tests { ), ]; // create a projected schema + let statistics = Statistics { + num_rows: Precision::Inexact(3), + total_byte_size: Precision::Absent, + column_statistics: Statistics::unknown_column(&file_batch.schema()), + }; + let conf = config_for_projection( file_batch.schema(), // keep all cols from file and 2 from partitioning @@ -1213,9 +1238,23 @@ mod tests { file_batch.schema().fields().len(), file_batch.schema().fields().len() + 2, ]), - Statistics::new_unknown(&file_batch.schema()), + statistics.clone(), to_partition_cols(partition_cols.clone()), ); + + let source_statistics = conf.file_source.statistics().unwrap(); + let conf_stats = conf.statistics().unwrap(); + + // projection should be reflected in the file source statistics + assert_eq!(conf_stats.num_rows, Precision::Inexact(3)); + + // 3 original statistics + 2 partition statistics + assert_eq!(conf_stats.column_statistics.len(), 5); + + // file statics should not be modified + assert_eq!(source_statistics, statistics); + assert_eq!(source_statistics.column_statistics.len(), 3); + let (proj_schema, ..) = conf.project(); // created a projector for that projected schema let mut proj = PartitionColumnProjector::new( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 54c4946a2c9a..a8ee21365308 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -48,7 +48,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, - FileSinkConfig, ParquetSource, + FileSinkConfig, FileSource, ParquetSource, }; use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::sum::sum_udaf; @@ -778,24 +778,16 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let source = Arc::new(ParquetSource::default()); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![file_group]], - constraints: Constraints::empty(), - statistics: Statistics::new_unknown(&schema), - file_schema: schema, - projection: Some(vec![0, 1]), - limit: None, - table_partition_cols: vec![Field::new( - "part".to_string(), - wrap_partition_type_in_dict(DataType::Int16), - false, - )], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - file_source: source, - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) + .with_projection(Some(vec![0, 1])) + .with_file_group(vec![file_group]) + .with_table_partition_cols(vec![Field::new( + "part".to_string(), + wrap_partition_type_in_dict(DataType::Int16), + false, + )]) + .with_newlines_in_values(false); roundtrip_test(scan_config.build()) } @@ -1605,6 +1597,47 @@ async fn roundtrip_coalesce() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_projection_source() -> Result<()> { + let schema = Arc::new(Schema::new(Fields::from([ + Arc::new(Field::new("a", DataType::Utf8, false)), + Arc::new(Field::new("b", DataType::Utf8, false)), + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Field::new("d", DataType::Int32, false)), + ]))); + + let statistics = Statistics::new_unknown(&schema); + + let source = ParquetSource::default().with_statistics(statistics.clone()); + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )]], + constraints: Constraints::empty(), + statistics, + file_schema: schema.clone(), + projection: Some(vec![0, 1, 2]), + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, + new_lines_in_values: false, + file_source: source, + }; + + let filter = Arc::new( + FilterExec::try_new( + Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), + scan_config.build(), + )? + .with_projection(Some(vec![0, 1]))?, + ); + + roundtrip_test(filter) +} + #[tokio::test] async fn roundtrip_parquet_select_star() -> Result<()> { let ctx = all_types_context().await?; @@ -1626,7 +1659,6 @@ async fn roundtrip_parquet_select_star_predicate() -> Result<()> { roundtrip_test_sql_with_context(sql, &ctx).await } -#[ignore = "Test failing due to https://github.com/apache/datafusion/issues/14679"] #[tokio::test] async fn roundtrip_parquet_select_projection_predicate() -> Result<()> { let ctx = all_types_context().await?; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 455b9a439776..16c61a1db6ee 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -294,22 +294,22 @@ CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-t query TT EXPLAIN SELECT * FROM alltypes_plain limit 10; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] # explain verbose with both collect & show statistics on query TT EXPLAIN VERBOSE SELECT * FROM alltypes_plain limit 10; ---- initial_physical_plan -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements -01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE @@ -320,13 +320,13 @@ physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -341,8 +341,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet initial_physical_plan_with_stats -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -367,7 +367,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet -physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]