diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 408a2b13c57b..adcac4bca064 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -26,7 +26,7 @@ use std::{any::Any, usize, vec}; use crate::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, calculate_join_output_ordering, get_final_indices_from_bit_map, - need_produce_result_in_final, JoinHashMap, JoinHashMapType, + need_produce_result_in_final, JoinHashMap, JoinHashMapOffset, JoinHashMapType, }; use crate::{ coalesce_partitions::CoalescePartitionsExec, @@ -61,7 +61,8 @@ use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; use datafusion_common::{ - exec_err, internal_err, plan_err, DataFusionError, JoinSide, JoinType, Result, + internal_datafusion_err, internal_err, plan_err, DataFusionError, JoinSide, JoinType, + Result, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -911,16 +912,10 @@ enum HashJoinStreamState { Completed, } -/// Container for HashJoinStreamState::ProcessProbeBatch related data -struct ProcessProbeBatchState { - /// Current probe-side batch - batch: RecordBatch, -} - impl HashJoinStreamState { /// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum. /// Returns an error if state is not ProcessProbeBatchState. - fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> { + fn try_as_process_probe_batch_mut(&mut self) -> Result<&mut ProcessProbeBatchState> { match self { HashJoinStreamState::ProcessProbeBatch(state) => Ok(state), _ => internal_err!("Expected hash join stream in ProcessProbeBatch state"), @@ -928,6 +923,25 @@ impl HashJoinStreamState { } } +/// Container for HashJoinStreamState::ProcessProbeBatch related data +struct ProcessProbeBatchState { + /// Current probe-side batch + batch: RecordBatch, + /// Matching offset + offset: JoinHashMapOffset, + /// Max joined probe-side index from current batch + joined_probe_idx: Option, +} + +impl ProcessProbeBatchState { + fn advance(&mut self, offset: JoinHashMapOffset, joined_probe_idx: Option) { + self.offset = offset; + if joined_probe_idx.is_some() { + self.joined_probe_idx = joined_probe_idx; + } + } +} + /// [`Stream`] for [`HashJoinExec`] that does the actual join. /// /// This stream: @@ -973,7 +987,9 @@ impl RecordBatchStream for HashJoinStream { } } -/// Returns build/probe indices satisfying the equality condition. +/// Lookups by hash agaist JoinHashMap and resolves potential hash collisions. +/// Returns build/probe indices satisfying the equality condition, along with +/// starting point for next iteration. /// /// # Example /// @@ -1019,7 +1035,7 @@ impl RecordBatchStream for HashJoinStream { /// Probe indices: 3, 3, 4, 5 /// ``` #[allow(clippy::too_many_arguments)] -pub(crate) fn build_equal_condition_join_indices( +fn lookup_join_hashmap( build_hashmap: &T, build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, @@ -1027,12 +1043,9 @@ pub(crate) fn build_equal_condition_join_indices( probe_on: &[Column], random_state: &RandomState, null_equals_null: bool, - hashes_buffer: &mut Vec, - filter: Option<&JoinFilter>, - build_side: JoinSide, - deleted_offset: Option, - fifo_hashmap: bool, -) -> Result<(UInt64Array, UInt32Array)> { + limit: usize, + offset: JoinHashMapOffset, +) -> Result<(UInt64Array, UInt32Array, Option)> { let keys_values = probe_on .iter() .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())) @@ -1044,78 +1057,32 @@ pub(crate) fn build_equal_condition_join_indices( .into_array(build_input_buffer.num_rows()) }) .collect::>>()?; - hashes_buffer.clear(); - hashes_buffer.resize(probe_batch.num_rows(), 0); - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // In case build-side input has not been inverted while JoinHashMap creation, the chained list algorithm - // will return build indices for each probe row in a reverse order as such: - // Build Indices: [5, 4, 3] - // Probe Indices: [1, 1, 1] - // - // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. - // Let's consider probe rows [0,1] as an example: - // - // When the probe iteration sequence is reversed, the following pairings can be derived: - // - // For probe row 1: - // (5, 1) - // (4, 1) - // (3, 1) - // - // For probe row 0: - // (5, 0) - // (4, 0) - // (3, 0) - // - // After reversing both sets of indices, we obtain reversed indices: - // - // (3,0) - // (4,0) - // (5,0) - // (3,1) - // (4,1) - // (5,1) - // - // With this approach, the lexicographic order on both the probe side and the build side is preserved. - let (mut probe_indices, mut build_indices) = if fifo_hashmap { - build_hashmap.get_matched_indices(hash_values.iter().enumerate(), deleted_offset) - } else { - let (mut matched_probe, mut matched_build) = build_hashmap - .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); + let mut hashes_buffer = vec![0; probe_batch.num_rows()]; + let hash_values = create_hashes(&keys_values, random_state, &mut hashes_buffer)?; - matched_probe.as_slice_mut().reverse(); - matched_build.as_slice_mut().reverse(); - - (matched_probe, matched_build) - }; - - let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); - let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); + let (mut probe_builder, mut build_builder, next_offset) = build_hashmap + .get_matched_indices_with_limit_offset( + hash_values.iter().enumerate(), + None, + limit, + offset, + ); - let (left, right) = if let Some(filter) = filter { - // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` - apply_join_filter_to_indices( - build_input_buffer, - probe_batch, - left, - right, - filter, - build_side, - )? - } else { - (left, right) - }; + let build_indices: UInt64Array = + PrimitiveArray::new(build_builder.finish().into(), None); + let probe_indices: UInt32Array = + PrimitiveArray::new(probe_builder.finish().into(), None); - let matched_indices = equal_rows_arr( - &left, - &right, + let (build_indices, probe_indices) = equal_rows_arr( + &build_indices, + &probe_indices, &build_join_values, &keys_values, null_equals_null, )?; - Ok((matched_indices.0, matched_indices.1)) + Ok((build_indices, probe_indices, next_offset)) } // version of eq_dyn supporting equality on null arrays @@ -1263,6 +1230,8 @@ impl HashJoinStream { self.state = HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch, + offset: (0, None), + joined_probe_idx: None, }); } Some(Err(err)) => return Poll::Ready(Err(err)), @@ -1277,16 +1246,15 @@ impl HashJoinStream { fn process_probe_batch( &mut self, ) -> Result>> { - let state = self.state.try_as_process_probe_batch()?; + let state = self.state.try_as_process_probe_batch_mut()?; let build_side = self.build_side.try_as_ready_mut()?; self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(state.batch.num_rows()); let timer = self.join_metrics.join_time.timer(); - let mut hashes_buffer = vec![]; - // get the matched two indices for the on condition - let left_right_indices = build_equal_condition_join_indices( + // get the matched by join keys indices + let (left_indices, right_indices, next_offset) = lookup_join_hashmap( build_side.left_data.hash_map(), build_side.left_data.batch(), &state.batch, @@ -1294,53 +1262,102 @@ impl HashJoinStream { &self.on_right, &self.random_state, self.null_equals_null, - &mut hashes_buffer, - self.filter.as_ref(), - JoinSide::Left, - None, - true, - ); + self.batch_size, + state.offset, + )?; - let result = match left_right_indices { - Ok((left_side, right_side)) => { - // set the left bitmap - // and only left, full, left semi, left anti need the left bitmap - if need_produce_result_in_final(self.join_type) { - left_side.iter().flatten().for_each(|x| { - build_side.visited_left_side.set_bit(x as usize, true); - }); - } + // apply join filters if exists + let (left_indices, right_indices) = if let Some(filter) = &self.filter { + // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` + apply_join_filter_to_indices( + build_side.left_data.batch(), + &state.batch, + left_indices, + right_indices, + filter, + JoinSide::Left, + )? + } else { + (left_indices, right_indices) + }; - // adjust the two side indices base on the join type - let (left_side, right_side) = adjust_indices_by_join_type( - left_side, - right_side, - 0..state.batch.num_rows(), - self.join_type, - ); + // mark joined left-side indices as visited, if required by join type + if need_produce_result_in_final(self.join_type) { + left_indices.iter().flatten().for_each(|x| { + build_side.visited_left_side.set_bit(x as usize, true); + }); + } - let result = build_batch_from_indices( - &self.schema, - build_side.left_data.batch(), - &state.batch, - &left_side, - &right_side, - &self.column_indices, - JoinSide::Left, - ); - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(state.batch.num_rows()); - result - } - Err(err) => { - exec_err!("Fail to build join indices in HashJoinExec, error:{err}") - } + // check if probe batch scanned based on `next_offset` returned from lookup function + let probe_batch_scanned = next_offset.is_none() + || next_offset.is_some_and(|(probe_idx, build_idx)| { + probe_idx + 1 >= state.batch.num_rows() + && build_idx.is_some_and(|v| v == 0) + }); + + // The goals of index alignment for different join types are: + // + // 1) Right & FullJoin -- to append all missing probe-side indices between + // previous (excluding) and current joined indices. + // 2) SemiJoin -- deduplicate probe indices in range between previous + // (excluding) and current joined indices. + // 3) AntiJoin -- return only missing indices in range between + // previous and current joined indices. + // Inclusion/exclusion of the indices themselves don't matter + // + // As a summary -- alignment range can be produced based only on + // joined (matched with filters applied) probe side indices, excluding starting one + // (left from previous iteration). + + // if any rows have been joined -- get last joined probe-side (right) row + // it's important that index counts as "joined" after hash collisions checks + // and join filters applied. + let last_joined_right_idx = match right_indices.len() { + 0 => None, + n => Some(right_indices.value(n - 1) as usize), + }; + + // Calculate range and perform alignment. + // In case probe batch has been processed -- align all remaining rows. + let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v| v + 1); + let index_alignment_range_end = if probe_batch_scanned { + state.batch.num_rows() + } else { + last_joined_right_idx.map_or(0, |v| v + 1) }; + + let (left_indices, right_indices) = adjust_indices_by_join_type( + left_indices, + right_indices, + index_alignment_range_start..index_alignment_range_end, + self.join_type, + ); + + let result = build_batch_from_indices( + &self.schema, + build_side.left_data.batch(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + )?; + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(state.batch.num_rows()); timer.done(); - self.state = HashJoinStreamState::FetchProbeBatch; + if probe_batch_scanned { + self.state = HashJoinStreamState::FetchProbeBatch; + } else { + state.advance( + next_offset + .ok_or_else(|| internal_datafusion_err!("unexpected None offset"))?, + last_joined_right_idx, + ) + }; - Ok(StatefulStreamResult::Ready(Some(result?))) + Ok(StatefulStreamResult::Ready(Some(result))) } /// Processes unmatched build-side rows for certain join types and produces output batch @@ -1406,15 +1423,15 @@ mod tests { use super::*; use crate::{ - common, expressions::Column, hash_utils::create_hashes, - joins::hash_join::build_equal_condition_join_indices, memory::MemoryExec, + common, expressions::Column, hash_utils::create_hashes, memory::MemoryExec, repartition::RepartitionExec, test::build_table_i32, test::exec::MockExec, }; use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, assert_contains, ScalarValue, + assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, + ScalarValue, }; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -2914,7 +2931,7 @@ mod tests { let join_hash_map = JoinHashMap::new(hashmap_left, next); - let (l, r) = build_equal_condition_join_indices( + let (l, r, _) = lookup_join_hashmap( &join_hash_map, &left, &right, @@ -2922,11 +2939,8 @@ mod tests { &[Column::new("a", 0)], &random_state, false, - &mut vec![0; right.num_rows()], - None, - JoinSide::Left, - None, - false, + 8192, + (0, None), )?; let mut left_ids = UInt64Builder::with_capacity(0); @@ -3314,26 +3328,26 @@ mod tests { "+----+----+----+----+----+----+", "| a1 | b1 | c1 | a2 | b2 | c2 |", "+----+----+----+----+----+----+", - "| 4 | 1 | 0 | 10 | 1 | 0 |", - "| 3 | 1 | 0 | 10 | 1 | 0 |", - "| 2 | 1 | 0 | 10 | 1 | 0 |", "| 1 | 1 | 0 | 10 | 1 | 0 |", - "| 4 | 1 | 0 | 20 | 1 | 0 |", - "| 3 | 1 | 0 | 20 | 1 | 0 |", - "| 2 | 1 | 0 | 20 | 1 | 0 |", + "| 2 | 1 | 0 | 10 | 1 | 0 |", + "| 3 | 1 | 0 | 10 | 1 | 0 |", + "| 4 | 1 | 0 | 10 | 1 | 0 |", "| 1 | 1 | 0 | 20 | 1 | 0 |", - "| 4 | 1 | 0 | 30 | 1 | 0 |", - "| 3 | 1 | 0 | 30 | 1 | 0 |", - "| 2 | 1 | 0 | 30 | 1 | 0 |", + "| 2 | 1 | 0 | 20 | 1 | 0 |", + "| 3 | 1 | 0 | 20 | 1 | 0 |", + "| 4 | 1 | 0 | 20 | 1 | 0 |", "| 1 | 1 | 0 | 30 | 1 | 0 |", - "| 4 | 1 | 0 | 40 | 1 | 0 |", - "| 3 | 1 | 0 | 40 | 1 | 0 |", - "| 2 | 1 | 0 | 40 | 1 | 0 |", + "| 2 | 1 | 0 | 30 | 1 | 0 |", + "| 3 | 1 | 0 | 30 | 1 | 0 |", + "| 4 | 1 | 0 | 30 | 1 | 0 |", "| 1 | 1 | 0 | 40 | 1 | 0 |", - "| 4 | 1 | 0 | 50 | 1 | 0 |", - "| 3 | 1 | 0 | 50 | 1 | 0 |", - "| 2 | 1 | 0 | 50 | 1 | 0 |", + "| 2 | 1 | 0 | 40 | 1 | 0 |", + "| 3 | 1 | 0 | 40 | 1 | 0 |", + "| 4 | 1 | 0 | 40 | 1 | 0 |", "| 1 | 1 | 0 | 50 | 1 | 0 |", + "| 2 | 1 | 0 | 50 | 1 | 0 |", + "| 3 | 1 | 0 | 50 | 1 | 0 |", + "| 4 | 1 | 0 | 50 | 1 | 0 |", "+----+----+----+----+----+----+", ]; let left_batch = [ diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 7a3db04d8255..e7c267817708 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -32,7 +32,7 @@ use std::task::Poll; use std::{usize, vec}; use crate::common::SharedMemoryReservation; -use crate::joins::hash_join::{build_equal_condition_join_indices, update_hash}; +use crate::joins::hash_join::{equal_rows_arr, update_hash}; use crate::joins::stream_join_utils::{ calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, @@ -41,22 +41,26 @@ use crate::joins::stream_join_utils::{ StreamJoinMetrics, }; use crate::joins::utils::{ - build_batch_from_indices, build_join_schema, check_join_is_valid, - partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, - StatefulStreamResult, + apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, + check_join_is_valid, partitioned_join_output_partitioning, ColumnIndex, JoinFilter, + JoinHashMapType, JoinOn, StatefulStreamResult, }; use crate::{ expressions::{Column, PhysicalSortExpr}, joins::StreamJoinPartitionMode, metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, - Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, + Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder}; +use arrow::array::{ + ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array, + UInt64Array, +}; use arrow::compute::concat_batches; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::bisect; use datafusion_common::{ internal_err, plan_err, DataFusionError, JoinSide, JoinType, Result, @@ -759,7 +763,7 @@ pub(crate) fn join_with_probe_batch( if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(None); } - let (build_indices, probe_indices) = build_equal_condition_join_indices( + let (build_indices, probe_indices) = lookup_join_hashmap( &build_hash_joiner.hashmap, &build_hash_joiner.input_buffer, probe_batch, @@ -768,12 +772,22 @@ pub(crate) fn join_with_probe_batch( random_state, null_equals_null, &mut build_hash_joiner.hashes_buffer, - filter, - build_hash_joiner.build_side, Some(build_hash_joiner.deleted_offset), - false, )?; + let (build_indices, probe_indices) = if let Some(filter) = filter { + apply_join_filter_to_indices( + &build_hash_joiner.input_buffer, + probe_batch, + build_indices, + probe_indices, + filter, + build_hash_joiner.build_side, + )? + } else { + (build_indices, probe_indices) + }; + if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( &mut build_hash_joiner.visited_rows, @@ -810,6 +824,103 @@ pub(crate) fn join_with_probe_batch( } } +/// This method performs lookups against JoinHashMap by hash values of join-key columns, and handles potential +/// hash collisions. +/// +/// # Arguments +/// +/// * `build_hashmap` - hashmap collected from build side data. +/// * `build_batch` - Build side record batch. +/// * `probe_batch` - Probe side record batch. +/// * `build_on` - An array of columns on which the join will be performed. The columns are from the build side of the join. +/// * `probe_on` - An array of columns on which the join will be performed. The columns are from the probe side of the join. +/// * `random_state` - The random state for the join. +/// * `null_equals_null` - A boolean indicating whether NULL values should be treated as equal when joining. +/// * `hashes_buffer` - Buffer used for probe side keys hash calculation. +/// * `probe_batch` - The second record batch to be joined. +/// * `column_indices` - An array of columns to be selected for the result of the join. +/// * `deleted_offset` - deleted offset for build side data. +/// +/// # Returns +/// +/// A [Result] containing a tuple with two equal length arrays, representing indices of rows from build and probe side, +/// matched by join key columns. +#[allow(clippy::too_many_arguments)] +fn lookup_join_hashmap( + build_hashmap: &PruningJoinHashMap, + build_batch: &RecordBatch, + probe_batch: &RecordBatch, + build_on: &[Column], + probe_on: &[Column], + random_state: &RandomState, + null_equals_null: bool, + hashes_buffer: &mut Vec, + deleted_offset: Option, +) -> Result<(UInt64Array, UInt32Array)> { + let keys_values = probe_on + .iter() + .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())) + .collect::>>()?; + let build_join_values = build_on + .iter() + .map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows())) + .collect::>>()?; + hashes_buffer.clear(); + hashes_buffer.resize(probe_batch.num_rows(), 0); + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + + // As SymmetricHashJoin uses LIFO JoinHashMap, the chained list algorithm + // will return build indices for each probe row in a reverse order as such: + // Build Indices: [5, 4, 3] + // Probe Indices: [1, 1, 1] + // + // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. + // Let's consider probe rows [0,1] as an example: + // + // When the probe iteration sequence is reversed, the following pairings can be derived: + // + // For probe row 1: + // (5, 1) + // (4, 1) + // (3, 1) + // + // For probe row 0: + // (5, 0) + // (4, 0) + // (3, 0) + // + // After reversing both sets of indices, we obtain reversed indices: + // + // (3,0) + // (4,0) + // (5,0) + // (3,1) + // (4,1) + // (5,1) + // + // With this approach, the lexicographic order on both the probe side and the build side is preserved. + let (mut matched_probe, mut matched_build) = build_hashmap + .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); + + matched_probe.as_slice_mut().reverse(); + matched_build.as_slice_mut().reverse(); + + let build_indices: UInt64Array = + PrimitiveArray::new(matched_build.finish().into(), None); + let probe_indices: UInt32Array = + PrimitiveArray::new(matched_probe.finish().into(), None); + + let (build_indices, probe_indices) = equal_rows_arr( + &build_indices, + &probe_indices, + &build_join_values, + &keys_values, + null_equals_null, + )?; + + Ok((build_indices, probe_indices)) +} + pub struct OneSideHashJoiner { /// Build side build_side: JoinSide, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 36c7143ee0d8..a6d784502cdc 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -138,6 +138,8 @@ impl JoinHashMap { } } +pub(crate) type JoinHashMapOffset = (usize, Option); + // Trait defining methods that must be implemented by a hash map type to be used for joins. pub trait JoinHashMapType { /// The type of list used to store the next list @@ -226,6 +228,85 @@ pub trait JoinHashMapType { (input_indices, match_indices) } + + /// Matches hashes with taking limit and offset into account. + /// Returns pairs of matched indices along with the starting point for next + /// matching iteration (`None` if limit has not been reached). + /// + /// This method only compares hashes, so additional further check for actual values + /// equality may be required. + fn get_matched_indices_with_limit_offset<'a>( + &self, + iter: impl Iterator, + deleted_offset: Option, + limit: usize, + offset: JoinHashMapOffset, + ) -> ( + UInt32BufferBuilder, + UInt64BufferBuilder, + Option, + ) { + let mut input_indices = UInt32BufferBuilder::new(0); + let mut match_indices = UInt64BufferBuilder::new(0); + + let mut output_tuples = 0_usize; + let mut next_offset = None; + + let hash_map: &RawTable<(u64, u64)> = self.get_map(); + let next_chain = self.get_list(); + + let (initial_idx, initial_next_idx) = offset; + 'probe: for (row_idx, hash_value) in iter.skip(initial_idx) { + let index = if initial_next_idx.is_some() && row_idx == initial_idx { + // If `initial_next_idx` is zero, then input index has been processed + // during previous iteration, and it can be skipped now + if let Some(0) = initial_next_idx { + continue; + } + // Otherwise, use `initial_next_idx` as-is + initial_next_idx + } else if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) + { + Some(*index) + } else { + None + }; + + if let Some(index) = index { + let mut i = index - 1; + loop { + let match_row_idx = if let Some(offset) = deleted_offset { + // This arguments means that we prune the next index way before here. + if i < offset as u64 { + // End of the list due to pruning + break; + } + i - offset as u64 + } else { + i + }; + match_indices.append(match_row_idx); + input_indices.append(row_idx as u32); + output_tuples += 1; + // Follow the chain to get the next index value + let next = next_chain[match_row_idx as usize]; + + if output_tuples >= limit { + next_offset = Some((row_idx, Some(next))); + break 'probe; + } + if next == 0 { + // end of list + break; + } + i = next - 1; + } + } + } + + (input_indices, match_indices, next_offset) + } } /// Implementation of `JoinHashMapType` for `JoinHashMap`. diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index e5d4c25f48c8..1312f2916ed6 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2 ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -1 3 95 0 -1 3 93 0 -1 3 92 0 -1 3 81 0 -1 3 76 0 +0 0 0 0 +0 0 2 0 +0 0 3 0 +0 0 6 0 +0 0 20 0 query TT EXPLAIN SELECT t2.a as a2, t2.b