diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index cf9d33252ad9d..faf8d01a97fd9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -621,6 +621,7 @@ mod tests { limit_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_sorted, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, union_exec, + RequirementsTestExec, }; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; @@ -2346,4 +2347,67 @@ mod tests { assert_optimized!(expected_input, expected_no_change, physical_plan, true); Ok(()) } + + #[tokio::test] + async fn test_push_with_required_input_ordering_prohibited() -> Result<()> { + // SortExec: expr=[b] <-- can't push this down + // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order + // SortExec: expr=[a] + // MemoryExec + let schema = create_test_schema3()?; + let sort_exprs_a = vec![sort_expr("a", &schema)]; + let sort_exprs_b = vec![sort_expr("b", &schema)]; + let plan = memory_exec(&schema); + let plan = sort_exec(sort_exprs_a.clone(), plan); + let plan = RequirementsTestExec::new(plan) + .with_required_input_ordering(sort_exprs_a) + .with_maintains_input_order(true) + .into_arc(); + let plan = sort_exec(sort_exprs_b, plan); + + let expected_input = [ + "SortExec: expr=[b@1 ASC], preserve_partitioning=[false]", + " RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + // should not be able to push shorts + let expected_no_change = expected_input; + assert_optimized!(expected_input, expected_no_change, plan, true); + Ok(()) + } + + // test when the required input ordering is satisfied so could push through + #[tokio::test] + async fn test_push_with_required_input_ordering_allowed() -> Result<()> { + // SortExec: expr=[a,b] <-- can push this down (as it is compatible with the required input ordering) + // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order + // SortExec: expr=[a] + // MemoryExec + let schema = create_test_schema3()?; + let sort_exprs_a = vec![sort_expr("a", &schema)]; + let sort_exprs_ab = vec![sort_expr("a", &schema), sort_expr("b", &schema)]; + let plan = memory_exec(&schema); + let plan = sort_exec(sort_exprs_a.clone(), plan); + let plan = RequirementsTestExec::new(plan) + .with_required_input_ordering(sort_exprs_a) + .with_maintains_input_order(true) + .into_arc(); + let plan = sort_exec(sort_exprs_ab, plan); + + let expected_input = [ + "SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]", + " RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + // should able to push shorts + let expected = [ + "RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected, plan, true); + Ok(()) + } } diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 36ac4b22d5942..3577e109b0697 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -176,6 +176,7 @@ fn pushdown_requirement_to_children( || plan.as_any().is::() || is_limit(plan) || plan.as_any().is::() + || pushdown_would_violate_requirements(parent_required, plan.as_ref()) { // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements. // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering. @@ -211,6 +212,29 @@ fn pushdown_requirement_to_children( // TODO: Add support for Projection push down } +/// Return true if pushing the sort requirements through a node would violate +/// the input sorting requirements for the plan +fn pushdown_would_violate_requirements( + parent_required: LexRequirementRef, + child: &dyn ExecutionPlan, +) -> bool { + child + .required_input_ordering() + .iter() + .any(|child_required| { + let Some(child_required) = child_required.as_ref() else { + // no requirements, so pushing down would not violate anything + return false; + }; + // check if the plan's requirements would still e satisfied if we pushed + // down the parent requirements + child_required + .iter() + .zip(parent_required.iter()) + .all(|(c, p)| !c.compatible(p)) + }) +} + /// Determine children requirements: /// - If children requirements are more specific, do not push down parent /// requirements. diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 5320938d2eb88..55a0fa8145527 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -17,6 +17,8 @@ //! Collection of testing utility functions that are leveraged by the query optimizer rules +use std::any::Any; +use std::fmt::Formatter; use std::sync::Arc; use crate::datasource::listing::PartitionedFile; @@ -47,10 +49,14 @@ use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_plan::displayable; use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::{ + displayable, DisplayAs, DisplayFormatType, PlanProperties, +}; use async_trait::async_trait; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement; async fn register_current_csv( ctx: &SessionContext, @@ -354,6 +360,97 @@ pub fn sort_exec( Arc::new(SortExec::new(sort_exprs, input)) } +/// A test [`ExecutionPlan`] whose requirements can be configured. +#[derive(Debug)] +pub struct RequirementsTestExec { + required_input_ordering: Vec, + maintains_input_order: bool, + input: Arc, +} + +impl RequirementsTestExec { + pub fn new(input: Arc) -> Self { + Self { + required_input_ordering: vec![], + maintains_input_order: true, + input, + } + } + + /// sets the required input ordering + pub fn with_required_input_ordering( + mut self, + required_input_ordering: Vec, + ) -> Self { + self.required_input_ordering = required_input_ordering; + self + } + + /// set the maintains_input_order flag + pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self { + self.maintains_input_order = maintains_input_order; + self + } + + /// returns this ExecutionPlan as an Arc + pub fn into_arc(self) -> Arc { + Arc::new(self) + } +} + +impl DisplayAs for RequirementsTestExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "RequiredInputOrderingExec") + } +} + +impl ExecutionPlan for RequirementsTestExec { + fn name(&self) -> &str { + "RequiredInputOrderingExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn required_input_ordering(&self) -> Vec>> { + let requirement = + PhysicalSortRequirement::from_sort_exprs(&self.required_input_ordering); + vec![Some(requirement)] + } + + fn maintains_input_order(&self) -> Vec { + vec![self.maintains_input_order] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(RequirementsTestExec::new(children[0].clone()) + .with_required_input_ordering(self.required_input_ordering.clone()) + .with_maintains_input_order(self.maintains_input_order) + .into_arc()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("Test exec does not support execution") + } +} + /// A [`PlanContext`] object is susceptible to being left in an inconsistent state after /// untested mutable operations. It is crucial that there be no discrepancies between a plan /// associated with the root node and the plan generated after traversing all nodes