Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add spark-scala-quickstart #148

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ This repository contains code and documentation for use with
[Google Cloud Dataproc](https://cloud.google.com/dataproc/).

## Samples in this Repository
* `codelabs/opencv-haarcascade` provides the source code for the [OpenCV Dataproc Codelab](https://codelabs.developers.google.com/codelabs/cloud-dataproc-opencv/index.html), which demonstrates a Spark job that adds facial detection to a set of images.
* `codelabs/opencv-haarcascade` provides the source code for the [OpenCV Dataproc Codelab](https://codelabs.developers.google.com/codelabs/cloud-dataproc-opencv/index.html), which demonstrates a Spark job that adds facial detection to a set of images.
* `codelabs/spark-bigquery` provides the source code for the [PySpark for Preprocessing BigQuery Data Codelab](https://codelabs.developers.google.com/codelabs/pyspark-bigquery/index.html), which demonstrates using PySpark on Cloud Dataproc to process data from BigQuery.
* `codelabs/spark-nlp` provides the source code for the [PySpark for Natural Language Processing Codelab](https://codelabs.developers.google.com/codelabs/spark-nlp/index.html), which demonstrates using [spark-nlp](https://github.com/JohnSnowLabs/spark-nlp) library for Natural Language Processing.
* `notebooks/python` provides example Jupyter notebooks to demonstrate using PySpark with the [BigQuery Storage Connector](https://github.com/GoogleCloudPlatform/spark-bigquery-connector) and the [Spark GCS Connector](https://github.com/GoogleCloudPlatform/bigdata-interop/tree/master/gcs)
* `spark-tensorflow` provides an example of using Spark as a preprocessing toolchain for Tensorflow jobs. Optionally,
* `spark-scala-quickstart` provides examples and technical guides to run *Spark jobs written in Scala* in Dataproc and orchestrate them with Composer.
* `spark-tensorflow` provides an example of using Spark as a preprocessing toolchain for Tensorflow jobs. Optionally,
it demonstrates the [spark-tensorflow-connector](https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-connector) to convert CSV files to TFRecords.
* `spark-translate` provides a simple demo Spark application that translates words using Google's Translation API and running on Cloud Dataproc.
* `spark-translate` provides a simple demo Spark application that translates words using Google's Translation API and running on Cloud Dataproc.

See each directories README for more information.

Expand Down
56 changes: 56 additions & 0 deletions spark-scala-quickstart/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Intellij idea
*.log
*.iml
*.ipr
*.iws
.idea

# Eclipse
build/
.classpath
.project
.settings
.worksheet
bin/
.cache

# sbt
.bsp/
project/
target/
dataproc/*/project/
dataproc/*/target/

# Scala
*.class

# Mac
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
98 changes: 98 additions & 0 deletions spark-scala-quickstart/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Dataproc - Spark Scala Quickstart

**Dataproc - Spark Scala Quickstart** is an effort to assist in the creation of Spark jobs written in Scala to run on [Dataproc](https://cloud.google.com/dataproc). Google is providing different pre-implemented Spark jobs and technical guides to run them on GCP.

This guide is based on the WordCount ETL example with common sources and sinks (Kafka, GCS, BigQuery, etc).
It is intended to catalyze your development to run Spark jobs written in Scala on Dataproc.

It is demonstrated how to run Spark jobs using Dataproc Submit, Serverless, Workflow and how to orchestrate them with [Cloud Composer](https://cloud.google.com/composer).

If you are looking to use Dataproc Templates, please refer to this [repository](https://github.com/GoogleCloudPlatform/dataproc-templates).
Also, check out the [quickstart documentation](https://cloud.google.com/dataproc/docs/quickstarts) for other quickstarts.

![Icons](_docs/images/icons.png)

Recommended versions
```
Scala = 2.12.14
Spark = 3.1.2
sbt = 1.6.1
Python = 3.8.12
Airflow = 2.2.3
Composer = composer-2.0.6-airflow-2.2.3
Dataproc = 2.0-debian10
```

Note: if using Dataproc Serverless (detailed in the guides as one of the options to run jobs), please recompile the jobs using Spark version 3.2.0

### Before you start

- Be aware that the data format used in this guide for data in GCS is [Parquet](https://parquet.apache.org/).
- Follow the [setup instructions](./_docs/setup.md) for installing, testing and compiling the project.
- This guide is configured to run the main class, despite Dataproc having the option to specify another class to run.

[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor)

<hr/>

## Dataproc Spark Use Cases

0) [Create Mock Dataset](_docs/dataproc/0_create-dataset.md)
- Creates input and output mock WordCount datasets in GCS and BQ to use in other examples
1) [Streaming - Kafka to GCS](_docs/dataproc/1_streaming-kafka-gcs.md)
- Runs a Spark Structured Streaming WordCount example from Kafka to GCS
2) [Batch - GCS to GCS](_docs/dataproc/2_batch-gcs-gcs.md)
- Runs a Spark WordCount example from GCS to GCS
- Appendix: Load from GCS to BQ
- Appendix: Create BQ External table pointing to GCS data
3) [Batch - GCS to BQ](_docs/dataproc/3_batch-gcs-bq.md)
- Runs a Spark WordCount example from GCS to BQ

### Orquestrate with Cloud Composer

This part of the guide provides example DAGs to run on [Cloud Composer](https://cloud.google.com/composer) to orquestrate the jobs from section above.

A) [Batch - Dataproc Submit - Creating and Deleting Cluster](_docs/composer/A_batch-submit-cluster.md)
B) [Batch - Dataproc Workflow](_docs/composer/B_batch-workflow.md)
C) [Batch - Dataproc Serverless](_docs/composer/C_batch-serverless.md)
D) [Load from GCS to BQ](_docs/composer/D_batch-bq-import.md)

<hr/>

### Backlog

- Streaming - Kafka to BigQuery
- Streaming - PubSub Lite to GCS
- Streaming - PubSub Lite to BigQuery
- Batch - BigQuery to BigQuery

### References

#### GCP Resources

- [Spark to Dataproc](https://cloud.google.com/architecture/hadoop/migrating-apache-spark-jobs-to-cloud-dataproc)
- [BigQuery Write API](https://cloud.google.com/bigquery/docs/write-api)
- [BigQuery External Tables](https://cloud.google.com/bigquery/docs/external-tables)
- [Dataproc Serverless](https://cloud.google.com/dataproc-serverless/docs/overview)
- [Dataproc Workflows](https://cloud.google.com/dataproc/docs/concepts/workflows/using-yamls)
- [Spark BigQuery Connector](https://github.com/GoogleCloudDataproc/spark-bigquery-connector)
- [Data Lake on GCS Architecture](https://cloud.google.com/architecture/build-a-data-lake-on-gcp)

#### Spark Resources

- [Spark SQL Migration Guide](https://spark.apache.org/docs/latest/sql-migration-guide.html)
- [Spark Structured Streaming](https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html)
- [Spark Kafka Integration](https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html)

#### Composer Resources

- [Google Cloud Airflow Operators](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/index.html)

#### Initialisms

```
GPC = Google Cloud Plataform
GCS = Google Cloud Storage
BQ = BigQuery
DAG = Direct Acyclic Graph
```
62 changes: 62 additions & 0 deletions spark-scala-quickstart/_docs/composer/A_batch-submit-cluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
## Cloud Composer - Submit Dataproc

This session guides you to create an Apache Airflow DAG in GCP Cloud Composer.
Scheduled to run a job using Dataproc Submit, creating and deleting an ephemeral cluster.
Batch job example used: [GCS to GCS](../dataproc/2_batch-gcs-gcs.md).
The DAG used in this guide is located [here](../../composer/src/dags/a_batch_submit_cluster.py).

### Step 1 - Setup GCP Environment Variables

```console
export PROJECT_ID="your_project_id"
export REGION="your_region"
```

### Step 2 - Setup Composer Variables

```console
export COMPOSER_ENV="your_composer-env"
export COMPOSER_LOCATION=${REGION}
```

### Step 3 - Setup DAG specific config

Update the [composer/src/dags/config/a_batch_submit_cluster.ini](../../composer/src/dags/config/a_batch_submit_cluster.ini) to your desired config values, read by the python file.

### Step 4 - Create a Composer Environment (skip if it is already created)

Follow the [instructions](https://cloud.google.com/composer/docs/composer-2/create-environments) to create a Composer Environment.

### Step 5 - Set Composer Airflow variables

```console
export AIRFLOW_VARIABLE="gcloud composer environments run ${COMPOSER_ENV} \
--location ${COMPOSER_LOCATION} variables -- set"

$AIRFLOW_VARIABLE PROJECT_ID "${PROJECT_ID}" && \
$AIRFLOW_VARIABLE REGION "${REGION}"
```

### Step 6 - Push the local python and conf files to DAGs folder

```console
export LOCAL_DAG_PYTHON="composer/src/dags/a_batch_submit_cluster.py"
export LOCAL_DAG_CONFIG="composer/src/dags/config/a_batch_submit_cluster.ini"

export DAGs_FOLDER=$(gcloud composer environments describe $COMPOSER_ENV \
--location $REGION \
--format="get(config.dagGcsPrefix)")

gsutil cp $LOCAL_DAG_PYTHON $DAGs_FOLDER/
gsutil cp $LOCAL_DAG_CONFIG $DAGs_FOLDER/config/
```

### Result

The job will run as scheduled, creating a Dataproc cluster, running the Spark job, and deleting the cluster.

### Code Snippets
All code snippets within this document are provided under the following terms.
```
Copyright 2022 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.
```
77 changes: 77 additions & 0 deletions spark-scala-quickstart/_docs/composer/B_batch-workflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## Cloud Composer - Dataproc Workflow

This session guides you to create an Apache Airflow DAG in GCP Cloud Composer.
This example runs a job using Dataproc Workflow.
Batch job example used: [GCS to GCS](../dataproc/2_batch-gcs-gcs.md).
The DAG used in this guide is located [here](../../composer/src/dags/b_batch_workflow.py).

### Step 1 - Setup GCP Environment Variables

```console
export PROJECT_ID="your_project_id"
export REGION="your_region"
```

### Step 2 - Setup Composer Variables

```console
export COMPOSER_ENV="your_composer-env"
export COMPOSER_LOCATION=${REGION}
```

### Step 3 - Setup DAG specific config

Update the [composer/src/dags/config/b_batch_workflow.ini](../../composer/src/dags/config/b_batch_workflow.ini) to your desired config values, read by the python file.

Copy the workflow template from dataproc folder to the composer folder:
```console
cp dataproc/batch-gcs-gcs/gcp-dataproc-workflow/batch-gcs-gcs-workflow.yaml composer/src/dags/config/batch-gcs-gcs-workflow.yaml
```

### Step 4 - Create a Composer Environment (skip if it is already created)

Follow the [instructions](https://cloud.google.com/composer/docs/composer-2/create-environments) to create a Composer Environment.

### Step 6 - Install python dependencies

```
gcloud composer environments update ${COMPOSER_ENV} \
--update-pypi-package pyyaml \
--location ${COMPOSER_LOCATION}
```

### Step 7 - Set Composer Airflow variables

```console
export AIRFLOW_VARIABLE="gcloud composer environments run ${COMPOSER_ENV} \
--location ${COMPOSER_LOCATION} variables -- set"

$AIRFLOW_VARIABLE PROJECT_ID "${PROJECT_ID}" && \
$AIRFLOW_VARIABLE REGION "${REGION}"
```

### Step 8 - Push the local python, conf and yaml files to DAGs folder

```console
export LOCAL_DAG_PYTHON="composer/src/dags/b_batch_workflow.py"
export LOCAL_DAG_CONFIG="composer/src/dags/config/c_batch_workflow.ini"
export LOCAL_DAG_WORKFLOW="composer/src/dags/config/batch-gcs-gcs-workflow.yaml"

export DAGs_FOLDER=$(gcloud composer environments describe $COMPOSER_ENV \
--location $COMPOSER_LOCATION \
--format="get(config.dagGcsPrefix)")

gsutil cp $LOCAL_DAG_PYTHON $DAGs_FOLDER/
gsutil cp $LOCAL_DAG_CONFIG $DAGs_FOLDER/config/
gsutil cp $LOCAL_DAG_WORKFLOW $DAGs_FOLDER/config/
```

### Result

The job will run as scheduled, running a Dataproc Workflow.

### Code Snippets
All code snippets within this document are provided under the following terms.
```
Copyright 2022 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.
```
62 changes: 62 additions & 0 deletions spark-scala-quickstart/_docs/composer/C_batch-serverless.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
## Cloud Composer - Dataproc Serverless

This session guides you to create an Apache Airflow DAG in GCP Cloud Composer.
This example runs a job using Dataproc Serverless.
Batch job example used: [GCS to GCS](../dataproc/2_batch-gcs-gcs.md).
The DAG used in this guide is located [here](../../composer/src/dags/c_batch_serverless.py).

### Step 1 - Setup GCP Environment Variables (to Airflow)

```console
export PROJECT_ID="your_project_id"
export REGION="your_region"
```

### Step 2 - Setup Composer Variables

```console
export COMPOSER_ENV="your_composer-env"
export COMPOSER_LOCATION=${REGION}
```

### Step 3 - Setup DAG specific config

Update the [composer/src/dags/config/c_batch_serverless.ini](../../composer/src/dags/config/c_batch_serverless.ini) to your desired config values, read by the python file.

### Step 4 - Create a Composer Environment (skip if it is already created)

Follow the [instructions](https://cloud.google.com/composer/docs/composer-2/create-environments) to create a Composer Environment.

### Step 5 - Set Composer Airflow variables

```console
export AIRFLOW_VARIABLE="gcloud composer environments run ${COMPOSER_ENV} \
--location ${COMPOSER_LOCATION} variables -- set"

$AIRFLOW_VARIABLE PROJECT_ID "${PROJECT_ID}" && \
$AIRFLOW_VARIABLE REGION "${REGION}"
```

### Step 6 - Push the local python and conf files to DAGs folder

```console
export LOCAL_DAG_PYTHON="composer/src/dags/c_batch_serverless.py"
export LOCAL_DAG_CONFIG="composer/src/dags/config/c_batch_serverless.ini"

export DAGs_FOLDER=$(gcloud composer environments describe $COMPOSER_ENV \
--location $REGION \
--format="get(config.dagGcsPrefix)")

gsutil cp $LOCAL_DAG_PYTHON $DAGs_FOLDER/
gsutil cp $LOCAL_DAG_CONFIG $DAGs_FOLDER/config/
```

### Result

The job will run as scheduled, running the Spark job with Dataproc Serverless.

### Code Snippets
All code snippets within this document are provided under the following terms.
```
Copyright 2022 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google.
```
Loading