Skip to content

Commit

Permalink
Matts edits
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Mar 12, 2024
1 parent 283d5a0 commit 65a8672
Showing 1 changed file with 50 additions and 52 deletions.
102 changes: 50 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
# Example Production ETL System - TPC-H
# Easy Scalable Production ETL - TPC-H

People often want to run regular jobs in a data pipeline, on the cloud, at scale. However, many pipelines either don't scale well or are overly complex.
**This repository is a lightweight scalable example pipeline that runs large Python jobs on a schedule in the cloud.**

**This repository contains a lightweight, scalable example data pipeline that easily runs regular Python jobs on the cloud.**
We hope this example is easy to copy and modify for your needs.

We hope this example demonstrates a path for addressing common pain points we see in real-world applications, and can serve as a template for your own use case.
## Data Pipelines

## Common data pipeline steps

_People want to run regular Python jobs._
*How do I process large scale data continuously in production?*

While specific details vary across use cases, we often see these four steps:

- Step 1: **Real world generates raw data**
- Step 1: **Acquire raw data** from the real world
- _Example:_ A customer buys a load of bread at your grocery store
- _Example:_ A satellite measurements sea temperature in Antarctica
- Step 2: **Regularly process that raw data**
- Step 2: **Regularly process** that raw data
- _Example:_ Apply data quality cuts followed by a feature engineering step
- _Example:_ Convert to a different storage format
- _Extra requirement_: Retry on failures, run close to data in the cloud, alert me if processing fails
- Step 3: **Address business needs using your data**
- Step 3: **Scalably process** that data periodically
- _Example:_ Large scale query of your data -- "How many purchases were returned at our store in NYC?"
- _Example:_ Train an ML model for weather forecasting
- _Extra requirement_: Data is too large to fit on a single machine
Expand All @@ -30,19 +28,19 @@ While specific details vary across use cases, we often see these four steps:
Running these steps on a regular cadence can be handled by most modern workflow management systems (e.g. Prefect, Dagster, Airflow, Argo). Managing cloud infrastructure and scalability is where most people tend to struggle.


## Common data pipeline pain points
## Pain points of common data pipelines

_Many pipelines either don't scale well or are overly complex._
_Pipelines usually either don't scale well or are overly complex._

Most modern workflow management systems (e.g. Prefect, Dagster, Airflow, Argo) are able to address workflow orchestration needs well. Instead, where we tend to see groups struggle is with:

- **Complexity around managing cloud infrastructure**:
- **Complexity** around managing cloud infrastructure:
- Provisioning / deprovisioning cloud machines
- Software environment management
- Handling cloud data access
- Easy access to logs
- Cost monitoring and spending limits
- **Lack of scalability**:
- **Scale** limitations:
- Scaling existing Python code across many cloud machines in parallel
- Computing on larger-than-memory datasets (e.g. 100 TB Parquet dataset in S3)

Expand All @@ -51,17 +49,17 @@ Because of these issues, it's common for systems to be overly complex or very ex
Below we show an example that connects Prefect and Coiled to run a lightweight, scalable data pipeline on the cloud.


## Example scalable pipeline
## Solution in this repository

_It's easy to run regularly scheduled jobs on the cloud at scale with Coiled and Prefect._
_Coiled and Prefect together make it easy to run regularly scheduled jobs on the cloud at scale._

Our example data pipeline looks like the following:

- Step 1: **Data generation** — New [TPC-H dataset](https://www.tpc.org/tpch/) JSON files with customer orders and supplier information appear in an S3 bucket (every 15 minutes)
- Step 2: **Data processing**
- Step 2: **Regular processing**
- JSON gets transformed into Parquet / Delta (every 15 minutes)
- Data compaction of small Parquet files into larger ones for efficient downstream usage (every 6 hours)
- Step 3: **Business query** — Run large scale multi-table analysis query to monitor unshipped, high-revenue orders (every 24 hours)
- Step 3: **Scalable processing** — Run large scale multi-table analysis query to monitor unshipped, high-revenue orders (every 24 hours)
- Step 4: **Serve dashboard** — Results from latest business query are served on a dashboard (always on)

![ETL Pipeline](images/excalidraw.png)
Expand All @@ -72,7 +70,33 @@ _We combine Prefect's workflow orchestration with Coiled's easy cloud infrastruc

We think this is a lightweight approach that addresses common pain points and works well for most people.

### Prefect for workflow orchestration
### Define tasks and hardware in Python

Data-intensive tasks, like those in [pipeline/preprocess.py](pipeline/preprocess.py), are combined with Coiled Functions for remote execution on cloud VMs.

Coiled makes it easy to deploy Prefect tasks on cloud hardware of our choosing.

```python
# pipeline/preprocess.py

import coiled
from prefect import task, flow
import pandas as pd

@task
@coiled.function(region="us-east-2", memory="64 GiB") # <--- Define hardware in script
def json_to_parquet(filename):
df = pd.read_json(filename) # <--- Use simple pandas functions
df.to_parquet(OUTDIR / filename.split(".")[-2] + ".parquet")

@flow
def convert_all_files():
files = list_files()
json_to_parquet.map(files) # <--- Map across many files at once
```


### Run jobs regularly

The file [workflow.py](workflow.py) defines a Prefect flow for each step in our pipeline and runs them regularly at different cadences.

Expand All @@ -84,17 +108,13 @@ Prefect makes it easy to schedule regular jobs and manage workflow orchestration
from datetime import timedelta
from prefect import serve

data = generate_data.to_deployment(
name="generate_data",
interval=timedelta(minutes=15),
)
preprocess = json_to_parquet.to_deployment(
name="preprocess",
interval=timedelta(minutes=15),
interval=timedelta(minutes=15), # <--- Run job on a regular schedule
)
reduce = query_reduce.to_deployment(
name="reduce",
interval=timedelta(hours=24),
interval=timedelta(hours=24), # <--- Different jobs at different schedules
)

...
Expand All @@ -107,32 +127,8 @@ serve(
)
```

### Coiled Functions for deploying Prefect tasks

Data-intensive tasks, like those in [pipeline/preprocess.py](pipeline/preprocess.py), are combined with Coiled Functions for remote execution on cloud VMs.

Coiled makes it easy to deploy Prefect tasks on cloud hardware of our choosing.

```python
# pipeline/preprocess.py

import coiled
from prefect import task, flow
import pandas as pd

@task
@coiled.function(region="us-east-2", memory="64 GiB")
def json_to_parquet(filename):
df = pd.read_json(filename)
df.to_parquet(OUTDIR / filename.split(".")[-2] + ".parquet")

@flow
def convert_all_files():
files = list_files()
json_to_parquet.map(files)
```

### Dask Clusters for large-scale jobs
### Scale with Dask Dask Clusters

The large-scale, multi-table analysis query in [pipeline/reduce.py](pipeline/reduce.py) uses Coiled to create an on-demand Dask cluster to handle this large-scale processing.

Expand All @@ -147,7 +143,10 @@ import dask.dataframe as dd

@task
def unshipped_orders_by_revenue(bucket):
with coiled.Cluster(n_workers=20, region="us-west-2") as cluster:
with coiled.Cluster(
n_workers=200, # <--- Ask for hundreds of workers
region="us-west-2", # <--- Run anywhere on any hardware
) as cluster:
with cluster.get_client() as client:
df = dd.read_parquet(bucket)
result = ... # Complex query
Expand Down Expand Up @@ -188,4 +187,3 @@ coiled prefect serve \
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
workflow.py
```

0 comments on commit 65a8672

Please sign in to comment.