-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support OnDemandRepartitionExec
#55
base: apache_main
Are you sure you want to change the base?
Conversation
let partition = output_partition_rx.recv().await.map_err(|e| { | ||
internal_datafusion_err!( | ||
"Error receiving partition number from output partition: {}", | ||
e | ||
) | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channel to receive the related partition number from the poll
// Send partition number to input partitions | ||
if !self.sender.is_closed() { | ||
self.sender.send_blocking(self.partition).map_err(|e| { | ||
internal_datafusion_err!( | ||
"Error sending partition number to input partitions: {}", | ||
e | ||
) | ||
})?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send partition number to the channel when polling
7a75b6a
to
b4b267a
Compare
…obin RepartitionExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was requesting some stylistic changes, but then I realized the current code structure does not satisfy the poll-based criteria on OnDemandRepartitionExec
. As far as I can see, it's acting like the RepartitionExec
, pre-fetching the data to output channels but only sharing the partition info. So, I stopped reviewing there.
Please let me know if I'm missing anything, once the requirement is satisfied we can continue iterating.
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Outdated
Show resolved
Hide resolved
|
||
// While there are still outputs to send to, keep pulling inputs | ||
let mut batches_until_yield = partitioner.num_partitions(); | ||
while !output_channels.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like pre-fetching data from input and not poll-based, right? Am I missing anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reordered the process to receive the channel before the polling.
If you'd like to test this feature, you can simply construct a plan like CoalescePartitionExec
--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----MemoryExec: partitions=1, partition_sizes=[2] and writing a debug print at the first lines of Once you collect the stream from root, you need to see that prints from |
Hi @Weijun-H , Can you test with what @berkaysynnada suggested? I believe it will get us to a better state |
I wrote a test for this case use std::sync::Arc;
use arrow::array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::{
execution::TaskContext,
physical_plan::{
coalesce_partitions::CoalescePartitionsExec, common::collect, memory::MemoryExec,
repartition::on_demand_repartition::OnDemandRepartitionExec, ExecutionPlan,
},
};
use datafusion_physical_expr::Partitioning;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap();
let n = 2;
let partition: Vec<RecordBatch> = (0..n).map(|_| batch.clone()).collect();
let partitions = vec![partition.clone()];
let input =
Arc::new(MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap());
let exec =
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)).unwrap();
let coalesce_exec =
CoalescePartitionsExec::new(Arc::new(exec) as Arc<dyn ExecutionPlan>);
// execute the plan
let task_ctx = Arc::new(TaskContext::default());
let stream = coalesce_exec.execute(0, task_ctx)?;
let batches = collect(stream).await?;
Ok(())
}
|
/// Sender state | ||
is_requested: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the receiver is waiting for the data, but it has sent the partition ID previously. is_requested
is TRUE
if !self.sender.is_closed() && !self.is_requested { | ||
debug!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid repeated requests when Pending
c15eb6a
to
8cd65c7
Compare
Rationale for this change
Introduce
OnDemandRepartition
to support a pull-based OnDemand Repartition mode. This operator should wait for a poll from its downstream operators before fetching data, without pre-fetching. It won’t follow any specific partitioning rule but will send data to the partition that triggers the poll.Are these changes tested?
Yes
Benchmark
Didn't notice a significant reduction in degradation when enabling
OnDemandRepartition
and pull-basedSortPreservingMergeExec
.