Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 30, 2024
1 parent 9e3fdec commit 7462f4b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 20 deletions.
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,9 @@ impl DefaultPhysicalPlanner {
_ => None,
}).sorted()
.collect::<Vec<_>>();
println!("filter expr: {:?}", expr);
println!("left_field_indices: {:?}", left_field_indices);
println!("right_field_indices: {:?}", right_field_indices);

// Collect DFFields and Fields required for intermediate schemas
let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices.clone()
Expand Down Expand Up @@ -1111,6 +1114,7 @@ impl DefaultPhysicalPlanner {
};

let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;

if join_on.is_empty() {
// there is no equal join condition, use the nested loop join
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,11 +1111,14 @@ impl SMJStream {

// Apply join filter if any
let output_batch = if let Some(f) = &self.filter {
println!("f: {:?}", f);

let filter_result = f
.expression()
.evaluate(&output_batch)?
.into_array(output_batch.num_rows())?;
let mask = datafusion_common::cast::as_boolean_array(&filter_result)?;
println!("mask: {:?}", mask);

compute::filter_record_batch(&output_batch, mask)?
} else {
Expand Down Expand Up @@ -1177,11 +1180,14 @@ impl SMJStream {

// Apply join filter if any
let output_batch = if let Some(f) = &self.filter {
println!("f: {:?}", f);

let filter_result = f
.expression()
.evaluate(&output_batch)?
.into_array(output_batch.num_rows())?;
let mask = datafusion_common::cast::as_boolean_array(&filter_result)?;
println!("mask: {:?}", mask);

compute::filter_record_batch(&output_batch, mask)?
} else {
Expand Down
30 changes: 29 additions & 1 deletion datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,35 @@ statement ok
set datafusion.execution.target_partitions = 4;

statement ok
set datafusion.optimizer.repartition_joins = false;
set datafusion.optimizer.repartition_joins = true;

# equijoin_and_other_condition (sort merge join)
statement ok
set datafusion.optimizer.prefer_hash_join = false;

query TT
EXPLAIN SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b + 1 > t1.b
----
logical_plan
Inner Join: t1.a = t2.a Filter: CAST(t2.b AS Int64) + Int64(1) > CAST(t1.b AS Int64)
--TableScan: t1 projection=[a, b]
--TableScan: t2 projection=[a, b]
physical_plan
SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) + 1 > CAST(b@0 AS Int64)
--SortExec: expr=[a@0 ASC]
----CoalesceBatchesExec: target_batch_size=8192
------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
--SortExec: expr=[a@0 ASC]
----CoalesceBatchesExec: target_batch_size=8192
------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]

query error DataFusion error: Arrow error: Invalid argument error: Invalid comparison operation: Int32 > Utf8
SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b > t1.b

statement ok
set datafusion.optimizer.prefer_hash_join = true;

statement ok
DROP TABLE t1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ Limit: skip=0, fetch=5
physical_plan
GlobalLimitExec: skip=0, fetch=5
--SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5
----ProjectionExec: expr=[a@1 as a]
------CoalesceBatchesExec: target_batch_size=8192
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true
----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
----SortExec: TopK(fetch=5), expr=[a@0 ASC NULLS LAST]
------ProjectionExec: expr=[a@1 as a]
--------CoalesceBatchesExec: target_batch_size=8192
----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@0, c@1)]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([c@0], 4), input_partitions=4
----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([c@1], 4), input_partitions=4
----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# preserve_inner_join
query IIII nosort
Expand All @@ -72,11 +78,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
0 0 0 0
0 0 2 0
0 0 3 0
0 0 6 0
0 0 20 0
0 0 7 0
0 0 11 0
0 0 12 0
0 0 14 0
0 0 1 0

query TT
EXPLAIN SELECT t2.a as a2, t2.b
Expand All @@ -100,14 +106,20 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10
----ProjectionExec: expr=[a@0 as a2, b@1 as b]
------CoalesceBatchesExec: target_batch_size=8192
--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)]
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], has_header=true
----------CoalesceBatchesExec: target_batch_size=8192
------------FilterExec: d@3 = 3
--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
----SortExec: TopK(fetch=10), expr=[a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST]
------ProjectionExec: expr=[a@0 as a2, b@1 as b]
--------CoalesceBatchesExec: target_batch_size=8192
----------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([d@1, c@0], 4), input_partitions=4
----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], has_header=true
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([d@3, c@2], 4), input_partitions=4
----------------CoalesceBatchesExec: target_batch_size=8192
------------------FilterExec: d@3 = 3
--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true

# preserve_right_semi_join
query II nosort
Expand Down

0 comments on commit 7462f4b

Please sign in to comment.