From bf0be0a8f328ca79c92b5f091cf5bd70090933a2 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 20 Sep 2021 16:41:46 -0400 Subject: [PATCH 01/16] Add prints --- .../aggregate_statistics.rs | 23 ++++++++++++++++++- testing | 2 +- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 1b361dd54936..cb6d17d753dd 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -131,17 +131,38 @@ fn take_optimizable_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, ) -> Option<(ScalarValue, &'static str)> { - if let (Some(num_rows), Some(casted_expr)) = ( + println!("{:?}", agg_expr); + println!("{:?}", agg_expr.expressions()); + if let (Some(num_rows), Some(col_stats), Some(casted_expr)) = ( stats.num_rows, + &stats.column_statistics, agg_expr.as_any().downcast_ref::(), ) { // TODO implementing Eq on PhysicalExpr would help a lot here + println!("{:?}", num_rows); + println!("{:?}", col_stats); + println!("{:?}", casted_expr); if casted_expr.expressions().len() == 1 { if let Some(lit_expr) = casted_expr.expressions()[0] .as_any() .downcast_ref::() { + // if let Some(col_expr) = casted_expr.expressions()[0] + // .as_any() + // .downcast_ref::() + // { + // if let ColumnStatistics { + // null_count: Some(null_count), + // .. + // } = &col_stats[col_expr.index()] + // { + // return Some(( + // ScalarValue::UInt64(Some(num_rows as u64 - *null_count as u64)), + // "COUNT(Uint8(1)", + // )); + // } if lit_expr.value() == &ScalarValue::UInt8(Some(1)) { + println!("{:?}", lit_expr); return Some(( ScalarValue::UInt64(Some(num_rows as u64)), "COUNT(Uint8(1))", diff --git a/testing b/testing index a8f7be380531..b658b087767b 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 +Subproject commit b658b087767b041b2081766814655b4dd5a9a439 From ca4e1c808c5436f969f4b52783d129baf832ed33 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 28 Sep 2021 11:08:50 -0400 Subject: [PATCH 02/16] First cut at optimizing count with nulls --- .../aggregate_statistics.rs | 59 ++++++++++++------- parquet-testing | 2 +- testing | 2 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index cb6d17d753dd..eef4a04d1fbf 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -57,7 +57,13 @@ impl PhysicalOptimizerRule for AggregateStatistics { let stats = partial_agg_exec.input().statistics(); let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { - if let Some((num_rows, name)) = take_optimizable_count(&**expr, &stats) { + if let Some((non_null_rows, name)) = + take_optimizable_count_with_nulls(&**expr, &stats) + { + projections.push((expressions::lit(non_null_rows), name.to_owned())); + } else if let Some((num_rows, name)) = + take_optimizable_count(&**expr, &stats) + { projections.push((expressions::lit(num_rows), name.to_owned())); } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { projections.push((expressions::lit(min), name.to_owned())); @@ -131,38 +137,18 @@ fn take_optimizable_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, ) -> Option<(ScalarValue, &'static str)> { - println!("{:?}", agg_expr); - println!("{:?}", agg_expr.expressions()); if let (Some(num_rows), Some(col_stats), Some(casted_expr)) = ( stats.num_rows, &stats.column_statistics, agg_expr.as_any().downcast_ref::(), ) { // TODO implementing Eq on PhysicalExpr would help a lot here - println!("{:?}", num_rows); - println!("{:?}", col_stats); - println!("{:?}", casted_expr); if casted_expr.expressions().len() == 1 { if let Some(lit_expr) = casted_expr.expressions()[0] .as_any() .downcast_ref::() { - // if let Some(col_expr) = casted_expr.expressions()[0] - // .as_any() - // .downcast_ref::() - // { - // if let ColumnStatistics { - // null_count: Some(null_count), - // .. - // } = &col_stats[col_expr.index()] - // { - // return Some(( - // ScalarValue::UInt64(Some(num_rows as u64 - *null_count as u64)), - // "COUNT(Uint8(1)", - // )); - // } if lit_expr.value() == &ScalarValue::UInt8(Some(1)) { - println!("{:?}", lit_expr); return Some(( ScalarValue::UInt64(Some(num_rows as u64)), "COUNT(Uint8(1))", @@ -174,6 +160,37 @@ fn take_optimizable_count( None } +fn take_optimizable_count_with_nulls( + agg_expr: &dyn AggregateExpr, + stats: &Statistics, +) -> Option<(ScalarValue, String)> { + if let (Some(num_rows), Some(col_stats), Some(casted_expr)) = ( + stats.num_rows, + &stats.column_statistics, + agg_expr.as_any().downcast_ref::(), + ) { + if casted_expr.expressions().len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = casted_expr.expressions()[0] + .as_any() + .downcast_ref::() + { + if let ColumnStatistics { + null_count: Some(val), + .. + } = &col_stats[col_expr.index()] + { + return Some(( + ScalarValue::UInt64(Some((num_rows - val) as u64)), + "COUNT(Uint8(1))".to_string(), + )); + } + } + } + } + None +} + /// If this agg_expr is a min that is defined in the statistics, return it fn take_optimizable_min( agg_expr: &dyn AggregateExpr, diff --git a/parquet-testing b/parquet-testing index ddd898958803..d4d485956a64 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de +Subproject commit d4d485956a643c693b5549e1a62d52ca61c170f1 diff --git a/testing b/testing index b658b087767b..1ec12d1f37ec 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit b658b087767b041b2081766814655b4dd5a9a439 +Subproject commit 1ec12d1f37ecd852287d533569e3dd54d729a89c From 472c80afb03d50d364e0abe6a6089092f851758e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 28 Sep 2021 11:11:47 -0400 Subject: [PATCH 03/16] Cleanup old changes --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index eef4a04d1fbf..fa069787e590 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -137,9 +137,8 @@ fn take_optimizable_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, ) -> Option<(ScalarValue, &'static str)> { - if let (Some(num_rows), Some(col_stats), Some(casted_expr)) = ( + if let (Some(num_rows), Some(casted_expr)) = ( stats.num_rows, - &stats.column_statistics, agg_expr.as_any().downcast_ref::(), ) { // TODO implementing Eq on PhysicalExpr would help a lot here From ed7c838c94c2ece6ab4781a6604e43ec802db64e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 28 Sep 2021 11:21:50 -0400 Subject: [PATCH 04/16] Uint to UInt --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index fa069787e590..da97296b7f7d 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -181,7 +181,7 @@ fn take_optimizable_count_with_nulls( { return Some(( ScalarValue::UInt64(Some((num_rows - val) as u64)), - "COUNT(Uint8(1))".to_string(), + "COUNT(UInt8(1))".to_string(), )); } } From ad5311c3f3e6b2e29a4416b568c86bcb851d3458 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 1 Oct 2021 11:41:06 -0400 Subject: [PATCH 05/16] COUNT with col name --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index da97296b7f7d..8f2eae6bd79e 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -159,6 +159,7 @@ fn take_optimizable_count( None } +/// If this agg_expr is a count that can be derived from the statistics, return it fn take_optimizable_count_with_nulls( agg_expr: &dyn AggregateExpr, stats: &Statistics, @@ -179,9 +180,10 @@ fn take_optimizable_count_with_nulls( .. } = &col_stats[col_expr.index()] { + let expr = format!("COUNT({})", col_expr.name()); return Some(( ScalarValue::UInt64(Some((num_rows - val) as u64)), - "COUNT(UInt8(1))".to_string(), + expr, )); } } From 3e28a0f9b3e214bfbb15dcf51aa2ac5e967dc2fc Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 12 Oct 2021 14:37:08 -0400 Subject: [PATCH 06/16] Fix count_with_nulls and submodules --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 8f2eae6bd79e..ef4f33ece9bd 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -167,7 +167,7 @@ fn take_optimizable_count_with_nulls( if let (Some(num_rows), Some(col_stats), Some(casted_expr)) = ( stats.num_rows, &stats.column_statistics, - agg_expr.as_any().downcast_ref::(), + agg_expr.as_any().downcast_ref::(), ) { if casted_expr.expressions().len() == 1 { // TODO optimize with exprs other than Column From 597338b7dbfc2d74561518bbb0c474edf22e0685 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 12 Oct 2021 14:47:06 -0400 Subject: [PATCH 07/16] Fix submodules --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index d4d485956a64..ddd898958803 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit d4d485956a643c693b5549e1a62d52ca61c170f1 +Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de diff --git a/testing b/testing index 1ec12d1f37ec..b658b087767b 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 1ec12d1f37ecd852287d533569e3dd54d729a89c +Subproject commit b658b087767b041b2081766814655b4dd5a9a439 From fb683c6287362cc39864f29a84d763c5335daa62 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 12 Oct 2021 14:50:02 -0400 Subject: [PATCH 08/16] Fix testing submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index b658b087767b..a8f7be380531 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit b658b087767b041b2081766814655b4dd5a9a439 +Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 From 6dd40ea95af7e918a712191104d7d845941e37ba Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 13 Oct 2021 10:28:19 -0400 Subject: [PATCH 09/16] Add tests --- .../aggregate_statistics.rs | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index ef4f33ece9bd..c3e74f23d1b0 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -288,6 +288,27 @@ mod tests { )?)) } + fn mock_data_with_nulls() -> Result> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(3)])), + Arc::new(Int32Array::from(vec![Some(4), None, Some(5), Some(6)])), + ], + )?; + + Ok(Arc::new(MemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?)) + } + /// Checks that the count optimization was applied and we still get the right result async fn assert_count_optim_success(plan: HashAggregateExec) -> Result<()> { let conf = ExecutionConfig::new(); @@ -315,6 +336,35 @@ mod tests { Ok(()) } + /// Checks that the count optimization was applied and we still get the right result + async fn assert_count_with_nulls_optim_success( + plan: HashAggregateExec, + ) -> Result<()> { + let conf = ExecutionConfig::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; + + assert!(optimized.as_any().is::()); + let result = common::collect(optimized.execute(0).await?).await?; + assert_eq!( + result[0].schema(), + Arc::new(Schema::new(vec![Field::new( + "COUNT(Uint8(1))", + DataType::UInt64, + false + )])) + ); + assert_eq!( + result[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4] + ); + Ok(()) + } + fn count_expr() -> Arc { Arc::new(Count::new( expressions::lit(ScalarValue::UInt8(Some(1))), @@ -350,6 +400,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_count_partial_with_nulls_direct_child() -> Result<()> { + // basic test case with the aggregation applied on a source with exact statistics + let source = mock_data_with_nulls()?; + let schema = source.schema(); + + let partial_agg = HashAggregateExec::try_new( + AggregateMode::Partial, + vec![], + vec![count_expr()], + source, + Arc::clone(&schema), + )?; + + let final_agg = HashAggregateExec::try_new( + AggregateMode::Final, + vec![], + vec![count_expr()], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + assert_count_with_nulls_optim_success(final_agg).await?; + + Ok(()) + } + #[tokio::test] async fn test_count_partial_indirect_child() -> Result<()> { let source = mock_data()?; @@ -379,6 +456,35 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { + let source = mock_data_with_nulls()?; + let schema = source.schema(); + + let partial_agg = HashAggregateExec::try_new( + AggregateMode::Partial, + vec![], + vec![count_expr()], + source, + Arc::clone(&schema), + )?; + + // We introduce an intermediate optimization step between the partial and final aggregtator + let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); + + let final_agg = HashAggregateExec::try_new( + AggregateMode::Final, + vec![], + vec![count_expr()], + Arc::new(coalesce), + Arc::clone(&schema), + )?; + + assert_count_with_nulls_optim_success(final_agg).await?; + + Ok(()) + } + #[tokio::test] async fn test_count_inexact_stat() -> Result<()> { let source = mock_data()?; @@ -420,4 +526,46 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_count_with_nulls_inexact_stat() -> Result<()> { + let source = mock_data_with_nulls()?; + let schema = source.schema(); + + // adding a filter makes the statistics inexact + let filter = Arc::new(FilterExec::try_new( + expressions::binary( + expressions::col("a", &schema)?, + Operator::Gt, + expressions::lit(ScalarValue::from(1u32)), + &schema, + )?, + source, + )?); + + let partial_agg = HashAggregateExec::try_new( + AggregateMode::Partial, + vec![], + vec![count_expr()], + filter, + Arc::clone(&schema), + )?; + + let final_agg = HashAggregateExec::try_new( + AggregateMode::Final, + vec![], + vec![count_expr()], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ExecutionConfig::new(); + let optimized = + AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + // check that the original ExecutionPlan was not replaced + assert!(optimized.as_any().is::()); + + Ok(()) + } } From df032a2a15727dd82348f0dad8d5676de5d0ed9e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 14 Oct 2021 10:02:49 -0400 Subject: [PATCH 10/16] Add comment on test and removed redundant test function --- .../aggregate_statistics.rs | 38 +++---------------- 1 file changed, 5 insertions(+), 33 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index c3e74f23d1b0..0a2226720695 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -297,8 +297,8 @@ mod tests { let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(3)])), - Arc::new(Int32Array::from(vec![Some(4), None, Some(5), Some(6)])), + Arc::new(Int32Array::from(vec![Some(1), Some(2), None])), + Arc::new(Int32Array::from(vec![Some(4), None, Some(5)])), ], )?; @@ -314,6 +314,7 @@ mod tests { let conf = ExecutionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; + // A ProjectionExec is a sign that the count optimization was applied assert!(optimized.as_any().is::()); let result = common::collect(optimized.execute(0).await?).await?; assert_eq!( @@ -336,35 +337,6 @@ mod tests { Ok(()) } - /// Checks that the count optimization was applied and we still get the right result - async fn assert_count_with_nulls_optim_success( - plan: HashAggregateExec, - ) -> Result<()> { - let conf = ExecutionConfig::new(); - let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; - - assert!(optimized.as_any().is::()); - let result = common::collect(optimized.execute(0).await?).await?; - assert_eq!( - result[0].schema(), - Arc::new(Schema::new(vec![Field::new( - "COUNT(Uint8(1))", - DataType::UInt64, - false - )])) - ); - assert_eq!( - result[0] - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .values(), - &[4] - ); - Ok(()) - } - fn count_expr() -> Arc { Arc::new(Count::new( expressions::lit(ScalarValue::UInt8(Some(1))), @@ -422,7 +394,7 @@ mod tests { Arc::clone(&schema), )?; - assert_count_with_nulls_optim_success(final_agg).await?; + assert_count_optim_success(final_agg).await?; Ok(()) } @@ -480,7 +452,7 @@ mod tests { Arc::clone(&schema), )?; - assert_count_with_nulls_optim_success(final_agg).await?; + assert_count_optim_success(final_agg).await?; Ok(()) } From c73d6bc673cdc34b4b5cf84bd44ae3b3f39a4aaa Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 22 Oct 2021 11:47:25 -0400 Subject: [PATCH 11/16] Update testing for count nulls --- .../aggregate_statistics.rs | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 0a2226720695..d1a1a8e690ed 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -60,10 +60,12 @@ impl PhysicalOptimizerRule for AggregateStatistics { if let Some((non_null_rows, name)) = take_optimizable_count_with_nulls(&**expr, &stats) { + println!("Using count with nulls opt"); projections.push((expressions::lit(non_null_rows), name.to_owned())); } else if let Some((num_rows, name)) = take_optimizable_count(&**expr, &stats) { + println!("Using count opt"); projections.push((expressions::lit(num_rows), name.to_owned())); } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { projections.push((expressions::lit(min), name.to_owned())); @@ -169,6 +171,10 @@ fn take_optimizable_count_with_nulls( &stats.column_statistics, agg_expr.as_any().downcast_ref::(), ) { + println!( + "Num rows: {}\nCol Stats: {:?}\nExpr: {:?}", + num_rows, col_stats, casted_expr + ); if casted_expr.expressions().len() == 1 { // TODO optimize with exprs other than Column if let Some(col_expr) = casted_expr.expressions()[0] @@ -310,21 +316,23 @@ mod tests { } /// Checks that the count optimization was applied and we still get the right result - async fn assert_count_optim_success(plan: HashAggregateExec) -> Result<()> { + async fn assert_count_optim_success( + plan: HashAggregateExec, + nulls: bool, + ) -> Result<()> { let conf = ExecutionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; + println!("{:?}", optimized); + + let (col, count) = match nulls { + false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), + true => (Field::new("COUNT(a)", DataType::UInt64, false), 2), + }; // A ProjectionExec is a sign that the count optimization was applied assert!(optimized.as_any().is::()); let result = common::collect(optimized.execute(0).await?).await?; - assert_eq!( - result[0].schema(), - Arc::new(Schema::new(vec![Field::new( - "COUNT(Uint8(1))", - DataType::UInt64, - false - )])) - ); + assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col]))); assert_eq!( result[0] .column(0) @@ -332,7 +340,7 @@ mod tests { .downcast_ref::() .unwrap() .values(), - &[3] + &[count] ); Ok(()) } @@ -345,6 +353,14 @@ mod tests { )) } + fn count_expr_with_nulls(schema: &Schema) -> Arc { + Arc::new(Count::new( + expressions::col("a", schema).unwrap(), + "my_count_alias", + DataType::UInt64, + )) + } + #[tokio::test] async fn test_count_partial_direct_child() -> Result<()> { // basic test case with the aggregation applied on a source with exact statistics @@ -367,7 +383,7 @@ mod tests { Arc::clone(&schema), )?; - assert_count_optim_success(final_agg).await?; + assert_count_optim_success(final_agg, false).await?; Ok(()) } @@ -381,20 +397,22 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], source, Arc::clone(&schema), )?; + // println!("{:?}", partial_agg); let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], Arc::new(partial_agg), Arc::clone(&schema), )?; + // println!("{:?}", final_agg); - assert_count_optim_success(final_agg).await?; + assert_count_optim_success(final_agg, true).await?; Ok(()) } @@ -423,7 +441,7 @@ mod tests { Arc::clone(&schema), )?; - assert_count_optim_success(final_agg).await?; + assert_count_optim_success(final_agg, false).await?; Ok(()) } @@ -436,7 +454,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], source, Arc::clone(&schema), )?; @@ -447,12 +465,12 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], Arc::new(coalesce), Arc::clone(&schema), )?; - assert_count_optim_success(final_agg).await?; + assert_count_optim_success(final_agg, true).await?; Ok(()) } From 62e0eeb203e9c95fcb563acc19ccb61a12450179 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 22 Oct 2021 11:58:48 -0400 Subject: [PATCH 12/16] Update inexact stats test --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index d1a1a8e690ed..dbb42e2918d5 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -536,7 +536,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], filter, Arc::clone(&schema), )?; @@ -544,7 +544,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr_with_nulls(&schema)], Arc::new(partial_agg), Arc::clone(&schema), )?; From a47e8125d85d09a906f5c8d87902e09cd9c64bd2 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 25 Oct 2021 14:24:26 -0400 Subject: [PATCH 13/16] Cleanup test utils --- .../aggregate_statistics.rs | 45 ++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index dbb42e2918d5..81c4b4608df1 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -345,20 +345,13 @@ mod tests { Ok(()) } - fn count_expr() -> Arc { - Arc::new(Count::new( - expressions::lit(ScalarValue::UInt8(Some(1))), - "my_count_alias", - DataType::UInt64, - )) - } - - fn count_expr_with_nulls(schema: &Schema) -> Arc { - Arc::new(Count::new( - expressions::col("a", schema).unwrap(), - "my_count_alias", - DataType::UInt64, - )) + fn count_expr(schema: Option<&Schema>, col: Option<&str>) -> Arc { + // Return appropriate expr depending if COUNT is for col or table + let expr = match schema { + None => expressions::lit(ScalarValue::UInt8(Some(1))), + Some(s) => expressions::col(col.unwrap(), s).unwrap(), + }; + Arc::new(Count::new(expr, "my_count_alias", DataType::UInt64)) } #[tokio::test] @@ -370,7 +363,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr(None, None)], source, Arc::clone(&schema), )?; @@ -378,7 +371,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr(None, None)], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -397,7 +390,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], source, Arc::clone(&schema), )?; @@ -406,7 +399,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -425,7 +418,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr(None, None)], source, Arc::clone(&schema), )?; @@ -436,7 +429,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr(None, None)], Arc::new(coalesce), Arc::clone(&schema), )?; @@ -454,7 +447,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], source, Arc::clone(&schema), )?; @@ -465,7 +458,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], Arc::new(coalesce), Arc::clone(&schema), )?; @@ -494,7 +487,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr()], + vec![count_expr(None, None)], filter, Arc::clone(&schema), )?; @@ -502,7 +495,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr()], + vec![count_expr(None, None)], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -536,7 +529,7 @@ mod tests { let partial_agg = HashAggregateExec::try_new( AggregateMode::Partial, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], filter, Arc::clone(&schema), )?; @@ -544,7 +537,7 @@ mod tests { let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], - vec![count_expr_with_nulls(&schema)], + vec![count_expr(Some(&schema), Some("a"))], Arc::new(partial_agg), Arc::clone(&schema), )?; From 42c01d17dc331928c2728cfb8838bd6aad4353b9 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 25 Oct 2021 15:17:43 -0400 Subject: [PATCH 14/16] Cleanup mock_data and prints --- .../aggregate_statistics.rs | 38 ++----------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 81c4b4608df1..9222c7493e47 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -60,12 +60,10 @@ impl PhysicalOptimizerRule for AggregateStatistics { if let Some((non_null_rows, name)) = take_optimizable_count_with_nulls(&**expr, &stats) { - println!("Using count with nulls opt"); projections.push((expressions::lit(non_null_rows), name.to_owned())); } else if let Some((num_rows, name)) = take_optimizable_count(&**expr, &stats) { - println!("Using count opt"); projections.push((expressions::lit(num_rows), name.to_owned())); } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { projections.push((expressions::lit(min), name.to_owned())); @@ -171,10 +169,6 @@ fn take_optimizable_count_with_nulls( &stats.column_statistics, agg_expr.as_any().downcast_ref::(), ) { - println!( - "Num rows: {}\nCol Stats: {:?}\nExpr: {:?}", - num_rows, col_stats, casted_expr - ); if casted_expr.expressions().len() == 1 { // TODO optimize with exprs other than Column if let Some(col_expr) = casted_expr.expressions()[0] @@ -279,32 +273,11 @@ mod tests { Field::new("b", DataType::Int32, false), ])); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - ], - )?; - - Ok(Arc::new(MemoryExec::try_new( - &[vec![batch]], - Arc::clone(&schema), - None, - )?)) - } - - fn mock_data_with_nulls() -> Result> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ])); - let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ Arc::new(Int32Array::from(vec![Some(1), Some(2), None])), - Arc::new(Int32Array::from(vec![Some(4), None, Some(5)])), + Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])), ], )?; @@ -322,7 +295,6 @@ mod tests { ) -> Result<()> { let conf = ExecutionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; - println!("{:?}", optimized); let (col, count) = match nulls { false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), @@ -384,7 +356,7 @@ mod tests { #[tokio::test] async fn test_count_partial_with_nulls_direct_child() -> Result<()> { // basic test case with the aggregation applied on a source with exact statistics - let source = mock_data_with_nulls()?; + let source = mock_data()?; let schema = source.schema(); let partial_agg = HashAggregateExec::try_new( @@ -395,7 +367,6 @@ mod tests { Arc::clone(&schema), )?; - // println!("{:?}", partial_agg); let final_agg = HashAggregateExec::try_new( AggregateMode::Final, vec![], @@ -403,7 +374,6 @@ mod tests { Arc::new(partial_agg), Arc::clone(&schema), )?; - // println!("{:?}", final_agg); assert_count_optim_success(final_agg, true).await?; @@ -441,7 +411,7 @@ mod tests { #[tokio::test] async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { - let source = mock_data_with_nulls()?; + let source = mock_data()?; let schema = source.schema(); let partial_agg = HashAggregateExec::try_new( @@ -512,7 +482,7 @@ mod tests { #[tokio::test] async fn test_count_with_nulls_inexact_stat() -> Result<()> { - let source = mock_data_with_nulls()?; + let source = mock_data()?; let schema = source.schema(); // adding a filter makes the statistics inexact From 1c65b6a03ac1bb8fe7309105b92dcbbfe7842110 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 15 Nov 2021 11:02:57 -0500 Subject: [PATCH 15/16] Update count optimization function names --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 9222c7493e47..7a5345933d0c 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -58,11 +58,11 @@ impl PhysicalOptimizerRule for AggregateStatistics { let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { if let Some((non_null_rows, name)) = - take_optimizable_count_with_nulls(&**expr, &stats) + take_optimizable_column_count(&**expr, &stats) { projections.push((expressions::lit(non_null_rows), name.to_owned())); } else if let Some((num_rows, name)) = - take_optimizable_count(&**expr, &stats) + take_optimizable_table_count(&**expr, &stats) { projections.push((expressions::lit(num_rows), name.to_owned())); } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { @@ -133,7 +133,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> } /// If this agg_expr is a count that is defined in the statistics, return it -fn take_optimizable_count( +fn take_optimizable_table_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, ) -> Option<(ScalarValue, &'static str)> { @@ -160,7 +160,7 @@ fn take_optimizable_count( } /// If this agg_expr is a count that can be derived from the statistics, return it -fn take_optimizable_count_with_nulls( +fn take_optimizable_column_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { From 5f8c9fa212cb6cc61dce40583fe5e85f8b695da4 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 16 Nov 2021 11:47:49 -0500 Subject: [PATCH 16/16] Update column name --- datafusion/src/physical_optimizer/aggregate_statistics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 7a5345933d0c..2732777de7da 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -150,7 +150,7 @@ fn take_optimizable_table_count( if lit_expr.value() == &ScalarValue::UInt8(Some(1)) { return Some(( ScalarValue::UInt64(Some(num_rows as u64)), - "COUNT(Uint8(1))", + "COUNT(UInt8(1))", )); } } @@ -297,7 +297,7 @@ mod tests { let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; let (col, count) = match nulls { - false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), + false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3), true => (Field::new("COUNT(a)", DataType::UInt64, false), 2), };