diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index dd705cb5311c..420c080f09c2 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -72,8 +72,8 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrde use itertools::izip; -/// This rule inspects [`SortExec`]'s in the given physical plan and removes the -/// ones it can prove unnecessary. +/// This rule inspects [`SortExec`]'s in the given physical plan in order to +/// remove unnecessary sorts, and optimize sort performance across the plan. #[derive(Default, Debug)] pub struct EnforceSorting {} @@ -84,33 +84,43 @@ impl EnforceSorting { } } -/// This object is used within the [`EnforceSorting`] rule to track the closest +/// This context object is used within the [`EnforceSorting`] rule to track the closest /// [`SortExec`] descendant(s) for every child of a plan. The data attribute /// stores whether the plan is a `SortExec` or is connected to a `SortExec` /// via its children. pub type PlanWithCorrespondingSort = PlanContext; +/// For a given node, update the [`PlanContext.data`] attribute. +/// +/// If the node is a `SortExec`, or any of the node's children are a `SortExec`, +/// then set the attribute to true. +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. fn update_sort_ctx_children_data( - mut node: PlanWithCorrespondingSort, + mut node_and_ctx: PlanWithCorrespondingSort, data: bool, ) -> Result { - for child_node in node.children.iter_mut() { - let plan = &child_node.plan; - child_node.data = if is_sort(plan) { - // Initiate connection: + // Update `child.data` for all children. + for child_node in node_and_ctx.children.iter_mut() { + let child_plan = &child_node.plan; + child_node.data = if is_sort(child_plan) { + // child is sort true - } else if is_limit(plan) { + } else if is_limit(child_plan) { // There is no sort linkage for this path, it starts at a limit. false } else { - let is_spm = is_sort_preserving_merge(plan); - let required_orderings = plan.required_input_ordering(); - let flags = plan.maintains_input_order(); + // If a descendent is a sort, and the child maintains the sort. + let is_spm = is_sort_preserving_merge(child_plan); + let required_orderings = child_plan.required_input_ordering(); + let flags = child_plan.maintains_input_order(); // Add parent node to the tree if there is at least one child with // a sort connection: izip!(flags, required_orderings).any(|(maintains, required_ordering)| { let propagates_ordering = (maintains && required_ordering.is_none()) || is_spm; + // `connected_to_sort` only returns the correct answer with bottom-up traversal let connected_to_sort = child_node.children.iter().any(|child| child.data); propagates_ordering && connected_to_sort @@ -118,8 +128,10 @@ fn update_sort_ctx_children_data( } } - node.data = data; - Ok(node) + // set data attribute on current node + node_and_ctx.data = data; + + Ok(node_and_ctx) } /// This object is used within the [`EnforceSorting`] rule to track the closest @@ -151,10 +163,15 @@ fn update_coalesce_ctx_children( }; } -/// The boolean flag `repartition_sorts` defined in the config indicates -/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades -/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to -/// perform sorting in parallel. +/// Performs optimizations based upon a series of subrules. +/// +/// Refer to each subrule for detailed descriptions of the optimizations performed: +/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`], +/// and [`pushdown_sorts`]. +/// +/// Subrule application is ordering dependent. +/// +/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled. impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, @@ -243,20 +260,66 @@ fn replace_with_partial_sort( Ok(plan) } -/// This function turns plans of the form +/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into +/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below: +/// +/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades +/// combine the partitions first, and then sort: +/// ```text +/// ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ +/// ││B│A│D│... ├──┐ +/// └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ Partition Partition +/// ││E│C│ ... ├──┘ +/// └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 +/// ``` +/// +/// +/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades +/// sorts each partition first, then merge partitions while retaining the sort: +/// ```text +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐ +/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐ +/// └─┴─┴─┘ │ │ └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition +/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘ +/// └─┴─┘ │ │ └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 Partition 2 +/// ``` +/// +/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the +/// sort first on a per-partition basis, thereby parallelizing the sort. +/// +/// +/// The outcome is that plans of the form /// ```text /// "SortExec: expr=\[a@0 ASC\]", -/// " CoalescePartitionsExec", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// " ...nodes..." +/// " CoalescePartitionsExec", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` -/// to +/// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " SortExec: expr=\[a@0 ASC\]", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// " ...nodes..." +/// " SortExec: expr=\[a@0 ASC\]", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` -/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. -/// By performing sorting in parallel, we can increase performance in some scenarios. pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { @@ -318,7 +381,7 @@ pub fn parallelize_sorts( } /// This function enforces sorting requirements and makes optimizations without -/// violating these requirements whenever possible. +/// violating these requirements whenever possible. Requires a bottom-up traversal. pub fn ensure_sorting( mut requirements: PlanWithCorrespondingSort, ) -> Result> { diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 7a5edc7aaddd..69b0a165315e 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -39,11 +39,17 @@ impl DynTreeNode for dyn ExecutionPlan { } } -/// A node object beneficial for writing optimizer rules, encapsulating an [`ExecutionPlan`] node with a payload. -/// Since there are two ways to access child plans—directly from the plan and through child nodes—it's recommended +/// A node context object beneficial for writing optimizer rules. +/// This context encapsulating an [`ExecutionPlan`] node with a payload. +/// +/// Since each wrapped node has it's children within both the [`PlanContext.plan.children()`], +/// as well as separately within the [`PlanContext.children`] (which are child nodes wrapped in the context), +/// it's important to keep these child plans in sync when performing mutations. +/// +/// Since there are two ways to access child plans directly -— it's recommended /// to perform mutable operations via [`Self::update_plan_from_children`]. -/// After update `children`, please do the sync updating for `plan`'s children. -/// Or after creating the `PlanContext`, if you can't guarantee they are consistent, call `update_plan_from_children` to sync. +/// After mutating the `PlanContext.children`, or after creating the `PlanContext`, +/// call `update_plan_from_children` to sync. #[derive(Debug)] pub struct PlanContext { /// The execution plan associated with this context. @@ -63,6 +69,8 @@ impl PlanContext { } } + /// Update the [`PlanContext.plan.children()`] from the [`PlanContext.children`], + /// if the `PlanContext.children` have been changed. pub fn update_plan_from_children(mut self) -> Result { let children_plans = self.children.iter().map(|c| Arc::clone(&c.plan)).collect(); self.plan = with_new_children_if_necessary(self.plan, children_plans)?;