From 65a8672bca05de9f6f2076a55ab0d55ae6279a86 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 12 Mar 2024 14:11:59 -0500 Subject: [PATCH] Matts edits --- README.md | 102 ++++++++++++++++++++++++++---------------------------- 1 file changed, 50 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 8b6c5d4..edc11f4 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,23 @@ -# Example Production ETL System - TPC-H +# Easy Scalable Production ETL - TPC-H -People often want to run regular jobs in a data pipeline, on the cloud, at scale. However, many pipelines either don't scale well or are overly complex. +**This repository is a lightweight scalable example pipeline that runs large Python jobs on a schedule in the cloud.** -**This repository contains a lightweight, scalable example data pipeline that easily runs regular Python jobs on the cloud.** +We hope this example is easy to copy and modify for your needs. -We hope this example demonstrates a path for addressing common pain points we see in real-world applications, and can serve as a template for your own use case. +## Data Pipelines -## Common data pipeline steps - -_People want to run regular Python jobs._ +*How do I process large scale data continuously in production?* While specific details vary across use cases, we often see these four steps: -- Step 1: **Real world generates raw data** +- Step 1: **Acquire raw data** from the real world - _Example:_ A customer buys a load of bread at your grocery store - _Example:_ A satellite measurements sea temperature in Antarctica -- Step 2: **Regularly process that raw data** +- Step 2: **Regularly process** that raw data - _Example:_ Apply data quality cuts followed by a feature engineering step - _Example:_ Convert to a different storage format - _Extra requirement_: Retry on failures, run close to data in the cloud, alert me if processing fails -- Step 3: **Address business needs using your data** +- Step 3: **Scalably process** that data periodically - _Example:_ Large scale query of your data -- "How many purchases were returned at our store in NYC?" - _Example:_ Train an ML model for weather forecasting - _Extra requirement_: Data is too large to fit on a single machine @@ -30,19 +28,19 @@ While specific details vary across use cases, we often see these four steps: Running these steps on a regular cadence can be handled by most modern workflow management systems (e.g. Prefect, Dagster, Airflow, Argo). Managing cloud infrastructure and scalability is where most people tend to struggle. -## Common data pipeline pain points +## Pain points of common data pipelines -_Many pipelines either don't scale well or are overly complex._ +_Pipelines usually either don't scale well or are overly complex._ Most modern workflow management systems (e.g. Prefect, Dagster, Airflow, Argo) are able to address workflow orchestration needs well. Instead, where we tend to see groups struggle is with: -- **Complexity around managing cloud infrastructure**: +- **Complexity** around managing cloud infrastructure: - Provisioning / deprovisioning cloud machines - Software environment management - Handling cloud data access - Easy access to logs - Cost monitoring and spending limits -- **Lack of scalability**: +- **Scale** limitations: - Scaling existing Python code across many cloud machines in parallel - Computing on larger-than-memory datasets (e.g. 100 TB Parquet dataset in S3) @@ -51,17 +49,17 @@ Because of these issues, it's common for systems to be overly complex or very ex Below we show an example that connects Prefect and Coiled to run a lightweight, scalable data pipeline on the cloud. -## Example scalable pipeline +## Solution in this repository -_It's easy to run regularly scheduled jobs on the cloud at scale with Coiled and Prefect._ +_Coiled and Prefect together make it easy to run regularly scheduled jobs on the cloud at scale._ Our example data pipeline looks like the following: - Step 1: **Data generation** — New [TPC-H dataset](https://www.tpc.org/tpch/) JSON files with customer orders and supplier information appear in an S3 bucket (every 15 minutes) -- Step 2: **Data processing** +- Step 2: **Regular processing** - JSON gets transformed into Parquet / Delta (every 15 minutes) - Data compaction of small Parquet files into larger ones for efficient downstream usage (every 6 hours) -- Step 3: **Business query** — Run large scale multi-table analysis query to monitor unshipped, high-revenue orders (every 24 hours) +- Step 3: **Scalable processing** — Run large scale multi-table analysis query to monitor unshipped, high-revenue orders (every 24 hours) - Step 4: **Serve dashboard** — Results from latest business query are served on a dashboard (always on) ![ETL Pipeline](images/excalidraw.png) @@ -72,7 +70,33 @@ _We combine Prefect's workflow orchestration with Coiled's easy cloud infrastruc We think this is a lightweight approach that addresses common pain points and works well for most people. -### Prefect for workflow orchestration +### Define tasks and hardware in Python + +Data-intensive tasks, like those in [pipeline/preprocess.py](pipeline/preprocess.py), are combined with Coiled Functions for remote execution on cloud VMs. + +Coiled makes it easy to deploy Prefect tasks on cloud hardware of our choosing. + +```python +# pipeline/preprocess.py + +import coiled +from prefect import task, flow +import pandas as pd + +@task +@coiled.function(region="us-east-2", memory="64 GiB") # <--- Define hardware in script +def json_to_parquet(filename): + df = pd.read_json(filename) # <--- Use simple pandas functions + df.to_parquet(OUTDIR / filename.split(".")[-2] + ".parquet") + +@flow +def convert_all_files(): + files = list_files() + json_to_parquet.map(files) # <--- Map across many files at once +``` + + +### Run jobs regularly The file [workflow.py](workflow.py) defines a Prefect flow for each step in our pipeline and runs them regularly at different cadences. @@ -84,17 +108,13 @@ Prefect makes it easy to schedule regular jobs and manage workflow orchestration from datetime import timedelta from prefect import serve -data = generate_data.to_deployment( - name="generate_data", - interval=timedelta(minutes=15), -) preprocess = json_to_parquet.to_deployment( name="preprocess", - interval=timedelta(minutes=15), + interval=timedelta(minutes=15), # <--- Run job on a regular schedule ) reduce = query_reduce.to_deployment( name="reduce", - interval=timedelta(hours=24), + interval=timedelta(hours=24), # <--- Different jobs at different schedules ) ... @@ -107,32 +127,8 @@ serve( ) ``` -### Coiled Functions for deploying Prefect tasks - -Data-intensive tasks, like those in [pipeline/preprocess.py](pipeline/preprocess.py), are combined with Coiled Functions for remote execution on cloud VMs. - -Coiled makes it easy to deploy Prefect tasks on cloud hardware of our choosing. - -```python -# pipeline/preprocess.py - -import coiled -from prefect import task, flow -import pandas as pd - -@task -@coiled.function(region="us-east-2", memory="64 GiB") -def json_to_parquet(filename): - df = pd.read_json(filename) - df.to_parquet(OUTDIR / filename.split(".")[-2] + ".parquet") - -@flow -def convert_all_files(): - files = list_files() - json_to_parquet.map(files) -``` -### Dask Clusters for large-scale jobs +### Scale with Dask Dask Clusters The large-scale, multi-table analysis query in [pipeline/reduce.py](pipeline/reduce.py) uses Coiled to create an on-demand Dask cluster to handle this large-scale processing. @@ -147,7 +143,10 @@ import dask.dataframe as dd @task def unshipped_orders_by_revenue(bucket): - with coiled.Cluster(n_workers=20, region="us-west-2") as cluster: + with coiled.Cluster( + n_workers=200, # <--- Ask for hundreds of workers + region="us-west-2", # <--- Run anywhere on any hardware + ) as cluster: with cluster.get_client() as client: df = dd.read_parquet(bucket) result = ... # Complex query @@ -188,4 +187,3 @@ coiled prefect serve \ -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ workflow.py ``` -