Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepared physical plan reusage #14342

Open
askalt opened this issue Jan 28, 2025 · 4 comments
Open

Prepared physical plan reusage #14342

askalt opened this issue Jan 28, 2025 · 4 comments

Comments

@askalt
Copy link
Contributor

askalt commented Jan 28, 2025

Problem

While implementing the saving of prepared statements in our storage based on the DataFusion, we encountered the following issue:

It is inefficient to save the logical plan and rebuild the physical plan from it at execution time.
The physical planner and optimizer are quite heavy, and for large queries with many columns, building the physical plan can take 100-200 ms.

After reading the optimizer's code, I realized that it includes a lot of complex logic, which is generally quite difficult to optimize. Moreover, the time required for plan construction is an uncontrollable variable, as the optimizations can be arbitrarily complex in order to produce a good plan.

The current API does not allow reusing physical plans for multiple executions due to the following reasons:

  1. Metrics. Since metrics are stored within the plans, streams are forced to share a pointer to the same metrics during execution. If physical plans are reused, streams will compete for writing to the metrics, which leads to non-scalability.

  2. Lack of physical placeholders. This makes it impossible to substitute parameters during the execute(...) stage.

Possible solution

To address these issues, I created experimental patches that:

  1. Move metric storage out of Execution Plans into TaskContext. The set of metrics is associated with a node of the ExecutionPlan, for example, metrics can be associated with ProjectionExec.

  2. Introduce physical placeholders implemented as a PhysicalExpr trait. These placeholders are resolved at the execute(...) stage, fetching parameters from the TaskContext.

The experimental patches can be found here:

https://github.com/tarantool/datafusion/commits/askalt/physical-placehdolders/

Challenges related to physical placeholders

For example, since the values of placeholders are not known during the optimizer passes, certain optimizations that depend on these values cannot be performed. In some cases, rebuilding the plan based on the parameters could be beneficial.

These issues can either be addressed by the user or resolved directly in DataFusion at a later stage.

In total

I’m creating this issue to discuss:

  1. How do you view the problems mentioned above?
  2. Could the experimental patches be useful for upstream (I’m ready to refine them if needed)?
@askalt
Copy link
Contributor Author

askalt commented Jan 28, 2025

About physical placeholders challenges. For example, postgresql uses the following heuristic to choose "use generic plan" or "rebuild plan with inlined parameters when they become known"

https://github.com/postgres/postgres/blob/master/src/backend/utils/cache/plancache.c#L1045-L1093

@alamb
Copy link
Contributor

alamb commented Jan 30, 2025

It is inefficient to save the logical plan and rebuild the physical plan from it at execution time.
100%

One thing that might be quite a bit faster than replanning the entire query could be to make a (deep) copy of the plan via https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.with_new_children

Here is an example that @jonahgao did when re-starting a recursive query

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
// WorkTableExec's states have already been updated correctly.
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
} else {
let new_plan = Arc::clone(&plan)
.with_new_children(plan.children().into_iter().cloned().collect())?;
Ok(Transformed::yes(new_plan))
}
})
.data()
}

I believe this will also reset the metrics.

The physical planner and optimizer are quite heavy, and for large queries with many columns, building the physical plan can take 100-200 ms.

BTW if you have examples of such queries it would be great to add them to our planning benchmarks:

Introduce physical placeholders implemented as a PhysicalExpr trait. These placeholders are resolved at the execute(...) stage, fetching parameters from the TaskContext.

This sounds like a reasonable idea -- however some things like expression simplification only happen in LogicalPlans so if you defer placeholder resolution as you mention some optimizations might not work as well.

As long as there is a way to choose the behavior (early replacement or deferred replacement) I think that would be good

@askalt
Copy link
Contributor Author

askalt commented Jan 30, 2025

Yes, we can deep copy a physical plan to solve the problem with metrics. But this is not a solution for me, because physical plan can hold too much resources in the heap (for example, ProjectionExec keeps all projected column names for now), so every deep copy requires too many allocations. If the query contains thousands columns -- it is noticeable.

So, what do you think about an idea to extract metrics to the some place like TaskContext and associate them with physical plan nodes some way?

BTW if you have examples of such queries it would be great to add them to our planning benchmarks:

Nice idea, I'll take it up.

As long as there is a way to choose the behavior (early replacement or deferred replacement) I think that would be good

Yes, exactly, I don't want to force the user to use physical placeholders, just add the ability to have them in the plan and resolve them from context when the execution begins.

@alamb
Copy link
Contributor

alamb commented Jan 30, 2025

So, what do you think about an idea to extract metrics to the some place like TaskContext and associate them with physical plan nodes some way?

I think this will be challenging and it isn't clear to me how it would work. It seems like it would mean when you call execute you'll have to update the PhysicalPlan anyways with the new metrics. And if you have to modify the plan nodes anyways moving the data to another structure doesn't seem to help, but I may misunderstand

Yes, we can deep copy a physical plan to solve the problem with metrics. But this is not a solution for me, because physical plan can hold too much resources in the heap (for example, ProjectionExec keeps all projected column names for now), so every deep copy requires too many allocations. If the query contains thousands columns -- it is noticeable.

It seems like this field is Arc'dd so cloning it should not require any allocations:

pub(crate) expr: Vec<(Arc<dyn PhysicalExpr>, String)>,

Maybe we can apply the same Arc idea to other expensive fields (like maybe the plan properties is expensive to clone 🤔 )

Another possibility might be to add a method like the following to the ExecutionPlan:

trait ExecutionPlan {
  fn reset_metrics(&mut self) -> Result<()>{ not_implemented_err!("Plan does not implement resetting metrics"}
  ...
}

Since execution plans are Arcs you woudl likely have to do something like https://doc.rust-lang.org/std/sync/struct.Arc.html#method.unwrap_or_clone and walk up the tree

Another possibility would be to Arc more of the ProjectionExec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants