Skip to content

Latest commit

 

History

History
195 lines (139 loc) · 7.47 KB

README.md

File metadata and controls

195 lines (139 loc) · 7.47 KB

Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

logo

Ballista is a distributed query execution engine that enhances Apache DataFusion by enabling the parallelized execution of workloads across multiple nodes in a distributed environment.

Existing DataFusion application:

use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // datafusion context
  let ctx = SessionContext::new();

  // register the table
  ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

  // create a plan to run a SQL query
  let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

  // execute and print results
  df.show().await?;
  Ok(())
}

can be distributed with few lines changed:

use ballista::prelude::*;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // create SessionContext with ballista support
    // standalone context will start all required
    // ballista infrastructure in the background as well
    let ctx = SessionContext::standalone().await?;

    // everything else remains the same

    // register the table
    ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
        .await?;

    // create a plan to run a SQL query
    let df = ctx
        .sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
        .await?;

    // execute and print results
    df.show().await?;
    Ok(())
}

Starting a cluster

architecture

A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

cargo install --locked ballista-scheduler
cargo install --locked ballista-executor

With these crates installed, it is now possible to start a scheduler process.

RUST_LOG=info ballista-scheduler

The scheduler will bind to port 50050 by default.

Next, start an executor processes in a new terminal session with the specified concurrency level.

RUST_LOG=info ballista-executor -c 4

The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port.

For full documentation, refer to the deployment section of the Ballista User Guide

Executing a Query

Ballista provides a custom SessionContext as a starting point for creating queries. DataFrames can be created by invoking the read_csv, read_parquet, and sql methods.

To build a simple ballista example, run the following command to add the dependencies to your Cargo.toml file:

cargo add ballista datafusion tokio
use ballista::prelude::*;
use datafusion::common::Result;
use datafusion::prelude::{col, SessionContext, ParquetReadOptions};
use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, average::avg};

#[tokio::main]
async fn main() -> Result<()> {

    // connect to Ballista scheduler
    let ctx = SessionContext::remote("df://localhost:50050").await?;

    let filename = "testdata/yellow_tripdata_2022-01.parquet";

    // define the query using the DataFrame trait
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .select_columns(&["passenger_count", "fare_amount"])?
        .aggregate(
            vec![col("passenger_count")],
            vec![
                min(col("fare_amount")),
                max(col("fare_amount")),
                avg(col("fare_amount")),
                sum(col("fare_amount")),
            ],
        )?
        .sort(vec![col("passenger_count").sort(true, true)])?;

    df.show().await?;

    Ok(())
}

The output should look similar to the following table.

+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
|                 | -159.5                   | 285.2                    | 17.60577640099004        | 1258865.829999991        |
| 0               | -115                     | 500                      | 11.794859107585335       | 614052.1600000001        |
| 1               | -480                     | 401092.32                | 12.61028389876563        | 22623542.879999973       |
| 2               | -250                     | 640.5                    | 13.79501011585127        | 4732047.139999998        |
| 3               | -130                     | 480                      | 13.473184817311106       | 1139427.2400000002       |
| 4               | -250                     | 464                      | 14.232650547832726       | 502711.4499999997        |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+

More examples can be found in the arrow-ballista repository.

Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.

These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 100 (100 GB) on a single node with a single executor and 8 concurrent tasks.

Overall Speedup

The overall speedup is 2.9x

benchmarks

Per Query Comparison

benchmarks

Relative Speedup

benchmarks

Absolute Speedup

benchmarks