Skip to content

Commit

Permalink
fix: handle LogicalPlan::Limit separately to preserve skip and offset…
Browse files Browse the repository at this point in the history
… in rewrite_table_scans
  • Loading branch information
ewgenius committed Jan 2, 2025
1 parent 4c0ff79 commit 1276595
Showing 1 changed file with 89 additions and 1 deletion.
90 changes: 89 additions & 1 deletion sources/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::{
AggregateFunction, Alias, Exists, InList, InSubquery, ScalarFunction, Sort, Unnest,
WindowFunction,
},
Between, BinaryExpr, Case, Cast, Expr, Extension, GroupingSet, Like, LogicalPlan,
Between, BinaryExpr, Case, Cast, Expr, Extension, GroupingSet, Like, Limit, LogicalPlan,
LogicalPlanBuilder, Projection, Subquery, TryCast,
},
optimizer::analyzer::{Analyzer, AnalyzerRule},
Expand Down Expand Up @@ -180,6 +180,44 @@ fn rewrite_table_scans(
subquery_table_scans,
)
}
LogicalPlan::Limit(limit) => {
let rewritten_skip = limit
.skip
.as_ref()
.map(|skip| {
rewrite_table_scans_in_expr(
*skip.clone(),
known_rewrites,
subquery_uses_partial_path,
subquery_table_scans,
)
.map(Box::new)
})
.transpose()?;

let rewritten_fetch = limit
.fetch
.as_ref()
.map(|fetch| {
rewrite_table_scans_in_expr(
*fetch.clone(),
known_rewrites,
subquery_uses_partial_path,
subquery_table_scans,
)
.map(Box::new)
})
.transpose()?;

// explisitly set fetch and skip
let new_plan = LogicalPlan::Limit(Limit {
skip: rewritten_skip,
fetch: rewritten_fetch,
input: Arc::new(rewritten_inputs[0].clone()),
});
Ok(new_plan)
}

_ => {
let mut new_expressions = vec![];
for expression in plan.expressions() {
Expand Down Expand Up @@ -1558,4 +1596,54 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_rewrite_table_scans_limit_offset() -> Result<()> {
init_tracing();
let ctx = get_test_df_context();

let tests = vec![
// Basic LIMIT
(
"SELECT a FROM foo.df_table LIMIT 5",
r#"SELECT remote_table.a FROM remote_table LIMIT 5"#,
),
// Basic OFFSET
(
"SELECT a FROM foo.df_table OFFSET 5",
r#"SELECT remote_table.a FROM remote_table OFFSET 5"#,
),
// OFFSET after LIMIT
(
"SELECT a FROM foo.df_table LIMIT 10 OFFSET 5",
r#"SELECT remote_table.a FROM remote_table LIMIT 10 OFFSET 5"#,
),
// LIMIT after OFFSET
(
"SELECT a FROM foo.df_table OFFSET 5 LIMIT 10",
r#"SELECT remote_table.a FROM remote_table LIMIT 10 OFFSET 5"#,
),
// Zero OFFSET
(
"SELECT a FROM foo.df_table OFFSET 0",
r#"SELECT remote_table.a FROM remote_table OFFSET 0"#,
),
// Zero LIMIT
(
"SELECT a FROM foo.df_table LIMIT 0",
r#"SELECT remote_table.a FROM remote_table LIMIT 0"#,
),
// Zero LIMIT and OFFSET
(
"SELECT a FROM foo.df_table LIMIT 0 OFFSET 0",
r#"SELECT remote_table.a FROM remote_table LIMIT 0 OFFSET 0"#,
),
];

for test in tests {
test_sql(&ctx, test.0, test.1, false).await?;
}

Ok(())
}
}

0 comments on commit 1276595

Please sign in to comment.