diff --git a/README.md b/README.md index bb9aa07..2238c1d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Then run each command below in separate terminal windows: python serve_model.py # Serve ML model ``` ```bash -python pipeline.py # Run ETL pipeline +python workflow.py # Run ETL pipeline ``` ```bash streamlit run dashboard.py # Serve dashboard diff --git a/environment.yml b/environment.yml index 1d09f6b..37dafa1 100644 --- a/environment.yml +++ b/environment.yml @@ -15,5 +15,7 @@ dependencies: - filelock - streamlit - s3fs - - universal_pathlib + - universal_pathlib <0.2.0 - boto3 + # - awscli + - dask-deltatable diff --git a/pipeline/__init__.py b/pipeline/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/data.py b/pipeline/data.py index 031638e..9f2eb45 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -5,18 +5,29 @@ import coiled import duckdb import psutil +from dask.distributed import print from prefect import flow, task -from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs +from .settings import ( + LOCAL, + REGION, + STAGING_JSON_DIR, + STAGING_PARQUET_DIR, + fs, + lock_generate, +) @task(log_prints=True) @coiled.function( + name="data-generation", local=LOCAL, region=REGION, + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def generate(scale: float, path: os.PathLike) -> None: + static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"] with duckdb.connect() as con: con.install_extension("tpch") con.load_extension("tpch") @@ -55,6 +66,12 @@ def generate(scale: float, path: os.PathLike) -> None: .column("table_name") ) for table in map(str, tables): + if table in static_tables and ( + list((STAGING_JSON_DIR / table).rglob("*.json")) + or list((STAGING_PARQUET_DIR / table).rglob("*.parquet")) + ): + print(f"Static table {table} already exists") + continue print(f"Exporting table: {table}") stmt = f"""select * from {table}""" df = con.sql(stmt).arrow() @@ -77,7 +94,9 @@ def generate(scale: float, path: os.PathLike) -> None: @flow def generate_data(): - generate( - scale=0.01, - path=STAGING_JSON_DIR, - ) + with lock_generate: + generate( + scale=0.01, + path=STAGING_JSON_DIR, + ) + generate.fn.client.restart() diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 9aad365..0a64787 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -1,8 +1,8 @@ import coiled -import dask import deltalake import pandas as pd import pyarrow as pa +from dask.distributed import print from prefect import flow, task from prefect.tasks import exponential_backoff @@ -13,11 +13,11 @@ STAGING_JSON_DIR, STAGING_PARQUET_DIR, fs, + lock_compact, + lock_json_to_parquet, storage_options, ) -dask.config.set({"coiled.use_aws_creds_endpoint": False}) - @task( log_prints=True, @@ -26,9 +26,10 @@ retry_jitter_factor=1, ) @coiled.function( + name="data-etl", local=LOCAL, region=REGION, - keepalive="10 minutes", + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def json_file_to_parquet(file): @@ -50,7 +51,6 @@ def archive_json_file(file): outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR) fs.makedirs(outfile.parent, exist_ok=True) fs.mv(str(file), str(outfile)) - print(f"Archived {str(outfile)}") return outfile @@ -61,16 +61,20 @@ def list_new_json_files(): @flow(log_prints=True) def json_to_parquet(): - files = list_new_json_files() - files = json_file_to_parquet.map(files) - archive_json_file.map(files) + with lock_json_to_parquet: + files = list_new_json_files() + files = json_file_to_parquet.map(files) + futures = archive_json_file.map(files) + for f in futures: + print(f"Archived {str(f.result())}") @task(log_prints=True) @coiled.function( + name="data-etl", local=LOCAL, region=REGION, - keepalive="10 minutes", + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def compact(table): @@ -79,15 +83,21 @@ def compact(table): t = deltalake.DeltaTable(table, storage_options=storage_options) t.optimize.compact() t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) + return table @task def list_tables(): - directories = fs.ls(STAGING_PARQUET_DIR) + if not fs.exists(STAGING_PARQUET_DIR): + return [] + directories = fs.ls(STAGING_PARQUET_DIR, refresh=True) return directories @flow(log_prints=True) def compact_tables(): - tables = list_tables() - compact.map(tables) + with lock_compact: + tables = list_tables() + futures = compact.map(tables) + for f in futures: + print(f"Finished compacting {f.result()} table") diff --git a/pipeline/reduce.py b/pipeline/reduce.py index 83087d4..18c0a62 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -1,16 +1,20 @@ import functools import itertools -import uuid import coiled -import dask import dask_deltatable from dask.distributed import LocalCluster from prefect import flow, task -from .settings import LOCAL, REDUCED_DATA_DIR, REGION, STAGING_PARQUET_DIR, fs - -dask.config.set({"coiled.use_aws_creds_endpoint": False}) +from .settings import ( + LOCAL, + REDUCED_DATA_DIR, + REGION, + STAGING_PARQUET_DIR, + fs, + lock_compact, + storage_options, +) @task @@ -20,22 +24,37 @@ def save_query(region, part_type): cluster = LocalCluster else: cluster = functools.partial( - coiled.Cluster, region=REGION, tags={"workflow": "etl-tpch"} + coiled.Cluster, + name="reduce", + region=REGION, + n_workers=10, + tags={"workflow": "etl-tpch"}, + shutdown_on_close=False, + idle_timeout="5 minutes", + wait_for_workers=True, ) with cluster() as cluster: with cluster.get_client(): size = 15 - region_ds = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "region") + region_ds = dask_deltatable.read_deltalake( + str(STAGING_PARQUET_DIR / "region"), + delta_storage_options=storage_options, + ) nation_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "nation" + str(STAGING_PARQUET_DIR / "nation"), + delta_storage_options=storage_options, ) supplier_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "supplier" + str(STAGING_PARQUET_DIR / "supplier"), + delta_storage_options=storage_options, + ) + part_filtered = dask_deltatable.read_deltalake( + str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options ) - part_filtered = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "part") partsupp_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "partsupp" + str(STAGING_PARQUET_DIR / "partsupp"), + delta_storage_options=storage_options, ) region_filtered = region_ds[(region_ds["r_name"] == region.upper())] @@ -91,32 +110,29 @@ def save_query(region, part_type): .sort_values( by=[ "s_acctbal", - # "n_name", - # "s_name", - # "p_partkey", + "n_name", + "s_name", + "p_partkey", + ], + ascending=[ + False, + True, + True, + True, ], - # ascending=[ - # False, - # True, - # True, - # True, - # ], ) - .head(100, compute=False) + .head(100) ) - outdir = REDUCED_DATA_DIR / region / part_type - fs.makedirs(outdir, exist_ok=True) - - def name(_): - return f"{uuid.uuid4()}.snappy.parquet" - - result.to_parquet(outdir, compression="snappy", name_function=name) + outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet" + fs.makedirs(outfile.parent, exist_ok=True) + result.to_parquet(outfile, compression="snappy") @flow def query_reduce(): - regions = ["europe", "africa", "america", "asia", "middle east"] - part_types = ["copper", "brass", "tin", "nickel", "steel"] - for region, part_type in itertools.product(regions, part_types): - save_query(region, part_type) + with lock_compact: + regions = ["europe", "africa", "america", "asia", "middle east"] + part_types = ["copper", "brass", "tin", "nickel", "steel"] + for region, part_type in itertools.product(regions, part_types): + save_query(region, part_type) diff --git a/pipeline/resize.py b/pipeline/resize.py deleted file mode 100644 index 4a2c88d..0000000 --- a/pipeline/resize.py +++ /dev/null @@ -1,63 +0,0 @@ -import itertools -import operator -import uuid - -import coiled -import dask_expr as dd -from filelock import FileLock -from prefect import flow, task - -from .settings import ( - LOCAL, - PROCESSED_DATA_DIR, - RAW_PARQUET_DIR, - REGION, - STAGING_PARQUET_DIR, - fs, -) - -# TODO: Couldn't figure out how to limit concurrent flow runs -# in Prefect, so am using a file lock... -lock = FileLock("resize.lock") - - -@task -@coiled.function( - local=LOCAL, - region=REGION, - tags={"workflow": "etl-tpch"}, -) -def repartition_table(files, table): - df = dd.read_parquet(files) - df = df.repartition(partition_size="128 MiB") - outdir = PROCESSED_DATA_DIR / table - fs.makedirs(outdir, exist_ok=True) - - def name(_): - return f"{table}_{uuid.uuid4()}.snappy.parquet" - - df.to_parquet(outdir, compression="snappy", name_function=name) - - -@task -def archive_parquet_files(files): - # Move original staged files to long-term storage - for file in files: - outfile = RAW_PARQUET_DIR / file.relative_to(STAGING_PARQUET_DIR) - fs.makedirs(outfile.parent, exist_ok=True) - # Need str(...), otherwise, `TypeError: 'S3Path' object is not iterable` - fs.mv(str(file), str(outfile)) - print(f"Archived {str(outfile)}") - - -@flow(log_prints=True) -def resize_parquet(): - """Repartition small Parquet files for future analysis""" - with lock: - files = list(STAGING_PARQUET_DIR.rglob("*.parquet")) - for table, group in itertools.groupby( - files, key=operator.attrgetter("parent.stem") - ): - files_ = list(group) - futures = repartition_table.submit(files_, table=table) - archive_parquet_files.submit(files_, wait_for=futures) diff --git a/pipeline/settings.py b/pipeline/settings.py index 7ec3a95..9824e1f 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -1,5 +1,6 @@ import boto3 import fsspec +from filelock import FileLock from upath import UPath as Path LOCAL = True @@ -12,8 +13,8 @@ storage_options = {} else: # TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp) - ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data") - fs = fsspec.filesystem("s3") + ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data") + fs = fsspec.filesystem("s3", use_listings_cache=False) # Find cloud region being used bucket = str(ROOT).replace("s3://", "").split("/")[0] resp = boto3.client("s3").get_bucket_location(Bucket=bucket) @@ -28,3 +29,7 @@ REDUCED_DATA_DIR = ROOT / "reduced" MODEL_FILE = ROOT.parent / "model.json" MODEL_SERVER_FILE = ROOT.parent / "serve_model.py" + +lock_generate = FileLock("generate.lock") +lock_json_to_parquet = FileLock("json_to_parquet.lock") +lock_compact = FileLock("compact.lock") diff --git a/pipeline/train.py b/pipeline/train.py index d6db8ef..c3a6f91 100644 --- a/pipeline/train.py +++ b/pipeline/train.py @@ -10,15 +10,16 @@ @task @coiled.function( + name="train", local=LOCAL, region=REGION, tags={"workflow": "etl-tpch"}, ) -def train(): - df = pd.read_parquet(REDUCED_DATA_DIR / "europe" / "brass") +def train(file): + df = pd.read_parquet(file) X = df[["p_partkey", "s_acctbal"]] y = df["n_name"].map( - {"FRANCE": 0, "UNITED KINGDOM": 1, "RUSSIA": 2, "GERMANY": 3, "ROMANIA": 4} + {"GERMANY": 0, "ROMANIA": 1, "RUSSIA": 2, "FRANCE": 3, "UNITED KINGDOM": 4} ) model = xgb.XGBClassifier() if MODEL_FILE.exists(): @@ -30,11 +31,21 @@ def train(): with tempfile.TemporaryDirectory() as tmpdir: out = Path(tmpdir) / "model.json" model.save_model(out) - fs.mv(str(out), str(MODEL_FILE.parent)) + fs.makedirs(MODEL_FILE.parent, exist_ok=True) + fs.put(out, MODEL_FILE) return model +def list_training_data_files(): + data_dir = REDUCED_DATA_DIR / "europe" / "brass" + return list(data_dir.rglob("*.parquet")) + + @flow(log_prints=True) def update_model(): - train() + files = list_training_data_files() + if not files: + print("No training data available") + return + train(files[0]) print(f"Updated model at {MODEL_FILE}") diff --git a/pipeline.py b/workflow.py similarity index 80% rename from pipeline.py rename to workflow.py index dbf914a..318c59d 100644 --- a/pipeline.py +++ b/workflow.py @@ -11,27 +11,27 @@ if __name__ == "__main__": data = generate_data.to_deployment( name="generate_data", - interval=timedelta(seconds=20), + interval=timedelta(seconds=30), ) preprocess = json_to_parquet.to_deployment( name="preprocess", - interval=timedelta(seconds=60), + interval=timedelta(minutes=1), ) compact = compact_tables.to_deployment( name="compact", - interval=timedelta(minutes=5), + interval=timedelta(minutes=30), ) reduce = query_reduce.to_deployment( name="reduce", - interval=timedelta(minutes=1), + interval=timedelta(hours=1), ) train = update_model.to_deployment( name="train", - interval=timedelta(minutes=2), + interval=timedelta(hours=6), ) monitor = check_model_endpoint.to_deployment( name="monitor", - interval=timedelta(seconds=10), + interval=timedelta(minutes=1), ) serve( @@ -40,5 +40,5 @@ compact, reduce, train, - monitor, + # monitor, )