Skip to content

Commit

Permalink
fix: Limits are not applied correctly (#14418)
Browse files Browse the repository at this point in the history
* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments
  • Loading branch information
zhuqi-lucas authored Feb 4, 2025
1 parent ea788c7 commit 0d9f845
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
9 changes: 9 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
global_state.satisfied = true;
return Ok((Transformed::no(pushdown_plan), global_state));
}

// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
Expand Down
104 changes: 104 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,107 @@ physical_plan

statement ok
drop table testSubQueryLimit;


# Test push down limit with more than one partition
statement ok
set datafusion.explain.logical_plan_only = false;

# Set up 3 partitions
statement ok
set datafusion.execution.target_partitions = 3;

# automatically partition all files over 1 byte
statement ok
set datafusion.optimizer.repartition_file_min_size = 1;

# Create a table as a data source
statement ok
CREATE TABLE src_table (
part_key INT,
value INT
) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6);


# Setup 3 files, i.e., as many as there are partitions:

# File 1:
query I
COPY (SELECT * FROM src_table where part_key = 1)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet'
STORED AS PARQUET;
----
3

# File 2:
query I
COPY (SELECT * FROM src_table where part_key = 2)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet'
STORED AS PARQUET;
----
4

# File 3:
query I
COPY (SELECT * FROM src_table where part_key = 3)
TO 'test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet'
STORED AS PARQUET;
----
3

statement ok
CREATE EXTERNAL TABLE test_limit_with_partitions
(
part_key INT,
value INT
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_limit_with_partitions/';

query TT
explain
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.part_key ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.part_key
04)------SubqueryAlias: selection
05)--------Limit: skip=0, fetch=1
06)----------TableScan: test_limit_with_partitions projection=[part_key], fetch=1
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
04)------GlobalLimitExec: skip=0, fetch=1
05)--------CoalescePartitionsExec
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1

query I
with selection as (
select *
from test_limit_with_partitions
limit 1
)
select 1 as foo
from selection
order by part_key
limit 1000;
----
1

# Tear down test_filter_with_limit table:
statement ok
DROP TABLE test_limit_with_partitions;

# Tear down src_table table:
statement ok
DROP TABLE src_table;

0 comments on commit 0d9f845

Please sign in to comment.