diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 00aa1be73089..70846cb0098a 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1071,10 +1071,20 @@ fn replace_round_robin_repartition_with_on_demand( if let Some(repartition) = context.plan.as_any().downcast_ref::() { if let Partitioning::RoundRobinBatch(n) = repartition.partitioning() { let child_plan = Arc::clone(&context.children[0].plan); - context.plan = Arc::new(OnDemandRepartitionExec::try_new( - child_plan, - Partitioning::OnDemand(*n), - )?); + context.plan = if repartition.preserve_order() { + Arc::new( + OnDemandRepartitionExec::try_new( + child_plan, + Partitioning::OnDemand(*n), + )? + .with_preserve_order(), + ) + } else { + Arc::new(OnDemandRepartitionExec::try_new( + child_plan, + Partitioning::OnDemand(*n), + )?) + }; return Ok(context); } } diff --git a/datafusion/physical-plan/src/repartition/on_demand_repartition.rs b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs index ea7516aeaa71..6f10b29b315f 100644 --- a/datafusion/physical-plan/src/repartition/on_demand_repartition.rs +++ b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs @@ -239,7 +239,6 @@ impl ExecutionPlan for OnDemandRepartitionExec { let stream = futures::stream::once(async move { let num_input_partitions = input.output_partitioning().partition_count(); - let input_captured = Arc::clone(&input); let metrics_captured = metrics.clone(); let name_captured = name.clone(); @@ -483,8 +482,8 @@ impl OnDemandRepartitionExec { partition, buffer_tx, Arc::clone(&context), - metrics.fetch_time, - metrics.send_buffer_time, + metrics.fetch_time.clone(), + metrics.send_buffer_time.clone(), )); // While there are still outputs to send to, keep pulling inputs @@ -621,10 +620,11 @@ impl Stream for OnDemandPerPartitionStream { cx: &mut Context<'_>, ) -> Poll> { if !self.is_requested && !self.sender.is_closed() { - self.sender.try_send(self.partition).map_err(|_| { + self.sender.send_blocking(self.partition).map_err(|e| { internal_datafusion_err!( - "Error sending partition number to the receiver for partition {}", - self.partition + "Error sending partition number to the receiver for partition {}: {}", + self.partition, + e ) })?; self.is_requested = true; @@ -693,10 +693,11 @@ impl Stream for OnDemandRepartitionStream { loop { // Send partition number to input partitions if !self.is_requested && !self.sender.is_closed() { - self.sender.try_send(self.partition).map_err(|_| { + self.sender.send_blocking(self.partition).map_err(|e| { internal_datafusion_err!( - "Error sending partition number to the receiver for partition {}", - self.partition + "Error sending partition number to the receiver for partition {}: {}", + self.partition, + e ) })?; self.is_requested = true;