diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 4e224ea65d196..90a6e05d5cc4b 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -56,11 +56,12 @@ use arrow::array::{ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; -use arrow::compute::{and, concat_batches, take, FilterBuilder}; +use arrow::compute::{and, concat, concat_batches, take, FilterBuilder}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_array::cast::downcast_array; +use arrow_array::new_empty_array; use arrow_schema::ArrowError; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ @@ -896,16 +897,18 @@ async fn collect_left_input( // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = stream + let initial = (Vec::new(), 0, 0, metrics, reservation); + let (batches, num_rows, batches_size, metrics, mut reservation) = stream .try_fold(initial, |mut acc, batch| async { let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.4.try_grow(batch_size)?; // Update metrics - acc.2.build_mem_used.add(batch_size); - acc.2.build_input_batches.add(1); - acc.2.build_input_rows.add(batch.num_rows()); + acc.3.build_mem_used.add(batch_size); + acc.3.build_input_batches.add(1); + acc.3.build_input_rows.add(batch.num_rows()); + // Update total batches size + acc.2 += batch_size; // Update row count acc.1 += batch.num_rows(); // Push batch to output @@ -914,6 +917,56 @@ async fn collect_left_input( }) .await?; + let batches_iter = batches.iter().rev(); + + // Collecting build-side join keys values + let left_values = if batches.is_empty() { + on_left + .iter() + .map(|expr| Ok(new_empty_array(&expr.data_type(&schema)?))) + .collect::>>()? + } else { + on_left + .iter() + .map(|expr| { + // Evaluate build-side join key expression on each batch, gradually + // reserving additional memory + let mut join_key_size = 0; + let join_key_arrays = batches_iter + .clone() + .map(|batch| { + let array: Arc = + expr.evaluate(batch).unwrap().into_array(batch.num_rows())?; + let array_size = array.get_array_memory_size(); + reservation.try_grow(array_size)?; + join_key_size += array_size; + Ok(array) + }) + .collect::>>()?; + + // Merge all keys into a single array, reserving memory for the copy, + // of source arrays (assuming worst case scenario) + reservation.try_grow(join_key_size)?; + let concatenated = concat( + join_key_arrays + .iter() + .map(AsRef::as_ref) + .collect::>() + .as_ref(), + )?; + + // Drop unconcatenated data and adjust memory reservation for single array + drop(join_key_arrays); + let build_side_mem = reservation.size() - join_key_size * 2 + + concatenated.get_array_memory_size(); + reservation.resize(build_side_mem); + metrics.build_mem_used.set(build_side_mem); + + Ok(concatenated) + }) + .collect::>>()? + }; + // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` let fixed_size = size_of::(); @@ -928,7 +981,6 @@ async fn collect_left_input( let mut offset = 0; // Updating hashmap starting from the last batch - let batches_iter = batches.iter().rev(); for batch in batches_iter.clone() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); @@ -944,9 +996,19 @@ async fn collect_left_input( )?; offset += batch.num_rows(); } - // Merge all batches into a single batch, so we can directly index into the arrays + + // Merge all batches into a single batch, so we can directly index into the arrays, + // with prior reserving memory for the copy of source batches (assuming worst case scenario) + reservation.try_grow(batches_size)?; let single_batch = concat_batches(&schema, batches_iter)?; + // Drop original batches and adjust memory reservation for a single batch + drop(batches); + let build_side_mem = reservation.size() - batches_size * 2 + + get_record_batch_memory_size(&single_batch); + reservation.resize(build_side_mem); + metrics.build_mem_used.set(build_side_mem); + // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); @@ -960,14 +1022,6 @@ async fn collect_left_input( BooleanBufferBuilder::new(0) }; - let left_values = on_left - .iter() - .map(|c| { - c.evaluate(&single_batch)? - .into_array(single_batch.num_rows()) - }) - .collect::>>()?; - let data = JoinLeftData::new( hashmap, single_batch,