From c1c6fb19f13caf5b265f8aed577a8c0c9e8c0eb8 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 30 Jan 2025 17:44:07 +0800 Subject: [PATCH] [BLAZE-808] Support statistics of ExecutionPlan for WindowExec --- .../datafusion-ext-plans/src/window_exec.rs | 121 +++++++++++------- 1 file changed, 73 insertions(+), 48 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/window_exec.rs b/native-engine/datafusion-ext-plans/src/window_exec.rs index 5af7dcd8..b28594c8 100644 --- a/native-engine/datafusion-ext-plans/src/window_exec.rs +++ b/native-engine/datafusion-ext-plans/src/window_exec.rs @@ -20,7 +20,7 @@ use arrow::{ record_batch::{RecordBatch, RecordBatchOptions}, }; use datafusion::{ - common::{Result, Statistics}, + common::{stats::Precision, ColumnStatistics, Result, Statistics}, execution::context::TaskContext, physical_expr::{EquivalenceProperties, PhysicalSortExpr}, physical_plan::{ @@ -131,7 +131,19 @@ impl ExecutionPlan for WindowExec { } fn statistics(&self) -> Result { - todo!() + let input_stat = self.input.statistics()?; + let win_cols = self.context.window_exprs.len(); + let input_cols = self.input.schema().fields().len(); + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + column_statistics.extend(input_stat.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) + } + Ok(Statistics { + num_rows: input_stat.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) } } @@ -192,6 +204,7 @@ mod test { use arrow::{array::*, datatypes::*, record_batch::RecordBatch}; use datafusion::{ assert_batches_eq, + common::stats::Precision, physical_expr::{expressions::Column, PhysicalSortExpr}, physical_plan::{memory::MemoryExec, ExecutionPlan}, prelude::SessionContext, @@ -246,30 +259,31 @@ mod test { ("b1", &vec![1, 2, 2, 3, 4, 1, 1]), ("c1", &vec![0, 0, 0, 0, 0, 0, 0]), ); + let window_exprs = vec![ + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::RowNumber), + vec![], + Arc::new(Field::new("b1_row_number", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::Rank), + vec![], + Arc::new(Field::new("b1_rank", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::DenseRank), + vec![], + Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::Agg(AggFunction::Sum), + vec![Arc::new(Column::new("b1", 1))], + Arc::new(Field::new("b1_sum", DataType::Int64, false)), + ), + ]; let window = Arc::new(WindowExec::try_new( - input, - vec![ - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::RowNumber), - vec![], - Arc::new(Field::new("b1_row_number", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::Rank), - vec![], - Arc::new(Field::new("b1_rank", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::DenseRank), - vec![], - Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::Agg(AggFunction::Sum), - vec![Arc::new(Column::new("b1", 1))], - Arc::new(Field::new("b1_sum", DataType::Int64, false)), - ), - ], + input.clone(), + window_exprs.clone(), vec![Arc::new(Column::new("a1", 0))], vec![PhysicalSortExpr { expr: Arc::new(Column::new("b1", 1)), @@ -278,6 +292,7 @@ mod test { )?); let stream = window.execute(0, task_ctx.clone())?; let batches = datafusion::physical_plan::common::collect(stream).await?; + let row_count = window.statistics()?.num_rows; let expected = vec![ "+----+----+----+---------------+---------+---------------+--------+", "| a1 | b1 | c1 | b1_row_number | b1_rank | b1_dense_rank | b1_sum |", @@ -292,6 +307,10 @@ mod test { "+----+----+----+---------------+---------+---------------+--------+", ]; assert_batches_eq!(expected, &batches); + assert_eq!( + row_count, + Precision::Exact(window_exprs.clone().len() + input.clone().schema().fields().len()) + ); // test window without partition by clause let input = build_table( @@ -299,30 +318,31 @@ mod test { ("b1", &vec![1, 1, 1, 2, 2, 3, 4]), ("c1", &vec![0, 0, 0, 0, 0, 0, 0]), ); + let window_exprs = vec![ + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::RowNumber), + vec![], + Arc::new(Field::new("b1_row_number", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::Rank), + vec![], + Arc::new(Field::new("b1_rank", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::RankLike(WindowRankType::DenseRank), + vec![], + Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)), + ), + WindowExpr::new( + WindowFunction::Agg(AggFunction::Sum), + vec![Arc::new(Column::new("b1", 1))], + Arc::new(Field::new("b1_sum", DataType::Int64, false)), + ), + ]; let window = Arc::new(WindowExec::try_new( - input, - vec![ - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::RowNumber), - vec![], - Arc::new(Field::new("b1_row_number", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::Rank), - vec![], - Arc::new(Field::new("b1_rank", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::RankLike(WindowRankType::DenseRank), - vec![], - Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)), - ), - WindowExpr::new( - WindowFunction::Agg(AggFunction::Sum), - vec![Arc::new(Column::new("b1", 1))], - Arc::new(Field::new("b1_sum", DataType::Int64, false)), - ), - ], + input.clone(), + window_exprs.clone(), vec![], vec![PhysicalSortExpr { expr: Arc::new(Column::new("b1", 1)), @@ -331,6 +351,7 @@ mod test { )?); let stream = window.execute(0, task_ctx.clone())?; let batches = datafusion::physical_plan::common::collect(stream).await?; + let row_count = window.statistics()?.num_rows; let expected = vec![ "+----+----+----+---------------+---------+---------------+--------+", "| a1 | b1 | c1 | b1_row_number | b1_rank | b1_dense_rank | b1_sum |", @@ -345,6 +366,10 @@ mod test { "+----+----+----+---------------+---------+---------------+--------+", ]; assert_batches_eq!(expected, &batches); + assert_eq!( + row_count, + Precision::Exact(window_exprs.clone().len() + input.clone().schema().fields().len()) + ); Ok(()) } }