diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 31f390607f04..3b77a1e9faa8 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -84,6 +84,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use parking_lot::RwLock; use sqlparser::dialect::dialect_from_str; +use tokio::task::JoinSet; use url::Url; use uuid::Uuid; @@ -1840,13 +1841,90 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { + struct Cfg { + query_execution_start_time: DateTime, + alias_generator: Arc, + options: ConfigOptions, + } + + impl OptimizerConfig for Cfg { + fn query_execution_start_time(&self) -> DateTime { + self.query_execution_start_time + } + + fn alias_generator(&self) -> Arc { + Arc::clone(&self.alias_generator) + } + + fn options(&self) -> &ConfigOptions { + &self.options + } + } + + let optimizer_cfg = Cfg { + query_execution_start_time: self.query_execution_start_time(), + alias_generator: self.alias_generator(), + options: self.options().clone(), + }; + Self::optimize_blocking(&self.analyzer, &self.optimizer, &optimizer_cfg, plan) + } + + /// Optimizes the logical plan by applying optimizer rules. + async fn optimize_async(&self, plan: &LogicalPlan) -> Result { + struct Cfg { + query_execution_start_time: DateTime, + alias_generator: Arc, + options: ConfigOptions, + } + + impl OptimizerConfig for Cfg { + fn query_execution_start_time(&self) -> DateTime { + self.query_execution_start_time + } + + fn alias_generator(&self) -> Arc { + Arc::clone(&self.alias_generator) + } + + fn options(&self) -> &ConfigOptions { + &self.options + } + } + + let analyzer = self.analyzer.clone(); + let optimizer = self.optimizer.clone(); + let optimizer_cfg = Cfg { + query_execution_start_time: self.query_execution_start_time(), + alias_generator: self.alias_generator(), + options: self.options().clone(), + }; + let plan = plan.clone(); + + let mut tasks = JoinSet::new(); + tasks.spawn_blocking(move || { + Self::optimize_blocking(&analyzer, &optimizer, &optimizer_cfg, &plan) + }); + tasks + .join_next() + .await + .expect("just launched task") + .map_err(|e| DataFusionError::Internal(e.to_string()))? + } + + /// Optimizes the logical plan by applying optimizer rules. + fn optimize_blocking( + analyzer: &Analyzer, + optimizer: &Optimizer, + optimizer_cfg: &dyn OptimizerConfig, + plan: &LogicalPlan, + ) -> Result { if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); // analyze & capture output of each rule - let analyzer_result = self.analyzer.execute_and_check( + let analyzer_result = analyzer.execute_and_check( e.plan.as_ref(), - self.options(), + optimizer_cfg.options(), |analyzed_plan, analyzer| { let analyzer_name = analyzer.name().to_string(); let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name }; @@ -1876,9 +1954,9 @@ impl SessionState { .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan)); // optimize the child plan, capturing the output of each optimizer - let optimized_plan = self.optimizer.optimize( + let optimized_plan = optimizer.optimize( &analyzed_plan, - self, + optimizer_cfg, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; @@ -1905,9 +1983,8 @@ impl SessionState { })) } else { let analyzed_plan = - self.analyzer - .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + analyzer.execute_and_check(plan, optimizer_cfg.options(), |_, _| {})?; + optimizer.optimize(&analyzed_plan, optimizer_cfg, |_, _| {}) } } @@ -1922,7 +1999,7 @@ impl SessionState { &self, logical_plan: &LogicalPlan, ) -> Result> { - let logical_plan = self.optimize(logical_plan)?; + let logical_plan = self.optimize_async(logical_plan).await?; self.query_planner .create_physical_plan(&logical_plan, self) .await