Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support OnDemandRepartitionExec #55

Open
wants to merge 10 commits into
base: apache_main
Choose a base branch
from

Conversation

Weijun-H
Copy link

@Weijun-H Weijun-H commented Dec 26, 2024

Rationale for this change

Introduce OnDemandRepartition to support a pull-based OnDemand Repartition mode. This operator should wait for a poll from its downstream operators before fetching data, without pre-fetching. It won’t follow any specific partitioning rule but will send data to the partition that triggers the poll.

image

Are these changes tested?

Yes

Benchmark

Didn't notice a significant reduction in degradation when enabling OnDemandRepartition and pull-based SortPreservingMergeExec.

╰─ ./benchmarks/bench.sh compare main on-demand-repartition                                                                                                                    ─╯
Comparing main and on-demand-repartition
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-repartition ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  71.95ms │               77.18ms │ 1.07x slower │
│ QQuery 2     │  12.61ms │               15.92ms │ 1.26x slower │
│ QQuery 3     │  21.49ms │               39.75ms │ 1.85x slower │
│ QQuery 4     │  11.73ms │               21.28ms │ 1.81x slower │
│ QQuery 5     │  34.53ms │               66.94ms │ 1.94x slower │
│ QQuery 6     │   4.49ms │                4.64ms │    no change │
│ QQuery 7     │  64.83ms │               72.47ms │ 1.12x slower │
│ QQuery 8     │  15.42ms │               24.25ms │ 1.57x slower │
│ QQuery 9     │  37.82ms │               69.19ms │ 1.83x slower │
│ QQuery 10    │  30.39ms │               49.11ms │ 1.62x slower │
│ QQuery 11    │   5.78ms │                7.36ms │ 1.27x slower │
│ QQuery 12    │  20.93ms │               35.99ms │ 1.72x slower │
│ QQuery 13    │  15.65ms │               27.11ms │ 1.73x slower │
│ QQuery 14    │   5.22ms │                5.15ms │    no change │
│ QQuery 15    │  11.38ms │               12.29ms │ 1.08x slower │
│ QQuery 16    │  12.59ms │               17.70ms │ 1.41x slower │
│ QQuery 17    │  55.50ms │               55.65ms │    no change │
│ QQuery 18    │ 118.56ms │              248.33ms │ 2.09x slower │
│ QQuery 19    │  23.99ms │               24.27ms │    no change │
│ QQuery 20    │  19.93ms │               24.66ms │ 1.24x slower │
│ QQuery 21    │  82.93ms │              166.59ms │ 2.01x slower │
│ QQuery 22    │  18.77ms │               41.70ms │ 2.22x slower │
└──────────────┴──────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                    │  696.48ms │
│ Total Time (on-demand-repartition)   │ 1107.51ms │
│ Average Time (main)                  │   31.66ms │
│ Average Time (on-demand-repartition) │   50.34ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │        18 │
│ Queries with No Change               │         4 │
└──────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 1385.78ms │             1545.95ms │  1.12x slower │
│ QQuery 2     │  136.45ms │              174.08ms │  1.28x slower │
│ QQuery 3     │  236.80ms │              412.59ms │  1.74x slower │
│ QQuery 4     │  141.06ms │              191.22ms │  1.36x slower │
│ QQuery 5     │  706.85ms │             1084.75ms │  1.53x slower │
│ QQuery 6     │  451.56ms │              506.01ms │  1.12x slower │
│ QQuery 7     │ 1550.27ms │             2115.31ms │  1.36x slower │
│ QQuery 8     │  305.93ms │              840.47ms │  2.75x slower │
│ QQuery 9     │ 1438.25ms │             2349.85ms │  1.63x slower │
│ QQuery 10    │  363.86ms │              807.39ms │  2.22x slower │
│ QQuery 11    │  109.83ms │              103.27ms │ +1.06x faster │
│ QQuery 12    │  328.10ms │              342.77ms │     no change │
│ QQuery 13    │  340.69ms │              383.25ms │  1.12x slower │
│ QQuery 14    │   82.82ms │              139.05ms │  1.68x slower │
│ QQuery 15    │  129.81ms │              129.59ms │     no change │
│ QQuery 16    │  110.76ms │              190.93ms │  1.72x slower │
│ QQuery 17    │  893.68ms │              865.50ms │     no change │
│ QQuery 18    │ 4222.16ms │             6652.17ms │  1.58x slower │
│ QQuery 19    │  808.09ms │              794.23ms │     no change │
│ QQuery 20    │  201.36ms │              289.73ms │  1.44x slower │
│ QQuery 21    │ 1914.64ms │             3058.73ms │  1.60x slower │
│ QQuery 22    │   98.37ms │              160.41ms │  1.63x slower │
└──────────────┴───────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                    │ 15957.11ms │
│ Total Time (on-demand-repartition)   │ 23137.26ms │
│ Average Time (main)                  │   725.32ms │
│ Average Time (on-demand-repartition) │  1051.69ms │
│ Queries Faster                       │          1 │
│ Queries Slower                       │         17 │
│ Queries with No Change               │          4 │
└──────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-repartition ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  91.68ms │              109.49ms │ 1.19x slower │
│ QQuery 2     │  19.80ms │               22.99ms │ 1.16x slower │
│ QQuery 3     │  37.11ms │               39.42ms │ 1.06x slower │
│ QQuery 4     │  21.37ms │               43.64ms │ 2.04x slower │
│ QQuery 5     │  53.92ms │              112.58ms │ 2.09x slower │
│ QQuery 6     │  17.88ms │               18.33ms │    no change │
│ QQuery 7     │  81.01ms │              147.40ms │ 1.82x slower │
│ QQuery 8     │  52.87ms │               81.27ms │ 1.54x slower │
│ QQuery 9     │  66.90ms │              112.11ms │ 1.68x slower │
│ QQuery 10    │  54.90ms │               88.33ms │ 1.61x slower │
│ QQuery 11    │  13.98ms │               14.90ms │ 1.07x slower │
│ QQuery 12    │  34.88ms │               53.64ms │ 1.54x slower │
│ QQuery 13    │  32.31ms │               54.75ms │ 1.69x slower │
│ QQuery 14    │  28.94ms │               29.07ms │    no change │
│ QQuery 15    │  41.60ms │               43.71ms │ 1.05x slower │
│ QQuery 16    │  14.79ms │               20.56ms │ 1.39x slower │
│ QQuery 17    │  93.26ms │               89.71ms │    no change │
│ QQuery 18    │ 122.28ms │              235.23ms │ 1.92x slower │
│ QQuery 19    │  49.44ms │               48.62ms │    no change │
│ QQuery 20    │  42.75ms │               59.40ms │ 1.39x slower │
│ QQuery 21    │ 109.62ms │              139.85ms │ 1.28x slower │
│ QQuery 22    │  15.79ms │               18.19ms │ 1.15x slower │
└──────────────┴──────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                    │ 1097.07ms │
│ Total Time (on-demand-repartition)   │ 1583.19ms │
│ Average Time (main)                  │   49.87ms │
│ Average Time (on-demand-repartition) │   71.96ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │        18 │
│ Queries with No Change               │         4 │
└──────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  842.97ms │              966.72ms │ 1.15x slower │
│ QQuery 2     │  121.65ms │              164.88ms │ 1.36x slower │
│ QQuery 3     │  395.51ms │              716.07ms │ 1.81x slower │
│ QQuery 4     │  198.98ms │              374.92ms │ 1.88x slower │
│ QQuery 5     │  594.04ms │             1014.70ms │ 1.71x slower │
│ QQuery 6     │  138.74ms │              144.43ms │    no change │
│ QQuery 7     │  869.16ms │             1604.63ms │ 1.85x slower │
│ QQuery 8     │  623.21ms │             1136.40ms │ 1.82x slower │
│ QQuery 9     │ 1001.38ms │             1777.71ms │ 1.78x slower │
│ QQuery 10    │  553.44ms │              882.55ms │ 1.59x slower │
│ QQuery 11    │   84.11ms │               89.23ms │ 1.06x slower │
│ QQuery 12    │  300.43ms │              482.21ms │ 1.61x slower │
│ QQuery 13    │  421.05ms │              753.82ms │ 1.79x slower │
│ QQuery 14    │  239.22ms │              237.19ms │    no change │
│ QQuery 15    │  392.86ms │              404.59ms │    no change │
│ QQuery 16    │   93.17ms │              141.12ms │ 1.51x slower │
│ QQuery 17    │ 1051.32ms │             1087.79ms │    no change │
│ QQuery 18    │ 1568.21ms │             3020.59ms │ 1.93x slower │
│ QQuery 19    │  387.46ms │              385.81ms │    no change │
│ QQuery 20    │  384.23ms │              598.62ms │ 1.56x slower │
│ QQuery 21    │ 1327.44ms │             2440.54ms │ 1.84x slower │
│ QQuery 22    │  126.51ms │              232.05ms │ 1.83x slower │
└──────────────┴───────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                    │ 11715.08ms │
│ Total Time (on-demand-repartition)   │ 18656.57ms │
│ Average Time (main)                  │   532.50ms │
│ Average Time (on-demand-repartition) │   848.03ms │
│ Queries Faster                       │          0 │
│ Queries Slower                       │         17 │
│ Queries with No Change               │          5 │
└──────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ on-demand-repartition ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  4041.39ms │             6221.13ms │ 1.54x slower │
│ QQuery 2     │   812.40ms │             1323.46ms │ 1.63x slower │
│ QQuery 3     │  1999.75ms │             4012.13ms │ 2.01x slower │
│ QQuery 4     │   961.16ms │             1919.61ms │ 2.00x slower │
│ QQuery 5     │  3312.35ms │             6580.98ms │ 1.99x slower │
│ QQuery 6     │   681.90ms │              650.67ms │    no change │
│ QQuery 7     │ 11748.47ms │            22405.71ms │ 1.91x slower │
│ QQuery 8     │  3338.18ms │             6077.53ms │ 1.82x slower │
│ QQuery 9     │  6539.49ms │            11200.39ms │ 1.71x slower │
│ QQuery 10    │  2736.24ms │             4932.34ms │ 1.80x slower │
│ QQuery 11    │   775.18ms │              804.16ms │    no change │
│ QQuery 12    │  1335.43ms │             2440.56ms │ 1.83x slower │
│ QQuery 13    │  2446.79ms │             4541.79ms │ 1.86x slower │
│ QQuery 14    │  1030.11ms │             1111.42ms │ 1.08x slower │
│ QQuery 15    │  2407.94ms │             2505.93ms │    no change │
│ QQuery 16    │   403.22ms │              452.64ms │ 1.12x slower │
│ QQuery 17    │  5891.41ms │             5953.27ms │    no change │
│ QQuery 18    │ 15268.20ms │            25646.59ms │ 1.68x slower │
│ QQuery 19    │  1773.60ms │             1879.60ms │ 1.06x slower │
│ QQuery 20    │  2257.16ms │             3310.47ms │ 1.47x slower │
│ QQuery 21    │  8515.76ms │            16584.11ms │ 1.95x slower │
│ QQuery 22    │   657.94ms │              997.05ms │ 1.52x slower │
└──────────────┴────────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main)                    │  78934.06ms │
│ Total Time (on-demand-repartition)   │ 131551.55ms │
│ Average Time (main)                  │   3587.91ms │
│ Average Time (on-demand-repartition) │   5979.62ms │
│ Queries Faster                       │           0 │
│ Queries Slower                       │          18 │
│ Queries with No Change               │           4 │
└──────────────────────────────────────┴─────────────┘

