Skip to content

Commit

Permalink
Memory limited nested-loop join (#5564)
Browse files Browse the repository at this point in the history
* memory limited nl join

* shared reservations as structs
  • Loading branch information
korowa authored Mar 14, 2023
1 parent 26eb406 commit 612eb1d
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 72 deletions.
5 changes: 0 additions & 5 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ use tokio::task::JoinHandle;
/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;

/// [`MemoryReservation`] used at query operator level
/// `Option` wrapper allows to initialize empty reservation in operator constructor,
/// and set it to actual reservation at stream level.
pub(crate) type OperatorMemoryReservation = Arc<Mutex<Option<SharedMemoryReservation>>>;

/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
Expand Down
28 changes: 7 additions & 21 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryConsumer;
use crate::physical_plan::common::{OperatorMemoryReservation, SharedMemoryReservation};
use crate::execution::memory_pool::{SharedOptionalMemoryReservation, TryGrow};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
Expand All @@ -38,7 +37,6 @@ use crate::physical_plan::{
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use parking_lot::Mutex;

use super::utils::{
adjust_right_output_partitioning, cross_join_equivalence_properties,
Expand All @@ -61,7 +59,7 @@ pub struct CrossJoinExec {
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Memory reservation for build-side data
reservation: OperatorMemoryReservation,
reservation: SharedOptionalMemoryReservation,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
}
Expand Down Expand Up @@ -106,7 +104,7 @@ async fn load_left_input(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: SharedMemoryReservation,
reservation: SharedOptionalMemoryReservation,
) -> Result<JoinLeftData> {
// merge all left parts into a single stream
let merge = {
Expand All @@ -125,7 +123,7 @@ async fn load_left_input(
|mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.lock().try_grow(batch_size)?;
acc.3.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down Expand Up @@ -226,27 +224,15 @@ impl ExecutionPlan for CrossJoinExec {
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);

// Initialization of operator-level reservation
{
let mut reservation_lock = self.reservation.lock();
if reservation_lock.is_none() {
*reservation_lock = Some(Arc::new(Mutex::new(
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()),
)));
};
}

let reservation = self.reservation.lock().clone().ok_or_else(|| {
DataFusionError::Internal(
"Operator-level memory reservation is not initialized".to_string(),
)
})?;
self.reservation
.initialize("CrossJoinExec", context.memory_pool());

let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
reservation,
self.reservation.clone(),
)
});

Expand Down
43 changes: 17 additions & 26 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use hashbrown::raw::RawTable;
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
common::{OperatorMemoryReservation, SharedMemoryReservation},
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
Expand All @@ -78,7 +77,12 @@ use crate::logical_expr::JoinType;

use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
use crate::execution::{
context::TaskContext,
memory_pool::{
MemoryConsumer, SharedMemoryReservation, SharedOptionalMemoryReservation, TryGrow,
},
};

use super::{
utils::{OnceAsync, OnceFut},
Expand All @@ -88,7 +92,6 @@ use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
};
use parking_lot::Mutex;
use std::fmt;
use std::task::Poll;

Expand Down Expand Up @@ -137,7 +140,7 @@ pub struct HashJoinExec {
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Operator-level memory reservation for left data
reservation: OperatorMemoryReservation,
reservation: SharedOptionalMemoryReservation,
/// Shares the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Partitioning mode to use
Expand Down Expand Up @@ -378,26 +381,14 @@ impl ExecutionPlan for HashJoinExec {
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);

// Initialization of operator-level reservation
{
let mut operator_reservation_lock = self.reservation.lock();
if operator_reservation_lock.is_none() {
*operator_reservation_lock = Some(Arc::new(Mutex::new(
MemoryConsumer::new("HashJoinExec").register(context.memory_pool()),
)));
};
}

let operator_reservation = self.reservation.lock().clone().ok_or_else(|| {
DataFusionError::Internal(
"Operator-level memory reservation is not initialized".to_string(),
)
})?;
self.reservation
.initialize("HashJoinExec", context.memory_pool());

// Inititalization of stream-level reservation
let reservation = Arc::new(Mutex::new(
let reservation = SharedMemoryReservation::from(
MemoryConsumer::new(format!("HashJoinStream[{partition}]"))
.register(context.memory_pool()),
));
);

// Memory reservation for left-side data depends on PartitionMode:
// - operator-level for `CollectLeft` mode
Expand All @@ -415,7 +406,7 @@ impl ExecutionPlan for HashJoinExec {
on_left.clone(),
context.clone(),
join_metrics.clone(),
operator_reservation.clone(),
Arc::new(self.reservation.clone()),
)
}),
PartitionMode::Partitioned => OnceFut::new(collect_left_input(
Expand All @@ -425,7 +416,7 @@ impl ExecutionPlan for HashJoinExec {
on_left.clone(),
context.clone(),
join_metrics.clone(),
reservation.clone(),
Arc::new(reservation.clone()),
)),
PartitionMode::Auto => {
return Err(DataFusionError::Plan(format!(
Expand Down Expand Up @@ -497,7 +488,7 @@ async fn collect_left_input(
on_left: Vec<Column>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: SharedMemoryReservation,
reservation: Arc<dyn TryGrow>,
) -> Result<JoinLeftData> {
let schema = left.schema();

Expand Down Expand Up @@ -526,7 +517,7 @@ async fn collect_left_input(
.try_fold(initial, |mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.lock().try_grow(batch_size)?;
acc.3.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down Expand Up @@ -555,7 +546,7 @@ async fn collect_left_input(
// + 16 bytes fixed
let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;

reservation.lock().try_grow(estimated_hastable_size)?;
reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);

let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
Expand Down Expand Up @@ -1157,7 +1148,7 @@ impl HashJoinStream {
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
self.reservation.lock().try_grow(visited_bitmap_size)?;
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}

Expand Down
Loading

0 comments on commit 612eb1d

Please sign in to comment.