From f486c4f21c66ee5ae08f94f3d7331e1162a8e534 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 21 Mar 2024 22:12:24 +0200 Subject: [PATCH] fix: duplicate output for HashJoinExec in CollectLeft mode --- .../src/physical_optimizer/join_selection.rs | 73 ++--- .../physical-plan/src/joins/hash_join.rs | 290 ++++++++++++++---- 2 files changed, 254 insertions(+), 109 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 72174b0e6e2f6..f7512cb6d0756 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -305,11 +305,6 @@ impl PhysicalOptimizerRule for JoinSelection { /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides. /// When the `ignore_threshold` is false, this function will also check left /// and right sizes in bytes or rows. -/// -/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`. -/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft` -/// mode as is, but it can do so by changing the join type to [`JoinType::Right`] -/// and [`JoinType::RightAnti`], respectively. fn try_collect_left( hash_join: &HashJoinExec, ignore_threshold: bool, @@ -318,38 +313,20 @@ fn try_collect_left( ) -> Result>> { let left = hash_join.left(); let right = hash_join.right(); - let join_type = hash_join.join_type(); - let left_can_collect = match join_type { - JoinType::Left | JoinType::Full | JoinType::LeftAnti => false, - JoinType::Inner - | JoinType::LeftSemi - | JoinType::Right - | JoinType::RightSemi - | JoinType::RightAnti => { - ignore_threshold - || supports_collect_by_thresholds( - &**left, - threshold_byte_size, - threshold_num_rows, - ) - } - }; - let right_can_collect = match join_type { - JoinType::Right | JoinType::Full | JoinType::RightAnti => false, - JoinType::Inner - | JoinType::RightSemi - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti => { - ignore_threshold - || supports_collect_by_thresholds( - &**right, - threshold_byte_size, - threshold_num_rows, - ) - } - }; + let left_can_collect = ignore_threshold + || supports_collect_by_thresholds( + &**left, + threshold_byte_size, + threshold_num_rows, + ); + let right_can_collect = ignore_threshold + || supports_collect_by_thresholds( + &**right, + threshold_byte_size, + threshold_num_rows, + ); + match (left_can_collect, right_can_collect) { (true, true) => { if should_swap_join_order(&**left, &**right)? @@ -916,9 +893,9 @@ mod tests_statistical { } #[tokio::test] - async fn test_left_join_with_swap() { + async fn test_left_join_no_swap() { let (big, small) = create_big_and_small(); - // Left out join should alway swap when the mode is PartitionMode::CollectLeft, even left side is small and right side is large + let join = Arc::new( HashJoinExec::try_new( Arc::clone(&small), @@ -942,32 +919,18 @@ mod tests_statistical { .optimize(join.clone(), &ConfigOptions::new()) .unwrap(); - let swapping_projection = optimized_join - .as_any() - .downcast_ref::() - .expect("A proj is required to swap columns back to their original order"); - - assert_eq!(swapping_projection.expr().len(), 2); - let (col, name) = &swapping_projection.expr()[0]; - assert_eq!(name, "small_col"); - assert_col_expr(col, "small_col", 1); - let (col, name) = &swapping_projection.expr()[1]; - assert_eq!(name, "big_col"); - assert_col_expr(col, "big_col", 0); - - let swapped_join = swapping_projection - .input() + let swapped_join = optimized_join .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); assert_eq!( swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) + Precision::Inexact(8192) ); assert_eq!( swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) + Precision::Inexact(2097152) ); crosscheck_plans(join.clone()).unwrap(); } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a1c50a2113baa..93d32ea1ddc0f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -19,6 +19,7 @@ use std::fmt; use std::mem::size_of; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use std::{any::Any, usize, vec}; @@ -72,6 +73,9 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; +use parking_lot::Mutex; + +type SharedBitmapBuilder = Mutex; /// HashTable and input data for the left (build side) of a join struct JoinLeftData { @@ -79,6 +83,11 @@ struct JoinLeftData { hash_map: JoinHashMap, /// The input rows for the build side batch: RecordBatch, + /// Shared bitmap builder for visited left indices + visited_indices_bitmap: Mutex, + /// Counter of running probe-threads, potentially + /// able to update `visited_indices_bitmap` + running_threads_counter: AtomicUsize, /// Memory reservation that tracks memory used by `hash_map` hash table /// `batch`. Cleared on drop. #[allow(dead_code)] @@ -90,20 +99,19 @@ impl JoinLeftData { fn new( hash_map: JoinHashMap, batch: RecordBatch, + visited_indices_bitmap: SharedBitmapBuilder, + running_threads_counter: AtomicUsize, reservation: MemoryReservation, ) -> Self { Self { hash_map, batch, + visited_indices_bitmap, + running_threads_counter, reservation, } } - /// Returns the number of rows in the build side - fn num_rows(&self) -> usize { - self.batch.num_rows() - } - /// return a reference to the hash map fn hash_map(&self) -> &JoinHashMap { &self.hash_map @@ -113,6 +121,17 @@ impl JoinLeftData { fn batch(&self) -> &RecordBatch { &self.batch } + + /// returns a reference to the visited indices bitmap + fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder { + &self.visited_indices_bitmap + } + + /// Decrements the counter of running threads, and returns `true` + /// if caller is the last running thread + fn report_probe_completed(&self) -> bool { + self.running_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 + } } /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple @@ -711,6 +730,8 @@ impl ExecutionPlan for HashJoinExec { context.clone(), join_metrics.clone(), reservation, + need_produce_result_in_final(self.join_type), + self.right().output_partitioning().partition_count(), ) }), PartitionMode::Partitioned => { @@ -726,6 +747,8 @@ impl ExecutionPlan for HashJoinExec { context.clone(), join_metrics.clone(), reservation, + need_produce_result_in_final(self.join_type), + 1, )) } PartitionMode::Auto => { @@ -738,9 +761,6 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); - let reservation = MemoryConsumer::new(format!("HashJoinStream[{partition}]")) - .register(context.memory_pool()); - // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -765,7 +785,6 @@ impl ExecutionPlan for HashJoinExec { random_state: self.random_state.clone(), join_metrics, null_equals_null: self.null_equals_null, - reservation, state: HashJoinStreamState::WaitBuildSide, build_side: BuildSide::Initial(BuildSideInitialState { left_fut }), batch_size, @@ -812,6 +831,8 @@ async fn collect_left_input( context: Arc, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, + with_visited_indices_bitmap: bool, + right_input_partitions: usize, ) -> Result { let schema = left.schema(); @@ -888,10 +909,29 @@ async fn collect_left_input( )?; offset += batch.num_rows(); } - // Merge all batches into a single batch, so we - // can directly index into the arrays + // Merge all batches into a single batch, so we can directly index into the arrays let single_batch = concat_batches(&schema, batches_iter)?; - let data = JoinLeftData::new(hashmap, single_batch, reservation); + + // Reserve additional memory for visited indices bitmap and create shared builder + let visited_indices_bitmap = if with_visited_indices_bitmap { + let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); + reservation.try_grow(bitmap_size)?; + metrics.build_mem_used.add(bitmap_size); + + let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows()); + bitmap_buffer.append_n(num_rows, false); + bitmap_buffer + } else { + BooleanBufferBuilder::new(0) + }; + + let data = JoinLeftData::new( + hashmap, + single_batch, + Mutex::new(visited_indices_bitmap), + AtomicUsize::new(right_input_partitions), + reservation, + ); Ok(data) } @@ -961,10 +1001,6 @@ struct BuildSideInitialState { struct BuildSideReadyState { /// Collected build-side data left_data: Arc, - /// Which build-side rows have been matched while creating output. - /// For some OUTER joins, we need to know which rows have not been matched - /// to produce the correct output. - visited_left_side: BooleanBufferBuilder, } impl BuildSide { @@ -1083,8 +1119,6 @@ struct HashJoinStream { column_indices: Vec, /// If null_equals_null is true, null == null else null != null null_equals_null: bool, - /// Memory reservation - reservation: MemoryReservation, /// State of the stream state: HashJoinStreamState, /// Build side @@ -1246,6 +1280,14 @@ pub fn equal_rows_arr( )) } +fn get_final_indices_from_shared_bitmap( + shared_bitmap: &SharedBitmapBuilder, + join_type: JoinType, +) -> (UInt64Array, UInt32Array) { + let bitmap = shared_bitmap.lock(); + get_final_indices_from_bit_map(&bitmap, join_type) +} + impl HashJoinStream { /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly @@ -1288,35 +1330,8 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); - // Reserving memory for visited_left_side bitmap in case it hasn't been initialized yet - // and join_type requires to store it - if need_produce_result_in_final(self.join_type) { - // TODO: Replace `ceil` wrapper with stable `div_cell` after - // https://github.com/rust-lang/rust/issues/88581 - let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8); - self.reservation.try_grow(visited_bitmap_size)?; - self.join_metrics.build_mem_used.add(visited_bitmap_size); - } - - let visited_left_side = if need_produce_result_in_final(self.join_type) { - let num_rows = left_data.num_rows(); - // Some join types need to track which row has be matched or unmatched: - // `left semi` join: need to use the bitmap to produce the matched row in the left side - // `left` join: need to use the bitmap to produce the unmatched row in the left side with null - // `left anti` join: need to use the bitmap to produce the unmatched row in the left side - // `full` join: need to use the bitmap to produce the unmatched row in the left side with null - let mut buffer = BooleanBufferBuilder::new(num_rows); - buffer.append_n(num_rows, false); - buffer - } else { - BooleanBufferBuilder::new(0) - }; - self.state = HashJoinStreamState::FetchProbeBatch; - self.build_side = BuildSide::Ready(BuildSideReadyState { - left_data, - visited_left_side, - }); + self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1401,8 +1416,9 @@ impl HashJoinStream { // mark joined left-side indices as visited, if required by join type if need_produce_result_in_final(self.join_type) { + let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); left_indices.iter().flatten().for_each(|x| { - build_side.visited_left_side.set_bit(x as usize, true); + bitmap.set_bit(x as usize, true); }); } @@ -1481,15 +1497,20 @@ impl HashJoinStream { if !need_produce_result_in_final(self.join_type) { self.state = HashJoinStreamState::Completed; - return Ok(StatefulStreamResult::Continue); } let build_side = self.build_side.try_as_ready()?; + if !build_side.left_data.report_probe_completed() { + self.state = HashJoinStreamState::Completed; + return Ok(StatefulStreamResult::Continue); + } // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = - get_final_indices_from_bit_map(&build_side.visited_left_side, self.join_type); + let (left_side, right_side) = get_final_indices_from_shared_bitmap( + build_side.left_data.visited_indices_bitmap(), + self.join_type, + ); let empty_right_batch = RecordBatch::new_empty(self.right.schema()); // use the left and right indices to produce the batch result let result = build_batch_from_indices( @@ -1640,26 +1661,73 @@ mod tests { join_type: &JoinType, null_equals_null: bool, context: Arc, + ) -> Result<(Vec, Vec)> { + join_collect_with_partition_mode( + left, + right, + on, + join_type, + PartitionMode::Partitioned, + null_equals_null, + context, + ) + .await + } + + async fn join_collect_with_partition_mode( + left: Arc, + right: Arc, + on: JoinOn, + join_type: &JoinType, + partition_mode: PartitionMode, + null_equals_null: bool, + context: Arc, ) -> Result<(Vec, Vec)> { let partition_count = 4; let (left_expr, right_expr) = on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); - let join = HashJoinExec::try_new( - Arc::new(RepartitionExec::try_new( + let left_repartitioned: Arc = match partition_mode { + PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)), + PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( left, Partitioning::Hash(left_expr, partition_count), )?), - Arc::new(RepartitionExec::try_new( + PartitionMode::Auto => { + return internal_err!("Unexpected PartitionMode::Auto in join tests") + } + }; + + let right_repartitioned: Arc = match partition_mode { + PartitionMode::CollectLeft => { + let partition_column_name = right.schema().field(0).name().clone(); + let partition_expr = vec![Arc::new(Column::new_with_schema( + &partition_column_name, + &right.schema(), + )?) as _]; + Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(partition_expr, partition_count), + )?) as _ + } + PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( right, Partitioning::Hash(right_expr, partition_count), )?), + PartitionMode::Auto => { + return internal_err!("Unexpected PartitionMode::Auto in join tests") + } + }; + + let join = HashJoinExec::try_new( + left_repartitioned, + right_repartitioned, on, None, join_type, None, - PartitionMode::Partitioned, + partition_mode, null_equals_null, )?; @@ -3312,6 +3380,120 @@ mod tests { Ok(()) } + /// Test for parallelised HashJoinExec with PartitionMode::CollectLeft + #[tokio::test] + async fn test_collect_left_multiple_partitions_join() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b2", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + + let expected_inner = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "+----+----+----+----+----+----+", + ]; + let expected_left = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 3 | 7 | 9 | | | |", + "+----+----+----+----+----+----+", + ]; + let expected_right = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| | | | 30 | 6 | 90 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "+----+----+----+----+----+----+", + ]; + let expected_full = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| | | | 30 | 6 | 90 |", + "| 1 | 4 | 7 | 10 | 4 | 70 |", + "| 2 | 5 | 8 | 20 | 5 | 80 |", + "| 3 | 7 | 9 | | | |", + "+----+----+----+----+----+----+", + ]; + let expected_left_semi = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "+----+----+----+", + ]; + let expected_left_anti = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 3 | 7 | 9 |", + "+----+----+----+", + ]; + let expected_right_semi = vec![ + "+----+----+----+", + "| a2 | b2 | c2 |", + "+----+----+----+", + "| 10 | 4 | 70 |", + "| 20 | 5 | 80 |", + "+----+----+----+", + ]; + let expected_right_anti = vec![ + "+----+----+----+", + "| a2 | b2 | c2 |", + "+----+----+----+", + "| 30 | 6 | 90 |", + "+----+----+----+", + ]; + + let test_cases = vec![ + (JoinType::Inner, expected_inner), + (JoinType::Left, expected_left), + (JoinType::Right, expected_right), + (JoinType::Full, expected_full), + (JoinType::LeftSemi, expected_left_semi), + (JoinType::LeftAnti, expected_left_anti), + (JoinType::RightSemi, expected_right_semi), + (JoinType::RightAnti, expected_right_anti), + ]; + + for (join_type, expected) in test_cases { + let (_, batches) = join_collect_with_partition_mode( + left.clone(), + right.clone(), + on.clone(), + &join_type, + PartitionMode::CollectLeft, + false, + task_ctx.clone(), + ) + .await?; + assert_batches_sorted_eq!(expected, &batches); + } + + Ok(()) + } + #[tokio::test] async fn join_date32() -> Result<()> { let schema = Arc::new(Schema::new(vec![