Comment on lines 545 to 550
let partition = output_partition_rx.recv().await.map_err(|e| {
internal_datafusion_err!(
"Error receiving partition number from output partition: {}",
e
)
})?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel to receive the related partition number from the poll

Comment on lines 737 to 745
// Send partition number to input partitions
if !self.sender.is_closed() {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to input partitions: {}",
e
)
})?;
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send partition number to the channel when polling

@Weijun-H Weijun-H marked this pull request as ready for review December 29, 2024 16:53
Copy link

@mertak-synnada mertak-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was requesting some stylistic changes, but then I realized the current code structure does not satisfy the poll-based criteria on OnDemandRepartitionExec. As far as I can see, it's acting like the RepartitionExec, pre-fetching the data to output channels but only sharing the partition info. So, I stopped reviewing there.

Please let me know if I'm missing anything, once the requirement is satisfied we can continue iterating.


// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like pre-fetching data from input and not poll-based, right? Am I missing anything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reordered the process to receive the channel before the polling.

@berkaysynnada
Copy link
Collaborator

berkaysynnada commented Dec 31, 2024

If you'd like to test this feature, you can simply construct a plan like

CoalescePartitionExec
--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----MemoryExec: partitions=1, partition_sizes=[2]

and writing a debug print at the first lines of poll_next()s in CoalescePartitionExec's stream and MemoryExec's stream.

