From 1a43e845629bc4eae85e5fe42c4e771c10ebb090 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 Oct 2022 17:24:52 -0400 Subject: [PATCH] Consolidate physical join code into `datafusion/core/src/physical_plan/joins` (#3942) * Consolidate physical join code into `datafusion/core/src/physical_plan/joins` * Update * Update `use` paths * Add RAT --- .../physical_optimizer/coalesce_batches.rs | 5 +-- .../hash_build_probe_order.rs | 9 +++-- .../physical_plan/{ => joins}/cross_join.rs | 19 ++++------ .../physical_plan/{ => joins}/hash_join.rs | 34 ++++++----------- .../core/src/physical_plan/joins/mod.rs | 38 +++++++++++++++++++ .../{ => joins}/sort_merge_join.rs | 8 ++-- .../{join_utils.rs => joins/utils.rs} | 2 +- datafusion/core/src/physical_plan/mod.rs | 5 +-- datafusion/core/src/physical_plan/planner.rs | 8 ++-- datafusion/core/tests/join_fuzz.rs | 3 +- 10 files changed, 77 insertions(+), 54 deletions(-) rename datafusion/core/src/physical_plan/{ => joins}/cross_join.rs (97%) rename datafusion/core/src/physical_plan/{ => joins}/hash_join.rs (99%) create mode 100644 datafusion/core/src/physical_plan/joins/mod.rs rename datafusion/core/src/physical_plan/{ => joins}/sort_merge_join.rs (99%) rename datafusion/core/src/physical_plan/{join_utils.rs => joins/utils.rs} (99%) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 51d56d28d9745..9f7e22dbb0857 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -22,9 +22,8 @@ use crate::{ error::Result, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ - coalesce_batches::CoalesceBatchesExec, filter::FilterExec, - hash_join::HashJoinExec, repartition::RepartitionExec, - with_new_children_if_necessary, + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, + repartition::RepartitionExec, with_new_children_if_necessary, }, }; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs index 66dfc6e693d61..6817001d374a5 100644 --- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs +++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs @@ -22,10 +22,11 @@ use arrow::datatypes::Schema; use crate::execution::context::SessionConfig; use crate::logical_expr::JoinType; -use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::expressions::Column; -use crate::physical_plan::hash_join::HashJoinExec; -use crate::physical_plan::join_utils::{ColumnIndex, JoinFilter, JoinSide}; +use crate::physical_plan::joins::{ + utils::{ColumnIndex, JoinFilter, JoinSide}, + CrossJoinExec, HashJoinExec, +}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::{ExecutionPlan, PhysicalExpr}; @@ -197,7 +198,7 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder { mod tests { use crate::{ physical_plan::{ - displayable, hash_join::PartitionMode, ColumnStatistics, Statistics, + displayable, joins::PartitionMode, ColumnStatistics, Statistics, }, test::exec::StatisticsExec, }; diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs similarity index 97% rename from datafusion/core/src/physical_plan/cross_join.rs rename to datafusion/core/src/physical_plan/joins/cross_join.rs index e3f25fc566abc..7a35116a46585 100644 --- a/datafusion/core/src/physical_plan/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -26,22 +26,19 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use super::expressions::PhysicalSortExpr; -use super::{ - coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid, - ColumnStatistics, Statistics, +use crate::execution::context::TaskContext; +use crate::physical_plan::{ + coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, + ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; use crate::{error::Result, scalar::ScalarValue}; use async_trait::async_trait; +use datafusion_physical_expr::PhysicalSortExpr; +use log::debug; use std::time::Instant; -use super::{ - coalesce_batches::concat_batches, DisplayFormatType, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, -}; -use crate::execution::context::TaskContext; -use crate::physical_plan::join_utils::{OnceAsync, OnceFut}; -use log::debug; +use super::utils::{check_join_is_valid, OnceAsync, OnceFut}; /// Data of the left side type JoinLeftData = RecordBatch; diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs similarity index 99% rename from datafusion/core/src/physical_plan/hash_join.rs rename to datafusion/core/src/physical_plan/joins/hash_join.rs index 972f432ccfbba..ff036b78b32e6 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -55,33 +55,32 @@ use arrow::array::{ use hashbrown::raw::RawTable; -use super::{ +use crate::physical_plan::{ + coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, + expressions::Column, expressions::PhysicalSortExpr, - join_utils::{ + hash_utils::create_hashes, + joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex, JoinFilter, JoinOn, JoinSide, }, -}; -use super::{ - expressions::Column, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; -use super::{hash_utils::create_hashes, Statistics}; + use crate::error::{DataFusionError, Result}; use crate::logical_expr::JoinType; -use super::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, -}; use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; use crate::execution::context::TaskContext; -use crate::physical_plan::coalesce_batches::concat_batches; -use crate::physical_plan::PhysicalExpr; -use crate::physical_plan::join_utils::{OnceAsync, OnceFut}; +use super::{ + utils::{OnceAsync, OnceFut}, + PartitionMode, +}; use log::debug; use std::cmp; use std::fmt; @@ -182,15 +181,6 @@ impl HashJoinMetrics { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -/// Partitioning mode to use for hash join -pub enum PartitionMode { - /// Left/right children are partitioned using the left and right keys - Partitioned, - /// Left side will collected into one partition - CollectLeft, -} - impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error diff --git a/datafusion/core/src/physical_plan/joins/mod.rs b/datafusion/core/src/physical_plan/joins/mod.rs new file mode 100644 index 0000000000000..ae8f943af491d --- /dev/null +++ b/datafusion/core/src/physical_plan/joins/mod.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataFusion Join implementations + +mod cross_join; +mod hash_join; +mod sort_merge_join; +pub mod utils; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Partitioning mode to use for hash join +pub enum PartitionMode { + /// Left/right children are partitioned using the left and right keys + Partitioned, + /// Left side will collected into one partition + CollectLeft, +} + +pub use cross_join::CrossJoinExec; +pub use hash_join::HashJoinExec; + +// Note: SortMergeJoin is not used in plans yet +pub use sort_merge_join::SortMergeJoinExec; diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs similarity index 99% rename from datafusion/core/src/physical_plan/sort_merge_join.rs rename to datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 29da01a1474e1..3de712745de4f 100644 --- a/datafusion/core/src/physical_plan/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -42,7 +42,9 @@ use crate::logical_expr::JoinType; use crate::physical_plan::common::combine_batches; use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::join_utils::{build_join_schema, check_join_is_valid, JoinOn}; +use crate::physical_plan::joins::utils::{ + build_join_schema, check_join_is_valid, JoinOn, +}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::physical_plan::{ metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, @@ -1198,9 +1200,9 @@ mod tests { use crate::error::Result; use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; - use crate::physical_plan::join_utils::JoinOn; + use crate::physical_plan::joins::utils::JoinOn; + use crate::physical_plan::joins::SortMergeJoinExec; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::sort_merge_join::SortMergeJoinExec; use crate::physical_plan::{common, ExecutionPlan}; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{build_table_i32, columns}; diff --git a/datafusion/core/src/physical_plan/join_utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs similarity index 99% rename from datafusion/core/src/physical_plan/join_utils.rs rename to datafusion/core/src/physical_plan/joins/utils.rs index 4ce72ccc22f28..f937dc1c42a92 100644 --- a/datafusion/core/src/physical_plan/join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -33,7 +33,7 @@ use std::future::Future; use std::sync::Arc; use std::task::{Context, Poll}; -use super::{ColumnStatistics, ExecutionPlan, Statistics}; +use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; /// The on clause of the join, as vector of (left, right) columns. pub type JoinOn = Vec<(Column, Column)>; diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index b2f6c40f556d4..9e36c3ec80edc 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -520,22 +520,19 @@ pub mod analyze; pub mod coalesce_batches; pub mod coalesce_partitions; pub mod common; -pub mod cross_join; pub mod display; pub mod empty; pub mod explain; pub mod file_format; pub mod filter; -pub mod hash_join; pub mod hash_utils; -pub mod join_utils; +pub mod joins; pub mod limit; pub mod memory; pub mod metrics; pub mod planner; pub mod projection; pub mod repartition; -pub mod sort_merge_join; pub mod sorts; pub mod stream; pub mod udaf; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 4a8399af920c8..c209fd150cc51 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -20,7 +20,7 @@ use super::analyze::AnalyzeExec; use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ - aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, + aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY}; @@ -39,17 +39,17 @@ use crate::logical_expr::{Limit, Values}; use crate::physical_expr::create_physical_expr; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; -use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{Column, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::hash_join::HashJoinExec; +use crate::physical_plan::joins::CrossJoinExec; +use crate::physical_plan::joins::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::WindowAggExec; -use crate::physical_plan::{join_utils, Partitioning}; +use crate::physical_plan::{joins::utils as join_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr}; use crate::{ error::{DataFusionError, Result}, diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs index 9e402896cd09e..c5111a0750bfa 100644 --- a/datafusion/core/tests/join_fuzz.rs +++ b/datafusion/core/tests/join_fuzz.rs @@ -26,9 +26,8 @@ use rand::{Rng, SeedableRng}; use datafusion::physical_plan::collect; use datafusion::physical_plan::expressions::Column; -use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; +use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; use datafusion::physical_plan::memory::MemoryExec; -use datafusion::physical_plan::sort_merge_join::SortMergeJoinExec; use datafusion_expr::JoinType; use datafusion::prelude::{SessionConfig, SessionContext};