Skip to content

Commit

Permalink
fix: LimitPushdown rule uncorrect remove some GlobalLimitExec
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuqi-lucas committed Jan 23, 2025
1 parent 0228bee commit 9781cc1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 7 deletions.
34 changes: 29 additions & 5 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> {
let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;

let df_explain = ctx.sql("explain SELECT a FROM data").await?;
let explain_result = df_explain.collect().await?;

println!("explain_result {:?}", explain_result);

assert_batches_eq!(
&[
"+---+---+",
Expand Down Expand Up @@ -5182,3 +5177,32 @@ async fn register_non_parquet_file() {
"1.json' does not match the expected extension '.parquet'"
);
}

// Test issue: https://github.com/apache/datafusion/issues/14204
#[tokio::test]
async fn test_with_subquery_limit() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql("create table t(a int, b int) as values (1,2), (2,3), (3,4)")
.await?;

let df = ctx
.sql("select * from t as t1 join (select * from t limit 1) limit 10")
.await?
.collect()
.await?;

assert_batches_eq!(
&[
"+---+---+---+---+",
"| a | b | a | b |",
"+---+---+---+---+",
"| 1 | 2 | 1 | 2 |",
"| 2 | 3 | 1 | 2 |",
"| 3 | 4 | 1 | 2 |",
"+---+---+---+---+",
],
&df
);

Ok(())
}
11 changes: 9 additions & 2 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
/// the parent to the child if applicable.
#[derive(Default, Debug)]
Expand Down Expand Up @@ -247,7 +246,15 @@ pub fn pushdown_limit_helper(
}
} else {
// Add fetch or a `LimitExec`:
global_state.satisfied = true;

// If the plan's children have limit, we shouldn't change the global state to true,
// because the children limit will be overridden if the global state is changed.
if pushdown_plan.children().iter().any(|child| {
child.as_any().is::<GlobalLimitExec>()
|| child.as_any().is::<LocalLimitExec>()
}) {
global_state.satisfied = false;
}
pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
if global_skip > 0 {
add_global_limit(plan_with_fetch, global_skip, Some(global_fetch))
Expand Down
34 changes: 34 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -711,3 +711,37 @@ OFFSET 3 LIMIT 2;

statement ok
drop table ordered_table;

# Test limit pushdown with subquery
statement ok
create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4);

query IIII
select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10;
----
1 2 1 2
2 3 1 2
3 4 1 2

query TT
explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10;
----
logical_plan
01)Limit: skip=0, fetch=10
02)--Cross Join:
03)----SubqueryAlias: t1
04)------Limit: skip=0, fetch=10
05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10
06)----Limit: skip=0, fetch=1
07)------TableScan: testsubquerylimit projection=[a, b], fetch=1
physical_plan
01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b]
02)--GlobalLimitExec: skip=0, fetch=10
03)----CrossJoinExec
04)------GlobalLimitExec: skip=0, fetch=1
05)--------MemoryExec: partitions=1, partition_sizes=[1]
06)------GlobalLimitExec: skip=0, fetch=10
07)--------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
drop table testSubQueryLimit;

0 comments on commit 9781cc1

Please sign in to comment.