Once you collect the stream from root, you need to see that prints from MemoryExec's stream must appear after CoalescePartitionExec's stream

@mertak-synnada
Copy link

Hi @Weijun-H ,

Can you test with what @berkaysynnada suggested? I believe it will get us to a better state

@Weijun-H
Copy link
Author

Weijun-H commented Jan 12, 2025

Hi @Weijun-H ,

Can you test with what @berkaysynnada suggested? I believe it will get us to a better state

I wrote a test for this case

use std::sync::Arc;

use arrow::array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::{
    execution::TaskContext,
    physical_plan::{
        coalesce_partitions::CoalescePartitionsExec, common::collect, memory::MemoryExec,
        repartition::on_demand_repartition::OnDemandRepartitionExec, ExecutionPlan,
    },
};
use datafusion_physical_expr::Partitioning;

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();
    let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))],
    )
    .unwrap();
    let n = 2;
    let partition: Vec<RecordBatch> = (0..n).map(|_| batch.clone()).collect();
    let partitions = vec![partition.clone()];
    let input =
        Arc::new(MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap());
    let exec =
        OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)).unwrap();

    let coalesce_exec =
        CoalescePartitionsExec::new(Arc::new(exec) as Arc<dyn ExecutionPlan>);

    // execute the plan
    let task_ctx = Arc::new(TaskContext::default());
    let stream = coalesce_exec.execute(0, task_ctx)?;
    let batches = collect(stream).await?;

    Ok(())
}
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::memory] MemoryStream::poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::memory] MemoryStream::poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::memory] MemoryStream::poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next
[2025-01-13T05:43:42Z DEBUG datafusion_physical_plan::stream] CoalescePartitionExec poll_next

@github-actions github-actions bot added the proto label Jan 13, 2025
Comment on lines +566 to +577
/// Sender state
is_requested: bool,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the receiver is waiting for the data, but it has sent the partition ID previously. is_requested is TRUE

Comment on lines 581 to 592
if !self.sender.is_closed() && !self.is_requested {
debug!(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid repeated requests when Pending

@Weijun-H Weijun-H force-pushed the on-demand-repartition branch from c15eb6a to 8cd65c7 Compare January 13, 2025 14:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants