Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "can_be_pushed_down" in AggregateFunction #17

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
args,
filter,
order_by,
..
}) => {
let args = args
.iter()
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/provider_aggregation_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl TableProvider for CustomAggregationProvider {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => {
let support_agg_func = match fun {
aggregate_function::AggregateFunction::Count => true,
Expand All @@ -263,6 +264,7 @@ impl TableProvider for CustomAggregationProvider {
&& !distinct
&& filter.is_none()
&& order_by.is_none()
&& *can_be_pushed_down
}
_ => false,
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ pub struct AggregateFunction {
pub filter: Option<Box<Expr>>,
/// Optional ordering
pub order_by: Option<Vec<Expr>>,
/// Whether it can be pushed down
pub can_be_pushed_down: bool,
}

impl AggregateFunction {
Expand All @@ -434,13 +436,15 @@ impl AggregateFunction {
distinct: bool,
filter: Option<Box<Expr>>,
order_by: Option<Vec<Expr>>,
can_be_pushed_down: bool,
) -> Self {
Self {
fun,
args,
distinct,
filter,
order_by,
can_be_pushed_down,
}
}
}
Expand Down Expand Up @@ -1364,6 +1368,7 @@ fn create_name(e: &Expr) -> Result<String> {
args,
filter,
order_by,
..
}) => {
let mut name = create_function_name(&fun.to_string(), *distinct, args)?;
if let Some(fe) = filter {
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub fn min(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -122,6 +123,7 @@ pub fn max(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -133,6 +135,7 @@ pub fn sum(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -144,6 +147,7 @@ pub fn avg(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -155,6 +159,7 @@ pub fn count(expr: Expr) -> Expr {
false,
None,
None,
true,
))
}

Expand Down Expand Up @@ -211,6 +216,7 @@ pub fn count_distinct(expr: Expr) -> Expr {
true,
None,
None,
false,
))
}

Expand Down Expand Up @@ -263,6 +269,7 @@ pub fn approx_distinct(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -274,6 +281,7 @@ pub fn median(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -285,6 +293,7 @@ pub fn approx_median(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -296,6 +305,7 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand All @@ -311,6 +321,7 @@ pub fn approx_percentile_cont_with_weight(
false,
None,
None,
false,
))
}

Expand Down Expand Up @@ -381,6 +392,7 @@ pub fn stddev(expr: Expr) -> Expr {
false,
None,
None,
false,
))
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,14 @@ impl TreeNode for Expr {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => Expr::AggregateFunction(AggregateFunction::new(
fun,
transform_vec(args, &mut transform)?,
distinct,
transform_option_box(filter, &mut transform)?,
transform_option_vec(order_by, &mut transform)?,
can_be_pushed_down,
)),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Expr::GroupingSet(GroupingSet::Rollup(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ impl TreeNodeRewriter for CountWildcardRewriter {
distinct,
filter,
order_by,
can_be_pushed_down,
}) if args.len() == 1 => match args[0] {
Expr::Wildcard => Expr::AggregateFunction(AggregateFunction {
fun: aggregate_function::AggregateFunction::Count,
args: vec![lit(COUNT_STAR_EXPANSION)],
distinct,
filter,
order_by,
can_be_pushed_down,
}),
_ => old_expr,
},
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
distinct,
filter,
order_by,
can_be_pushed_down,
}) => {
let new_expr = coerce_agg_exprs_for_signature(
&fun,
Expand All @@ -417,7 +418,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
&aggregate_function::signature(&fun),
)?;
let expr = Expr::AggregateFunction(expr::AggregateFunction::new(
fun, new_expr, distinct, filter, order_by,
fun, new_expr, distinct, filter, order_by, can_be_pushed_down,
));
Ok(expr)
}
Expand Down Expand Up @@ -993,6 +994,7 @@ mod test {
false,
None,
None,
false,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(Int64(12))\n EmptyRelation";
Expand All @@ -1006,6 +1008,7 @@ mod test {
false,
None,
None,
false,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(a)\n EmptyRelation";
Expand All @@ -1023,6 +1026,7 @@ mod test {
false,
None,
None,
false,
));
let err = Projection::try_new(vec![agg_expr], empty).err().unwrap();
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ mod tests {
false,
Some(Box::new(col("c").gt(lit(42)))),
None,
false,
));

let plan = LogicalPlanBuilder::from(table_scan)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
args,
filter,
order_by,
can_be_pushed_down,
..
}) => {
// is_single_distinct_agg ensure args.len=1
Expand All @@ -146,6 +147,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
false, // intentional to remove distinct here
filter.clone(),
order_by.clone(),
can_be_pushed_down.clone(),
)))
}
_ => Ok(aggr_expr.clone()),
Expand Down Expand Up @@ -402,6 +404,7 @@ mod tests {
true,
None,
None,
false,
)),
],
)?
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ pub fn parse_expr(
expr.distinct,
parse_optional_expr(expr.filter.as_deref(), registry)?.map(Box::new),
parse_vec_expr(&expr.order_by, registry)?,
false,
)))
}
ExprType::Alias(alias) => Ok(Expr::Alias(
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,7 @@ mod roundtrip_tests {
false,
None,
None,
false,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
Expand All @@ -2583,6 +2584,7 @@ mod roundtrip_tests {
true,
None,
None,
false,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
Expand All @@ -2596,6 +2598,7 @@ mod roundtrip_tests {
false,
None,
None,
false,
));

let ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
ref distinct,
ref filter,
ref order_by,
..
}) => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.function_args_to_expr(function.args, schema, planner_context)?;

return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun, args, distinct, None, order_by,
fun, args, distinct, None, order_by, true,
)));
};

Expand Down
4 changes: 3 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
let fun = AggregateFunction::ArrayAgg;
Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun, args, distinct, None, order_by,
fun, args, distinct, None, order_by, false,
)))
}

Expand Down Expand Up @@ -500,6 +500,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
args,
distinct,
order_by,
can_be_pushed_down,
..
}) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun,
Expand All @@ -511,6 +512,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context,
)?)),
order_by,
can_be_pushed_down,
))),
_ => Err(DataFusionError::Plan(
"AggregateExpressionWithFilter expression was not an AggregateFunction"
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ where
distinct,
filter,
order_by,
can_be_pushed_down,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
Expand All @@ -175,6 +176,7 @@ where
*distinct,
filter.clone(),
order_by.clone(),
can_be_pushed_down.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ pub async fn from_substrait_agg_func(
distinct,
filter,
order_by,
can_be_pushed_down: false,
})))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ pub fn to_substrait_agg_measure(
),
) -> Result<Measure> {
match expr {
Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter, order_by }) => {
Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter, order_by, .. }) => {
let sorts = if let Some(order_by) = order_by {
order_by.iter().map(|expr| to_substrait_sort_field(expr, schema, extension_info)).collect::<Result<Vec<_>>>()?
} else {
Expand Down
Loading