Skip to content

Commit

Permalink
fix: do not block tokio during logical optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Apr 18, 2024
1 parent 4fed5a0 commit 920a6f7
Showing 1 changed file with 85 additions and 8 deletions.
93 changes: 85 additions & 8 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use tokio::task::JoinSet;
use url::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -1840,13 +1841,90 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
struct Cfg {
query_execution_start_time: DateTime<Utc>,
alias_generator: Arc<AliasGenerator>,
options: ConfigOptions,
}

impl OptimizerConfig for Cfg {
fn query_execution_start_time(&self) -> DateTime<Utc> {
self.query_execution_start_time
}

fn alias_generator(&self) -> Arc<AliasGenerator> {
Arc::clone(&self.alias_generator)
}

fn options(&self) -> &ConfigOptions {
&self.options
}
}

let optimizer_cfg = Cfg {
query_execution_start_time: self.query_execution_start_time(),
alias_generator: self.alias_generator(),
options: self.options().clone(),
};
Self::optimize_blocking(&self.analyzer, &self.optimizer, &optimizer_cfg, plan)
}

/// Optimizes the logical plan by applying optimizer rules.
async fn optimize_async(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
struct Cfg {
query_execution_start_time: DateTime<Utc>,
alias_generator: Arc<AliasGenerator>,
options: ConfigOptions,
}

impl OptimizerConfig for Cfg {
fn query_execution_start_time(&self) -> DateTime<Utc> {
self.query_execution_start_time
}

fn alias_generator(&self) -> Arc<AliasGenerator> {
Arc::clone(&self.alias_generator)
}

fn options(&self) -> &ConfigOptions {
&self.options
}
}

let analyzer = self.analyzer.clone();
let optimizer = self.optimizer.clone();
let optimizer_cfg = Cfg {
query_execution_start_time: self.query_execution_start_time(),
alias_generator: self.alias_generator(),
options: self.options().clone(),
};
let plan = plan.clone();

let mut tasks = JoinSet::new();
tasks.spawn_blocking(move || {
Self::optimize_blocking(&analyzer, &optimizer, &optimizer_cfg, &plan)
});
tasks
.join_next()
.await
.expect("just launched task")
.map_err(|e| DataFusionError::Internal(e.to_string()))?
}

/// Optimizes the logical plan by applying optimizer rules.
fn optimize_blocking(
analyzer: &Analyzer,
optimizer: &Optimizer,
optimizer_cfg: &dyn OptimizerConfig,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// analyze & capture output of each rule
let analyzer_result = self.analyzer.execute_and_check(
let analyzer_result = analyzer.execute_and_check(
e.plan.as_ref(),
self.options(),
optimizer_cfg.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
Expand Down Expand Up @@ -1876,9 +1954,9 @@ impl SessionState {
.push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));

// optimize the child plan, capturing the output of each optimizer
let optimized_plan = self.optimizer.optimize(
let optimized_plan = optimizer.optimize(
&analyzed_plan,
self,
optimizer_cfg,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1905,9 +1983,8 @@ impl SessionState {
}))
} else {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
analyzer.execute_and_check(plan, optimizer_cfg.options(), |_, _| {})?;
optimizer.optimize(&analyzed_plan, optimizer_cfg, |_, _| {})
}
}

Expand All @@ -1922,7 +1999,7 @@ impl SessionState {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.optimize(logical_plan)?;
let logical_plan = self.optimize_async(logical_plan).await?;
self.query_planner
.create_physical_plan(&logical_plan, self)
.await
Expand Down

0 comments on commit 920a6f7

Please sign in to comment.