diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index 756acea..09a1fb3 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -19,7 +19,7 @@ use datafusion::{ AggregateFunction, Alias, Exists, InList, InSubquery, ScalarFunction, Sort, Unnest, WindowFunction, }, - Between, BinaryExpr, Case, Cast, Expr, Extension, GroupingSet, Like, LogicalPlan, + Between, BinaryExpr, Case, Cast, Expr, Extension, GroupingSet, Like, Limit, LogicalPlan, LogicalPlanBuilder, Projection, Subquery, TryCast, }, optimizer::analyzer::{Analyzer, AnalyzerRule}, @@ -180,6 +180,44 @@ fn rewrite_table_scans( subquery_table_scans, ) } + LogicalPlan::Limit(limit) => { + let rewritten_skip = limit + .skip + .as_ref() + .map(|skip| { + rewrite_table_scans_in_expr( + *skip.clone(), + known_rewrites, + subquery_uses_partial_path, + subquery_table_scans, + ) + .map(Box::new) + }) + .transpose()?; + + let rewritten_fetch = limit + .fetch + .as_ref() + .map(|fetch| { + rewrite_table_scans_in_expr( + *fetch.clone(), + known_rewrites, + subquery_uses_partial_path, + subquery_table_scans, + ) + .map(Box::new) + }) + .transpose()?; + + // explisitly set fetch and skip + let new_plan = LogicalPlan::Limit(Limit { + skip: rewritten_skip, + fetch: rewritten_fetch, + input: Arc::new(rewritten_inputs[0].clone()), + }); + Ok(new_plan) + } + _ => { let mut new_expressions = vec![]; for expression in plan.expressions() { @@ -1558,4 +1596,54 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_rewrite_table_scans_limit_offset() -> Result<()> { + init_tracing(); + let ctx = get_test_df_context(); + + let tests = vec![ + // Basic LIMIT + ( + "SELECT a FROM foo.df_table LIMIT 5", + r#"SELECT remote_table.a FROM remote_table LIMIT 5"#, + ), + // Basic OFFSET + ( + "SELECT a FROM foo.df_table OFFSET 5", + r#"SELECT remote_table.a FROM remote_table OFFSET 5"#, + ), + // OFFSET after LIMIT + ( + "SELECT a FROM foo.df_table LIMIT 10 OFFSET 5", + r#"SELECT remote_table.a FROM remote_table LIMIT 10 OFFSET 5"#, + ), + // LIMIT after OFFSET + ( + "SELECT a FROM foo.df_table OFFSET 5 LIMIT 10", + r#"SELECT remote_table.a FROM remote_table LIMIT 10 OFFSET 5"#, + ), + // Zero OFFSET + ( + "SELECT a FROM foo.df_table OFFSET 0", + r#"SELECT remote_table.a FROM remote_table OFFSET 0"#, + ), + // Zero LIMIT + ( + "SELECT a FROM foo.df_table LIMIT 0", + r#"SELECT remote_table.a FROM remote_table LIMIT 0"#, + ), + // Zero LIMIT and OFFSET + ( + "SELECT a FROM foo.df_table LIMIT 0 OFFSET 0", + r#"SELECT remote_table.a FROM remote_table LIMIT 0 OFFSET 0"#, + ), + ]; + + for test in tests { + test_sql(&ctx, test.0, test.1, false).await?; + } + + Ok(()) + } }