Skip to content

Commit

Permalink
fix: do not block tokio runtime during optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Mar 19, 2024
1 parent 5965d67 commit 863e90a
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use sqlparser::ast::NullTreatment;
use tokio::task::JoinSet;

fn create_function_physical_name(
fun: &str,
Expand Down Expand Up @@ -465,7 +466,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let plan = self
.create_initial_plan(logical_plan, session_state)
.await?;
self.optimize_internal(plan, session_state, |_, _| {})
self.optimize_internal(plan, session_state, |_, _| {}).await
}
}
}
Expand Down Expand Up @@ -1818,19 +1819,17 @@ impl DefaultPhysicalPlanner {
);
}

let optimized_plan = self.optimize_internal(
input,
session_state,
|plan, optimizer| {
let optimized_plan = self
.optimize_internal(input, session_state, |plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans.push(
displayable(plan)
.set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, plan_type),
);
},
);
})
.await;
match optimized_plan {
Ok(input) => {
// This plan will includes statistics if show_statistics is on
Expand Down Expand Up @@ -1887,7 +1886,7 @@ impl DefaultPhysicalPlanner {

/// Optimize a physical plan by applying each physical optimizer,
/// calling observer(plan, optimizer after each one)
fn optimize_internal<F>(
async fn optimize_internal<F>(
&self,
plan: Arc<dyn ExecutionPlan>,
session_state: &SessionState,
Expand All @@ -1906,14 +1905,27 @@ impl DefaultPhysicalPlanner {
displayable(plan.as_ref()).indent(true)
);

let config_options = Arc::new(session_state.config_options().clone());
let mut tasks = JoinSet::new();

let mut new_plan = plan;
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())

let optimizer_captured = Arc::clone(optimizer);
let config_options_captured = Arc::clone(&config_options);
tasks.spawn_blocking(move || {
optimizer_captured.optimize(new_plan, &config_options_captured)
});
new_plan = tasks
.join_next()
.await
.expect("just added task")
.map_err(|e| DataFusionError::Internal(e.to_string()))?
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;

if optimizer.schema_check() && new_plan.schema() != before_schema {
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
Expand Down

0 comments on commit 863e90a

Please sign in to comment.