Skip to content

Commit

Permalink
Clean up spawned task on CoalescePartitionsExec drop
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Oct 15, 2021
1 parent 4159a5c commit adc1fde
Showing 1 changed file with 36 additions and 4 deletions.
40 changes: 36 additions & 4 deletions datafusion/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use async_trait::async_trait;
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::common::AbortOnDropMany;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -129,14 +130,20 @@ impl ExecutionPlan for CoalescePartitionsExec {

// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
let mut join_handles = Vec::with_capacity(input_partitions);
for part_i in 0..input_partitions {
spawn_execution(self.input.clone(), sender.clone(), part_i);
join_handles.push(spawn_execution(
self.input.clone(),
sender.clone(),
part_i,
));
}

Ok(Box::pin(MergeStream {
input: receiver,
schema: self.schema(),
baseline_metrics,
drop_helper: AbortOnDropMany(join_handles),
}))
}
}
Expand Down Expand Up @@ -168,7 +175,8 @@ pin_project! {
schema: SchemaRef,
#[pin]
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
baseline_metrics: BaselineMetrics
baseline_metrics: BaselineMetrics,
drop_helper: AbortOnDropMany<()>,
}
}

Expand All @@ -194,11 +202,15 @@ impl RecordBatchStream for MergeStream {
#[cfg(test)]
mod tests {

use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;

use super::*;
use crate::datasource::object_store::local::LocalFileSystem;
use crate::physical_plan::common;
use crate::physical_plan::file_format::CsvExec;
use crate::test;
use crate::physical_plan::{collect, common};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};

#[tokio::test]
async fn merge() -> Result<()> {
Expand Down Expand Up @@ -238,4 +250,24 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
let refs = blocking_exec.refs();
let coaelesce_partitions_exec =
Arc::new(CoalescePartitionsExec::new(blocking_exec));

let fut = collect(coaelesce_partitions_exec);
let mut fut = fut.boxed();

assert_is_pending(&mut fut);
drop(fut);
assert_strong_count_converges_to_zero(refs).await;

Ok(())
}
}

0 comments on commit adc1fde

Please sign in to comment.