diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 9ca1a430e8c9..719f9cec373e 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -20,18 +20,19 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, - create_test_schema3, create_test_schema4, filter_exec, global_limit_exec, - hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec, - repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, - spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, + create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, + local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr, + sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, + union_exec, RequirementsTestExec, }; use datafusion_physical_plan::{displayable, InputOrderMode}; use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; -use datafusion_common::Result; -use datafusion_expr::JoinType; +use arrow_schema::DataType; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::{JoinType, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition}; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_expr::Partitioning; diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index f55b61d5b85d..6d497f642062 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -194,33 +194,20 @@ pub fn bounded_window_exec( col_name: &str, sort_exprs: impl IntoIterator, input: Arc, -) -> Arc { - bounded_window_exec_with_partition(col_name, sort_exprs, &[], input, false) -} - -pub fn bounded_window_exec_with_partition( - col_name: &str, - sort_exprs: impl IntoIterator, - partition_by: &[Arc], - input: Arc, - should_reverse: bool, ) -> Arc { let sort_exprs: LexOrdering = sort_exprs.into_iter().collect(); let schema = input.schema(); - let mut window_expr = create_window_expr( + let window_expr = create_window_expr( &WindowFunctionDefinition::AggregateUDF(count_udaf()), "count".to_owned(), &[col(col_name, &schema).unwrap()], - partition_by, + &[], sort_exprs.as_ref(), Arc::new(WindowFrame::new(Some(false))), schema.as_ref(), false, ) .unwrap(); - if should_reverse { - window_expr = window_expr.get_reverse_expr().unwrap(); - } Arc::new( BoundedWindowAggExec::try_new( diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 6497e0ec46a0..e505a9be5be1 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -243,11 +243,7 @@ impl OrderingEquivalenceClass { count += 1; } } - if count == 4 { - true - } else { - false - } + count == 4 } } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index c5cd2fde4ef1..3e198320a4e2 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1464,11 +1464,10 @@ fn update_properties( let normalized_expr = eq_properties .eq_group .normalize_expr(Arc::clone(&node.expr)); - if eq_properties.is_expr_constant(&normalized_expr) { - node.data.sort_properties = SortProperties::Singleton; - } else if eq_properties - .normalized_oeq_class() - .singleton_options(&normalized_expr) + if eq_properties.is_expr_constant(&normalized_expr) + || eq_properties + .normalized_oeq_class() + .singleton_options(&normalized_expr) { node.data.sort_properties = SortProperties::Singleton; } else if let Some(options) = eq_properties diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 123c89429ce8..2abe259a5f55 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -24,14 +24,13 @@ use crate::utils::{ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{ - ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor, + ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion, }; use datafusion_common::{plan_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::PhysicalSortRequirement; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::{ diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index e169ba9e99dd..1679652b0c81 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -37,6 +37,7 @@ use crate::{ }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::SortOptions; use datafusion_common::{exec_err, Result}; use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame, @@ -359,7 +360,7 @@ pub(crate) fn window_equivalence_properties( partition_by_order.into_iter().multi_cartesian_product(); let mut all_lexs = all_orders_cartesian .into_iter() - .map(|inner| LexOrdering::new(inner)) + .map(LexOrdering::new) .collect::>(); if !expr.partition_by().is_empty() && all_lexs @@ -498,15 +499,15 @@ pub(crate) fn window_equivalence_properties( .map(|pb| { vec![ PhysicalSortExpr::new( - pb.clone(), + Arc::clone(&pb), SortOptions::new(false, false), ), PhysicalSortExpr::new( - pb.clone(), + Arc::clone(&pb), SortOptions::new(false, true), ), PhysicalSortExpr::new( - pb.clone(), + Arc::clone(&pb), SortOptions::new(true, false), ), PhysicalSortExpr::new(pb, SortOptions::new(true, true)),