Skip to content

Commit

Permalink
chore
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Feb 6, 2025
1 parent 1cb85dd commit e6c37c5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
18 changes: 14 additions & 4 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,20 @@ fn replace_round_robin_repartition_with_on_demand(
if let Some(repartition) = context.plan.as_any().downcast_ref::<RepartitionExec>() {
if let Partitioning::RoundRobinBatch(n) = repartition.partitioning() {
let child_plan = Arc::clone(&context.children[0].plan);
context.plan = Arc::new(OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?);
context.plan = if repartition.preserve_order() {
Arc::new(
OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?
.with_preserve_order(),
)
} else {
Arc::new(OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?)
};
return Ok(context);
}
}
Expand Down
19 changes: 10 additions & 9 deletions datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ impl ExecutionPlan for OnDemandRepartitionExec {

let stream = futures::stream::once(async move {
let num_input_partitions = input.output_partitioning().partition_count();

let input_captured = Arc::clone(&input);
let metrics_captured = metrics.clone();
let name_captured = name.clone();
Expand Down Expand Up @@ -483,8 +482,8 @@ impl OnDemandRepartitionExec {
partition,
buffer_tx,
Arc::clone(&context),
metrics.fetch_time,
metrics.send_buffer_time,
metrics.fetch_time.clone(),
metrics.send_buffer_time.clone(),
));

// While there are still outputs to send to, keep pulling inputs
Expand Down Expand Up @@ -621,10 +620,11 @@ impl Stream for OnDemandPerPartitionStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if !self.is_requested && !self.sender.is_closed() {
self.sender.try_send(self.partition).map_err(|_| {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to the receiver for partition {}",
self.partition
"Error sending partition number to the receiver for partition {}: {}",
self.partition,
e
)
})?;
self.is_requested = true;
Expand Down Expand Up @@ -693,10 +693,11 @@ impl Stream for OnDemandRepartitionStream {
loop {
// Send partition number to input partitions
if !self.is_requested && !self.sender.is_closed() {
self.sender.try_send(self.partition).map_err(|_| {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to the receiver for partition {}",
self.partition
"Error sending partition number to the receiver for partition {}: {}",
self.partition,
e
)
})?;
self.is_requested = true;
Expand Down

0 comments on commit e6c37c5

Please sign in to comment.