-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[docs] add docs for PipesEMRContainers
- Loading branch information
1 parent
cecb481
commit 420f265
Showing
14 changed files
with
247 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
207 changes: 207 additions & 0 deletions
207
docs/content/concepts/dagster-pipes/aws-emr-containers.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
--- | ||
title: "Integrating AWS EMR on EKS with Dagster Pipes | Dagster Docs" | ||
description: "Learn to integrate Dagster Pipes with AWS EMR Containers to launch external code from Dagster assets." | ||
--- | ||
|
||
# AWS EMR on EKS & Dagster Pipes | ||
|
||
This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS EMR on EKS](https://aws.amazon.com/emr/features/eks/) (the corresponding AWS API is called `emr-containers`). | ||
|
||
The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesEMRContainersClient" module="dagster_aws.pipes" /> resource, which can be used to launch EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs. | ||
|
||
--- | ||
|
||
## Prerequisites | ||
|
||
- **In the Dagster environment**, you'll need to: | ||
|
||
- Install the following packages: | ||
|
||
```shell | ||
pip install dagster dagster-webserver dagster-aws | ||
``` | ||
|
||
Refer to the [Dagster installation guide](/getting-started/install) for more info. | ||
|
||
- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html). | ||
|
||
- **In AWS**: | ||
|
||
- An existing AWS account | ||
- A [EMR Virtual Cluster](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/virtual-cluster.html) set up | ||
|
||
--- | ||
|
||
## Step 1: Install the dagster-pipes module in your EMR environment | ||
|
||
There are [a few options](https://aws.github.io/aws-emr-containers-best-practices/submit-applications/docs/spark/pyspark/#python-code-with-python-dependencies) for deploying Python code & dependencies for PySpark jobs. In this tutorial, we are going to build a custom Docker image for this purpose. | ||
|
||
Install `dagster-pipes`, `dagster-aws` and `boto3` Python packages in your image: | ||
|
||
```Dockerfile,file=/guides/dagster/dagster_pipes/emr-containers/Dockerfile | ||
# start from EMR image | ||
FROM public.ecr.aws/emr-containers/spark/emr-7.2.0:latest | ||
USER root | ||
RUN python -m pip install dagster-pipes | ||
# copy the job script | ||
COPY . . | ||
USER hadoop | ||
``` | ||
|
||
<Note> | ||
It's also recommended to upgrade the default Python version included in the | ||
base EMR image (as it has been done in the `Dockerfile` above) | ||
</Note> | ||
|
||
--- | ||
|
||
We copy the EMR job script (`script.py`) to the image in the last step. | ||
|
||
## Step 2: Invoke dagster-pipes in the EMR job script | ||
|
||
Call `open_dagster_pipes` in the EMR script to create a context that can be used to send messages to Dagster: | ||
|
||
```python file=/guides/dagster/dagster_pipes/emr-containers/script.py | ||
import sys | ||
|
||
import boto3 | ||
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes | ||
from pyspark.sql import SparkSession | ||
|
||
|
||
def main(): | ||
s3_client = boto3.client("s3") | ||
with open_dagster_pipes( | ||
message_writer=PipesS3MessageWriter(client=s3_client), | ||
) as pipes: | ||
pipes.log.info("Hello from AWS EMR Containers!") | ||
|
||
spark = SparkSession.builder.appName("HelloWorld").getOrCreate() | ||
|
||
df = spark.createDataFrame( | ||
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)], | ||
["id", "name", "age"], | ||
) | ||
|
||
# calculate a really important statistic | ||
avg_age = float(df.agg({"age": "avg"}).collect()[0][0]) | ||
|
||
# attach it to the asset materialization in Dagster | ||
pipes.report_asset_materialization( | ||
metadata={"average_age": {"raw_value": avg_age, "type": "float"}}, | ||
data_version="alpha", | ||
) | ||
|
||
print("Hello from stdout!") | ||
print("Hello from stderr!", file=sys.stderr) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
``` | ||
|
||
<Note> | ||
It's best to use the `PipesS3MessageWriter` with EMR on EKS, because this | ||
message writer has the ability to capture the Spark driver logs and send them | ||
to Dagster. | ||
</Note> | ||
|
||
--- | ||
|
||
## Step 3: Create an asset using the PipesEMRcontainersClient to launch the job | ||
|
||
In the Dagster asset/op code, use the `PipesEMRcontainersClient` resource to launch the job: | ||
|
||
```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker | ||
from dagster_aws.pipes import PipesEMRContainersClient | ||
|
||
import dagster as dg | ||
|
||
|
||
@dg.asset | ||
def emr_containers_asset( | ||
context: dg.AssetExecutionContext, | ||
pipes_emr_containers_client: PipesEMRContainersClient, | ||
): | ||
image = ( | ||
... | ||
) # it's likely the image can be taken from context.run_tags["dagster/image"] | ||
|
||
return pipes_emr_containers_client.run( | ||
context=context, | ||
start_job_run_params={ | ||
"releaseLabel": "emr-7.5.0-latest", | ||
"virtualClusterId": ..., | ||
"clientToken": context.run_id, # idempotency identifier for the job run | ||
"executionRoleArn": ..., | ||
"jobDriver": { | ||
"sparkSubmitJobDriver": { | ||
"entryPoint": "local:///app/script.py", | ||
# --conf spark.kubernetes.container.image= | ||
"sparkSubmitParameters": "--conf spark.kubernetes.file.upload.path=/tmp/spark --conf spark.kubernetes.container.image={}", # --conf spark.pyspark.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python --conf spark.pyspark.driver.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python", | ||
} | ||
}, | ||
"configurationOverrides": { | ||
"monitoringConfiguration": { | ||
"cloudWatchMonitoringConfiguration": { | ||
"logGroupName": "/aws/emr/containers/pipes", | ||
"logStreamNamePrefix": str(context.run_id), | ||
} | ||
}, | ||
}, | ||
}, | ||
).get_materialize_result() | ||
``` | ||
|
||
<Note> | ||
Setting `include_stdio_in_messages` to `True` in the `PipesS3MessageReader` | ||
will allow the driver logs to be forwarded to the Dagster process. | ||
</Note> | ||
|
||
Materializing this sasset will launch the AWS on EKS job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated. | ||
|
||
--- | ||
|
||
## Step 4: Create Dagster definitions | ||
|
||
Next, add the `PipesEMRContainersClient` resource to your project's <PyObject object="Definitions" /> object: | ||
|
||
```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker | ||
import boto3 | ||
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader | ||
|
||
from dagster import Definitions | ||
|
||
defs = Definitions( | ||
assets=[emr_containers_asset], | ||
resources={ | ||
"pipes_emr_containers_client": PipesEMRContainersClient( | ||
message_reader=PipesS3MessageReader( | ||
client=boto3.client("s3"), | ||
include_stdio_in_messages=True, | ||
), | ||
) | ||
}, | ||
) | ||
``` | ||
|
||
Dagster will now be able to launch the AWS EMR Containers job from the `emr_containers_asset` asset, and receive logs and events from the job. If `include_stdio_in_messages` is set to `True`, the logs will be forwarded to the Dagster process. | ||
|
||
--- | ||
|
||
## Related | ||
|
||
<ArticleList> | ||
<ArticleListItem | ||
title="Dagster Pipes" | ||
href="/concepts/dagster-pipes" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
title="AWS EMR Containers API reference" | ||
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesEMRContainersClient" | ||
></ArticleListItem> | ||
</ArticleList> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.