Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates from long-running tests #19

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Then run each command below in separate terminal windows:
python serve_model.py # Serve ML model
```
```bash
python pipeline.py # Run ETL pipeline
python workflow.py # Run ETL pipeline
```
```bash
streamlit run dashboard.py # Serve dashboard
Expand Down
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ dependencies:
- filelock
- streamlit
- s3fs
- universal_pathlib
- universal_pathlib <0.2.0
- boto3
# - awscli
- dask-deltatable
Empty file removed pipeline/__init__.py
Empty file.
29 changes: 24 additions & 5 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
import coiled
import duckdb
import psutil
from dask.distributed import print
from prefect import flow, task

from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs
from .settings import (
LOCAL,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_generate,
)


@task(log_prints=True)
@coiled.function(
name="data-generation",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def generate(scale: float, path: os.PathLike) -> None:
static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"]
with duckdb.connect() as con:
con.install_extension("tpch")
con.load_extension("tpch")
Expand Down Expand Up @@ -55,6 +66,12 @@ def generate(scale: float, path: os.PathLike) -> None:
.column("table_name")
)
for table in map(str, tables):
if table in static_tables and (
list((STAGING_JSON_DIR / table).rglob("*.json"))
or list((STAGING_PARQUET_DIR / table).rglob("*.parquet"))
):
print(f"Static table {table} already exists")
continue
print(f"Exporting table: {table}")
stmt = f"""select * from {table}"""
df = con.sql(stmt).arrow()
Expand All @@ -77,7 +94,9 @@ def generate(scale: float, path: os.PathLike) -> None:

@flow
def generate_data():
generate(
scale=0.01,
path=STAGING_JSON_DIR,
)
with lock_generate:
generate(
scale=0.01,
path=STAGING_JSON_DIR,
)
generate.fn.client.restart()
34 changes: 22 additions & 12 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import coiled
import dask
import deltalake
import pandas as pd
import pyarrow as pa
from dask.distributed import print
from prefect import flow, task
from prefect.tasks import exponential_backoff

Expand All @@ -13,11 +13,11 @@
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_compact,
lock_json_to_parquet,
storage_options,
)

dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task(
log_prints=True,
Expand All @@ -26,9 +26,10 @@
retry_jitter_factor=1,
)
@coiled.function(
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def json_file_to_parquet(file):
Expand All @@ -50,7 +51,6 @@ def archive_json_file(file):
outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
fs.mv(str(file), str(outfile))
print(f"Archived {str(outfile)}")

return outfile

Expand All @@ -61,16 +61,20 @@ def list_new_json_files():

@flow(log_prints=True)
def json_to_parquet():
files = list_new_json_files()
files = json_file_to_parquet.map(files)
archive_json_file.map(files)
with lock_json_to_parquet:
files = list_new_json_files()
files = json_file_to_parquet.map(files)
futures = archive_json_file.map(files)
for f in futures:
print(f"Archived {str(f.result())}")


@task(log_prints=True)
@coiled.function(
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def compact(table):
Expand All @@ -79,15 +83,21 @@ def compact(table):
t = deltalake.DeltaTable(table, storage_options=storage_options)
t.optimize.compact()
t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
return table


@task
def list_tables():
directories = fs.ls(STAGING_PARQUET_DIR)
if not fs.exists(STAGING_PARQUET_DIR):
return []
directories = fs.ls(STAGING_PARQUET_DIR, refresh=True)
return directories


@flow(log_prints=True)
def compact_tables():
tables = list_tables()
compact.map(tables)
with lock_compact:
tables = list_tables()
futures = compact.map(tables)
for f in futures:
print(f"Finished compacting {f.result()} table")
80 changes: 48 additions & 32 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import functools
import itertools
import uuid

import coiled
import dask
import dask_deltatable
from dask.distributed import LocalCluster
from prefect import flow, task

from .settings import LOCAL, REDUCED_DATA_DIR, REGION, STAGING_PARQUET_DIR, fs

dask.config.set({"coiled.use_aws_creds_endpoint": False})
from .settings import (
LOCAL,
REDUCED_DATA_DIR,
REGION,
STAGING_PARQUET_DIR,
fs,
lock_compact,
storage_options,
)


@task
Expand All @@ -20,22 +24,37 @@ def save_query(region, part_type):
cluster = LocalCluster
else:
cluster = functools.partial(
coiled.Cluster, region=REGION, tags={"workflow": "etl-tpch"}
coiled.Cluster,
name="reduce",
region=REGION,
n_workers=10,
tags={"workflow": "etl-tpch"},
shutdown_on_close=False,
idle_timeout="5 minutes",
wait_for_workers=True,
)

with cluster() as cluster:
with cluster.get_client():
size = 15
region_ds = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "region")
region_ds = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "region"),
delta_storage_options=storage_options,
)
nation_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "nation"
str(STAGING_PARQUET_DIR / "nation"),
delta_storage_options=storage_options,
)
supplier_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "supplier"
str(STAGING_PARQUET_DIR / "supplier"),
delta_storage_options=storage_options,
)
part_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options
)
part_filtered = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "part")
partsupp_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "partsupp"
str(STAGING_PARQUET_DIR / "partsupp"),
delta_storage_options=storage_options,
)

