Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Pluggable Execution Engine for Stream Processing #17501

Open
mch2 opened this issue Mar 4, 2025 · 1 comment
Open

RFC: Pluggable Execution Engine for Stream Processing #17501

mch2 opened this issue Mar 4, 2025 · 1 comment
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request RFC Issues requesting major changes Search:Performance

Comments

@mch2
Copy link
Member

mch2 commented Mar 4, 2025

Is your feature request related to a problem? Please describe

Overview and Motivation

With the introduction of arrow streams into OpenSearch we have been experimenting with the idea of an embeddable execution engine. The idea is to use Lucene for initial doc retrieval and filtering, and stream out doc values to a pluggable execution engine for analytics. The primary motivation of this is to introduce modern analytics capabilities to OpenSearch along with performance improvements particularly through vectorized processing of columnar data.

Describe the solution you'd like

I have been using rust based Apache DataFusion as the engine of choice for experiments, but the idea is to make the engine pluggable. I chose DataFusion particularly because it was easy to get off the ground, it uses arrow as its in-memory format, and is performant with efficient vectorized processing. It was also relatively simple to create vectors in the jvm and send them zero copy to/from DataFusion. Further its extensible architecture allows us to customize pretty much anything (language front-end/planning/execution etc), and use it for more than simply an execution engine.

Some additional benefits:

  • Reduced JVM Heap Pressure: With more pushed off heap JVM pressure is reduced. DataFusion can also limit memory pool size and spill to disk when necessary
  • External Data Integration: Built-in support for various file formats and remote locations, enabling unions between hot node data and external stores (e.g., Parquet files) with compatible schemas
  • Flexible Query Support - While not the main motivation, DataFusion is a full query engine. We can send sql or pre-built substrait plans built in java (ex. Calcite) to DF for execution.

Use Cases:
The three use cases I have been targeting for POC are (branches at bottom of this issue):

  1. Shard fan out - collate results from individual data node streams at the coordinator.
  2. Execute joins across multiple per index streams at the coordinator.
  3. Perform aggregations with improved efficiency - This is in line with this RFC for a memory efficient agg approach.

For implementation perspective this is what the execution flow looks like for terms aggregation in a POC:
Image

At the coordinator:
The Coordinator executes query phase as normal (not pictured), where each data node returns a stream ticket.

  1. The coordinator then allocates a DataFusion SessionContext & Runtime over JNI
  2. Coordinator executes the desired agg (1 and 2 are pictured separately but are a single JNI call in reality). This registers a TableProvider with a partition for each shard and creates a logical plan (DataFrame).
  3. On Registration getFlightInfo is invoked fetching stream metadata (schema) and endpoints.
  4. executeStream is then invoked on the DataFrame, which eventually invokes getStream from the data node’s flight server to return record batches.

On the data node - This resembles today's approach where aggregators return per-shard results, but with streaming to reduce memory requirements.

  1. Similar necessary DF context & runtime are allocated.
  2. Collector writes docValues vectors up to a certain batch size.
  3. The VectorSchemaRoot on the batch (VSR pictured) is exported to DF via CData interface. The reason for two VSR’s here is the input vector has a different schema than the output in this case. DF then aggregates the batch.
  4. Once a batch has been processed aggregated results are then written out to the result vector and streamed back to the coordinator.

I’ve benchmarked a couple of experiments for aggregations and have seen favorable results in both latency & memory used - Cluster using DF in green - 3 data nodes + 1 dedicated coordinator.

Image

Benchmark - Big5 keyword-terms operation.

I will follow up here with a few more benchmarks shortly using the new red line feature in OSB, but wanted to get this out there to see what people think?

Related:
Join Support: #15185
Streaming aggs - #16774
Pluggable Storage Engine Support - #17341 (comment)
POC branches - aggregations (term) - https://github.com/mch2/OpenSearch/commits/df-streaming-aggs/
joins - https://github.com/mch2/OpenSearch/commits/mch2-rishma-join

Related component

Search:Performance

Describe alternatives you've considered

not do this and rely on pure java implementations?

Additional context

No response

@mch2 mch2 added discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request RFC Issues requesting major changes untriaged labels Mar 4, 2025
@mch2 mch2 changed the title RFC: Embeddable Query Engine for Stream Processing RFC: Pluggable Query Engine for Stream Processing Mar 4, 2025
@mch2 mch2 changed the title RFC: Pluggable Query Engine for Stream Processing RFC: Pluggable Execution Engine for Stream Processing Mar 4, 2025
@ViggoC
Copy link
Contributor

ViggoC commented Mar 6, 2025

Hi @mch2, I'm so exciting to see this proposal, it will elevate the analytics capabilities of Opensearch to a new level.
But I'm kind of confused about some parts in this RFC. Could you explain more?

The Coordinator executes query phase as normal (not pictured), where each data node returns a stream ticket.

  1. Why do we need to do it in a query then fetch way

executeStream is then invoked on the DataFrame, which eventually invokes getStream from the data node’s flight server to return record batches.

  1. IIUC, the distributed plan is executed in "pull mode", the data node will not start working until the coordinator requests data from it, right? And at what phase is the DF Context initialized? query phase or fetch phase?

Benchmark - Big5 keyword-terms operation.

  1. How to read the benchmark result, what's the meaning of X/Y axises?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request RFC Issues requesting major changes Search:Performance
Projects
Status: New
Status: 🆕 New
Development

No branches or pull requests

3 participants