From 1d1f5160e36b7e9599ba8b8cabe671de1cb5a06e Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 16 Feb 2024 10:19:06 -0600 Subject: [PATCH 1/4] Updates --- environment.yml | 4 ++- pipeline.py | 16 +++++------ pipeline/data.py | 41 ++++++++++++++++++++++++++- pipeline/preprocess.py | 28 +++++++++++++------ pipeline/reduce.py | 54 +++++++++++++++++++++++++----------- pipeline/resize.py | 63 ------------------------------------------ pipeline/settings.py | 5 ++-- pipeline/train.py | 15 ++++++++-- 8 files changed, 124 insertions(+), 102 deletions(-) delete mode 100644 pipeline/resize.py diff --git a/environment.yml b/environment.yml index 1d09f6b..37dafa1 100644 --- a/environment.yml +++ b/environment.yml @@ -15,5 +15,7 @@ dependencies: - filelock - streamlit - s3fs - - universal_pathlib + - universal_pathlib <0.2.0 - boto3 + # - awscli + - dask-deltatable diff --git a/pipeline.py b/pipeline.py index dbf914a..0d84b1b 100644 --- a/pipeline.py +++ b/pipeline.py @@ -1,4 +1,4 @@ -from datetime import timedelta +# from datetime import timedelta from prefect import serve @@ -11,27 +11,27 @@ if __name__ == "__main__": data = generate_data.to_deployment( name="generate_data", - interval=timedelta(seconds=20), + # interval=timedelta(seconds=30), ) preprocess = json_to_parquet.to_deployment( name="preprocess", - interval=timedelta(seconds=60), + # interval=timedelta(minutes=1), ) compact = compact_tables.to_deployment( name="compact", - interval=timedelta(minutes=5), + # interval=timedelta(minutes=5), ) reduce = query_reduce.to_deployment( name="reduce", - interval=timedelta(minutes=1), + # interval=timedelta(minutes=30), ) train = update_model.to_deployment( name="train", - interval=timedelta(minutes=2), + # interval=timedelta(minutes=45), ) monitor = check_model_endpoint.to_deployment( name="monitor", - interval=timedelta(seconds=10), + # interval=timedelta(seconds=30), ) serve( @@ -40,5 +40,5 @@ compact, reduce, train, - monitor, + # monitor, ) diff --git a/pipeline/data.py b/pipeline/data.py index 031638e..b8563c9 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -5,6 +5,7 @@ import coiled import duckdb import psutil +import pyarrow.compute as pc from prefect import flow, task from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs @@ -12,8 +13,10 @@ @task(log_prints=True) @coiled.function( + name="data-generation", local=LOCAL, region=REGION, + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def generate(scale: float, path: os.PathLike) -> None: @@ -49,6 +52,10 @@ def generate(scale: float, path: os.PathLike) -> None: con.sql(query) print("Finished generating data, exporting...") + print("Converting types date -> timestamp_s and decimal -> double") + _alter_tables(con) + print("Done altering tables") + tables = ( con.sql("select * from information_schema.tables") .arrow() @@ -75,9 +82,41 @@ def generate(scale: float, path: os.PathLike) -> None: print("Finished exporting all data") +def _alter_tables(con): + """ + Temporary, used for debugging performance in data types. + + ref discussion here: https://github.com/coiled/benchmarks/pull/1131 + """ + tables = [ + "nation", + "region", + "customer", + "supplier", + "lineitem", + "orders", + "partsupp", + "part", + ] + for table in tables: + schema = con.sql(f"describe {table}").arrow() + + # alter decimals to floats + for column in schema.filter( + pc.match_like(pc.field("column_type"), "DECIMAL%") + ).column("column_name"): + con.sql(f"alter table {table} alter {column} type double") + + # alter date to timestamp_s + for column in schema.filter(pc.field("column_type") == "DATE").column( + "column_name" + ): + con.sql(f"alter table {table} alter {column} type timestamp_s") + + @flow def generate_data(): generate( - scale=0.01, + scale=0.02, path=STAGING_JSON_DIR, ) diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 9aad365..9990ae9 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -3,7 +3,9 @@ import deltalake import pandas as pd import pyarrow as pa +from dask.distributed import print from prefect import flow, task +from prefect.concurrency.sync import concurrency from prefect.tasks import exponential_backoff from .settings import ( @@ -26,6 +28,7 @@ retry_jitter_factor=1, ) @coiled.function( + name="preprocessing", local=LOCAL, region=REGION, keepalive="10 minutes", @@ -50,7 +53,6 @@ def archive_json_file(file): outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR) fs.makedirs(outfile.parent, exist_ok=True) fs.mv(str(file), str(outfile)) - print(f"Archived {str(outfile)}") return outfile @@ -61,13 +63,17 @@ def list_new_json_files(): @flow(log_prints=True) def json_to_parquet(): - files = list_new_json_files() - files = json_file_to_parquet.map(files) - archive_json_file.map(files) + with concurrency("json_to_parquet", occupy=1): + files = list_new_json_files() + files = json_file_to_parquet.map(files) + futures = archive_json_file.map(files) + for f in futures: + print(f"Archived {str(f.result())}") @task(log_prints=True) @coiled.function( + name="preprocessing", local=LOCAL, region=REGION, keepalive="10 minutes", @@ -78,16 +84,22 @@ def compact(table): table = table if LOCAL else f"s3://{table}" t = deltalake.DeltaTable(table, storage_options=storage_options) t.optimize.compact() - t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) + # t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) + return table @task def list_tables(): - directories = fs.ls(STAGING_PARQUET_DIR) + if not fs.exists(STAGING_PARQUET_DIR): + return [] + directories = fs.ls(STAGING_PARQUET_DIR, refresh=True) return directories @flow(log_prints=True) def compact_tables(): - tables = list_tables() - compact.map(tables) + with concurrency("compact", occupy=1): + tables = list_tables() + futures = compact.map(tables) + for f in futures: + print(f"Finished compacting {f.result()} table") diff --git a/pipeline/reduce.py b/pipeline/reduce.py index 83087d4..8696e3b 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -8,7 +8,14 @@ from dask.distributed import LocalCluster from prefect import flow, task -from .settings import LOCAL, REDUCED_DATA_DIR, REGION, STAGING_PARQUET_DIR, fs +from .settings import ( + LOCAL, + REDUCED_DATA_DIR, + REGION, + STAGING_PARQUET_DIR, + fs, + storage_options, +) dask.config.set({"coiled.use_aws_creds_endpoint": False}) @@ -20,22 +27,37 @@ def save_query(region, part_type): cluster = LocalCluster else: cluster = functools.partial( - coiled.Cluster, region=REGION, tags={"workflow": "etl-tpch"} + coiled.Cluster, + name="reduce", + region=REGION, + n_workers=20, + tags={"workflow": "etl-tpch"}, + shutdown_on_close=False, + idle_timeout="3 minutes", + wait_for_workers=True, ) with cluster() as cluster: with cluster.get_client(): size = 15 - region_ds = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "region") + region_ds = dask_deltatable.read_deltalake( + str(STAGING_PARQUET_DIR / "region"), + delta_storage_options=storage_options, + ) nation_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "nation" + str(STAGING_PARQUET_DIR / "nation"), + delta_storage_options=storage_options, ) supplier_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "supplier" + str(STAGING_PARQUET_DIR / "supplier"), + delta_storage_options=storage_options, + ) + part_filtered = dask_deltatable.read_deltalake( + str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options ) - part_filtered = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "part") partsupp_filtered = dask_deltatable.read_deltalake( - STAGING_PARQUET_DIR / "partsupp" + str(STAGING_PARQUET_DIR / "partsupp"), + delta_storage_options=storage_options, ) region_filtered = region_ds[(region_ds["r_name"] == region.upper())] @@ -91,16 +113,16 @@ def save_query(region, part_type): .sort_values( by=[ "s_acctbal", - # "n_name", - # "s_name", - # "p_partkey", + "n_name", + "s_name", + "p_partkey", + ], + ascending=[ + False, + True, + True, + True, ], - # ascending=[ - # False, - # True, - # True, - # True, - # ], ) .head(100, compute=False) ) diff --git a/pipeline/resize.py b/pipeline/resize.py deleted file mode 100644 index 4a2c88d..0000000 --- a/pipeline/resize.py +++ /dev/null @@ -1,63 +0,0 @@ -import itertools -import operator -import uuid - -import coiled -import dask_expr as dd -from filelock import FileLock -from prefect import flow, task - -from .settings import ( - LOCAL, - PROCESSED_DATA_DIR, - RAW_PARQUET_DIR, - REGION, - STAGING_PARQUET_DIR, - fs, -) - -# TODO: Couldn't figure out how to limit concurrent flow runs -# in Prefect, so am using a file lock... -lock = FileLock("resize.lock") - - -@task -@coiled.function( - local=LOCAL, - region=REGION, - tags={"workflow": "etl-tpch"}, -) -def repartition_table(files, table): - df = dd.read_parquet(files) - df = df.repartition(partition_size="128 MiB") - outdir = PROCESSED_DATA_DIR / table - fs.makedirs(outdir, exist_ok=True) - - def name(_): - return f"{table}_{uuid.uuid4()}.snappy.parquet" - - df.to_parquet(outdir, compression="snappy", name_function=name) - - -@task -def archive_parquet_files(files): - # Move original staged files to long-term storage - for file in files: - outfile = RAW_PARQUET_DIR / file.relative_to(STAGING_PARQUET_DIR) - fs.makedirs(outfile.parent, exist_ok=True) - # Need str(...), otherwise, `TypeError: 'S3Path' object is not iterable` - fs.mv(str(file), str(outfile)) - print(f"Archived {str(outfile)}") - - -@flow(log_prints=True) -def resize_parquet(): - """Repartition small Parquet files for future analysis""" - with lock: - files = list(STAGING_PARQUET_DIR.rglob("*.parquet")) - for table, group in itertools.groupby( - files, key=operator.attrgetter("parent.stem") - ): - files_ = list(group) - futures = repartition_table.submit(files_, table=table) - archive_parquet_files.submit(files_, wait_for=futures) diff --git a/pipeline/settings.py b/pipeline/settings.py index 7ec3a95..3ca7f90 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -12,8 +12,9 @@ storage_options = {} else: # TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp) - ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data") - fs = fsspec.filesystem("s3") + ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data-test") + # ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data-test") + fs = fsspec.filesystem("s3", use_listings_cache=False) # Find cloud region being used bucket = str(ROOT).replace("s3://", "").split("/")[0] resp = boto3.client("s3").get_bucket_location(Bucket=bucket) diff --git a/pipeline/train.py b/pipeline/train.py index d6db8ef..1dc5664 100644 --- a/pipeline/train.py +++ b/pipeline/train.py @@ -14,8 +14,8 @@ region=REGION, tags={"workflow": "etl-tpch"}, ) -def train(): - df = pd.read_parquet(REDUCED_DATA_DIR / "europe" / "brass") +def train(files): + df = pd.read_parquet(files) X = df[["p_partkey", "s_acctbal"]] y = df["n_name"].map( {"FRANCE": 0, "UNITED KINGDOM": 1, "RUSSIA": 2, "GERMANY": 3, "ROMANIA": 4} @@ -34,7 +34,16 @@ def train(): return model +def list_training_data_files(): + data_dir = REDUCED_DATA_DIR / "europe" / "brass" + return list(data_dir.rglob("*.parquet")) + + @flow(log_prints=True) def update_model(): - train() + files = list_training_data_files() + if not files: + print("No training data available") + return + train(files) print(f"Updated model at {MODEL_FILE}") From 13a227026d350ebd309438848134935d10fbaad6 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 16 Feb 2024 16:47:04 -0600 Subject: [PATCH 2/4] More --- README.md | 2 +- pipeline/__init__.py | 0 pipeline/data.py | 51 ++++++++------------------------------ pipeline/preprocess.py | 20 +++++++-------- pipeline/reduce.py | 28 ++++++++------------- pipeline/settings.py | 7 ++++-- pipeline/train.py | 9 ++++--- pipeline.py => workflow.py | 14 +++++------ 8 files changed, 49 insertions(+), 82 deletions(-) delete mode 100644 pipeline/__init__.py rename pipeline.py => workflow.py (75%) diff --git a/README.md b/README.md index bb9aa07..2238c1d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Then run each command below in separate terminal windows: python serve_model.py # Serve ML model ``` ```bash -python pipeline.py # Run ETL pipeline +python workflow.py # Run ETL pipeline ``` ```bash streamlit run dashboard.py # Serve dashboard diff --git a/pipeline/__init__.py b/pipeline/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/data.py b/pipeline/data.py index b8563c9..1a29b86 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -5,21 +5,22 @@ import coiled import duckdb import psutil -import pyarrow.compute as pc +from dask.distributed import print from prefect import flow, task -from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs +from .settings import LOCAL, REGION, STAGING_JSON_DIR, STAGING_PARQUET_DIR, fs @task(log_prints=True) @coiled.function( - name="data-generation", + name="data-etl", local=LOCAL, region=REGION, keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def generate(scale: float, path: os.PathLike) -> None: + static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"] with duckdb.connect() as con: con.install_extension("tpch") con.load_extension("tpch") @@ -52,16 +53,18 @@ def generate(scale: float, path: os.PathLike) -> None: con.sql(query) print("Finished generating data, exporting...") - print("Converting types date -> timestamp_s and decimal -> double") - _alter_tables(con) - print("Done altering tables") - tables = ( con.sql("select * from information_schema.tables") .arrow() .column("table_name") ) for table in map(str, tables): + if table in static_tables and ( + list((STAGING_JSON_DIR / table).rglob("*.json")) + or list((STAGING_PARQUET_DIR / table).rglob("*.parquet")) + ): + print(f"Static table {table} already exists") + continue print(f"Exporting table: {table}") stmt = f"""select * from {table}""" df = con.sql(stmt).arrow() @@ -82,41 +85,9 @@ def generate(scale: float, path: os.PathLike) -> None: print("Finished exporting all data") -def _alter_tables(con): - """ - Temporary, used for debugging performance in data types. - - ref discussion here: https://github.com/coiled/benchmarks/pull/1131 - """ - tables = [ - "nation", - "region", - "customer", - "supplier", - "lineitem", - "orders", - "partsupp", - "part", - ] - for table in tables: - schema = con.sql(f"describe {table}").arrow() - - # alter decimals to floats - for column in schema.filter( - pc.match_like(pc.field("column_type"), "DECIMAL%") - ).column("column_name"): - con.sql(f"alter table {table} alter {column} type double") - - # alter date to timestamp_s - for column in schema.filter(pc.field("column_type") == "DATE").column( - "column_name" - ): - con.sql(f"alter table {table} alter {column} type timestamp_s") - - @flow def generate_data(): generate( - scale=0.02, + scale=0.01, path=STAGING_JSON_DIR, ) diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 9990ae9..0a64787 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -1,11 +1,9 @@ import coiled -import dask import deltalake import pandas as pd import pyarrow as pa from dask.distributed import print from prefect import flow, task -from prefect.concurrency.sync import concurrency from prefect.tasks import exponential_backoff from .settings import ( @@ -15,11 +13,11 @@ STAGING_JSON_DIR, STAGING_PARQUET_DIR, fs, + lock_compact, + lock_json_to_parquet, storage_options, ) -dask.config.set({"coiled.use_aws_creds_endpoint": False}) - @task( log_prints=True, @@ -28,10 +26,10 @@ retry_jitter_factor=1, ) @coiled.function( - name="preprocessing", + name="data-etl", local=LOCAL, region=REGION, - keepalive="10 minutes", + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def json_file_to_parquet(file): @@ -63,7 +61,7 @@ def list_new_json_files(): @flow(log_prints=True) def json_to_parquet(): - with concurrency("json_to_parquet", occupy=1): + with lock_json_to_parquet: files = list_new_json_files() files = json_file_to_parquet.map(files) futures = archive_json_file.map(files) @@ -73,10 +71,10 @@ def json_to_parquet(): @task(log_prints=True) @coiled.function( - name="preprocessing", + name="data-etl", local=LOCAL, region=REGION, - keepalive="10 minutes", + keepalive="5 minutes", tags={"workflow": "etl-tpch"}, ) def compact(table): @@ -84,7 +82,7 @@ def compact(table): table = table if LOCAL else f"s3://{table}" t = deltalake.DeltaTable(table, storage_options=storage_options) t.optimize.compact() - # t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) + t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) return table @@ -98,7 +96,7 @@ def list_tables(): @flow(log_prints=True) def compact_tables(): - with concurrency("compact", occupy=1): + with lock_compact: tables = list_tables() futures = compact.map(tables) for f in futures: diff --git a/pipeline/reduce.py b/pipeline/reduce.py index 8696e3b..c5a5928 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -1,9 +1,7 @@ import functools import itertools -import uuid import coiled -import dask import dask_deltatable from dask.distributed import LocalCluster from prefect import flow, task @@ -14,11 +12,10 @@ REGION, STAGING_PARQUET_DIR, fs, + lock_compact, storage_options, ) -dask.config.set({"coiled.use_aws_creds_endpoint": False}) - @task def save_query(region, part_type): @@ -33,7 +30,7 @@ def save_query(region, part_type): n_workers=20, tags={"workflow": "etl-tpch"}, shutdown_on_close=False, - idle_timeout="3 minutes", + idle_timeout="5 minutes", wait_for_workers=True, ) @@ -124,21 +121,18 @@ def save_query(region, part_type): True, ], ) - .head(100, compute=False) + .head(100) ) - outdir = REDUCED_DATA_DIR / region / part_type - fs.makedirs(outdir, exist_ok=True) - - def name(_): - return f"{uuid.uuid4()}.snappy.parquet" - - result.to_parquet(outdir, compression="snappy", name_function=name) + outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet" + fs.makedirs(outfile.parent, exist_ok=True) + result.to_parquet(outfile, compression="snappy") @flow def query_reduce(): - regions = ["europe", "africa", "america", "asia", "middle east"] - part_types = ["copper", "brass", "tin", "nickel", "steel"] - for region, part_type in itertools.product(regions, part_types): - save_query(region, part_type) + with lock_compact: + regions = ["europe", "africa", "america", "asia", "middle east"] + part_types = ["copper", "brass", "tin", "nickel", "steel"] + for region, part_type in itertools.product(regions, part_types): + save_query(region, part_type) diff --git a/pipeline/settings.py b/pipeline/settings.py index 3ca7f90..db3e8c6 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -1,5 +1,6 @@ import boto3 import fsspec +from filelock import FileLock from upath import UPath as Path LOCAL = True @@ -12,8 +13,7 @@ storage_options = {} else: # TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp) - ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data-test") - # ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data-test") + ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data") fs = fsspec.filesystem("s3", use_listings_cache=False) # Find cloud region being used bucket = str(ROOT).replace("s3://", "").split("/")[0] @@ -29,3 +29,6 @@ REDUCED_DATA_DIR = ROOT / "reduced" MODEL_FILE = ROOT.parent / "model.json" MODEL_SERVER_FILE = ROOT.parent / "serve_model.py" + +lock_json_to_parquet = FileLock("json_to_parquet.lock") +lock_compact = FileLock("compact.lock") diff --git a/pipeline/train.py b/pipeline/train.py index 1dc5664..f5d9f41 100644 --- a/pipeline/train.py +++ b/pipeline/train.py @@ -10,15 +10,16 @@ @task @coiled.function( + name="train", local=LOCAL, region=REGION, tags={"workflow": "etl-tpch"}, ) -def train(files): - df = pd.read_parquet(files) +def train(file): + df = pd.read_parquet(file) X = df[["p_partkey", "s_acctbal"]] y = df["n_name"].map( - {"FRANCE": 0, "UNITED KINGDOM": 1, "RUSSIA": 2, "GERMANY": 3, "ROMANIA": 4} + {"GERMANY": 0, "ROMANIA": 1, "RUSSIA": 2, "FRANCE": 3, "UNITED KINGDOM": 4} ) model = xgb.XGBClassifier() if MODEL_FILE.exists(): @@ -45,5 +46,5 @@ def update_model(): if not files: print("No training data available") return - train(files) + train(files[0]) print(f"Updated model at {MODEL_FILE}") diff --git a/pipeline.py b/workflow.py similarity index 75% rename from pipeline.py rename to workflow.py index 0d84b1b..318c59d 100644 --- a/pipeline.py +++ b/workflow.py @@ -1,4 +1,4 @@ -# from datetime import timedelta +from datetime import timedelta from prefect import serve @@ -11,27 +11,27 @@ if __name__ == "__main__": data = generate_data.to_deployment( name="generate_data", - # interval=timedelta(seconds=30), + interval=timedelta(seconds=30), ) preprocess = json_to_parquet.to_deployment( name="preprocess", - # interval=timedelta(minutes=1), + interval=timedelta(minutes=1), ) compact = compact_tables.to_deployment( name="compact", - # interval=timedelta(minutes=5), + interval=timedelta(minutes=30), ) reduce = query_reduce.to_deployment( name="reduce", - # interval=timedelta(minutes=30), + interval=timedelta(hours=1), ) train = update_model.to_deployment( name="train", - # interval=timedelta(minutes=45), + interval=timedelta(hours=6), ) monitor = check_model_endpoint.to_deployment( name="monitor", - # interval=timedelta(seconds=30), + interval=timedelta(minutes=1), ) serve( From d3d5bd713b453d45d41dc1558d3d15120344358e Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 16 Feb 2024 21:47:12 -0600 Subject: [PATCH 3/4] More --- pipeline/reduce.py | 2 +- pipeline/train.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pipeline/reduce.py b/pipeline/reduce.py index c5a5928..18c0a62 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -27,7 +27,7 @@ def save_query(region, part_type): coiled.Cluster, name="reduce", region=REGION, - n_workers=20, + n_workers=10, tags={"workflow": "etl-tpch"}, shutdown_on_close=False, idle_timeout="5 minutes", diff --git a/pipeline/train.py b/pipeline/train.py index f5d9f41..c3a6f91 100644 --- a/pipeline/train.py +++ b/pipeline/train.py @@ -31,7 +31,8 @@ def train(file): with tempfile.TemporaryDirectory() as tmpdir: out = Path(tmpdir) / "model.json" model.save_model(out) - fs.mv(str(out), str(MODEL_FILE.parent)) + fs.makedirs(MODEL_FILE.parent, exist_ok=True) + fs.put(out, MODEL_FILE) return model From 3c6565f9f4c05eeb8dc7dd8b4e46fdef8c9df7e0 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 19 Feb 2024 09:36:28 -0600 Subject: [PATCH 4/4] More --- pipeline/data.py | 21 +++++++++++++++------ pipeline/settings.py | 1 + 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pipeline/data.py b/pipeline/data.py index 1a29b86..9f2eb45 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -8,12 +8,19 @@ from dask.distributed import print from prefect import flow, task -from .settings import LOCAL, REGION, STAGING_JSON_DIR, STAGING_PARQUET_DIR, fs +from .settings import ( + LOCAL, + REGION, + STAGING_JSON_DIR, + STAGING_PARQUET_DIR, + fs, + lock_generate, +) @task(log_prints=True) @coiled.function( - name="data-etl", + name="data-generation", local=LOCAL, region=REGION, keepalive="5 minutes", @@ -87,7 +94,9 @@ def generate(scale: float, path: os.PathLike) -> None: @flow def generate_data(): - generate( - scale=0.01, - path=STAGING_JSON_DIR, - ) + with lock_generate: + generate( + scale=0.01, + path=STAGING_JSON_DIR, + ) + generate.fn.client.restart() diff --git a/pipeline/settings.py b/pipeline/settings.py index db3e8c6..9824e1f 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -30,5 +30,6 @@ MODEL_FILE = ROOT.parent / "model.json" MODEL_SERVER_FILE = ROOT.parent / "serve_model.py" +lock_generate = FileLock("generate.lock") lock_json_to_parquet = FileLock("json_to_parquet.lock") lock_compact = FileLock("compact.lock")