region_filtered = region_ds[(region_ds["r_name"] == region.upper())]
Expand Down Expand Up @@ -91,32 +110,29 @@ def save_query(region, part_type):
.sort_values(
by=[
"s_acctbal",
# "n_name",
# "s_name",
# "p_partkey",
"n_name",
"s_name",
"p_partkey",
],
ascending=[
False,
True,
True,
True,
],
# ascending=[
# False,
# True,
# True,
# True,
# ],
)
.head(100, compute=False)
.head(100)
)

outdir = REDUCED_DATA_DIR / region / part_type
fs.makedirs(outdir, exist_ok=True)

def name(_):
return f"{uuid.uuid4()}.snappy.parquet"

result.to_parquet(outdir, compression="snappy", name_function=name)
outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet"
fs.makedirs(outfile.parent, exist_ok=True)
result.to_parquet(outfile, compression="snappy")


@flow
def query_reduce():
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
with lock_compact:
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
63 changes: 0 additions & 63 deletions pipeline/resize.py

This file was deleted.

9 changes: 7 additions & 2 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
import fsspec
from filelock import FileLock
from upath import UPath as Path

LOCAL = True
Expand All @@ -12,8 +13,8 @@
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3")
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3", use_listings_cache=False)
# Find cloud region being used
bucket = str(ROOT).replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
Expand All @@ -28,3 +29,7 @@
REDUCED_DATA_DIR = ROOT / "reduced"
MODEL_FILE = ROOT.parent / "model.json"
MODEL_SERVER_FILE = ROOT.parent / "serve_model.py"

lock_generate = FileLock("generate.lock")
lock_json_to_parquet = FileLock("json_to_parquet.lock")
lock_compact = FileLock("compact.lock")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess these work even in the cloud setting because this is all on the Prefect VM, right? But they have to be file locks rather than threading.Locks because prefect will create many processes?

Copy link
Member Author

@jrbourbeau jrbourbeau Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct

Also, I was using Prefect's built in concurrency limiting functionality https://docs.prefect.io/latest/guides/global-concurrency-limits/, which I like the idea of more than using filelocks. But we would sporadically run into deadlocks which is why I switched back to filelocks.

Also opened this issue PrefectHQ/prefect#12015 as a possible alternative

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably what we actually want here is something like ...

if lock.acquired:
    return

with lock:
    ...

Because we don't want things to pile up.

Not saying that we should actually do this, just mentioning it. Short term we could also make our own limiting decorator. My guess is that this makes more sense / is alot easier with prefect serve than it does with other Prefect deployment strategies.

Loading
Loading