Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update readme & examples #15

Merged
merged 2 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 33 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,56 @@ Currently, the Spark Connect client for Rust is **highly experimental** and **sh
not be used in any production setting**. This is currently a "proof of concept" to identify the methods
of interacting with Spark cluster from rust.

## Quick Start

The `spark-connect-rs` aims to provide an entrypoint to [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html), and provide *similar* DataFrame API interactions.

## Getting Started

This section explains how run Spark Connect Rust locally starting from 0.

**Step 1**: Install rust via rustup: https://www.rust-lang.org/tools/install

**Step 2**: Ensure you have a [cmake](https://cmake.org/download/) and [protobuf](https://grpc.io/docs/protoc-installation/) installed on your machine

**Step 3**: Run the following commands to clone the repo

```bash
docker compose up --build -d
```
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive

```rust
use spark_connect_rs;

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;

df.filter("salary >= 3500")
.select(F::col("name"))
.show(Some(5), None, None)
.await?;

// +-------------+
// | show_string |
// +-------------+
// | +------+ |
// | |name | |
// | +------+ |
// | |Andy | |
// | |Justin| |
// | |Berta | |
// | +------+ |
// | |
// +-------------+

Ok(())
}
cargo build
```

## Getting Started
**Step 4**: Setup the Spark Driver on localhost either by downloading spark or with [docker](https://docs.docker.com/engine/install/).

With local spark:

1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package.

2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):

```
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
```

With docker:

1. Start the Spark Connect server by leveraging the created `docker-compose.yml` in this repo. This will start a Spark Connect Server running on port 15002

```bash
docker compose up --build -d
```

**Step 5**: Run an example from the repo under `/examples`

cargo build && cargo test
```bash
cargo run --example sql
```

## Features

The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation.

- ![done] TLS authentication & Databricks compatability
- ![done] TLS authentication & Databricks compatability via the feature flag `feature = 'tls'`
- ![open] StreamingQueryManager
- ![open] Window and ~~Pivot~~ functions
- ![open] UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.)
Expand Down
8 changes: 5 additions & 3 deletions examples/databricks.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
// This example demonstrates connecting to a Databricks Cluster via a
// tls connection.
// This example demonstrates connecting to a Databricks Cluster via a tls connection.
//
// This demo requires access to a Databricks Workspace, a personal access token,
// and a cluster id. The cluster should be running a 13.3LTS runtime or greater. Populate
// the remote URL string between the `<>` with the appropriate details.
//
// The Databricks workspace instance name is the same as the Server Hostname value for your cluster.
// Get connection details for a Databricks compute resource via https://docs.databricks.com/en/integrations/compute-details.html
//
// To view the connected Spark Session, go to the cluster Spark UI and select the 'Connect' tab.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://<workspace id>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>")
let spark: SparkSession = SparkSessionBuilder::remote("sc://<workspace instance name>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>")
.build()
.await?;

Expand Down
4 changes: 3 additions & 1 deletion examples/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use spark_connect_rs::dataframe::SaveMode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

Expand Down
7 changes: 4 additions & 3 deletions examples/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// printing the results as "show(...)"

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// printing the results as "show(...)"
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/readstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{thread, time};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;

Expand Down
53 changes: 29 additions & 24 deletions examples/sql.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
use spark_connect_rs;
// This example demonstrates creating a Spark DataFrame from a SQL command
// and saving the results as a parquet and reading the new parquet file

use spark_connect_rs::dataframe::SaveMode;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};

// This example demonstrates creating a Spark DataFrame from a SQL command
// and leveraging &str input for `filter` and `select` to change the dataframe
// Displaying the results as "show(...)"
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;

df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;

// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;

df.show(Some(100), None, None).await?;

// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+

Ok(())
}
4 changes: 3 additions & 1 deletion examples/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use spark_connect_rs::dataframe::SaveMode;
// then reading the csv file back
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.clone()
Expand Down
Loading