From c2c7a96d0c7bdd130280c22c68fbb4597c9757a2 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 21 Mar 2024 22:12:24 +0200 Subject: [PATCH] fix: duplicate output for HashJoinExec in CollectLeft mode --- .../physical-plan/src/joins/hash_join.rs | 135 +++++++++++++++++- 1 file changed, 131 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a1c50a2113baa..ce8a23941e18a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1640,26 +1640,73 @@ mod tests { join_type: &JoinType, null_equals_null: bool, context: Arc, + ) -> Result<(Vec, Vec)> { + join_collect_with_partition_mode( + left, + right, + on, + join_type, + PartitionMode::Partitioned, + null_equals_null, + context, + ) + .await + } + + async fn join_collect_with_partition_mode( + left: Arc, + right: Arc, + on: JoinOn, + join_type: &JoinType, + partition_mode: PartitionMode, + null_equals_null: bool, + context: Arc, ) -> Result<(Vec, Vec)> { let partition_count = 4; let (left_expr, right_expr) = on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); - let join = HashJoinExec::try_new( - Arc::new(RepartitionExec::try_new( + let left_repartitioned: Arc = match partition_mode { + PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)), + PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( left, Partitioning::Hash(left_expr, partition_count), )?), - Arc::new(RepartitionExec::try_new( + PartitionMode::Auto => { + return internal_err!("Unexpected PartitionMode::Auto in join tests") + } + }; + + let right_repartitioned: Arc = match partition_mode { + PartitionMode::CollectLeft => { + let partition_column_name = right.schema().field(0).name().clone(); + let partition_expr = vec![Arc::new(Column::new_with_schema( + &partition_column_name, + &right.schema(), + )?) as _]; + Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(partition_expr, partition_count), + )?) as _ + } + PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( right, Partitioning::Hash(right_expr, partition_count), )?), + PartitionMode::Auto => { + return internal_err!("Unexpected PartitionMode::Auto in join tests") + } + }; + + let join = HashJoinExec::try_new( + left_repartitioned, + right_repartitioned, on, None, join_type, None, - PartitionMode::Partitioned, + partition_mode, null_equals_null, )?; @@ -3312,6 +3359,86 @@ mod tests { Ok(()) } + /// Test for parallelised HashJoinExec with PartitionMode::CollectLeft + #[tokio::test] + async fn test_collect_left_multiple_partitions_join() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b2", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + + let expected_inner = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "+----+----+----+----+----+----+", + ]; + let expected_left = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 3 | 7 | 9 | | | |", + "+----+----+----+----+----+----+", + ]; + let expected_right = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| | | | 30 | 6 | 90 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "+----+----+----+----+----+----+", + ]; + let expected_full = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| | | | 30 | 6 | 90 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 3 | 7 | 9 | | | |", + "+----+----+----+----+----+----+", + ]; + + let test_cases = vec![ + (JoinType::Inner, expected_inner), + (JoinType::Left, expected_left), + (JoinType::Right, expected_right), + (JoinType::Full, expected_full), + ]; + + for (join_type, expected) in test_cases { + let (_, batches) = join_collect_with_partition_mode( + left.clone(), + right.clone(), + on.clone(), + &join_type, + PartitionMode::CollectLeft, + false, + task_ctx.clone(), + ) + .await?; + assert_batches_sorted_eq!(expected, &batches); + } + + Ok(()) + } + #[tokio::test] async fn join_date32() -> Result<()> { let schema = Arc::new(Schema::new(vec![