From 6d43dc2d729e48e298a7af198d6d0d95fe14fe23 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 4 Dec 2024 13:54:28 -0800 Subject: [PATCH 01/17] minor(13525): perform LP validation before and after each possible mutation --- datafusion/optimizer/src/analyzer/mod.rs | 19 ++++++++++++- datafusion/optimizer/src/optimizer.rs | 36 ++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index a9fd4900b2f4..720fc7be5e4c 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -137,6 +137,19 @@ impl Analyzer { where F: FnMut(&LogicalPlan, &dyn AnalyzerRule), { + /* current test failure: + External error: statement is expected to fail with error: + (regex) DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name + but got error: + DataFusion error: Error during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name + [SQL] SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 + at test_files/subquery.slt:436 + */ + // verify at the start, before the first LP analyzer pass. + // check_plan(&plan).map_err(|e| { + // DataFusionError::Context("check_plan_before_analyzers".to_string(), Box::new(e)) + // })?; + let start_time = Instant::now(); let mut new_plan = plan; @@ -174,7 +187,11 @@ impl Analyzer { } } -/// Do necessary check and fail the invalid plan +/// These are invariants to hold true for each logical plan. +/// Do necessary check and fail the invalid plan. +/// +/// Checks for elements which are immutable across analyzer passes. +/// Does not check elements which are mutated by the analyzers (e.g. the schema). fn check_plan(plan: &LogicalPlan) -> Result<()> { plan.apply_with_subqueries(|plan: &LogicalPlan| { plan.apply_expressions(|expr| { diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index dfdd0c110c22..2e35bb0c426b 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -355,6 +355,16 @@ impl Optimizer { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { + // verify at the start, before the first LP optimizer pass. + check_plan("before_optimizers", &plan, Arc::clone(plan.schema())).map_err( + |e| { + DataFusionError::Context( + "check_plan_before_optimizers".to_string(), + Box::new(e), + ) + }, + )?; + let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; @@ -384,9 +394,15 @@ impl Optimizer { // rule handles recursion itself None => optimize_plan_node(new_plan, rule.as_ref(), config), } - // verify the rule didn't change the schema .and_then(|tnr| { - assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?; + // verify after each optimizer pass. + check_plan(rule.name(), &tnr.data, starting_schema).map_err(|e| { + DataFusionError::Context( + "check_optimized_plan".to_string(), + Box::new(e), + ) + })?; + Ok(tnr) }); @@ -451,6 +467,22 @@ impl Optimizer { } } +/// These are invariants to hold true for each logical plan. +/// Do necessary check and fail the invalid plan. +/// +/// Checks for elements which are immutable across optimizer passes. +fn check_plan( + check_name: &str, + plan: &LogicalPlan, + prev_schema: Arc, +) -> Result<()> { + // verify invariant: optimizer rule didn't change the schema + assert_schema_is_the_same(check_name, &prev_schema, plan)?; + + // TODO: trait API and provide extension on the Optimizer to define own validations? + Ok(()) +} + /// Returns an error if `new_plan`'s schema is different than `prev_schema` /// /// It ignores metadata and nullability. From a855811209c3a2640def01af939540e3672c7a66 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 4 Dec 2024 14:59:12 -0800 Subject: [PATCH 02/17] minor(13525): validate unique field names on query and subquery schemas, after each optimizer pass --- datafusion/optimizer/src/optimizer.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 2e35bb0c426b..4d876807cd72 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -27,7 +27,7 @@ use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion, TreeNodeRewriter}; use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result}; use datafusion_expr::logical_plan::LogicalPlan; @@ -479,6 +479,9 @@ fn check_plan( // verify invariant: optimizer rule didn't change the schema assert_schema_is_the_same(check_name, &prev_schema, plan)?; + // verify invariant: fields must have unique names + assert_unique_field_names(plan)?; + // TODO: trait API and provide extension on the Optimizer to define own validations? Ok(()) } @@ -508,6 +511,20 @@ pub(crate) fn assert_schema_is_the_same( } } +/// Returns an error if plan, and subplans, do not have unique fields. +/// +/// This invariant is subject to change. +/// refer: +fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { + plan.schema().check_names()?; + + plan.apply_with_subqueries(|plan: &LogicalPlan| { + plan.schema().check_names()?; + Ok(TreeNodeRecursion::Continue) + }) + .map(|_| ()) +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; From 0163a407a3fd3198e6f7cae397f7459eddb62b40 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 4 Dec 2024 15:50:30 -0800 Subject: [PATCH 03/17] minor(13525): validate union after each optimizer passes --- datafusion/optimizer/src/optimizer.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 4d876807cd72..4522615facea 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::Union; use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; @@ -482,6 +483,14 @@ fn check_plan( // verify invariant: fields must have unique names assert_unique_field_names(plan)?; + /* This current fails for: + - execution::context::tests::cross_catalog_access + - at test_files/string/string.slt:46 + External error: query failed: DataFusion error: Optimizer rule 'eliminate_nested_union' failed + */ + // verify invariant: equivalent schema across union inputs + // assert_unions_are_valid(check_name, plan)?; + // TODO: trait API and provide extension on the Optimizer to define own validations? Ok(()) } @@ -525,6 +534,24 @@ fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { .map(|_| ()) } +/// Returns an error if any union nodes are invalid. +#[allow(dead_code)] +fn assert_unions_are_valid(rule_name: &str, plan: &LogicalPlan) -> Result<()> { + plan.apply_with_subqueries(|plan: &LogicalPlan| { + if let LogicalPlan::Union(Union { schema, inputs }) = plan { + inputs.iter().try_for_each(|subplan| { + assert_schema_is_the_same( + format!("{rule_name}:union_check").as_str(), + schema, + subplan, + ) + })?; + } + Ok(TreeNodeRecursion::Continue) + }) + .map(|_| ()) +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; From bee7e9247db12ef4f3d0ee9c99a886776d71a0c3 Mon Sep 17 00:00:00 2001 From: wiedld Date: Sun, 15 Dec 2024 20:58:24 -0800 Subject: [PATCH 04/17] refactor: make explicit what is an invariant of the logical plan, versus assertions made after a given analyzer or optimizer pass --- datafusion/expr/src/logical_plan/mod.rs | 2 + datafusion/expr/src/logical_plan/plan.rs | 8 ++ datafusion/optimizer/src/analyzer/mod.rs | 45 ++++--- datafusion/optimizer/src/analyzer/subquery.rs | 4 +- datafusion/optimizer/src/optimizer.rs | 116 +++++------------- 5 files changed, 68 insertions(+), 107 deletions(-) diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5d613d4e80db..f0b1efded741 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -20,6 +20,8 @@ mod ddl; pub mod display; pub mod dml; mod extension; +pub(crate) mod invariants; +pub use invariants::assert_expected_schema; mod plan; mod statement; pub mod tree_node; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index a1875012eea7..cd57c203b290 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,6 +24,7 @@ use std::hash::{Hash, Hasher}; use std::sync::{Arc, OnceLock}; use super::dml::CopyTo; +use super::invariants::assert_unique_field_names; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; @@ -1125,6 +1126,13 @@ impl LogicalPlan { } } + /// These are invariants to hold true for each logical plan. + pub fn assert_invariants(&self) -> Result<()> { + assert_unique_field_names(self)?; + + Ok(()) + } + /// Helper for [Self::with_new_exprs] to use when no expressions are expected. #[inline] #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 720fc7be5e4c..b4626db55232 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -137,18 +137,13 @@ impl Analyzer { where F: FnMut(&LogicalPlan, &dyn AnalyzerRule), { - /* current test failure: - External error: statement is expected to fail with error: - (regex) DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name - but got error: - DataFusion error: Error during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name - [SQL] SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 - at test_files/subquery.slt:436 - */ // verify at the start, before the first LP analyzer pass. - // check_plan(&plan).map_err(|e| { - // DataFusionError::Context("check_plan_before_analyzers".to_string(), Box::new(e)) - // })?; + assert_valid_semantic_plan(&plan).map_err(|e| { + DataFusionError::Context( + "check_plan_before_analyzers".to_string(), + Box::new(e), + ) + })?; let start_time = Instant::now(); let mut new_plan = plan; @@ -177,22 +172,38 @@ impl Analyzer { log_plan(rule.name(), &new_plan); observer(&new_plan, rule.as_ref()); } - // for easier display in explain output - check_plan(&new_plan).map_err(|e| { + + // verify at the end, after the last LP analyzer pass. + assert_valid_semantic_plan(&new_plan).map_err(|e| { DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e)) })?; + log_plan("Final analyzed plan", &new_plan); debug!("Analyzer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } } -/// These are invariants to hold true for each logical plan. -/// Do necessary check and fail the invalid plan. +/// These are invariants which should hold true before and after each analyzer. +/// +/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular +/// LogicalPlan is valid. Instead this address if the analyzer (before and after) +/// is valid based upon permitted changes. /// -/// Checks for elements which are immutable across analyzer passes. /// Does not check elements which are mutated by the analyzers (e.g. the schema). -fn check_plan(plan: &LogicalPlan) -> Result<()> { +fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> { + plan.assert_invariants()?; + + // TODO: should this be moved to LogicalPlan::assert_invariants? + assert_subqueries_are_valid(plan)?; + + Ok(()) +} + +/// Asserts that the subqueries are structured properly with valid node placement. +/// +/// Refer to [`check_subquery_expr`] for more details. +fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> { plan.apply_with_subqueries(|plan: &LogicalPlan| { plan.apply_expressions(|expr| { // recursively look for subqueries diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index fee06eeb9f75..3addb9f95f5b 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::analyzer::check_plan; +use crate::analyzer::assert_subqueries_are_valid; use crate::utils::collect_subquery_cols; use recursive::recursive; @@ -37,7 +37,7 @@ pub fn check_subquery_expr( inner_plan: &LogicalPlan, expr: &Expr, ) -> Result<()> { - check_plan(inner_plan)?; + assert_subqueries_are_valid(inner_plan)?; if let Expr::ScalarSubquery(subquery) = expr { // Scalar subquery should only return one column if subquery.subquery.schema().fields().len() > 1 { diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 4522615facea..7abf82978751 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -21,14 +21,14 @@ use std::fmt::Debug; use std::sync::Arc; use chrono::{DateTime, Utc}; +use datafusion_expr::assert_expected_schema; use datafusion_expr::registry::FunctionRegistry; -use datafusion_expr::Union; use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{Transformed, TreeNodeRecursion, TreeNodeRewriter}; +use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result}; use datafusion_expr::logical_plan::LogicalPlan; @@ -356,15 +356,13 @@ impl Optimizer { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { - // verify at the start, before the first LP optimizer pass. - check_plan("before_optimizers", &plan, Arc::clone(plan.schema())).map_err( - |e| { - DataFusionError::Context( - "check_plan_before_optimizers".to_string(), - Box::new(e), - ) - }, - )?; + // verify LP is valid, before the first LP optimizer pass. + plan.assert_invariants().map_err(|e| { + DataFusionError::Context( + "check_plan_before_optimizers".to_string(), + Box::new(e), + ) + })?; let start_time = Instant::now(); let options = config.options(); @@ -397,7 +395,8 @@ impl Optimizer { } .and_then(|tnr| { // verify after each optimizer pass. - check_plan(rule.name(), &tnr.data, starting_schema).map_err(|e| { + assert_valid_optimization(rule.name(), &tnr.data, &starting_schema) + .map_err(|e| { DataFusionError::Context( "check_optimized_plan".to_string(), Box::new(e), @@ -462,96 +461,37 @@ impl Optimizer { } i += 1; } + + // verify LP is valid, after the last optimizer pass. + new_plan.assert_invariants().map_err(|e| { + DataFusionError::Context( + "check_plan_after_optimizers".to_string(), + Box::new(e), + ) + })?; + log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } } -/// These are invariants to hold true for each logical plan. -/// Do necessary check and fail the invalid plan. +/// These are invariants which should hold true before and after each optimization. /// -/// Checks for elements which are immutable across optimizer passes. -fn check_plan( - check_name: &str, +/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular +/// LogicalPlan is valid. Instead this address if the optimization (before and after) +/// is valid based upon permitted changes. +fn assert_valid_optimization( + rule_name: &str, plan: &LogicalPlan, - prev_schema: Arc, + prev_schema: &Arc, ) -> Result<()> { // verify invariant: optimizer rule didn't change the schema - assert_schema_is_the_same(check_name, &prev_schema, plan)?; - - // verify invariant: fields must have unique names - assert_unique_field_names(plan)?; - - /* This current fails for: - - execution::context::tests::cross_catalog_access - - at test_files/string/string.slt:46 - External error: query failed: DataFusion error: Optimizer rule 'eliminate_nested_union' failed - */ - // verify invariant: equivalent schema across union inputs - // assert_unions_are_valid(check_name, plan)?; + assert_expected_schema(rule_name, prev_schema, plan)?; - // TODO: trait API and provide extension on the Optimizer to define own validations? Ok(()) } -/// Returns an error if `new_plan`'s schema is different than `prev_schema` -/// -/// It ignores metadata and nullability. -pub(crate) fn assert_schema_is_the_same( - rule_name: &str, - prev_schema: &DFSchema, - new_plan: &LogicalPlan, -) -> Result<()> { - let equivalent = new_plan.schema().equivalent_names_and_types(prev_schema); - - if !equivalent { - let e = DataFusionError::Internal(format!( - "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", - prev_schema, - new_plan.schema() - )); - Err(DataFusionError::Context( - String::from(rule_name), - Box::new(e), - )) - } else { - Ok(()) - } -} - -/// Returns an error if plan, and subplans, do not have unique fields. -/// -/// This invariant is subject to change. -/// refer: -fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { - plan.schema().check_names()?; - - plan.apply_with_subqueries(|plan: &LogicalPlan| { - plan.schema().check_names()?; - Ok(TreeNodeRecursion::Continue) - }) - .map(|_| ()) -} - -/// Returns an error if any union nodes are invalid. -#[allow(dead_code)] -fn assert_unions_are_valid(rule_name: &str, plan: &LogicalPlan) -> Result<()> { - plan.apply_with_subqueries(|plan: &LogicalPlan| { - if let LogicalPlan::Union(Union { schema, inputs }) = plan { - inputs.iter().try_for_each(|subplan| { - assert_schema_is_the_same( - format!("{rule_name}:union_check").as_str(), - schema, - subplan, - ) - })?; - } - Ok(TreeNodeRecursion::Continue) - }) - .map(|_| ()) -} - #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; From 4eee9c4fff38947cb476eba6a03a90e14a49e835 Mon Sep 17 00:00:00 2001 From: wiedld Date: Sun, 15 Dec 2024 21:43:52 -0800 Subject: [PATCH 05/17] chore: add link to invariant docs --- datafusion/expr/src/logical_plan/plan.rs | 1 + datafusion/optimizer/src/optimizer.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index cd57c203b290..60355357d753 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1128,6 +1128,7 @@ impl LogicalPlan { /// These are invariants to hold true for each logical plan. pub fn assert_invariants(&self) -> Result<()> { + // Refer to assert_unique_field_names(self)?; Ok(()) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 7abf82978751..0899ea0dbd50 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -487,6 +487,7 @@ fn assert_valid_optimization( prev_schema: &Arc, ) -> Result<()> { // verify invariant: optimizer rule didn't change the schema + // Refer to assert_expected_schema(rule_name, prev_schema, plan)?; Ok(()) From a7d97704c2ec895143acbb5475323b00bbf37489 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 16 Dec 2024 09:26:17 -0800 Subject: [PATCH 06/17] fix: add new invariants module --- .../expr/src/logical_plan/invariants.rs | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 datafusion/expr/src/logical_plan/invariants.rs diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs new file mode 100644 index 000000000000..551f9dda81e1 --- /dev/null +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::{ + tree_node::TreeNodeRecursion, DFSchemaRef, DataFusionError, Result, +}; + +use super::LogicalPlan; + +/// Returns an error if plan, and subplans, do not have unique fields. +/// +/// This invariant is subject to change. +/// refer: +pub fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { + plan.schema().check_names()?; + + plan.apply_with_subqueries(|plan: &LogicalPlan| { + plan.schema().check_names()?; + Ok(TreeNodeRecursion::Continue) + }) + .map(|_| ()) +} + +/// Returns an error if the plan does not have the expected schema. +/// Ignores metadata and nullability. +pub fn assert_expected_schema( + rule_name: &str, + schema: &DFSchemaRef, + plan: &LogicalPlan, +) -> Result<()> { + let equivalent = plan.schema().equivalent_names_and_types(schema); + + if !equivalent { + let e = DataFusionError::Internal(format!( + "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", + schema, + plan.schema() + )); + Err(DataFusionError::Context( + String::from(rule_name), + Box::new(e), + )) + } else { + Ok(()) + } +} From 2002b1a099ef8237dd8e3d47dff7c10155050417 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 17 Dec 2024 13:21:32 -0800 Subject: [PATCH 07/17] refactor: move all LP invariant checking into LP, delineate executable (valid semantic plan) vs basic LP invariants --- .../expr/src/logical_plan/invariants.rs | 415 +++++++++++++++++- datafusion/expr/src/logical_plan/mod.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 16 +- datafusion/optimizer/src/analyzer/mod.rs | 72 +-- datafusion/optimizer/src/analyzer/subquery.rs | 351 --------------- datafusion/optimizer/src/optimizer.rs | 31 +- 6 files changed, 458 insertions(+), 429 deletions(-) delete mode 100644 datafusion/optimizer/src/analyzer/subquery.rs diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 551f9dda81e1..d6cd889afc2b 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -15,17 +15,54 @@ // specific language governing permissions and limitations // under the License. +use std::collections::BTreeSet; + +use recursive::recursive; + use datafusion_common::{ - tree_node::TreeNodeRecursion, DFSchemaRef, DataFusionError, Result, + plan_err, + tree_node::{TreeNode, TreeNodeRecursion}, + Column, DFSchema, DFSchemaRef, DataFusionError, Result, }; -use super::LogicalPlan; +use crate::{ + expr::{Exists, InSubquery}, + expr_rewriter::strip_outer_reference, + utils::split_conjunction, + Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window, +}; + +pub enum InvariantLevel { + /// Invariants that are always true in DataFusion `LogicalPlan`s + /// such as the number of expected children and no duplicated output fields + Always, + /// Invariants that must hold true for the plan to be "executable" + /// such as the type and number of function arguments are correct and + /// that wildcards have been expanded + /// + /// To ensure a LogicalPlan satisfies the `Executable` inariants, run the + /// `Analyzer` + Executable, +} + +pub fn assert_required_invariants(plan: &LogicalPlan) -> Result<()> { + // Refer to + assert_unique_field_names(plan)?; + + Ok(()) +} + +pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { + assert_required_invariants(plan)?; + assert_valid_semantic_plan(plan)?; + Ok(()) +} /// Returns an error if plan, and subplans, do not have unique fields. /// /// This invariant is subject to change. /// refer: -pub fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { +fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { plan.schema().check_names()?; plan.apply_with_subqueries(|plan: &LogicalPlan| { @@ -35,6 +72,13 @@ pub fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { .map(|_| ()) } +/// Returns an error if the plan is not sematically valid. +fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> { + assert_subqueries_are_valid(plan)?; + + Ok(()) +} + /// Returns an error if the plan does not have the expected schema. /// Ignores metadata and nullability. pub fn assert_expected_schema( @@ -58,3 +102,368 @@ pub fn assert_expected_schema( Ok(()) } } + +/// Asserts that the subqueries are structured properly with valid node placement. +/// +/// Refer to [`check_subquery_expr`] for more details. +fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> { + plan.apply_with_subqueries(|plan: &LogicalPlan| { + plan.apply_expressions(|expr| { + // recursively look for subqueries + expr.apply(|expr| { + match expr { + Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::ScalarSubquery(subquery) => { + check_subquery_expr(plan, &subquery.subquery, expr)?; + } + _ => {} + }; + Ok(TreeNodeRecursion::Continue) + }) + }) + }) + .map(|_| ()) +} + +/// Do necessary check on subquery expressions and fail the invalid plan +/// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions, +/// the allowed while list: [Projection, Filter, Window, Aggregate, Join]. +/// 2) Check whether the inner plan is in the allowed inner plans list to use correlated(outer) expressions. +/// 3) Check and validate unsupported cases to use the correlated(outer) expressions inside the subquery(inner) plans/inner expressions. +/// For example, we do not want to support to use correlated expressions as the Join conditions in the subquery plan when the Join +/// is a Full Out Join +pub fn check_subquery_expr( + outer_plan: &LogicalPlan, + inner_plan: &LogicalPlan, + expr: &Expr, +) -> Result<()> { + assert_subqueries_are_valid(inner_plan)?; + if let Expr::ScalarSubquery(subquery) = expr { + // Scalar subquery should only return one column + if subquery.subquery.schema().fields().len() > 1 { + return plan_err!( + "Scalar subquery should only return one column, but found {}: {}", + subquery.subquery.schema().fields().len(), + subquery.subquery.schema().field_names().join(", ") + ); + } + // Correlated scalar subquery must be aggregated to return at most one row + if !subquery.outer_ref_columns.is_empty() { + match strip_inner_query(inner_plan) { + LogicalPlan::Aggregate(agg) => { + check_aggregation_in_scalar_subquery(inner_plan, agg) + } + LogicalPlan::Filter(Filter { input, .. }) + if matches!(input.as_ref(), LogicalPlan::Aggregate(_)) => + { + if let LogicalPlan::Aggregate(agg) = input.as_ref() { + check_aggregation_in_scalar_subquery(inner_plan, agg) + } else { + Ok(()) + } + } + _ => { + if inner_plan + .max_rows() + .filter(|max_row| *max_row <= 1) + .is_some() + { + Ok(()) + } else { + plan_err!( + "Correlated scalar subquery must be aggregated to return at most one row" + ) + } + } + }?; + match outer_plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) => Ok(()), + LogicalPlan::Aggregate(Aggregate {group_expr, aggr_expr,..}) => { + if group_expr.contains(expr) && !aggr_expr.contains(expr) { + // TODO revisit this validation logic + plan_err!( + "Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions" + ) + } else { + Ok(()) + } + }, + _ => plan_err!( + "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" + ) + }?; + } + check_correlations_in_subquery(inner_plan) + } else { + if let Expr::InSubquery(subquery) = expr { + // InSubquery should only return one column + if subquery.subquery.subquery.schema().fields().len() > 1 { + return plan_err!( + "InSubquery should only return one column, but found {}: {}", + subquery.subquery.subquery.schema().fields().len(), + subquery.subquery.subquery.schema().field_names().join(", ") + ); + } + } + match outer_plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Join(_) => Ok(()), + _ => plan_err!( + "In/Exist subquery can only be used in \ + Projection, Filter, Window functions, Aggregate and Join plan nodes, \ + but was used in [{}]", + outer_plan.display() + ), + }?; + check_correlations_in_subquery(inner_plan) + } +} + +// Recursively check the unsupported outer references in the sub query plan. +fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { + check_inner_plan(inner_plan, true) +} + +// Recursively check the unsupported outer references in the sub query plan. +#[recursive] +fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { + if !can_contain_outer_ref && inner_plan.contains_outer_reference() { + return plan_err!("Accessing outer reference columns is not allowed in the plan"); + } + // We want to support as many operators as possible inside the correlated subquery + match inner_plan { + LogicalPlan::Aggregate(_) => { + inner_plan.apply_children(|plan| { + check_inner_plan(plan, can_contain_outer_ref)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + LogicalPlan::Filter(Filter { input, .. }) => { + check_inner_plan(input, can_contain_outer_ref) + } + LogicalPlan::Window(window) => { + check_mixed_out_refer_in_window(window)?; + inner_plan.apply_children(|plan| { + check_inner_plan(plan, can_contain_outer_ref)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + LogicalPlan::Projection(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Union(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Values(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Unnest(_) => { + inner_plan.apply_children(|plan| { + check_inner_plan(plan, can_contain_outer_ref)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + LogicalPlan::Join(Join { + left, + right, + join_type, + .. + }) => match join_type { + JoinType::Inner => { + inner_plan.apply_children(|plan| { + check_inner_plan(plan, can_contain_outer_ref)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark => { + check_inner_plan(left, can_contain_outer_ref)?; + check_inner_plan(right, false) + } + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + check_inner_plan(left, false)?; + check_inner_plan(right, can_contain_outer_ref) + } + JoinType::Full => { + inner_plan.apply_children(|plan| { + check_inner_plan(plan, false)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + }, + LogicalPlan::Extension(_) => Ok(()), + _ => plan_err!("Unsupported operator in the subquery plan."), + } +} + +fn check_aggregation_in_scalar_subquery( + inner_plan: &LogicalPlan, + agg: &Aggregate, +) -> Result<()> { + if agg.aggr_expr.is_empty() { + return plan_err!( + "Correlated scalar subquery must be aggregated to return at most one row" + ); + } + if !agg.group_expr.is_empty() { + let correlated_exprs = get_correlated_expressions(inner_plan)?; + let inner_subquery_cols = + collect_subquery_cols(&correlated_exprs, agg.input.schema())?; + let mut group_columns = agg + .group_expr + .iter() + .map(|group| Ok(group.column_refs().into_iter().cloned().collect::>())) + .collect::>>()? + .into_iter() + .flatten(); + + if !group_columns.all(|group| inner_subquery_cols.contains(&group)) { + // Group BY columns must be a subset of columns in the correlated expressions + return plan_err!( + "A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns" + ); + } + } + Ok(()) +} + +fn strip_inner_query(inner_plan: &LogicalPlan) -> &LogicalPlan { + match inner_plan { + LogicalPlan::Projection(projection) => { + strip_inner_query(projection.input.as_ref()) + } + LogicalPlan::SubqueryAlias(alias) => strip_inner_query(alias.input.as_ref()), + other => other, + } +} + +fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result> { + let mut exprs = vec![]; + inner_plan.apply_with_subqueries(|plan| { + if let LogicalPlan::Filter(Filter { predicate, .. }) = plan { + let (correlated, _): (Vec<_>, Vec<_>) = split_conjunction(predicate) + .into_iter() + .partition(|e| e.contains_outer()); + + for expr in correlated { + exprs.push(strip_outer_reference(expr.clone())); + } + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(exprs) +} + +/// Check whether the window expressions contain a mixture of out reference columns and inner columns +fn check_mixed_out_refer_in_window(window: &Window) -> Result<()> { + let mixed = window + .window_expr + .iter() + .any(|win_expr| win_expr.contains_outer() && win_expr.any_column_refs()); + if mixed { + plan_err!( + "Window expressions should not contain a mixed of outer references and inner columns" + ) + } else { + Ok(()) + } +} + +fn collect_subquery_cols( + exprs: &[Expr], + subquery_schema: &DFSchema, +) -> Result> { + exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { + let mut using_cols: Vec = vec![]; + for col in expr.column_refs().into_iter() { + if subquery_schema.has_column(col) { + using_cols.push(col.clone()); + } + } + + cols.extend(using_cols); + Result::<_>::Ok(cols) + }) +} + +#[cfg(test)] +mod test { + use std::cmp::Ordering; + use std::sync::Arc; + + use crate::{Extension, UserDefinedLogicalNodeCore}; + use datafusion_common::{DFSchema, DFSchemaRef}; + + use super::*; + + #[derive(Debug, PartialEq, Eq, Hash)] + struct MockUserDefinedLogicalPlan { + empty_schema: DFSchemaRef, + } + + impl PartialOrd for MockUserDefinedLogicalPlan { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } + } + + impl UserDefinedLogicalNodeCore for MockUserDefinedLogicalPlan { + fn name(&self) -> &str { + "MockUserDefinedLogicalPlan" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.empty_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "MockUserDefinedLogicalPlan") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + _inputs: Vec, + ) -> Result { + Ok(Self { + empty_schema: Arc::clone(&self.empty_schema), + }) + } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } + } + + #[test] + fn wont_fail_extension_plan() { + let plan = LogicalPlan::Extension(Extension { + node: Arc::new(MockUserDefinedLogicalPlan { + empty_schema: DFSchemaRef::new(DFSchema::empty()), + }), + }); + + check_inner_plan(&plan, true).unwrap(); + } +} diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index f0b1efded741..404941378663 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -21,7 +21,7 @@ pub mod display; pub mod dml; mod extension; pub(crate) mod invariants; -pub use invariants::assert_expected_schema; +pub use invariants::{assert_expected_schema, check_subquery_expr, InvariantLevel}; mod plan; mod statement; pub mod tree_node; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index eb5ed294e8e2..0bc8c5bcf264 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,7 +24,9 @@ use std::hash::{Hash, Hasher}; use std::sync::{Arc, LazyLock}; use super::dml::CopyTo; -use super::invariants::assert_unique_field_names; +use super::invariants::{ + assert_executable_invariants, assert_required_invariants, InvariantLevel, +}; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; @@ -1127,12 +1129,12 @@ impl LogicalPlan { } } - /// These are invariants to hold true for each logical plan. - pub fn assert_invariants(&self) -> Result<()> { - // Refer to - assert_unique_field_names(self)?; - - Ok(()) + /// checks that the plan conforms to the listed invariant level, returning an Error if not + pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => assert_required_invariants(self), + InvariantLevel::Executable => assert_executable_invariants(self), + } } /// Helper for [Self::with_new_exprs] to use when no expressions are expected. diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 680584b288a8..b845b44de617 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -24,18 +24,14 @@ use log::debug; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::expr::Exists; -use datafusion_expr::expr::InSubquery; use datafusion_expr::expr_rewriter::FunctionRewrite; -use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_expr::{InvariantLevel, LogicalPlan}; use crate::analyzer::count_wildcard_rule::CountWildcardRule; use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::inline_table_scan::InlineTableScan; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; -use crate::analyzer::subquery::check_subquery_expr; use crate::analyzer::type_coercion::TypeCoercion; use crate::utils::log_plan; @@ -46,9 +42,16 @@ pub mod expand_wildcard_rule; pub mod function_rewrite; pub mod inline_table_scan; pub mod resolve_grouping_function; -pub mod subquery; pub mod type_coercion; +pub mod subquery { + #[deprecated( + since = "44.0.0", + note = "please use `datafusion_expr::check_subquery_expr` instead" + )] + pub use datafusion_expr::check_subquery_expr; +} + /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. /// @@ -56,7 +59,7 @@ pub mod type_coercion; /// which must preserve the semantics of the `LogicalPlan`, while computing /// results in a more optimal way. /// -/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific +/// For example, an `AnalyzerRule` may resolve [`Expr`](datafusion_expr::Expr)s into more specific /// forms such as a subquery reference, or do type coercion to ensure the types /// of operands are correct. /// @@ -140,10 +143,10 @@ impl Analyzer { where F: FnMut(&LogicalPlan, &dyn AnalyzerRule), { - // verify at the start, before the first LP analyzer pass. - assert_valid_semantic_plan(&plan).map_err(|e| { + // verify the logical plan required invariants at the start, before analyzer + plan.check_invariants(InvariantLevel::Always).map_err(|e| { DataFusionError::Context( - "check_plan_before_analyzers".to_string(), + "assert_lp_invariants_before_analyzers".to_string(), Box::new(e), ) })?; @@ -176,52 +179,15 @@ impl Analyzer { observer(&new_plan, rule.as_ref()); } - // verify at the end, after the last LP analyzer pass. - assert_valid_semantic_plan(&new_plan).map_err(|e| { - DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e)) - })?; + // verify at the end, after the last LP analyzer pass, that the plan is executable. + new_plan + .check_invariants(InvariantLevel::Executable) + .map_err(|e| { + DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e)) + })?; log_plan("Final analyzed plan", &new_plan); debug!("Analyzer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } } - -/// These are invariants which should hold true before and after each analyzer. -/// -/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular -/// LogicalPlan is valid. Instead this address if the analyzer (before and after) -/// is valid based upon permitted changes. -/// -/// Does not check elements which are mutated by the analyzers (e.g. the schema). -fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> { - plan.assert_invariants()?; - - // TODO: should this be moved to LogicalPlan::assert_invariants? - assert_subqueries_are_valid(plan)?; - - Ok(()) -} - -/// Asserts that the subqueries are structured properly with valid node placement. -/// -/// Refer to [`check_subquery_expr`] for more details. -fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> { - plan.apply_with_subqueries(|plan: &LogicalPlan| { - plan.apply_expressions(|expr| { - // recursively look for subqueries - expr.apply(|expr| { - match expr { - Expr::Exists(Exists { subquery, .. }) - | Expr::InSubquery(InSubquery { subquery, .. }) - | Expr::ScalarSubquery(subquery) => { - check_subquery_expr(plan, &subquery.subquery, expr)?; - } - _ => {} - }; - Ok(TreeNodeRecursion::Continue) - }) - }) - }) - .map(|_| ()) -} diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs deleted file mode 100644 index 3addb9f95f5b..000000000000 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ /dev/null @@ -1,351 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::analyzer::assert_subqueries_are_valid; -use crate::utils::collect_subquery_cols; -use recursive::recursive; - -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{plan_err, Result}; -use datafusion_expr::expr_rewriter::strip_outer_reference; -use datafusion_expr::utils::split_conjunction; -use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window}; - -/// Do necessary check on subquery expressions and fail the invalid plan -/// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions, -/// the allowed while list: [Projection, Filter, Window, Aggregate, Join]. -/// 2) Check whether the inner plan is in the allowed inner plans list to use correlated(outer) expressions. -/// 3) Check and validate unsupported cases to use the correlated(outer) expressions inside the subquery(inner) plans/inner expressions. -/// For example, we do not want to support to use correlated expressions as the Join conditions in the subquery plan when the Join -/// is a Full Out Join -pub fn check_subquery_expr( - outer_plan: &LogicalPlan, - inner_plan: &LogicalPlan, - expr: &Expr, -) -> Result<()> { - assert_subqueries_are_valid(inner_plan)?; - if let Expr::ScalarSubquery(subquery) = expr { - // Scalar subquery should only return one column - if subquery.subquery.schema().fields().len() > 1 { - return plan_err!( - "Scalar subquery should only return one column, but found {}: {}", - subquery.subquery.schema().fields().len(), - subquery.subquery.schema().field_names().join(", ") - ); - } - // Correlated scalar subquery must be aggregated to return at most one row - if !subquery.outer_ref_columns.is_empty() { - match strip_inner_query(inner_plan) { - LogicalPlan::Aggregate(agg) => { - check_aggregation_in_scalar_subquery(inner_plan, agg) - } - LogicalPlan::Filter(Filter { input, .. }) - if matches!(input.as_ref(), LogicalPlan::Aggregate(_)) => - { - if let LogicalPlan::Aggregate(agg) = input.as_ref() { - check_aggregation_in_scalar_subquery(inner_plan, agg) - } else { - Ok(()) - } - } - _ => { - if inner_plan - .max_rows() - .filter(|max_row| *max_row <= 1) - .is_some() - { - Ok(()) - } else { - plan_err!( - "Correlated scalar subquery must be aggregated to return at most one row" - ) - } - } - }?; - match outer_plan { - LogicalPlan::Projection(_) - | LogicalPlan::Filter(_) => Ok(()), - LogicalPlan::Aggregate(Aggregate {group_expr, aggr_expr,..}) => { - if group_expr.contains(expr) && !aggr_expr.contains(expr) { - // TODO revisit this validation logic - plan_err!( - "Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions" - ) - } else { - Ok(()) - } - }, - _ => plan_err!( - "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" - ) - }?; - } - check_correlations_in_subquery(inner_plan) - } else { - if let Expr::InSubquery(subquery) = expr { - // InSubquery should only return one column - if subquery.subquery.subquery.schema().fields().len() > 1 { - return plan_err!( - "InSubquery should only return one column, but found {}: {}", - subquery.subquery.subquery.schema().fields().len(), - subquery.subquery.subquery.schema().field_names().join(", ") - ); - } - } - match outer_plan { - LogicalPlan::Projection(_) - | LogicalPlan::Filter(_) - | LogicalPlan::Window(_) - | LogicalPlan::Aggregate(_) - | LogicalPlan::Join(_) => Ok(()), - _ => plan_err!( - "In/Exist subquery can only be used in \ - Projection, Filter, Window functions, Aggregate and Join plan nodes, \ - but was used in [{}]", - outer_plan.display() - ), - }?; - check_correlations_in_subquery(inner_plan) - } -} - -// Recursively check the unsupported outer references in the sub query plan. -fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { - check_inner_plan(inner_plan, true) -} - -// Recursively check the unsupported outer references in the sub query plan. -#[recursive] -fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { - if !can_contain_outer_ref && inner_plan.contains_outer_reference() { - return plan_err!("Accessing outer reference columns is not allowed in the plan"); - } - // We want to support as many operators as possible inside the correlated subquery - match inner_plan { - LogicalPlan::Aggregate(_) => { - inner_plan.apply_children(|plan| { - check_inner_plan(plan, can_contain_outer_ref)?; - Ok(TreeNodeRecursion::Continue) - })?; - Ok(()) - } - LogicalPlan::Filter(Filter { input, .. }) => { - check_inner_plan(input, can_contain_outer_ref) - } - LogicalPlan::Window(window) => { - check_mixed_out_refer_in_window(window)?; - inner_plan.apply_children(|plan| { - check_inner_plan(plan, can_contain_outer_ref)?; - Ok(TreeNodeRecursion::Continue) - })?; - Ok(()) - } - LogicalPlan::Projection(_) - | LogicalPlan::Distinct(_) - | LogicalPlan::Sort(_) - | LogicalPlan::Union(_) - | LogicalPlan::TableScan(_) - | LogicalPlan::EmptyRelation(_) - | LogicalPlan::Limit(_) - | LogicalPlan::Values(_) - | LogicalPlan::Subquery(_) - | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Unnest(_) => { - inner_plan.apply_children(|plan| { - check_inner_plan(plan, can_contain_outer_ref)?; - Ok(TreeNodeRecursion::Continue) - })?; - Ok(()) - } - LogicalPlan::Join(Join { - left, - right, - join_type, - .. - }) => match join_type { - JoinType::Inner => { - inner_plan.apply_children(|plan| { - check_inner_plan(plan, can_contain_outer_ref)?; - Ok(TreeNodeRecursion::Continue) - })?; - Ok(()) - } - JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti - | JoinType::LeftMark => { - check_inner_plan(left, can_contain_outer_ref)?; - check_inner_plan(right, false) - } - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - check_inner_plan(left, false)?; - check_inner_plan(right, can_contain_outer_ref) - } - JoinType::Full => { - inner_plan.apply_children(|plan| { - check_inner_plan(plan, false)?; - Ok(TreeNodeRecursion::Continue) - })?; - Ok(()) - } - }, - LogicalPlan::Extension(_) => Ok(()), - _ => plan_err!("Unsupported operator in the subquery plan."), - } -} - -fn check_aggregation_in_scalar_subquery( - inner_plan: &LogicalPlan, - agg: &Aggregate, -) -> Result<()> { - if agg.aggr_expr.is_empty() { - return plan_err!( - "Correlated scalar subquery must be aggregated to return at most one row" - ); - } - if !agg.group_expr.is_empty() { - let correlated_exprs = get_correlated_expressions(inner_plan)?; - let inner_subquery_cols = - collect_subquery_cols(&correlated_exprs, agg.input.schema())?; - let mut group_columns = agg - .group_expr - .iter() - .map(|group| Ok(group.column_refs().into_iter().cloned().collect::>())) - .collect::>>()? - .into_iter() - .flatten(); - - if !group_columns.all(|group| inner_subquery_cols.contains(&group)) { - // Group BY columns must be a subset of columns in the correlated expressions - return plan_err!( - "A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns" - ); - } - } - Ok(()) -} - -fn strip_inner_query(inner_plan: &LogicalPlan) -> &LogicalPlan { - match inner_plan { - LogicalPlan::Projection(projection) => { - strip_inner_query(projection.input.as_ref()) - } - LogicalPlan::SubqueryAlias(alias) => strip_inner_query(alias.input.as_ref()), - other => other, - } -} - -fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result> { - let mut exprs = vec![]; - inner_plan.apply_with_subqueries(|plan| { - if let LogicalPlan::Filter(Filter { predicate, .. }) = plan { - let (correlated, _): (Vec<_>, Vec<_>) = split_conjunction(predicate) - .into_iter() - .partition(|e| e.contains_outer()); - - for expr in correlated { - exprs.push(strip_outer_reference(expr.clone())); - } - } - Ok(TreeNodeRecursion::Continue) - })?; - Ok(exprs) -} - -/// Check whether the window expressions contain a mixture of out reference columns and inner columns -fn check_mixed_out_refer_in_window(window: &Window) -> Result<()> { - let mixed = window - .window_expr - .iter() - .any(|win_expr| win_expr.contains_outer() && win_expr.any_column_refs()); - if mixed { - plan_err!( - "Window expressions should not contain a mixed of outer references and inner columns" - ) - } else { - Ok(()) - } -} - -#[cfg(test)] -mod test { - use std::cmp::Ordering; - use std::sync::Arc; - - use datafusion_common::{DFSchema, DFSchemaRef}; - use datafusion_expr::{Extension, UserDefinedLogicalNodeCore}; - - use super::*; - - #[derive(Debug, PartialEq, Eq, Hash)] - struct MockUserDefinedLogicalPlan { - empty_schema: DFSchemaRef, - } - - impl PartialOrd for MockUserDefinedLogicalPlan { - fn partial_cmp(&self, _other: &Self) -> Option { - None - } - } - - impl UserDefinedLogicalNodeCore for MockUserDefinedLogicalPlan { - fn name(&self) -> &str { - "MockUserDefinedLogicalPlan" - } - - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - - fn schema(&self) -> &DFSchemaRef { - &self.empty_schema - } - - fn expressions(&self) -> Vec { - vec![] - } - - fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "MockUserDefinedLogicalPlan") - } - - fn with_exprs_and_inputs( - &self, - _exprs: Vec, - _inputs: Vec, - ) -> Result { - Ok(Self { - empty_schema: Arc::clone(&self.empty_schema), - }) - } - - fn supports_limit_pushdown(&self) -> bool { - false // Disallow limit push-down by default - } - } - - #[test] - fn wont_fail_extension_plan() { - let plan = LogicalPlan::Extension(Extension { - node: Arc::new(MockUserDefinedLogicalPlan { - empty_schema: DFSchemaRef::new(DFSchema::empty()), - }), - }); - - check_inner_plan(&plan, true).unwrap(); - } -} diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 0899ea0dbd50..4a19bcd39838 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -21,8 +21,8 @@ use std::fmt::Debug; use std::sync::Arc; use chrono::{DateTime, Utc}; -use datafusion_expr::assert_expected_schema; use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::{assert_expected_schema, InvariantLevel}; use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; @@ -357,12 +357,13 @@ impl Optimizer { F: FnMut(&LogicalPlan, &dyn OptimizerRule), { // verify LP is valid, before the first LP optimizer pass. - plan.assert_invariants().map_err(|e| { - DataFusionError::Context( - "check_plan_before_optimizers".to_string(), - Box::new(e), - ) - })?; + plan.check_invariants(InvariantLevel::Executable) + .map_err(|e| { + DataFusionError::Context( + "check_plan_before_optimizers".to_string(), + Box::new(e), + ) + })?; let start_time = Instant::now(); let options = config.options(); @@ -463,12 +464,14 @@ impl Optimizer { } // verify LP is valid, after the last optimizer pass. - new_plan.assert_invariants().map_err(|e| { - DataFusionError::Context( - "check_plan_after_optimizers".to_string(), - Box::new(e), - ) - })?; + new_plan + .check_invariants(InvariantLevel::Executable) + .map_err(|e| { + DataFusionError::Context( + "check_plan_after_optimizers".to_string(), + Box::new(e), + ) + })?; log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); @@ -478,7 +481,7 @@ impl Optimizer { /// These are invariants which should hold true before and after each optimization. /// -/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular +/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular /// LogicalPlan is valid. Instead this address if the optimization (before and after) /// is valid based upon permitted changes. fn assert_valid_optimization( From fbc9c46b168c6a4ea608119380da1990c479019d Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 17 Dec 2024 15:10:14 -0800 Subject: [PATCH 08/17] test: update test for slight error message change --- datafusion/optimizer/src/optimizer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 4a19bcd39838..c191e4811213 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -549,7 +549,9 @@ mod tests { let err = opt.optimize(plan, &config, &observe).unwrap_err(); assert_eq!( "Optimizer rule 'get table_scan rule' failed\n\ - caused by\nget table_scan rule\ncaused by\n\ + caused by\ncheck_optimized_plan\n\ + caused by\nget table_scan rule\n\ + caused by\n\ Internal error: Failed due to a difference in schemas, \ original schema: DFSchema { inner: Schema { \ fields: [], \ From e52187e162965a51d7e00ae1cccd066a3c5dc034 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 17 Dec 2024 15:24:00 -0800 Subject: [PATCH 09/17] fix: push_down_filter optimization pass can push a IN() into a TableScan's filter clause --- datafusion/expr/src/logical_plan/invariants.rs | 3 ++- datafusion/sqllogictest/test_files/subquery.slt | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index d6cd889afc2b..7ffe6fee3356 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -210,12 +210,13 @@ pub fn check_subquery_expr( match outer_plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_) + | LogicalPlan::TableScan(_) | LogicalPlan::Window(_) | LogicalPlan::Aggregate(_) | LogicalPlan::Join(_) => Ok(()), _ => plan_err!( "In/Exist subquery can only be used in \ - Projection, Filter, Window functions, Aggregate and Join plan nodes, \ + Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, \ but was used in [{}]", outer_plan.display() ), diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 027b5ca8dcfb..e08389204461 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -438,7 +438,7 @@ SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t #subquery_not_allowed #In/Exist Subquery is not allowed in ORDER BY clause. -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] +statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int) #non_aggregated_correlated_scalar_subquery From ad1a1f8913e0a9e351d2fe15217297be4785ae28 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 23 Dec 2024 10:02:14 -0800 Subject: [PATCH 10/17] refactor: move collect_subquery_cols() to common utils crate --- .../expr/src/logical_plan/invariants.rs | 23 ++----------------- datafusion/expr/src/utils.rs | 20 +++++++++++++++- datafusion/optimizer/src/decorrelate.rs | 5 ++-- datafusion/optimizer/src/utils.rs | 17 -------------- 4 files changed, 24 insertions(+), 41 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 46aee72fb1af..49c4e87473f3 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -15,18 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::collections::BTreeSet; - use datafusion_common::{ plan_err, tree_node::{TreeNode, TreeNodeRecursion}, - Column, DFSchema, DFSchemaRef, DataFusionError, Result, + DFSchemaRef, DataFusionError, Result, }; use crate::{ expr::{Exists, InSubquery}, expr_rewriter::strip_outer_reference, - utils::split_conjunction, + utils::{collect_subquery_cols, split_conjunction}, Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window, }; @@ -381,23 +379,6 @@ fn check_mixed_out_refer_in_window(window: &Window) -> Result<()> { } } -fn collect_subquery_cols( - exprs: &[Expr], - subquery_schema: &DFSchema, -) -> Result> { - exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { - let mut using_cols: Vec = vec![]; - for col in expr.column_refs().into_iter() { - if subquery_schema.has_column(col) { - using_cols.push(col.clone()); - } - } - - cols.extend(using_cols); - Result::<_>::Ok(cols) - }) -} - #[cfg(test)] mod test { use std::cmp::Ordering; diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6f7c5d379260..2591d9e65aa4 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -18,7 +18,7 @@ //! Expression utilities use std::cmp::Ordering; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::ops::Deref; use std::sync::Arc; @@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } +/// Determine the set of [`Column`]s produced by the subquery. +pub fn collect_subquery_cols( + exprs: &[Expr], + subquery_schema: &DFSchema, +) -> Result> { + exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { + let mut using_cols: Vec = vec![]; + for col in expr.column_refs().into_iter() { + if subquery_schema.has_column(col) { + using_cols.push(col.clone()); + } + } + + cols.extend(using_cols); + Result::<_>::Ok(cols) + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index b5726d999137..ee6ea08b43bf 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -22,7 +22,6 @@ use std::ops::Deref; use std::sync::Arc; use crate::simplify_expressions::ExprSimplifier; -use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, @@ -30,7 +29,9 @@ use datafusion_common::tree_node::{ use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, ScalarValue}; use datafusion_expr::expr::Alias; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; +use datafusion_expr::utils::{ + collect_subquery_cols, conjunction, find_join_exprs, split_conjunction, +}; use datafusion_expr::{ expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan, LogicalPlanBuilder, Operator, diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 9f325bc01b1d..39f8cf285d17 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -87,23 +87,6 @@ pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet) -> == column_refs.len() } -pub(crate) fn collect_subquery_cols( - exprs: &[Expr], - subquery_schema: &DFSchema, -) -> Result> { - exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { - let mut using_cols: Vec = vec![]; - for col in expr.column_refs().into_iter() { - if subquery_schema.has_column(col) { - using_cols.push(col.clone()); - } - } - - cols.extend(using_cols); - Result::<_>::Ok(cols) - }) -} - pub(crate) fn replace_qualified_name( expr: Expr, cols: &BTreeSet, From 1164a7b4faa9826e55b1b42372d0ce6c163dee52 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 23 Dec 2024 10:53:30 -0800 Subject: [PATCH 11/17] refactor: clarify the purpose of assert_valid_optimization(), runs after all optimizer passes, except in debug mode it runs after each pass. --- .../expr/src/logical_plan/invariants.rs | 14 ++--- datafusion/optimizer/src/optimizer.rs | 52 +++++++++++++------ 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 49c4e87473f3..1b214582afc9 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -77,23 +77,15 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> { /// Returns an error if the plan does not have the expected schema. /// Ignores metadata and nullability. -pub fn assert_expected_schema( - rule_name: &str, - schema: &DFSchemaRef, - plan: &LogicalPlan, -) -> Result<()> { +pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> { let equivalent = plan.schema().equivalent_names_and_types(schema); if !equivalent { - let e = DataFusionError::Internal(format!( + Err(DataFusionError::Internal(format!( "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", schema, plan.schema() - )); - Err(DataFusionError::Context( - String::from(rule_name), - Box::new(e), - )) + ))) } else { Ok(()) } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index c191e4811213..bed4c3cb844c 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -360,7 +360,7 @@ impl Optimizer { plan.check_invariants(InvariantLevel::Executable) .map_err(|e| { DataFusionError::Context( - "check_plan_before_optimizers".to_string(), + "check_plan_is_executable before optimizers".to_string(), Box::new(e), ) })?; @@ -372,6 +372,8 @@ impl Optimizer { let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); + let starting_schema = Arc::clone(new_plan.schema()); + let mut i = 0; while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); @@ -384,6 +386,7 @@ impl Optimizer { .skip_failed_rules .then(|| new_plan.clone()); + #[cfg(debug_assertions)] let starting_schema = Arc::clone(new_plan.schema()); let result = match rule.apply_order() { @@ -395,14 +398,23 @@ impl Optimizer { None => optimize_plan_node(new_plan, rule.as_ref(), config), } .and_then(|tnr| { - // verify after each optimizer pass. - assert_valid_optimization(rule.name(), &tnr.data, &starting_schema) + // in debug mode, run checks are each optimer pass + #[cfg(debug_assertions)] + assert_valid_optimization(&tnr.data, &starting_schema) + .map_err(|e| { + DataFusionError::Context( + format!("check_optimizer_specific_invariants after optimizer pass: {}", rule.name()), + Box::new(e), + ) + })?; + #[cfg(debug_assertions)] + tnr.data.check_invariants(InvariantLevel::Executable) .map_err(|e| { - DataFusionError::Context( - "check_optimized_plan".to_string(), - Box::new(e), - ) - })?; + DataFusionError::Context( + format!("check_plan_is_executable after optimizer pass: {}", rule.name()), + Box::new(e), + ) + })?; Ok(tnr) }); @@ -463,12 +475,20 @@ impl Optimizer { i += 1; } + // verify that the optimizer passes only mutated what was permitted. + assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| { + DataFusionError::Context( + "check_optimizer_specific_invariants after all passes".to_string(), + Box::new(e), + ) + })?; + // verify LP is valid, after the last optimizer pass. new_plan .check_invariants(InvariantLevel::Executable) .map_err(|e| { DataFusionError::Context( - "check_plan_after_optimizers".to_string(), + "check_plan_is_executable after optimizers".to_string(), Box::new(e), ) })?; @@ -479,19 +499,17 @@ impl Optimizer { } } -/// These are invariants which should hold true before and after each optimization. +/// These are invariants which should hold true before and after [`LogicalPlan`] optimization. /// /// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular -/// LogicalPlan is valid. Instead this address if the optimization (before and after) -/// is valid based upon permitted changes. +/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes. fn assert_valid_optimization( - rule_name: &str, plan: &LogicalPlan, prev_schema: &Arc, ) -> Result<()> { - // verify invariant: optimizer rule didn't change the schema + // verify invariant: optimizer passes should not change the schema // Refer to - assert_expected_schema(rule_name, prev_schema, plan)?; + assert_expected_schema(prev_schema, plan)?; Ok(()) } @@ -549,8 +567,8 @@ mod tests { let err = opt.optimize(plan, &config, &observe).unwrap_err(); assert_eq!( "Optimizer rule 'get table_scan rule' failed\n\ - caused by\ncheck_optimized_plan\n\ - caused by\nget table_scan rule\n\ + caused by\n\ + check_optimizer_specific_invariants after optimizer pass: get table_scan rule\n\ caused by\n\ Internal error: Failed due to a difference in schemas, \ original schema: DFSchema { inner: Schema { \ From 7ad0b74ab5db38846b3f1a236de10d513bc21234 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 23 Dec 2024 22:58:27 -0800 Subject: [PATCH 12/17] refactor: based upon performance tests, run the maximum number of checks without impa ct: * assert_valid_optimization can run each optimizer pass * remove the recursive cehck_fields, which caused the performance regression * the full LP Invariants::Executable can only run in debug --- datafusion/expr/src/logical_plan/invariants.rs | 8 +------- datafusion/optimizer/src/optimizer.rs | 6 +++--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 1b214582afc9..92e43577ea46 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -59,13 +59,7 @@ pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { /// This invariant is subject to change. /// refer: fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { - plan.schema().check_names()?; - - plan.apply_with_subqueries(|plan: &LogicalPlan| { - plan.schema().check_names()?; - Ok(TreeNodeRecursion::Continue) - }) - .map(|_| ()) + plan.schema().check_names() } /// Returns an error if the plan is not sematically valid. diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index bed4c3cb844c..e0b32cfbbbe1 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -386,7 +386,6 @@ impl Optimizer { .skip_failed_rules .then(|| new_plan.clone()); - #[cfg(debug_assertions)] let starting_schema = Arc::clone(new_plan.schema()); let result = match rule.apply_order() { @@ -398,8 +397,7 @@ impl Optimizer { None => optimize_plan_node(new_plan, rule.as_ref(), config), } .and_then(|tnr| { - // in debug mode, run checks are each optimer pass - #[cfg(debug_assertions)] + // run checks optimizer invariant checks, per pass assert_valid_optimization(&tnr.data, &starting_schema) .map_err(|e| { DataFusionError::Context( @@ -407,6 +405,8 @@ impl Optimizer { Box::new(e), ) })?; + + // run LP invariant checks only in debug #[cfg(debug_assertions)] tnr.data.check_invariants(InvariantLevel::Executable) .map_err(|e| { From 911d4b824cc2a011d1fe39966f06a70f7ad5ae5c Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Dec 2024 14:30:00 -0800 Subject: [PATCH 13/17] chore: update error naming and terminology used in code comments --- datafusion/expr/src/logical_plan/invariants.rs | 6 +++--- datafusion/expr/src/logical_plan/plan.rs | 4 ++-- datafusion/optimizer/src/analyzer/mod.rs | 5 ++++- .../src/decorrelate_predicate_subquery.rs | 4 ++-- datafusion/optimizer/src/optimizer.rs | 14 +++++++------- .../optimizer/src/scalar_subquery_to_join.rs | 4 ++-- datafusion/sqllogictest/test_files/subquery.slt | 12 ++++++------ 7 files changed, 26 insertions(+), 23 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 92e43577ea46..c55d85fa0731 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -36,12 +36,12 @@ pub enum InvariantLevel { /// such as the type and number of function arguments are correct and /// that wildcards have been expanded /// - /// To ensure a LogicalPlan satisfies the `Executable` inariants, run the + /// To ensure a LogicalPlan satisfies the `Executable` invariants, run the /// `Analyzer` Executable, } -pub fn assert_required_invariants(plan: &LogicalPlan) -> Result<()> { +pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> { // Refer to assert_unique_field_names(plan)?; @@ -49,7 +49,7 @@ pub fn assert_required_invariants(plan: &LogicalPlan) -> Result<()> { } pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { - assert_required_invariants(plan)?; + assert_always_invariants(plan)?; assert_valid_semantic_plan(plan)?; Ok(()) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ddd35bf3f92d..41b6d351749e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, LazyLock}; use super::dml::CopyTo; use super::invariants::{ - assert_executable_invariants, assert_required_invariants, InvariantLevel, + assert_always_invariants, assert_executable_invariants, InvariantLevel, }; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; @@ -1133,7 +1133,7 @@ impl LogicalPlan { /// checks that the plan conforms to the listed invariant level, returning an Error if not pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> { match check { - InvariantLevel::Always => assert_required_invariants(self), + InvariantLevel::Always => assert_always_invariants(self), InvariantLevel::Executable => assert_executable_invariants(self), } } diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index b845b44de617..70554ba47160 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -183,7 +183,10 @@ impl Analyzer { new_plan .check_invariants(InvariantLevel::Executable) .map_err(|e| { - DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e)) + DataFusionError::Context( + "Invalid plan after Analyzer".to_string(), + Box::new(e), + ) })?; log_plan("Final analyzed plan", &new_plan); diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 7fdad5ba4b6e..437b9b180cdf 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -835,7 +835,7 @@ mod tests { .build()?; // Maybe okay if the table only has a single column? - let expected = "check_analyzed_plan\ + let expected = "Invalid plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column, but found 4"; assert_analyzer_check_err(vec![], plan, expected); @@ -930,7 +930,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index e0b32cfbbbe1..1fed1868c177 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -360,7 +360,7 @@ impl Optimizer { plan.check_invariants(InvariantLevel::Executable) .map_err(|e| { DataFusionError::Context( - "check_plan_is_executable before optimizers".to_string(), + "Invalid plan before LP Optimizers".to_string(), Box::new(e), ) })?; @@ -397,21 +397,21 @@ impl Optimizer { None => optimize_plan_node(new_plan, rule.as_ref(), config), } .and_then(|tnr| { - // run checks optimizer invariant checks, per pass + // run checks optimizer invariant checks, per optimizer rule applied assert_valid_optimization(&tnr.data, &starting_schema) .map_err(|e| { DataFusionError::Context( - format!("check_optimizer_specific_invariants after optimizer pass: {}", rule.name()), + format!("check_optimizer_specific_invariants after optimizer rule: {}", rule.name()), Box::new(e), ) })?; - // run LP invariant checks only in debug + // run LP invariant checks only in debug mode for performance reasons #[cfg(debug_assertions)] tnr.data.check_invariants(InvariantLevel::Executable) .map_err(|e| { DataFusionError::Context( - format!("check_plan_is_executable after optimizer pass: {}", rule.name()), + format!("check_plan_is_executable after optimizer rule: {}", rule.name()), Box::new(e), ) })?; @@ -488,7 +488,7 @@ impl Optimizer { .check_invariants(InvariantLevel::Executable) .map_err(|e| { DataFusionError::Context( - "check_plan_is_executable after optimizers".to_string(), + "Invalid plan after LP Optimizers".to_string(), Box::new(e), ) })?; @@ -568,7 +568,7 @@ mod tests { assert_eq!( "Optimizer rule 'get table_scan rule' failed\n\ caused by\n\ - check_optimizer_specific_invariants after optimizer pass: get table_scan rule\n\ + check_optimizer_specific_invariants after optimizer rule: get table_scan rule\n\ caused by\n\ Internal error: Failed due to a difference in schemas, \ original schema: DFSchema { inner: Schema { \ diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 2e2c8fb1d6f8..fa7068b1c5d3 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -731,7 +731,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); @@ -793,7 +793,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e08389204461..aad0c1e549e7 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -433,16 +433,16 @@ logical_plan 08)----------TableScan: t1 projection=[t1_int] #invalid_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 #subquery_not_allowed #In/Exist Subquery is not allowed in ORDER BY clause. -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int) #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_unique @@ -456,11 +456,11 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id = t1.t1_id) as t3_int from t #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_with_limit -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_with_single_row @@ -523,7 +523,7 @@ logical_plan 07)--TableScan: t1 projection=[t1_id] #aggregated_correlated_scalar_subquery_with_extra_group_by_columns -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns +statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1 #support_agg_correlated_columns From 810246d212499c7639d276fc2e5700781250f0c6 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Dec 2024 14:52:35 -0800 Subject: [PATCH 14/17] refactor: use proper error methods --- .../expr/src/logical_plan/invariants.rs | 8 ++--- datafusion/optimizer/src/analyzer/mod.rs | 23 ++++--------- datafusion/optimizer/src/optimizer.rs | 33 +++---------------- 3 files changed, 16 insertions(+), 48 deletions(-) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index c55d85fa0731..bb721c4b06ef 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -16,9 +16,9 @@ // under the License. use datafusion_common::{ - plan_err, + internal_err, plan_err, tree_node::{TreeNode, TreeNodeRecursion}, - DFSchemaRef, DataFusionError, Result, + DFSchemaRef, Result, }; use crate::{ @@ -75,11 +75,11 @@ pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Resul let equivalent = plan.schema().equivalent_names_and_types(schema); if !equivalent { - Err(DataFusionError::Internal(format!( + internal_err!( "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", schema, plan.schema() - ))) + ) } else { Ok(()) } diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 70554ba47160..813351bf3a98 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -24,7 +24,7 @@ use log::debug; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::Result; use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{InvariantLevel, LogicalPlan}; @@ -144,12 +144,8 @@ impl Analyzer { F: FnMut(&LogicalPlan, &dyn AnalyzerRule), { // verify the logical plan required invariants at the start, before analyzer - plan.check_invariants(InvariantLevel::Always).map_err(|e| { - DataFusionError::Context( - "assert_lp_invariants_before_analyzers".to_string(), - Box::new(e), - ) - })?; + plan.check_invariants(InvariantLevel::Always) + .map_err(|e| e.context("assert_lp_invariants_before_analyzers"))?; let start_time = Instant::now(); let mut new_plan = plan; @@ -172,9 +168,9 @@ impl Analyzer { // TODO add common rule executor for Analyzer and Optimizer for rule in rules { - new_plan = rule.analyze(new_plan, config).map_err(|e| { - DataFusionError::Context(rule.name().to_string(), Box::new(e)) - })?; + new_plan = rule + .analyze(new_plan, config) + .map_err(|e| e.context(rule.name()))?; log_plan(rule.name(), &new_plan); observer(&new_plan, rule.as_ref()); } @@ -182,12 +178,7 @@ impl Analyzer { // verify at the end, after the last LP analyzer pass, that the plan is executable. new_plan .check_invariants(InvariantLevel::Executable) - .map_err(|e| { - DataFusionError::Context( - "Invalid plan after Analyzer".to_string(), - Box::new(e), - ) - })?; + .map_err(|e| e.context("Invalid plan after Analyzer"))?; log_plan("Final analyzed plan", &new_plan); debug!("Analyzer took {} ms", start_time.elapsed().as_millis()); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1fed1868c177..3b4f434438cd 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -358,12 +358,7 @@ impl Optimizer { { // verify LP is valid, before the first LP optimizer pass. plan.check_invariants(InvariantLevel::Executable) - .map_err(|e| { - DataFusionError::Context( - "Invalid plan before LP Optimizers".to_string(), - Box::new(e), - ) - })?; + .map_err(|e| e.context("Invalid plan before LP Optimizers"))?; let start_time = Instant::now(); let options = config.options(); @@ -399,22 +394,12 @@ impl Optimizer { .and_then(|tnr| { // run checks optimizer invariant checks, per optimizer rule applied assert_valid_optimization(&tnr.data, &starting_schema) - .map_err(|e| { - DataFusionError::Context( - format!("check_optimizer_specific_invariants after optimizer rule: {}", rule.name()), - Box::new(e), - ) - })?; + .map_err(|e| e.context(format!("check_optimizer_specific_invariants after optimizer rule: {}", rule.name())))?; // run LP invariant checks only in debug mode for performance reasons #[cfg(debug_assertions)] tnr.data.check_invariants(InvariantLevel::Executable) - .map_err(|e| { - DataFusionError::Context( - format!("check_plan_is_executable after optimizer rule: {}", rule.name()), - Box::new(e), - ) - })?; + .map_err(|e| e.context(format!("check_plan_is_executable after optimizer rule: {}", rule.name())))?; Ok(tnr) }); @@ -477,21 +462,13 @@ impl Optimizer { // verify that the optimizer passes only mutated what was permitted. assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| { - DataFusionError::Context( - "check_optimizer_specific_invariants after all passes".to_string(), - Box::new(e), - ) + e.context("check_optimizer_specific_invariants after all passes") })?; // verify LP is valid, after the last optimizer pass. new_plan .check_invariants(InvariantLevel::Executable) - .map_err(|e| { - DataFusionError::Context( - "Invalid plan after LP Optimizers".to_string(), - Box::new(e), - ) - })?; + .map_err(|e| e.context("Invalid plan after LP Optimizers"))?; log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); From 9842d19b4baf36eca7d9bbedce45cb9a2f69ac6b Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Dec 2024 15:23:04 -0800 Subject: [PATCH 15/17] chore: more cleanup of error messages --- datafusion/optimizer/src/analyzer/mod.rs | 4 +- .../src/decorrelate_predicate_subquery.rs | 4 +- datafusion/optimizer/src/optimizer.rs | 14 ++++--- .../optimizer/src/scalar_subquery_to_join.rs | 4 +- .../sqllogictest/test_files/subquery.slt | 42 ++++++++++++++++--- 5 files changed, 50 insertions(+), 18 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 813351bf3a98..9d0ac6b54cf4 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -145,7 +145,7 @@ impl Analyzer { { // verify the logical plan required invariants at the start, before analyzer plan.check_invariants(InvariantLevel::Always) - .map_err(|e| e.context("assert_lp_invariants_before_analyzers"))?; + .map_err(|e| e.context("Invalid input plan passed to Analyzer"))?; let start_time = Instant::now(); let mut new_plan = plan; @@ -178,7 +178,7 @@ impl Analyzer { // verify at the end, after the last LP analyzer pass, that the plan is executable. new_plan .check_invariants(InvariantLevel::Executable) - .map_err(|e| e.context("Invalid plan after Analyzer"))?; + .map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?; log_plan("Final analyzed plan", &new_plan); debug!("Analyzer took {} ms", start_time.elapsed().as_millis()); diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 437b9b180cdf..500154090cc9 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -835,7 +835,7 @@ mod tests { .build()?; // Maybe okay if the table only has a single column? - let expected = "Invalid plan after Analyzer\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column, but found 4"; assert_analyzer_check_err(vec![], plan, expected); @@ -930,7 +930,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "Invalid plan after Analyzer\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 3b4f434438cd..f319191574cb 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -358,7 +358,7 @@ impl Optimizer { { // verify LP is valid, before the first LP optimizer pass. plan.check_invariants(InvariantLevel::Executable) - .map_err(|e| e.context("Invalid plan before LP Optimizers"))?; + .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?; let start_time = Instant::now(); let options = config.options(); @@ -394,12 +394,12 @@ impl Optimizer { .and_then(|tnr| { // run checks optimizer invariant checks, per optimizer rule applied assert_valid_optimization(&tnr.data, &starting_schema) - .map_err(|e| e.context(format!("check_optimizer_specific_invariants after optimizer rule: {}", rule.name())))?; + .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?; // run LP invariant checks only in debug mode for performance reasons #[cfg(debug_assertions)] tnr.data.check_invariants(InvariantLevel::Executable) - .map_err(|e| e.context(format!("check_plan_is_executable after optimizer rule: {}", rule.name())))?; + .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?; Ok(tnr) }); @@ -462,13 +462,15 @@ impl Optimizer { // verify that the optimizer passes only mutated what was permitted. assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| { - e.context("check_optimizer_specific_invariants after all passes") + e.context("Check optimizer-specific invariants after all passes") })?; // verify LP is valid, after the last optimizer pass. new_plan .check_invariants(InvariantLevel::Executable) - .map_err(|e| e.context("Invalid plan after LP Optimizers"))?; + .map_err(|e| { + e.context("Invalid (non-executable) plan after LP Optimizers") + })?; log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); @@ -545,7 +547,7 @@ mod tests { assert_eq!( "Optimizer rule 'get table_scan rule' failed\n\ caused by\n\ - check_optimizer_specific_invariants after optimizer rule: get table_scan rule\n\ + Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\ caused by\n\ Internal error: Failed due to a difference in schemas, \ original schema: DFSchema { inner: Schema { \ diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index fa7068b1c5d3..d656b7772951 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -731,7 +731,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "Invalid plan after Analyzer\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); @@ -793,7 +793,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "Invalid plan after Analyzer\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index aad0c1e549e7..e91ab55e58eb 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -433,17 +433,32 @@ logical_plan 08)----------TableScan: t1 projection=[t1_int] #invalid_scalar_subquery -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name +statement error SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name + #subquery_not_allowed #In/Exist Subquery is not allowed in ORDER BY clause. -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] +statement error SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int) +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in [Sort: t1.t1_int IN () ASC NULLS LAST] + #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1 +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: Correlated scalar subquery must be aggregated to return at most one row + #non_aggregated_correlated_scalar_subquery_unique query II rowsort @@ -456,12 +471,22 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id = t1.t1_id) as t3_int from t #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1 +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: Correlated scalar subquery must be aggregated to return at most one row + #non_aggregated_correlated_scalar_subquery_with_limit -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1 +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: Correlated scalar subquery must be aggregated to return at most one row + #non_aggregated_correlated_scalar_subquery_with_single_row query TT @@ -523,8 +548,13 @@ logical_plan 07)--TableScan: t1 projection=[t1_id] #aggregated_correlated_scalar_subquery_with_extra_group_by_columns -statement error DataFusion error: Invalid plan after Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns +statement error SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1 +---- +DataFusion error: Invalid (non-executable) plan after Analyzer +caused by +Error during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns + #support_agg_correlated_columns query TT From 9bca470fb2c66cec0577b683c69565a8b45a39ff Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Dec 2024 16:23:28 -0800 Subject: [PATCH 16/17] chore: handle option trailer to error message --- datafusion/optimizer/src/optimizer.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index f319191574cb..49bce3c1ce82 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -544,7 +544,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), }); let err = opt.optimize(plan, &config, &observe).unwrap_err(); - assert_eq!( + assert!(err.strip_backtrace().starts_with( "Optimizer rule 'get table_scan rule' failed\n\ caused by\n\ Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\ @@ -564,10 +564,8 @@ mod tests { ], \ metadata: {} }, \ field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \ - functional_dependencies: FunctionalDependencies { deps: [] } }.\n\ - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker", - err.strip_backtrace() - ); + functional_dependencies: FunctionalDependencies { deps: [] } }", + )); } #[test] From 529ac3ecac7baa1cd2314ca9089de5fda93a9181 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 26 Dec 2024 11:14:09 -0800 Subject: [PATCH 17/17] test: update sqllogictests tests to not use multiline --- .../sqllogictest/test_files/subquery.slt | 42 +++---------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e91ab55e58eb..25fe4c7b0390 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -433,32 +433,17 @@ logical_plan 08)----------TableScan: t1 projection=[t1_int] #invalid_scalar_subquery -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name - #subquery_not_allowed #In/Exist Subquery is not allowed in ORDER BY clause. -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int) ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in [Sort: t1.t1_int IN () ASC NULLS LAST] - #non_aggregated_correlated_scalar_subquery -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1 ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: Correlated scalar subquery must be aggregated to return at most one row - #non_aggregated_correlated_scalar_subquery_unique query II rowsort @@ -471,22 +456,12 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id = t1.t1_id) as t3_int from t #non_aggregated_correlated_scalar_subquery -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1 ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: Correlated scalar subquery must be aggregated to return at most one row - #non_aggregated_correlated_scalar_subquery_with_limit -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1 ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: Correlated scalar subquery must be aggregated to return at most one row - #non_aggregated_correlated_scalar_subquery_with_single_row query TT @@ -548,13 +523,8 @@ logical_plan 07)--TableScan: t1 projection=[t1_id] #aggregated_correlated_scalar_subquery_with_extra_group_by_columns -statement error +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1 ----- -DataFusion error: Invalid (non-executable) plan after Analyzer -caused by -Error during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns - #support_agg_correlated_columns query TT