From 5aa1a2600070769a74e2be37e6548cbe2522334e Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 22 Oct 2024 09:55:40 -0400 Subject: [PATCH 1/7] Only return parquet metadata if intending to write --- src/dask_awkward/lib/io/parquet.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 2fc47c25..78472527 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -483,6 +483,7 @@ def __init__( npartitions: int, prefix: str | None = None, storage_options: dict | None = None, + write_metadata: bool = False, **kwargs: Any, ): self.fs = fs @@ -496,6 +497,7 @@ def __init__( if isinstance(self.fs.protocol, str) else self.fs.protocol[0] ) + self.write_metadata = write_metadata self.kwargs = kwargs def __call__(self, data, block_index): @@ -503,9 +505,11 @@ def __call__(self, data, block_index): if self.prefix is not None: filename = f"{self.prefix}-{filename}" filename = self.fs.unstrip_protocol(f"{self.path}{self.fs.sep}{filename}") - return ak.to_parquet( + out = ak.to_parquet( data, filename, **self.kwargs, storage_options=self.storage_options ) + if self.write_metadata: + return out def to_parquet( @@ -667,6 +671,7 @@ def to_parquet( parquet_old_int96_timestamps=parquet_old_int96_timestamps, parquet_compliant_nested=parquet_compliant_nested, parquet_extra_options=parquet_extra_options, + write_metadata=write_metadata, ), array, BlockIndex((array.npartitions,)), From 1dac8d6eabcc710b4a938b62c67cc6917ed3ba3d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 11:54:34 -0500 Subject: [PATCH 2/7] Add fire&forget experimental option --- src/dask_awkward/lib/io/parquet.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 78472527..c4278c70 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -540,6 +540,7 @@ def to_parquet( write_metadata: bool = False, compute: bool = True, prefix: str | None = None, + fire_and_forget: bool = False, ) -> Scalar | None: """Write data to Parquet format. @@ -682,11 +683,20 @@ def to_parquet( dsk = {} if write_metadata: + if fire_and_forget: + raise ValueError final_name = name + "-metadata" dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple( map_res.__dask_keys__() ) else: + if fire_and_forget: + import distributed + # assume default client for now; will error if doesn't exist + client = distributed.get_client() + futs = client.compute(map_res.to_delayed()) + distributed.fire_and_forget(futs) + return final_name = name + "-finalize" dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) graph = HighLevelGraph.from_collections( From 519846ab138e6c5910a4967d8030b60434d53b38 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:55:08 +0000 Subject: [PATCH 3/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/lib/io/parquet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index c4278c70..a145be81 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -692,6 +692,7 @@ def to_parquet( else: if fire_and_forget: import distributed + # assume default client for now; will error if doesn't exist client = distributed.get_client() futs = client.compute(map_res.to_delayed()) From 982e3a3eae4795a680a6af29fc9f0a28c706f15a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 13:41:11 -0500 Subject: [PATCH 4/7] tree option --- src/dask_awkward/lib/io/parquet.py | 36 +++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index c4278c70..521ae38b 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -541,6 +541,7 @@ def to_parquet( compute: bool = True, prefix: str | None = None, fire_and_forget: bool = False, + tree: bool = False, ) -> Scalar | None: """Write data to Parquet format. @@ -690,15 +691,40 @@ def to_parquet( map_res.__dask_keys__() ) else: + final_name = name + "-finalize" if fire_and_forget: import distributed + # assume default client for now; will error if doesn't exist client = distributed.get_client() futs = client.compute(map_res.to_delayed()) distributed.fire_and_forget(futs) - return - final_name = name + "-finalize" - dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) + return None + elif tree: + from dask_awkward.layers import AwkwardTreeReductionLayer + + layer = AwkwardTreeReductionLayer( + name=final_name, + concat_func=none_to_none, + tree_node_func=none_to_none, + name_input=map_res.name, + npartitions_input=map_res.npartitions, + finalize_func=none_to_none, + ) + graph = HighLevelGraph.from_collections( + final_name, + layer, + dependencies=[map_res], + ) + out = new_scalar_object(graph, final_name, dtype="f8") + if compute: + out.compute() + return None + else: + return out + + else: + dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) graph = HighLevelGraph.from_collections( final_name, AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), @@ -710,3 +736,7 @@ def to_parquet( return None else: return out + + +def none_to_none(*_): + return None From 21090a3341d145366503011961b098689c14fbf2 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 18 Nov 2024 15:03:45 -0500 Subject: [PATCH 5/7] Tree becomes only way when not making parquet meta file --- src/dask_awkward/lib/io/parquet.py | 64 ++++++++++-------------------- 1 file changed, 22 insertions(+), 42 deletions(-) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 521ae38b..11b967c5 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -540,8 +540,6 @@ def to_parquet( write_metadata: bool = False, compute: bool = True, prefix: str | None = None, - fire_and_forget: bool = False, - tree: bool = False, ) -> Scalar | None: """Write data to Parquet format. @@ -684,53 +682,35 @@ def to_parquet( dsk = {} if write_metadata: - if fire_and_forget: - raise ValueError final_name = name + "-metadata" dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple( map_res.__dask_keys__() ) + graph = HighLevelGraph.from_collections( + final_name, + AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), + dependencies=[map_res], + ) + out = new_scalar_object(graph, final_name, dtype="f8") else: final_name = name + "-finalize" - if fire_and_forget: - import distributed - - # assume default client for now; will error if doesn't exist - client = distributed.get_client() - futs = client.compute(map_res.to_delayed()) - distributed.fire_and_forget(futs) - return None - elif tree: - from dask_awkward.layers import AwkwardTreeReductionLayer - - layer = AwkwardTreeReductionLayer( - name=final_name, - concat_func=none_to_none, - tree_node_func=none_to_none, - name_input=map_res.name, - npartitions_input=map_res.npartitions, - finalize_func=none_to_none, - ) - graph = HighLevelGraph.from_collections( - final_name, - layer, - dependencies=[map_res], - ) - out = new_scalar_object(graph, final_name, dtype="f8") - if compute: - out.compute() - return None - else: - return out + from dask_awkward.layers import AwkwardTreeReductionLayer + + layer = AwkwardTreeReductionLayer( + name=final_name, + concat_func=none_to_none, + tree_node_func=none_to_none, + name_input=map_res.name, + npartitions_input=map_res.npartitions, + finalize_func=none_to_none, + ) + graph = HighLevelGraph.from_collections( + final_name, + layer, + dependencies=[map_res], + ) + out = new_scalar_object(graph, final_name, dtype="f8") - else: - dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__()) - graph = HighLevelGraph.from_collections( - final_name, - AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]), - dependencies=[map_res], - ) - out = new_scalar_object(graph, final_name, dtype="f8") if compute: out.compute() return None From 056d8305450374e2b0bb17978503c498a85e74a8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 18 Nov 2024 15:05:49 -0500 Subject: [PATCH 6/7] add a little documentation --- src/dask_awkward/lib/io/parquet.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 11b967c5..9ff2a58d 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -601,7 +601,10 @@ def to_parquet( storage_options Storage options passed to ``fsspec``. write_metadata - Write Parquet metadata. + Write Parquet metadata. Note, that when this is True, all the + metadata pieces will be pulled into a single finalizer task. When + False, the whole write graph can be evaluated as a more efficient + tree reduction. compute If ``True``, immediately compute the result (write data to disk). If ``False`` a Scalar collection will be returned such @@ -719,4 +722,5 @@ def to_parquet( def none_to_none(*_): + """Dummy reduction function where write tasks produce no metadata""" return None From 64b36491ee5fb4b17f41ea6107dde7f8a979f2a3 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 18 Nov 2024 16:46:24 -0500 Subject: [PATCH 7/7] allow older awkward-cpp in ci --- .github/workflows/awkward-main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/awkward-main.yml b/.github/workflows/awkward-main.yml index b2ce95db..f1c97d05 100644 --- a/.github/workflows/awkward-main.yml +++ b/.github/workflows/awkward-main.yml @@ -29,7 +29,7 @@ jobs: run: | python3 -m pip install pip wheel -U python3 -m pip install -q --no-cache-dir -e .[complete,test] - python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main + python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main --no-deps - name: Run tests run: | python3 -m pytest