Skip to content

Commit

Permalink
add "can_be_pushed_down" in AggregateFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
lutengda authored and roseboy-liu committed Dec 2, 2024
1 parent 21b2303 commit 37f57fe
Show file tree
Hide file tree
Showing 17 changed files with 46 additions and 4 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
args,
filter,
order_by,
..
}) => {
let args = args
.iter()
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/provider_aggregation_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl TableProvider for CustomAggregationProvider {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => {
let support_agg_func = match fun {
aggregate_function::AggregateFunction::Count => true,
Expand All @@ -263,6 +264,7 @@ impl TableProvider for CustomAggregationProvider {
&& !distinct
&& filter.is_none()
&& order_by.is_none()
&& *can_be_pushed_down
}
_ => false,
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ pub struct AggregateFunction {
pub filter: Option<Box<Expr>>,
/// Optional ordering
pub order_by: Option<Vec<Expr>>,
/// Whether it can be pushed down
pub can_be_pushed_down: bool,
}

impl AggregateFunction {
Expand All @@ -434,13 +436,15 @@ impl AggregateFunction {
distinct: bool,
filter: Option<Box<Expr>>,
order_by: Option<Vec<Expr>>,
can_be_pushed_down: bool,
) -> Self {
Self {
fun,
args,
distinct,
filter,
order_by,
can_be_pushed_down,
}
}
}
Expand Down Expand Up @@ -1364,6 +1368,7 @@ fn create_name(e: &Expr) -> Result<String> {
args,
filter,
order_by,
..
}) => {
let mut name = create_function_name(&fun.to_string(), *distinct, args)?;
if let Some(fe) = filter {
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub fn min(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -122,6 +123,7 @@ pub fn max(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -133,6 +135,7 @@ pub fn sum(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -144,6 +147,7 @@ pub fn avg(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -155,6 +159,7 @@ pub fn count(expr: Expr) -> Expr {
false,
None,
None,
true,
))
}

Expand Down Expand Up @@ -211,6 +216,7 @@ pub fn count_distinct(expr: Expr) -> Expr {
true,
None,
None,
false,
))
}

Expand Down Expand Up @@ -263,6 +269,7 @@ pub fn approx_distinct(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -274,6 +281,7 @@ pub fn median(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -285,6 +293,7 @@ pub fn approx_median(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -296,6 +305,7 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -311,6 +321,7 @@ pub fn approx_percentile_cont_with_weight(
false,
None,
None,
false,
))
}

Expand Down Expand Up @@ -381,6 +392,7 @@ pub fn stddev(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,14 @@ impl TreeNode for Expr {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => Expr::AggregateFunction(AggregateFunction::new(
fun,
transform_vec(args, &mut transform)?,
distinct,
transform_option_box(filter, &mut transform)?,
transform_option_vec(order_by, &mut transform)?,
can_be_pushed_down,
)),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Expr::GroupingSet(GroupingSet::Rollup(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ impl TreeNodeRewriter for CountWildcardRewriter {
distinct,
filter,
order_by,
can_be_pushed_down,
}) if args.len() == 1 => match args[0] {
Expr::Wildcard => Expr::AggregateFunction(AggregateFunction {
fun: aggregate_function::AggregateFunction::Count,
args: vec![lit(COUNT_STAR_EXPANSION)],
distinct,
filter,
order_by,
can_be_pushed_down,
}),
_ => old_expr,
},
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => {
let new_expr = coerce_agg_exprs_for_signature(
&fun,
Expand All @@ -417,7 +418,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
&aggregate_function::signature(&fun),
)?;
let expr = Expr::AggregateFunction(expr::AggregateFunction::new(
fun, new_expr, distinct, filter, order_by,
fun, new_expr, distinct, filter, order_by, can_be_pushed_down,
));
Ok(expr)
}
Expand Down Expand Up @@ -993,6 +994,7 @@ mod test {
false,
None,
None,
false,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(Int64(12))\n EmptyRelation";
Expand All @@ -1006,6 +1008,7 @@ mod test {
false,
None,
None,
false,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(a)\n EmptyRelation";
Expand All @@ -1023,6 +1026,7 @@ mod test {
false,
None,
None,
false,
));
let err = Projection::try_new(vec![agg_expr], empty).err().unwrap();
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ mod tests {
false,
Some(Box::new(col("c").gt(lit(42)))),
None,
false,
));

let plan = LogicalPlanBuilder::from(table_scan)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
args,
filter,
order_by,
can_be_pushed_down,
..
}) => {
// is_single_distinct_agg ensure args.len=1
Expand All @@ -146,6 +147,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
false, // intentional to remove distinct here
filter.clone(),
order_by.clone(),
can_be_pushed_down.clone(),
)))
}
_ => Ok(aggr_expr.clone()),
Expand Down Expand Up @@ -402,6 +404,7 @@ mod tests {
true,
None,
None,
false,
)),
],
)?
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ pub fn parse_expr(
expr.distinct,
parse_optional_expr(expr.filter.as_deref(), registry)?.map(Box::new),
parse_vec_expr(&expr.order_by, registry)?,
false,
)))
}
ExprType::Alias(alias) => Ok(Expr::Alias(
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,7 @@ mod roundtrip_tests {
false,
None,
None,
false,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
Expand All @@ -2583,6 +2584,7 @@ mod roundtrip_tests {
true,
None,
None,
false,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
Expand All @@ -2596,6 +2598,7 @@ mod roundtrip_tests {
false,
None,
None,
false,
));

let ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
ref distinct,
ref filter,
ref order_by,
..
}) => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.function_args_to_expr(function.args, schema, planner_context)?;

return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun, args, distinct, None, order_by,
fun, args, distinct, None, order_by, true,
)));
};

Expand Down
4 changes: 3 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
let fun = AggregateFunction::ArrayAgg;
Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun, args, distinct, None, order_by,
fun, args, distinct, None, order_by, false,
)))
}

Expand Down Expand Up @@ -500,6 +500,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
args,
distinct,
order_by,
can_be_pushed_down,
..
}) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun,
Expand All @@ -511,6 +512,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context,
)?)),
order_by,
can_be_pushed_down,
))),
_ => Err(DataFusionError::Plan(
"AggregateExpressionWithFilter expression was not an AggregateFunction"
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ where
distinct,
filter,
order_by,
can_be_pushed_down,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
Expand All @@ -175,6 +176,7 @@ where
*distinct,
filter.clone(),
order_by.clone(),
can_be_pushed_down.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ pub async fn from_substrait_agg_func(
distinct,
filter,
order_by,
can_be_pushed_down: false,
})))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub fn to_substrait_agg_measure(
),
) -> Result<Measure> {
match expr {
Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter, order_by }) => {
Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter, order_by, .. }) => {
let sorts = if let Some(order_by) = order_by {
order_by.iter().map(|expr| to_substrait_sort_field(expr, schema, extension_info)).collect::<Result<Vec<_>>>()?
} else {
Expand Down

0 comments on commit 37f57fe

Please sign in to comment.