Skip to content

Commit

Permalink
remove unnecessary computations
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Jan 29, 2025
1 parent 29af731 commit f1777ef
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 25 deletions.
11 changes: 2 additions & 9 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ impl BoundedWindowAggExec {
vec![]
}
};
let cache =
Self::compute_properties(&input, &schema, &window_expr, window_expr_indices);
let cache = Self::compute_properties(&input, &schema, &window_expr);
Ok(Self {
input,
window_expr,
Expand Down Expand Up @@ -195,15 +194,9 @@ impl BoundedWindowAggExec {
input: &Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
window_exprs: &[Arc<dyn WindowExpr>],
window_expr_indices: Vec<usize>,
) -> PlanProperties {
// Calculate equivalence properties:
let eq_properties = window_equivalence_properties(
schema,
input,
window_exprs,
window_expr_indices,
);
let eq_properties = window_equivalence_properties(schema, input, window_exprs);

// As we can have repartitioning using the partition keys, this can
// be either one or more than one, depending on the presence of
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,21 +337,25 @@ pub(crate) fn window_equivalence_properties(
schema: &SchemaRef,
input: &Arc<dyn ExecutionPlan>,
window_exprs: &[Arc<dyn WindowExpr>],
window_expr_indices: Vec<usize>,
) -> EquivalenceProperties {
// We need to update the schema, so we can not directly use
// `input.equivalence_properties()`.
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
.extend(input.equivalence_properties().clone());

let schema_len = schema.fields.len();
let window_expr_indices = (schema_len..(schema_len - window_exprs.len()))
.rev()
.collect::<Vec<_>>();

for (i, expr) in window_exprs.iter().enumerate() {
let window_expr_index = window_expr_indices[i];
if let Some(udf_window_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
{
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
} else if let Some(aggregate_udf_window_expr) =
expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
{
let window_expr_index = window_expr_indices[i];
aggregate_udf_window_expr
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
}
Expand Down
16 changes: 2 additions & 14 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,10 @@ impl WindowAggExec {
let old_fields_len = input.schema().fields.len();
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let window_expr_indices = (old_fields_len..schema.fields.len()).collect_vec();

let ordered_partition_by_indices =
get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
let cache = Self::compute_properties(
Arc::clone(&schema),
&input,
&window_expr,
window_expr_indices,
);
let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr);
Ok(Self {
input,
window_expr,
Expand Down Expand Up @@ -129,15 +123,9 @@ impl WindowAggExec {
schema: SchemaRef,
input: &Arc<dyn ExecutionPlan>,
window_exprs: &[Arc<dyn WindowExpr>],
window_expr_indices: Vec<usize>,
) -> PlanProperties {
// Calculate equivalence properties:
let eq_properties = window_equivalence_properties(
&schema,
input,
window_exprs,
window_expr_indices,
);
let eq_properties = window_equivalence_properties(&schema, input, window_exprs);

// Get output partitioning:
// Because we can have repartitioning using the partition keys this
Expand Down

0 comments on commit f1777ef

Please sign in to comment.