diff --git a/dashboard.py b/dashboard.py index c54f0a4..5ba391b 100644 --- a/dashboard.py +++ b/dashboard.py @@ -1,62 +1,70 @@ -import dask.dataframe as dd -import plotly.express as px +import time + +import pandas as pd import streamlit as st from pipeline.settings import RESULTS_DIR @st.cache_data -def get_data(region, part_type): - return dd.read_parquet( - RESULTS_DIR / region / part_type.upper() / "*.parquet" - ).compute() - +def get_data(segment): + return pd.read_parquet(RESULTS_DIR / f"{segment.lower()}.snappy.parquet") -description = """ -### Recommended Suppliers -_Some text that explains the business problem being addressed..._ -This query finds which supplier should be selected to place an order for a given part in a given region. +st.markdown( + """ +### Top Unshipped Orders +_Top 50 unshipped orders with the highest revenue._ """ -st.markdown(description) -regions = list(map(str.title, ["EUROPE", "AFRICA", "AMERICA", "ASIA", "MIDDLE EAST"])) -region = st.selectbox( - "Region", - regions, - index=None, - placeholder="Please select a region...", ) -part_types = list(map(str.title, ["COPPER", "BRASS", "TIN", "NICKEL", "STEEL"])) -part_type = st.selectbox( - "Part Type", - part_types, + +SEGMENTS = ["automobile", "building", "furniture", "machinery", "household"] + + +def files_exist(): + # Do we have all the files needed for the dashboard? + files = list(RESULTS_DIR.rglob("*.snappy.parquet")) + return len(files) == len(SEGMENTS) + + +with st.spinner("Waiting for data..."): + while not files_exist(): + time.sleep(5) + +segments = list( + map(str.title, ["automobile", "building", "furniture", "machinery", "household"]) +) +segment = st.selectbox( + "Segment", + segments, index=None, - placeholder="Please select a part type...", + placeholder="Please select a product segment...", ) -if region and part_type: - df = get_data(region, part_type) +if segment: + df = get_data(segment) + df = df.drop(columns="o_shippriority") + df["l_orderkey"] = df["l_orderkey"].map(lambda x: f"{x:09}") + df["revenue"] = df["revenue"].round(2) df = df.rename( columns={ - "n_name": "Country", - "s_name": "Supplier", - "s_acctbal": "Balance", - "p_partkey": "Part ID", + "l_orderkey": "Order ID", + "o_order_time": "Date Ordered", + "revenue": "Revenue", } ) - maxes = df.groupby("Country").Balance.idxmax() - data = df.loc[maxes] - figure = px.choropleth( - data, - locationmode="country names", - locations="Country", - featureidkey="Supplier", - color="Balance", - color_continuous_scale="viridis", - hover_data=["Country", "Supplier", "Balance"], + + df = df.set_index("Order ID") + st.dataframe( + df.style.format({"Revenue": "${:,}"}), + column_config={ + "Date Ordered": st.column_config.DateColumn( + "Date Ordered", + format="MM/DD/YYYY", + help="Date order was placed", + ), + "Revenue": st.column_config.NumberColumn( + "Revenue (in USD)", + help="Total revenue of order", + ), + }, ) - st.plotly_chart(figure, theme="streamlit", use_container_width=True) - on = st.toggle("Show data") - if on: - st.write( - df[["Country", "Supplier", "Balance", "Part ID"]], use_container_width=True - ) diff --git a/environment.yml b/environment.yml index b87a748..aadc382 100644 --- a/environment.yml +++ b/environment.yml @@ -17,4 +17,8 @@ dependencies: - s3fs - universal_pathlib <0.2.0 - boto3 - - dask-deltatable + - deltalake=0.15.3 + # - dask-deltatable + - pip + - pip: + - git+https://github.com/fjetter/dask-deltatable.git@dask_expr diff --git a/pipeline/dashboard.py b/pipeline/dashboard.py new file mode 100644 index 0000000..6d72c3b --- /dev/null +++ b/pipeline/dashboard.py @@ -0,0 +1,61 @@ +import os +import shlex +import subprocess + +import coiled +import requests +from prefect import flow +from rich import print + +from .settings import DASHBOARD_FILE, LOCAL, REGION + +port = 8080 +name = "etl-tpch-dashboard" +subdomain = "etl-tpch" + + +def deploy(): + print("[green]Deploying dashboard...[/green]") + cmd = f"streamlit run {DASHBOARD_FILE} --server.port {port} --server.headless true" + if LOCAL: + subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) + else: + cmd = f""" + coiled run \ + --region {REGION} \ + --vm-type t3.medium \ + -f dashboard.py \ + -f pipeline \ + --subdomain {subdomain} \ + --port {port} \ + -e AWS_ACCESS_KEY_ID={os.environ['AWS_ACCESS_KEY_ID']} \ + -e AWS_SECRET_ACCESS_KEY={os.environ['AWS_SECRET_ACCESS_KEY']} \ + --detach \ + --keepalive '520 weeks' \ + --name {name} \ + -- \ + {cmd} + """ + subprocess.run(shlex.split(cmd)) + print(f"Dashboard is available at [blue]{get_address()}[/blue] :rocket:") + + +def get_address(): + if LOCAL: + return f"http://0.0.0.0:{port}" + else: + with coiled.Cloud() as cloud: + account = cloud.default_account + return f"http://{subdomain}.{account}.dask.host:{port}" + + +@flow(log_prints=True) +def deploy_dashboard(): + address = get_address() + try: + r = requests.get(address) + r.raise_for_status() + except Exception: + deploy() + else: + print("Dashboard is healthy") diff --git a/pipeline/data.py b/pipeline/data.py index b22db73..93f1278 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -1,8 +1,10 @@ import datetime import os +import uuid import coiled import duckdb +import pandas as pd import psutil from dask.distributed import print from prefect import flow, task @@ -10,12 +12,18 @@ from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate +def new_time(t, t_start=None, t_end=None): + + d = pd.Timestamp("1998-12-31") - pd.Timestamp("1992-01-01") + return t_start + (t - pd.Timestamp("1992-01-01")) * ((t_end - t_start) / d) + + @task(log_prints=True) @coiled.function( name="data-generation", local=LOCAL, region=REGION, - keepalive="5 minutes", + vm_type="m6i.2xlarge", tags={"workflow": "etl-tpch"}, ) def generate(scale: float, path: os.PathLike) -> None: @@ -42,7 +50,8 @@ def generate(scale: float, path: os.PathLike) -> None: .arrow() .column("table_name") ) - for table in map(str, tables): + now = pd.Timestamp.now() + for table in reversed(sorted(map(str, tables))): if table in static_tables and ( list((STAGING_DIR / table).rglob("*.json")) or list((PROCESSED_DIR / table).rglob("*.parquet")) @@ -51,7 +60,41 @@ def generate(scale: float, path: os.PathLike) -> None: continue print(f"Exporting table: {table}") stmt = f"""select * from {table}""" - df = con.sql(stmt).arrow() + df = con.sql(stmt).df() + + # Make order IDs unique across multiple data generation cycles + if table == "orders": + # Generate new, random uuid order IDs + df["o_orderkey_new"] = pd.Series( + (uuid.uuid4().hex for _ in range(df.shape[0])), + dtype="string[pyarrow]", + ) + orderkey_new = df[["o_orderkey", "o_orderkey_new"]].set_index( + "o_orderkey" + ) + df = df.drop(columns="o_orderkey").rename( + columns={"o_orderkey_new": "o_orderkey"} + ) + elif table == "lineitem": + # Join with `orderkey_new` mapping to convert old order IDs to new order IDs + df = ( + df.set_index("l_orderkey") + .join(orderkey_new) + .reset_index(drop=True) + .rename(columns={"o_orderkey_new": "l_orderkey"}) + ) + + # Shift times to be more recent + if table == "lineitem": + df["l_shipdate"] = new_time( + df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("7 days") + ) + df = df.rename(columns={"l_shipdate": "l_ship_time"}) + cols = [c for c in df.columns if "date" in c] + df[cols] = new_time( + df[cols], t_start=now - pd.Timedelta("15 minutes"), t_end=now + ) + df = df.rename(columns={c: c.replace("date", "_time") for c in cols}) outfile = ( path @@ -59,7 +102,7 @@ def generate(scale: float, path: os.PathLike) -> None: / f"{table}_{datetime.datetime.now().isoformat().split('.')[0]}.json" ) fs.makedirs(outfile.parent, exist_ok=True) - df.to_pandas().to_json( + df.to_json( outfile, date_format="iso", orient="records", @@ -73,7 +116,6 @@ def generate(scale: float, path: os.PathLike) -> None: def generate_data(): with lock_generate: generate( - scale=0.01, + scale=1, path=STAGING_DIR, ) - generate.fn.client.restart(wait_for_workers=False) diff --git a/pipeline/monitor.py b/pipeline/monitor.py deleted file mode 100644 index 9ef2c76..0000000 --- a/pipeline/monitor.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests -from prefect import flow - - -@flow -def check_model_endpoint(): - r = requests.get("http://0.0.0.0:8080/health") - if not r.json() == ["ok"]: - raise ValueError("Model endpoint isn't healthy") diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 2c15cf6..0ed1e57 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -7,7 +7,6 @@ from prefect.tasks import exponential_backoff from .settings import ( - ARCHIVE_DIR, LOCAL, PROCESSED_DIR, REGION, @@ -29,7 +28,7 @@ name="data-etl", local=LOCAL, region=REGION, - keepalive="5 minutes", + vm_type="m6i.2xlarge", tags={"workflow": "etl-tpch"}, ) def json_file_to_parquet(file): @@ -42,19 +41,10 @@ def json_file_to_parquet(file): deltalake.write_deltalake( outfile, data, mode="append", storage_options=storage_options ) - print(f"Saved {outfile}") + fs.rm(str(file)) return file -@task -def archive_json_file(file): - outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR) - fs.makedirs(outfile.parent, exist_ok=True) - fs.mv(str(file), str(outfile)) - - return outfile - - def list_new_json_files(): return list(STAGING_DIR.rglob("*.json")) @@ -63,18 +53,16 @@ def list_new_json_files(): def json_to_parquet(): with lock_json_to_parquet: files = list_new_json_files() - files = json_file_to_parquet.map(files) - futures = archive_json_file.map(files) + futures = json_file_to_parquet.map(files) for f in futures: - print(f"Archived {str(f.result())}") + print(f"Processed {str(f.result())}") @task(log_prints=True) @coiled.function( - name="data-etl", local=LOCAL, region=REGION, - keepalive="5 minutes", + vm_type="m6i.xlarge", tags={"workflow": "etl-tpch"}, ) def compact(table): diff --git a/pipeline/reduce.py b/pipeline/reduce.py index cd9dd0b..4cf5fe0 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -1,8 +1,9 @@ import functools -import itertools import coiled -import dask_deltatable +import dask +import dask_deltatable as ddt +import pandas as pd from dask.distributed import LocalCluster from prefect import flow, task @@ -16,9 +17,11 @@ storage_options, ) +dask.config.set({"dataframe.query-planning": True}) + @task -def save_query(region, part_type): +def unshipped_orders_by_revenue(segment): if LOCAL: cluster = LocalCluster @@ -30,101 +33,46 @@ def save_query(region, part_type): n_workers=10, tags={"workflow": "etl-tpch"}, shutdown_on_close=False, - idle_timeout="5 minutes", + idle_timeout="1 minute", wait_for_workers=True, ) with cluster() as cluster: with cluster.get_client(): - size = 15 - region_ds = dask_deltatable.read_deltalake( - str(PROCESSED_DIR / "region"), - delta_storage_options=storage_options, - ) - nation_filtered = dask_deltatable.read_deltalake( - str(PROCESSED_DIR / "nation"), + lineitem_ds = ddt.read_deltalake( + str(PROCESSED_DIR / "lineitem"), delta_storage_options=storage_options, ) - supplier_filtered = dask_deltatable.read_deltalake( - str(PROCESSED_DIR / "supplier"), + orders_ds = ddt.read_deltalake( + str(PROCESSED_DIR / "orders"), delta_storage_options=storage_options, ) - part_filtered = dask_deltatable.read_deltalake( - str(PROCESSED_DIR / "part"), delta_storage_options=storage_options - ) - partsupp_filtered = dask_deltatable.read_deltalake( - str(PROCESSED_DIR / "partsupp"), + customer_ds = ddt.read_deltalake( + str(PROCESSED_DIR / "customer"), delta_storage_options=storage_options, ) - region_filtered = region_ds[(region_ds["r_name"] == region.upper())] - r_n_merged = nation_filtered.merge( - region_filtered, - left_on="n_regionkey", - right_on="r_regionkey", - how="inner", - ) - s_r_n_merged = r_n_merged.merge( - supplier_filtered, - left_on="n_nationkey", - right_on="s_nationkey", - how="inner", - ) - ps_s_r_n_merged = s_r_n_merged.merge( - partsupp_filtered, - left_on="s_suppkey", - right_on="ps_suppkey", - how="inner", - ) - part_filtered = part_filtered[ - (part_filtered["p_size"] == size) - & (part_filtered["p_type"].astype(str).str.endswith(part_type.upper())) - ] - merged_df = part_filtered.merge( - ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner" - ) - min_values = ( - merged_df.groupby("p_partkey")["ps_supplycost"].min().reset_index() - ) - min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"] - merged_df = merged_df.merge( - min_values, - left_on=["p_partkey", "ps_supplycost"], - right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"], - how="inner", - ) - + date = pd.Timestamp.now() + lsel = lineitem_ds.l_ship_time > date + osel = orders_ds.o_order_time < date + csel = customer_ds.c_mktsegment == segment.upper() + flineitem = lineitem_ds[lsel] + forders = orders_ds[osel] + fcustomer = customer_ds[csel] + jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey") + jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey") + jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount) + total = jn2.groupby(["l_orderkey", "o_order_time", "o_shippriority"])[ + "revenue" + ].sum() result = ( - merged_df[ - [ - "s_acctbal", - "s_name", - "n_name", - "p_partkey", - "p_mfgr", - "s_address", - "s_phone", - "s_comment", - ] + total.reset_index() + .sort_values(["revenue"], ascending=False) + .head(50, compute=False)[ + ["l_orderkey", "revenue", "o_order_time", "o_shippriority"] ] - .sort_values( - by=[ - "s_acctbal", - "n_name", - "s_name", - "p_partkey", - ], - ascending=[ - False, - True, - True, - True, - ], - ) - .head(100) - ) - - outfile = RESULTS_DIR / region / part_type / "result.snappy.parquet" + ).compute() + outfile = RESULTS_DIR / f"{segment}.snappy.parquet" fs.makedirs(outfile.parent, exist_ok=True) result.to_parquet(outfile, compression="snappy") @@ -132,7 +80,6 @@ def save_query(region, part_type): @flow def query_reduce(): with lock_compact: - regions = ["europe", "africa", "america", "asia", "middle east"] - part_types = ["copper", "brass", "tin", "nickel", "steel"] - for region, part_type in itertools.product(regions, part_types): - save_query(region, part_type) + segments = ["automobile", "building", "furniture", "machinery", "household"] + for segment in segments: + unshipped_orders_by_revenue(segment) diff --git a/pipeline/settings.py b/pipeline/settings.py index 137f5a7..e9dced8 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -1,3 +1,5 @@ +import os + import boto3 import fsspec import yaml @@ -18,14 +20,18 @@ bucket = str(ROOT).replace("s3://", "").split("/")[0] resp = boto3.client("s3").get_bucket_location(Bucket=bucket) REGION = resp["LocationConstraint"] or "us-east-1" - storage_options = {"AWS_REGION": REGION, "AWS_S3_ALLOW_UNSAFE_RENAME": "true"} + storage_options = { + "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"], + "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"], + "AWS_REGION": REGION, + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } STAGING_DIR = ROOT / "staging" # Input JSON files PROCESSED_DIR = ROOT / "processed" # Processed Parquet files RESULTS_DIR = ROOT / "results" # Reduced/aggrgated results ARCHIVE_DIR = ROOT / "archive" # Archived JSON files -MODEL_FILE = ROOT.parent / "model.json" -MODEL_SERVER_FILE = ROOT.parent / "serve_model.py" +DASHBOARD_FILE = Path(__file__).parent.parent / "dashboard.py" lock_dir = Path(__file__).parent.parent / ".locks" lock_generate = FileLock(lock_dir / "generate.lock") diff --git a/pipeline/train.py b/pipeline/train.py deleted file mode 100644 index e96c557..0000000 --- a/pipeline/train.py +++ /dev/null @@ -1,51 +0,0 @@ -import tempfile - -import coiled -import pandas as pd -import xgboost as xgb -from prefect import flow, task - -from .settings import LOCAL, MODEL_FILE, REGION, RESULTS_DIR, Path, fs - - -@task -@coiled.function( - name="train", - local=LOCAL, - region=REGION, - tags={"workflow": "etl-tpch"}, -) -def train(file): - df = pd.read_parquet(file) - X = df[["p_partkey", "s_acctbal"]] - y = df["n_name"].map( - {"GERMANY": 0, "ROMANIA": 1, "RUSSIA": 2, "FRANCE": 3, "UNITED KINGDOM": 4} - ) - model = xgb.XGBClassifier() - if MODEL_FILE.exists(): - with fs.open(MODEL_FILE, mode="rb") as f: - model.load_model(bytearray(f.read())) - model.fit(X, y) - - # `save_model` only accepts local file paths - with tempfile.TemporaryDirectory() as tmpdir: - out = Path(tmpdir) / "model.json" - model.save_model(out) - fs.makedirs(MODEL_FILE.parent, exist_ok=True) - fs.put(out, MODEL_FILE) - return model - - -def list_training_data_files(): - data_dir = RESULTS_DIR / "europe" / "brass" - return list(data_dir.rglob("*.parquet")) - - -@flow(log_prints=True) -def update_model(): - files = list_training_data_files() - if not files: - print("No training data available") - return - train(files[0]) - print(f"Updated model at {MODEL_FILE}") diff --git a/serve_model.py b/serve_model.py deleted file mode 100644 index a99a9df..0000000 --- a/serve_model.py +++ /dev/null @@ -1,49 +0,0 @@ -from pydoc import locate - -import numpy as np -import uvicorn -import xgboost as xgb -from fastapi import FastAPI -from pydantic import BaseModel - -from pipeline.settings import MODEL_FILE, fs - -model = xgb.XGBClassifier() -with fs.open(MODEL_FILE, mode="rb") as f: - model.load_model(bytearray(f.read())) - - -def create_type_instance(type_name: str): - return locate(type_name).__call__() - - -def get_features_dict(model): - feature_names = model.get_booster().feature_names - feature_types = list(map(create_type_instance, model.get_booster().feature_types)) - return dict(zip(feature_names, feature_types)) - - -def create_input_features_class(model): - return type("InputFeatures", (BaseModel,), get_features_dict(model)) - - -InputFeatures = create_input_features_class(model) -app = FastAPI() - - -@app.get("/predict", response_model=list) -async def predict_post(datas: list[InputFeatures]): - return model.predict( - np.asarray([list(data.__dict__.values()) for data in datas]) - ).tolist() - - -@app.get("/health") -async def service_health(): - """Return service health""" - return {"ok"} - - -if __name__ == "__main__": - print(get_features_dict(model)) - uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/workflow.py b/workflow.py index 318c59d..3d2669f 100644 --- a/workflow.py +++ b/workflow.py @@ -2,43 +2,38 @@ from prefect import serve +from pipeline.dashboard import deploy_dashboard from pipeline.data import generate_data -from pipeline.monitor import check_model_endpoint from pipeline.preprocess import compact_tables, json_to_parquet from pipeline.reduce import query_reduce -from pipeline.train import update_model if __name__ == "__main__": + data = generate_data.to_deployment( name="generate_data", - interval=timedelta(seconds=30), + interval=timedelta(minutes=15), ) preprocess = json_to_parquet.to_deployment( name="preprocess", - interval=timedelta(minutes=1), + interval=timedelta(minutes=15), ) compact = compact_tables.to_deployment( name="compact", - interval=timedelta(minutes=30), + interval=timedelta(hours=6), ) reduce = query_reduce.to_deployment( name="reduce", - interval=timedelta(hours=1), + interval=timedelta(days=1), ) - train = update_model.to_deployment( - name="train", - interval=timedelta(hours=6), + dashboard = deploy_dashboard.to_deployment( + name="dashboard", + interval=timedelta(minutes=5), ) - monitor = check_model_endpoint.to_deployment( - name="monitor", - interval=timedelta(minutes=1), - ) - + deploy_dashboard() serve( data, preprocess, compact, reduce, - train, - # monitor, + dashboard, )