Skip to content

Commit

Permalink
Improve EnforceSorting docs. (#14673)
Browse files Browse the repository at this point in the history
* chore: add documentation for EnforceSorting

* chore: doc temporary TODO for xudong963's PR

* chore: remove unneeded reference to other PR

* chore: fix docs to appropriately refer to subrules

* chore: fix typos

* chore: cleanup docs after merge from main
  • Loading branch information
wiedld authored Feb 16, 2025
1 parent fefcf5b commit 50c3547
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 31 deletions.
117 changes: 90 additions & 27 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrde

use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
/// ones it can prove unnecessary.
/// This rule inspects [`SortExec`]'s in the given physical plan in order to
/// remove unnecessary sorts, and optimize sort performance across the plan.
#[derive(Default, Debug)]
pub struct EnforceSorting {}

Expand All @@ -84,42 +84,54 @@ impl EnforceSorting {
}
}

/// This object is used within the [`EnforceSorting`] rule to track the closest
/// This context object is used within the [`EnforceSorting`] rule to track the closest
/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
/// via its children.
pub type PlanWithCorrespondingSort = PlanContext<bool>;

/// For a given node, update the [`PlanContext.data`] attribute.
///
/// If the node is a `SortExec`, or any of the node's children are a `SortExec`,
/// then set the attribute to true.
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
fn update_sort_ctx_children_data(
mut node: PlanWithCorrespondingSort,
mut node_and_ctx: PlanWithCorrespondingSort,
data: bool,
) -> Result<PlanWithCorrespondingSort> {
for child_node in node.children.iter_mut() {
let plan = &child_node.plan;
child_node.data = if is_sort(plan) {
// Initiate connection:
// Update `child.data` for all children.
for child_node in node_and_ctx.children.iter_mut() {
let child_plan = &child_node.plan;
child_node.data = if is_sort(child_plan) {
// child is sort
true
} else if is_limit(plan) {
} else if is_limit(child_plan) {
// There is no sort linkage for this path, it starts at a limit.
false
} else {
let is_spm = is_sort_preserving_merge(plan);
let required_orderings = plan.required_input_ordering();
let flags = plan.maintains_input_order();
// If a descendent is a sort, and the child maintains the sort.
let is_spm = is_sort_preserving_merge(child_plan);
let required_orderings = child_plan.required_input_ordering();
let flags = child_plan.maintains_input_order();
// Add parent node to the tree if there is at least one child with
// a sort connection:
izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
let propagates_ordering =
(maintains && required_ordering.is_none()) || is_spm;
// `connected_to_sort` only returns the correct answer with bottom-up traversal
let connected_to_sort =
child_node.children.iter().any(|child| child.data);
propagates_ordering && connected_to_sort
})
}
}

node.data = data;
Ok(node)
// set data attribute on current node
node_and_ctx.data = data;

Ok(node_and_ctx)
}

/// This object is used within the [`EnforceSorting`] rule to track the closest
Expand Down Expand Up @@ -151,10 +163,15 @@ fn update_coalesce_ctx_children(
};
}

/// The boolean flag `repartition_sorts` defined in the config indicates
/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to
/// perform sorting in parallel.
/// Performs optimizations based upon a series of subrules.
///
/// Refer to each subrule for detailed descriptions of the optimizations performed:
/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`],
/// and [`pushdown_sorts`].
///
/// Subrule application is ordering dependent.
///
/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled.
impl PhysicalOptimizerRule for EnforceSorting {
fn optimize(
&self,
Expand Down Expand Up @@ -243,20 +260,66 @@ fn replace_with_partial_sort(
Ok(plan)
}

/// This function turns plans of the form
/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into
/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below:
///
/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
/// combine the partitions first, and then sort:
/// ```text
/// ┌ ─ ─ ─ ─ ─ ┐
/// ┌─┬─┬─┐
/// ││B│A│D│... ├──┐
/// └─┴─┴─┘ │
/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐
/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │
/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘
/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
/// ┌─┬─┐ │ Partition Partition
/// ││E│C│ ... ├──┘
/// └─┴─┘
/// └ ─ ─ ─ ─ ─ ┘
/// Partition 2
/// ```
///
///
/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
/// sorts each partition first, then merge partitions while retaining the sort:
/// ```text
/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐
/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐
/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐
/// └─┴─┴─┘ │ │ └─┴─┴─┘ │
/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐
/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
/// │ │ │ └─┴─┴─┴─┴─┘
/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition
/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘
/// └─┴─┘ │ │ └─┴─┘
/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘
/// Partition 2 Partition 2
/// ```
///
/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the
/// sort first on a per-partition basis, thereby parallelizing the sort.
///
///
/// The outcome is that plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// to
/// are transformed into
/// ```text
/// "SortPreservingMergeExec: \[a@0 ASC\]",
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
pub fn parallelize_sorts(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
Expand Down Expand Up @@ -318,7 +381,7 @@ pub fn parallelize_sorts(
}

/// This function enforces sorting requirements and makes optimizations without
/// violating these requirements whenever possible.
/// violating these requirements whenever possible. Requires a bottom-up traversal.
pub fn ensure_sorting(
mut requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
Expand Down
16 changes: 12 additions & 4 deletions datafusion/physical-plan/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ impl DynTreeNode for dyn ExecutionPlan {
}
}

/// A node object beneficial for writing optimizer rules, encapsulating an [`ExecutionPlan`] node with a payload.
/// Since there are two ways to access child plans—directly from the plan and through child nodes—it's recommended
/// A node context object beneficial for writing optimizer rules.
/// This context encapsulating an [`ExecutionPlan`] node with a payload.
///
/// Since each wrapped node has it's children within both the [`PlanContext.plan.children()`],
/// as well as separately within the [`PlanContext.children`] (which are child nodes wrapped in the context),
/// it's important to keep these child plans in sync when performing mutations.
///
/// Since there are two ways to access child plans directly -— it's recommended
/// to perform mutable operations via [`Self::update_plan_from_children`].
/// After update `children`, please do the sync updating for `plan`'s children.
/// Or after creating the `PlanContext`, if you can't guarantee they are consistent, call `update_plan_from_children` to sync.
/// After mutating the `PlanContext.children`, or after creating the `PlanContext`,
/// call `update_plan_from_children` to sync.
#[derive(Debug)]
pub struct PlanContext<T: Sized> {
/// The execution plan associated with this context.
Expand All @@ -63,6 +69,8 @@ impl<T> PlanContext<T> {
}
}

/// Update the [`PlanContext.plan.children()`] from the [`PlanContext.children`],
/// if the `PlanContext.children` have been changed.
pub fn update_plan_from_children(mut self) -> Result<Self> {
let children_plans = self.children.iter().map(|c| Arc::clone(&c.plan)).collect();
self.plan = with_new_children_if_necessary(self.plan, children_plans)?;
Expand Down

0 comments on commit 50c3547

Please sign in to comment.