Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 17, 2024
1 parent 7d3747c commit bbe2c6c
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 573 deletions.
44 changes: 42 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode,
SymmetricHashJoinExec,
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
StreamJoinPartitionMode, SymmetricHashJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
Expand Down Expand Up @@ -199,6 +199,38 @@ fn swap_hash_join(
}
}

/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required
fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
let new_filter = swap_join_filter(join.filter());
let new_join_type = &swap_join_type(*join.join_type());

let new_join = NestedLoopJoinExec::try_new(
Arc::clone(join.right()),
Arc::clone(join.left()),
new_filter,
new_join_type,
)?;

// For Semi/Anti joins, swap result will produce same output schema,
// no need to wrap them into additional projection
let plan: Arc<dyn ExecutionPlan> = if matches!(
join.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
) {
Arc::new(new_join)
} else {
let projection =
swap_reverting_projection(&join.left().schema(), &join.right().schema());

Arc::new(ProjectionExec::try_new(projection, Arc::new(new_join))?)
};

Ok(plan)
}

/// When the order of the join is changed by the optimizer, the columns in
/// the output should not be impacted. This function creates the expressions
/// that will allow to swap back the values from the original left as the first
Expand Down Expand Up @@ -461,6 +493,14 @@ fn statistical_join_selection_subrule(
} else {
None
}
} else if let Some(nl_join) = plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
let left = nl_join.left();
let right = nl_join.right();
if should_swap_join_order(&**left, &**right)? {
swap_nl_join(nl_join).map(Some)?
} else {
None
}
} else {
None
};
Expand Down
Loading

0 comments on commit bbe2c6c

Please sign in to comment.