diff --git a/Cargo.toml b/Cargo.toml index f61ed7e58fe37..24bde78b3001b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "datafusion/optimizer", "datafusion/physical-expr-common", "datafusion/physical-expr", + "datafusion/physical-optimizer", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", @@ -97,6 +98,7 @@ datafusion-functions-array = { path = "datafusion/functions-array", version = "4 datafusion-optimizer = { path = "datafusion/optimizer", version = "40.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "40.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "40.0.0", default-features = false } +datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "40.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "40.0.0" } datafusion-proto = { path = "datafusion/proto", version = "40.0.0" } datafusion-proto-common = { path = "datafusion/proto-common", version = "40.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e48c6b081e1a5..cdf0e7f573163 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -387,7 +387,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -772,9 +772,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210" dependencies = [ "arrayref", "arrayvec", @@ -838,9 +838,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "bytes-utils" @@ -875,13 +875,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -1105,7 +1104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1154,6 +1153,7 @@ dependencies = [ "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", "flate2", @@ -1391,6 +1391,14 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-physical-optimizer" +version = "40.0.0" +dependencies = [ + "datafusion-common", + "datafusion-physical-plan", +] + [[package]] name = "datafusion-physical-plan" version = "40.0.0" @@ -1694,7 +1702,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1908,9 +1916,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http 1.1.0", @@ -1925,7 +1933,7 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1982,7 +1990,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -2034,7 +2042,7 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", "socket2", @@ -2707,7 +2715,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2917,9 +2925,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -2982,7 +2990,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "http-body-util", "hyper 1.4.1", "hyper-rustls 0.27.2", @@ -3269,9 +3277,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -3282,9 +3290,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -3319,7 +3327,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3454,7 +3462,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3500,7 +3508,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3513,7 +3521,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3535,9 +3543,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.70" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -3585,22 +3593,22 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3670,9 +3678,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes", @@ -3695,7 +3703,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3792,7 +3800,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3837,7 +3845,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3991,7 +3999,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -4025,7 +4033,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4290,7 +4298,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 532ca8fde9e73..c937a6f6e59a9 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -106,6 +106,7 @@ datafusion-functions-array = { workspace = true, optional = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } +datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } flate2 = { version = "1.0.24", optional = true } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index bc5062893c867..0824b249b7d15 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -39,7 +39,6 @@ use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryP #[cfg(feature = "array_expressions")] use crate::functions_array; use crate::physical_optimizer::optimizer::PhysicalOptimizer; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use crate::{functions, functions_aggregate}; use arrow_schema::{DataType, SchemaRef}; @@ -74,6 +73,7 @@ use datafusion_optimizer::{ }; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 81c1c4629a3ad..9b9b1db8ff817 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -199,7 +199,7 @@ //! [`QueryPlanner`]: execution::context::QueryPlanner //! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule //! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule -//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule +//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::PhysicalOptimizerRule //! //! ## Query Planning and Execution Overview //! diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 66067d8cb5c42..e412d814239d1 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -18,7 +18,6 @@ //! Utilizing exact statistics from sources to avoid scanning data use std::sync::Arc; -use super::optimizer::PhysicalOptimizerRule; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::aggregates::AggregateExec; @@ -29,6 +28,7 @@ use crate::scalar::ScalarValue; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_expr::utils::COUNT_STAR_EXPANSION; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::udaf::AggregateFunctionExpr; diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 42b7463600dcb..da0e44c8de4e3 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use crate::{ config::ConfigOptions, error::Result, - physical_optimizer::PhysicalOptimizerRule, physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, repartition::RepartitionExec, Partitioning, @@ -31,6 +30,7 @@ use crate::{ }; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 56cdbd645285d..29148a594f31c 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -21,13 +21,13 @@ use std::sync::Arc; use crate::error::Result; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::{physical_exprs_equal, AggregateExpr, PhysicalExpr}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; /// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs /// into a Single AggregateExec if their grouping exprs and aggregate exprs equal. diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 818b2304fe097..afed5dd375351 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -31,7 +31,6 @@ use crate::physical_optimizer::utils::{ add_sort_above_with_check, is_coalesce_partitions, is_repartition, is_sort_preserving_merge, }; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::joins::{ @@ -56,6 +55,7 @@ use datafusion_physical_expr::{ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use datafusion_physical_plan::ExecutionPlanProperties; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 24306647c6867..e577c5336086a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -49,7 +49,6 @@ use crate::physical_optimizer::utils::{ is_coalesce_partitions, is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -66,6 +65,7 @@ use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::partial_sort::PartialSortExec; use datafusion_physical_plan::ExecutionPlanProperties; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::izip; /// This rule inspects [`SortExec`]'s in the given physical plan and removes the @@ -631,6 +631,7 @@ mod tests { use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; + use datafusion_physical_optimizer::PhysicalOptimizerRule; use rstest::rstest; fn create_test_schema() -> Result { diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 6dfe17632a653..b849df88e4aaf 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -27,7 +27,6 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use crate::physical_plan::joins::{ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, @@ -42,6 +41,7 @@ use datafusion_common::{internal_err, JoinSide, JoinType}; use datafusion_expr::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index f9d5a4c186eee..b5d3f432d84d0 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -20,7 +20,6 @@ use std::sync::Arc; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::AggregateExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -29,6 +28,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::Itertools; /// An optimizer rule that passes a `limit` hint into grouped aggregations which don't require all diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 9ad05bf496e59..582f340151ae5 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -42,4 +42,4 @@ mod utils; #[cfg(test)] pub mod test_utils; -pub use optimizer::PhysicalOptimizerRule; +pub use datafusion_physical_optimizer::*; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 2d9744ad23dd3..6449dbea0ddf0 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -17,11 +17,11 @@ //! Physical optimizer traits +use datafusion_physical_optimizer::PhysicalOptimizerRule; use std::sync::Arc; use super::projection_pushdown::ProjectionPushdown; use super::update_aggr_exprs::OptimizeAggregateOrder; -use crate::config::ConfigOptions; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; @@ -32,32 +32,6 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_optimizer::topk_aggregation::TopKAggregation; -use crate::{error::Result, physical_plan::ExecutionPlan}; - -/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which -/// computes the same results, but in a potentially more efficient way. -/// -/// Use [`SessionState::add_physical_optimizer_rule`] to register additional -/// `PhysicalOptimizerRule`s. -/// -/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule -pub trait PhysicalOptimizerRule { - /// Rewrite `plan` to an optimized form - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result>; - - /// A human readable name for this optimizer rule - fn name(&self) -> &str; - - /// A flag to indicate whether the physical planner should valid the rule will not - /// change the schema of the plan after the rewriting. - /// Some of the optimization rules might change the nullable properties of the schema - /// and should disable the schema check. - fn schema_check(&self) -> bool; -} /// A rule-based physical optimizer. #[derive(Clone)] diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 671bb437d5fa2..cb9a0cb90e6c7 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -24,7 +24,6 @@ use std::sync::Arc; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; @@ -32,6 +31,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties}; diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 24d9f31687f92..84f898431762b 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -24,7 +24,6 @@ use std::collections::HashMap; use std::sync::Arc; use super::output_requirements::OutputRequirementExec; -use super::PhysicalOptimizerRule; use crate::datasource::physical_plan::CsvExec; use crate::error::Result; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -55,6 +54,7 @@ use datafusion_physical_expr::{ use datafusion_physical_plan::streaming::StreamingTableExec; use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::Itertools; /// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 46e13c74d667f..4e172fc052e19 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use crate::error::Result; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::ExecutionPlan; use datafusion_common::config::{ConfigOptions, OptimizerOptions}; @@ -36,6 +35,7 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::izip; /// The SanityCheckPlan rule rejects the following query plans: diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index b754ee75ef3e8..82cf44ad77962 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -19,7 +19,6 @@ use std::sync::Arc; -use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::AggregateExec; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; @@ -34,6 +33,7 @@ use datafusion_common::Result; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::Itertools; /// An optimizer rule that passes a `limit` hint to aggregations if the whole result is not needed diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index d6b4c33384856..f8edf73e3d2af 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -20,14 +20,13 @@ use std::sync::Arc; -use super::PhysicalOptimizerRule; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_datafusion_err, Result}; use datafusion_physical_expr::{ reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalSortRequirement, }; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; use datafusion_physical_plan::{ diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index efc83d8f6b5c2..0accf9d83516a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -38,7 +38,6 @@ use crate::logical_expr::{ }; use crate::logical_expr::{Limit, Values}; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; -use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::empty::EmptyExec; @@ -91,6 +90,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; use log::{debug, trace}; diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml new file mode 100644 index 0000000000000..9c0ee61da52a0 --- /dev/null +++ b/datafusion/physical-optimizer/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-physical-optimizer" +description = "DataFusion Physical Optimizer" +keywords = ["datafusion", "query", "optimizer"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +datafusion-common = { workspace = true, default-features = true } +datafusion-physical-plan = { workspace = true } diff --git a/datafusion/physical-optimizer/README.md b/datafusion/physical-optimizer/README.md new file mode 100644 index 0000000000000..eb361d3f67792 --- /dev/null +++ b/datafusion/physical-optimizer/README.md @@ -0,0 +1,25 @@ + + +# DataFusion Physical Optimizer + +DataFusion is an extensible query execution framework, written in Rust, +that uses Apache Arrow as its in-memory format. + +This crate contains the physical optimizer for DataFusion. diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs new file mode 100644 index 0000000000000..c5a49216f5fdc --- /dev/null +++ b/datafusion/physical-optimizer/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143 +#![deny(clippy::clone_on_ref_ptr)] + +mod optimizer; + +pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs new file mode 100644 index 0000000000000..885dc4a95b8c6 --- /dev/null +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical optimizer traits + +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_physical_plan::ExecutionPlan; +use std::sync::Arc; + +/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which +/// computes the same results, but in a potentially more efficient way. +/// +/// Use [`SessionState::add_physical_optimizer_rule`] to register additional +/// `PhysicalOptimizerRule`s. +/// +/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule +pub trait PhysicalOptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result>; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; + + /// A flag to indicate whether the physical planner should valid the rule will not + /// change the schema of the plan after the rewriting. + /// Some of the optimization rules might change the nullable properties of the schema + /// and should disable the schema check. + fn schema_check(&self) -> bool; +}