Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update workflow #24

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 57 additions & 45 deletions dashboard.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,74 @@
import dask.dataframe as dd
import plotly.express as px
import datetime
import time

import pandas as pd
import streamlit as st

from pipeline.settings import RESULTS_DIR


@st.cache_data
def get_data(region, part_type):
return dd.read_parquet(
RESULTS_DIR / region / part_type.upper() / "*.parquet"
).compute()

def get_data(segment):
return pd.read_parquet(RESULTS_DIR / f"{segment.lower()}.snappy.parquet")

description = """
### Recommended Suppliers
_Some text that explains the business problem being addressed..._

This query finds which supplier should be selected to place an order for a given part in a given region.
st.markdown(
"""
### Top Unshipped Orders
_Top 50 unshipped orders with the highest revenue._
"""
st.markdown(description)
regions = list(map(str.title, ["EUROPE", "AFRICA", "AMERICA", "ASIA", "MIDDLE EAST"]))
region = st.selectbox(
"Region",
regions,
index=None,
placeholder="Please select a region...",
)
part_types = list(map(str.title, ["COPPER", "BRASS", "TIN", "NICKEL", "STEEL"]))
part_type = st.selectbox(
"Part Type",
part_types,

SEGMENTS = ["automobile", "building", "furniture", "machinery", "household"]


def files_exist():
# Do we have all the files needed for the dashboard?
files = list(RESULTS_DIR.rglob("*.snappy.parquet"))
return len(files) == len(SEGMENTS)


with st.spinner("Waiting for data..."):
while not files_exist():
time.sleep(5)

segments = list(
map(str.title, ["automobile", "building", "furniture", "machinery", "household"])
)
segment = st.selectbox(
"Segment",
segments,
index=None,
placeholder="Please select a part type...",
placeholder="Please select a product segment...",
)
if region and part_type:
df = get_data(region, part_type)
if segment:
df = get_data(segment)
df = df.drop(columns="o_shippriority")
df["l_orderkey"] = df["l_orderkey"].map(lambda x: f"{x:09}")
df["revenue"] = df["revenue"].round(2)
now = datetime.datetime.now()
dt = now.date() - datetime.date(1995, 3, 15)
df["o_orderdate"] = (df["o_orderdate"] + dt).dt.date
df = df.rename(
columns={
"n_name": "Country",
"s_name": "Supplier",
"s_acctbal": "Balance",
"p_partkey": "Part ID",
"l_orderkey": "Order ID",
"o_orderdate": "Date Ordered",
"revenue": "Revenue",
}
)
maxes = df.groupby("Country").Balance.idxmax()
data = df.loc[maxes]
figure = px.choropleth(
data,
locationmode="country names",
locations="Country",
featureidkey="Supplier",
color="Balance",
color_continuous_scale="viridis",
hover_data=["Country", "Supplier", "Balance"],

df = df.set_index("Order ID")
st.dataframe(
df.style.format({"Revenue": "${:,}"}),
column_config={
"Date Ordered": st.column_config.DateColumn(
"Date Ordered",
format="MM/DD/YYYY",
help="Date order was placed",
),
"Revenue": st.column_config.NumberColumn(
"Revenue (in USD)",
help="Total revenue of order",
),
},
)
st.plotly_chart(figure, theme="streamlit", use_container_width=True)
on = st.toggle("Show data")
if on:
st.write(
df[["Country", "Supplier", "Balance", "Part ID"]], use_container_width=True
)
5 changes: 4 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ dependencies:
- s3fs
- universal_pathlib <0.2.0
- boto3
- dask-deltatable
# - dask-deltatable
- pip
- pip:
- git+https://github.com/fjetter/dask-deltatable.git@dask_expr
14 changes: 9 additions & 5 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
name="data-generation",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
vm_type="m6i.2xlarge",
tags={"workflow": "etl-tpch"},
)
def generate(scale: float, path: os.PathLike) -> None:
Expand Down Expand Up @@ -51,15 +51,20 @@ def generate(scale: float, path: os.PathLike) -> None:
continue
print(f"Exporting table: {table}")
stmt = f"""select * from {table}"""
df = con.sql(stmt).arrow()
df = con.sql(stmt).df()
# TODO: Increment the order key in the `lineitem` and `orders`
# tables each time the flow is run to produce unique transactions
# xref https://discourse.prefect.io/t/how-to-get-flow-count/3996
# if table in ["lineitem", "orders"]:
# df[f"{table[0]}_orderkey"] += counter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to do more than just add now - 1995 or whatever. We might want to do an affine transform so that the entire previous range (like 1985-1995) gets squeezed into the last hour.


outfile = (
path
/ table
/ f"{table}_{datetime.datetime.now().isoformat().split('.')[0]}.json"
)
fs.makedirs(outfile.parent, exist_ok=True)
df.to_pandas().to_json(
df.to_json(
outfile,
date_format="iso",
orient="records",
Expand All @@ -73,7 +78,6 @@ def generate(scale: float, path: os.PathLike) -> None:
def generate_data():
with lock_generate:
generate(
scale=0.01,
scale=1,
path=STAGING_DIR,
)
generate.fn.client.restart(wait_for_workers=False)
9 changes: 0 additions & 9 deletions pipeline/monitor.py

This file was deleted.

25 changes: 8 additions & 17 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from prefect.tasks import exponential_backoff

from .settings import (
ARCHIVE_DIR,
LOCAL,
PROCESSED_DIR,
REGION,
Expand All @@ -29,32 +28,26 @@
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
vm_type="m6i.2xlarge",
tags={"workflow": "etl-tpch"},
)
def json_file_to_parquet(file):
"""Convert raw JSON data file to Parquet."""
print(f"Processing {file}")
df = pd.read_json(file, lines=True)
cols = [c for c in df.columns if "date" in c]
for col in cols:
df[col] = pd.to_datetime(df[col])
outfile = PROCESSED_DIR / file.parent.name
fs.makedirs(outfile.parent, exist_ok=True)
data = pa.Table.from_pandas(df, preserve_index=False)
deltalake.write_deltalake(
outfile, data, mode="append", storage_options=storage_options
)
print(f"Saved {outfile}")
fs.rm(str(file))
return file


@task
def archive_json_file(file):
outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
fs.mv(str(file), str(outfile))

return outfile


def list_new_json_files():
return list(STAGING_DIR.rglob("*.json"))

Expand All @@ -63,18 +56,16 @@ def list_new_json_files():
def json_to_parquet():
with lock_json_to_parquet:
files = list_new_json_files()
files = json_file_to_parquet.map(files)
futures = archive_json_file.map(files)
futures = json_file_to_parquet.map(files)
for f in futures:
print(f"Archived {str(f.result())}")
print(f"Processed {str(f.result())}")


@task(log_prints=True)
@coiled.function(
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
vm_type="m6i.xlarge",
tags={"workflow": "etl-tpch"},
)
def compact(table):
Expand Down
Loading
Loading