From 61582087826005370a3c457ec9d727d53666a733 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:11:30 -0600 Subject: [PATCH 01/29] fix: adapt to new Task spec in dask, now used in blockwise --- src/dask_awkward/layers/layers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 92441443..35f2ddb7 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -4,6 +4,7 @@ from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast +from dask._task_spec import Task from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer from dask.layers import DataFrameTreeReduction @@ -163,7 +164,7 @@ def __init__( super().__init__( output=self.name, output_indices="i", - dsk={name: (self.io_func, blockwise_token(0))}, + task=Task(name, self.io_func, blockwise_token(0)), indices=[(io_arg_map, "i")], numblocks={}, annotations=None, From d8cc3e4c1bcfb6e27577df22f36d7eb3b7d86d83 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:20:28 -0600 Subject: [PATCH 02/29] drop py3.8 from tests --- .github/workflows/pypi-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index 49d09f4e..04ff4c1a 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] runs-on: ${{matrix.platform}} steps: - name: Checkout From 7312b3185593eb0283ab2a5ede366c3caf810fc2 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:23:16 -0600 Subject: [PATCH 03/29] ah, we need version check logic instead, great... --- .github/workflows/pypi-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index 04ff4c1a..4296f8b7 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8","3.9", "3.10", "3.11", "3.12"] runs-on: ${{matrix.platform}} steps: - name: Checkout From f3461bb10999812332ef224abeb79ff4d8c531f3 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:24:10 -0600 Subject: [PATCH 04/29] whitespace --- .github/workflows/pypi-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index 4296f8b7..49d09f4e 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8","3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] runs-on: ${{matrix.platform}} steps: - name: Checkout From e100c459a1a810438799aefc70a24dad52a32ae4 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:28:18 -0600 Subject: [PATCH 05/29] guard against missing _task_spec and Task classes in older dask --- src/dask_awkward/layers/layers.py | 33 ++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 35f2ddb7..c15189a8 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -4,7 +4,12 @@ from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast -from dask._task_spec import Task +_dask_uses_tasks = True +try: + from dask._task_spec import Task +except ModuleNotFoundError as _: + _dask_uses_tasks = False + from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer from dask.layers import DataFrameTreeReduction @@ -161,14 +166,24 @@ def __init__( produces_tasks=self.produces_tasks, ) - super().__init__( - output=self.name, - output_indices="i", - task=Task(name, self.io_func, blockwise_token(0)), - indices=[(io_arg_map, "i")], - numblocks={}, - annotations=None, - ) + if _dask_uses_tasks: + super().__init__( + output=self.name, + output_indices="i", + task=Task(name, self.io_func, blockwise_token(0)), + indices=[(io_arg_map, "i")], + numblocks={}, + annotations=None, + ) + else: + super().__init__( + output=self.name, + output_indices="i", + dsk={name: (self.io_func, blockwise_token(0))}, + indices=[(io_arg_map, "i")], + numblocks={}, + annotations=None, + ) def __repr__(self) -> str: return f"AwkwardInputLayer<{self.output}>" From fc464733a9b888e0428edfff6a31cb81afcc8fb7 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:51:03 -0600 Subject: [PATCH 06/29] adjust min version requirements --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d5514159..cdbcd00c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,8 @@ classifiers = [ ] dependencies = [ "awkward >=2.5.1", - "dask >=2023.04.0", + "dask >=2024.12.0;python_version>'3.9'" + "dask >=2023.04.0;python_version<'3.10'", "cachetools", "typing_extensions >=4.8.0", ] From 341500abf2fcb6280f626d94e4ce602f4d2b2aad Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Dec 2024 15:51:20 +0000 Subject: [PATCH 07/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/__init__.py | 113 ++++++----------------- src/dask_awkward/layers/__init__.py | 18 ++-- src/dask_awkward/lib/__init__.py | 110 ++++++---------------- src/dask_awkward/lib/core.py | 37 ++------ src/dask_awkward/lib/io/columnar.py | 22 ++--- src/dask_awkward/lib/io/io.py | 28 ++---- src/dask_awkward/lib/io/json.py | 16 +--- src/dask_awkward/lib/io/parquet.py | 3 +- src/dask_awkward/lib/io/text.py | 7 +- src/dask_awkward/lib/operations.py | 18 ++-- src/dask_awkward/lib/structure.py | 22 ++--- src/dask_awkward/lib/unproject_layout.py | 39 ++------ src/dask_awkward/lib/utils.py | 3 +- tests/test_core.py | 25 ++--- tests/test_getitem.py | 3 +- tests/test_parquet.py | 3 +- tests/test_utils.py | 8 +- 17 files changed, 135 insertions(+), 340 deletions(-) diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index 34b5c4f5..14a68bc3 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -11,98 +11,39 @@ import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type -from dask_awkward.lib.core import ( - compatible_partitions, - dask_method, - dask_property, - map_partitions, - partition_compatibility, -) +from dask_awkward.lib.core import (compatible_partitions, dask_method, + dask_property, map_partitions, + partition_compatibility) from dask_awkward.lib.describe import backend, fields -from dask_awkward.lib.inspect import ( - report_necessary_buffers, - report_necessary_columns, - sample, -) +from dask_awkward.lib.inspect import (report_necessary_buffers, + report_necessary_columns, sample) necessary_columns = report_necessary_columns # Export for backwards compatibility. -from dask_awkward.lib.io.io import ( - from_awkward, - from_dask_array, - from_delayed, - from_lists, - from_map, - to_dask_array, - to_dask_bag, - to_dataframe, - to_delayed, -) +from dask_awkward.lib.io.io import (from_awkward, from_dask_array, + from_delayed, from_lists, from_map, + to_dask_array, to_dask_bag, to_dataframe, + to_delayed) from dask_awkward.lib.io.json import from_json, layout_to_jsonschema, to_json from dask_awkward.lib.io.parquet import from_parquet, to_parquet from dask_awkward.lib.io.text import from_text from dask_awkward.lib.operations import concatenate -from dask_awkward.lib.reducers import ( - all, - any, - argmax, - argmin, - corr, - count, - count_nonzero, - covar, - linear_fit, - max, - mean, - min, - moment, - prod, - ptp, - softmax, - std, - sum, - var, -) -from dask_awkward.lib.structure import ( - argcartesian, - argcombinations, - argsort, - broadcast_arrays, - cartesian, - combinations, - copy, - drop_none, - fill_none, - firsts, - flatten, - from_regular, - full_like, - is_none, - isclose, - local_index, - mask, - nan_to_num, - num, - ones_like, - pad_none, - ravel, - run_lengths, - singletons, - sort, - strings_astype, - to_list, - to_packed, - to_regular, - unflatten, - unzip, - values_astype, - where, - with_field, - with_name, - with_parameter, - without_field, - without_parameters, - zeros_like, - zip, -) +from dask_awkward.lib.reducers import (all, any, argmax, argmin, corr, count, + count_nonzero, covar, linear_fit, max, + mean, min, moment, prod, ptp, softmax, + std, sum, var) +from dask_awkward.lib.structure import (argcartesian, argcombinations, argsort, + broadcast_arrays, cartesian, + combinations, copy, drop_none, + fill_none, firsts, flatten, + from_regular, full_like, is_none, + isclose, local_index, mask, nan_to_num, + num, ones_like, pad_none, ravel, + run_lengths, singletons, sort, + strings_astype, to_list, to_packed, + to_regular, unflatten, unzip, + values_astype, where, with_field, + with_name, with_parameter, + without_field, without_parameters, + zeros_like, zip) from dask_awkward.version import __version__ diff --git a/src/dask_awkward/layers/__init__.py b/src/dask_awkward/layers/__init__.py index d4ba4c5e..491f4702 100644 --- a/src/dask_awkward/layers/__init__.py +++ b/src/dask_awkward/layers/__init__.py @@ -1,13 +1,11 @@ -from dask_awkward.layers.layers import ( - AwkwardBlockwiseLayer, - AwkwardInputLayer, - AwkwardMaterializedLayer, - AwkwardTreeReductionLayer, - ImplementsIOFunction, - ImplementsProjection, - IOFunctionWithMocking, - io_func_implements_projection, -) +from dask_awkward.layers.layers import (AwkwardBlockwiseLayer, + AwkwardInputLayer, + AwkwardMaterializedLayer, + AwkwardTreeReductionLayer, + ImplementsIOFunction, + ImplementsProjection, + IOFunctionWithMocking, + io_func_implements_projection) __all__ = ( "AwkwardInputLayer", diff --git a/src/dask_awkward/lib/__init__.py b/src/dask_awkward/lib/__init__.py index 74d16d6c..699fddd3 100644 --- a/src/dask_awkward/lib/__init__.py +++ b/src/dask_awkward/lib/__init__.py @@ -2,92 +2,34 @@ import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type -from dask_awkward.lib.core import ( - compatible_partitions, - map_partitions, - partition_compatibility, -) +from dask_awkward.lib.core import (compatible_partitions, map_partitions, + partition_compatibility) from dask_awkward.lib.describe import backend, fields -from dask_awkward.lib.inspect import ( - report_necessary_buffers, - report_necessary_columns, - sample, -) -from dask_awkward.lib.io.io import ( - from_awkward, - from_dask_array, - from_delayed, - from_lists, - from_map, - to_dask_array, - to_dask_bag, - to_dataframe, - to_delayed, -) +from dask_awkward.lib.inspect import (report_necessary_buffers, + report_necessary_columns, sample) +from dask_awkward.lib.io.io import (from_awkward, from_dask_array, + from_delayed, from_lists, from_map, + to_dask_array, to_dask_bag, to_dataframe, + to_delayed) from dask_awkward.lib.io.json import from_json, to_json from dask_awkward.lib.io.parquet import from_parquet, to_parquet from dask_awkward.lib.io.text import from_text from dask_awkward.lib.operations import concatenate -from dask_awkward.lib.reducers import ( - all, - any, - argmax, - argmin, - corr, - count, - count_nonzero, - covar, - linear_fit, - max, - mean, - min, - moment, - prod, - ptp, - softmax, - std, - sum, - var, -) -from dask_awkward.lib.structure import ( - argcartesian, - argcombinations, - argsort, - broadcast_arrays, - cartesian, - combinations, - copy, - drop_none, - fill_none, - firsts, - flatten, - from_regular, - full_like, - is_none, - isclose, - local_index, - mask, - nan_to_num, - num, - ones_like, - pad_none, - ravel, - run_lengths, - singletons, - sort, - strings_astype, - to_list, - to_packed, - to_regular, - unflatten, - unzip, - values_astype, - where, - with_field, - with_name, - with_parameter, - without_field, - without_parameters, - zeros_like, - zip, -) +from dask_awkward.lib.reducers import (all, any, argmax, argmin, corr, count, + count_nonzero, covar, linear_fit, max, + mean, min, moment, prod, ptp, softmax, + std, sum, var) +from dask_awkward.lib.structure import (argcartesian, argcombinations, argsort, + broadcast_arrays, cartesian, + combinations, copy, drop_none, + fill_none, firsts, flatten, + from_regular, full_like, is_none, + isclose, local_index, mask, nan_to_num, + num, ones_like, pad_none, ravel, + run_lengths, singletons, sort, + strings_astype, to_list, to_packed, + to_regular, unflatten, unzip, + values_astype, where, with_field, + with_name, with_parameter, + without_field, without_parameters, + zeros_like, zip) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index d7c1a4e0..92d1b235 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -21,20 +21,10 @@ import numpy as np from awkward._do import remove_structure as ak_do_remove_structure from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern -from awkward.typetracer import ( - MaybeNone, - OneOf, - TypeTracerArray, - create_unknown_scalar, - is_unknown_scalar, -) -from dask.base import ( - DaskMethodsMixin, - dont_optimize, - is_dask_collection, - tokenize, - unpack_collections, -) +from awkward.typetracer import (MaybeNone, OneOf, TypeTracerArray, + create_unknown_scalar, is_unknown_scalar) +from dask.base import (DaskMethodsMixin, dont_optimize, is_dask_collection, + tokenize, unpack_collections) from dask.blockwise import BlockwiseDep from dask.blockwise import blockwise as dask_blockwise from dask.context import globalmethod @@ -47,15 +37,10 @@ from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer from dask_awkward.lib.optimize import all_optimizations -from dask_awkward.utils import ( - ConcretizationTypeError, - DaskAwkwardNotImplemented, - IncompatiblePartitions, - field_access_to_front, - first, - hyphenize, - is_empty_slice, -) +from dask_awkward.utils import (ConcretizationTypeError, + DaskAwkwardNotImplemented, + IncompatiblePartitions, field_access_to_front, + first, hyphenize, is_empty_slice) if TYPE_CHECKING: from awkward.contents.content import Content @@ -996,10 +981,8 @@ def repartition( (npartitions=1) is a special case of this. """ from dask_awkward.layers import AwkwardMaterializedLayer - from dask_awkward.lib.structure import ( - repartition_layer, - simple_repartition_layer, - ) + from dask_awkward.lib.structure import (repartition_layer, + simple_repartition_layer) if ( sum( diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index 32ffb3ae..9dea26f1 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -8,21 +8,13 @@ from awkward.forms import Form from awkward.typetracer import typetracer_from_form, typetracer_with_report -from dask_awkward.layers.layers import ( - BackendT, - ImplementsIOFunction, - ImplementsNecessaryColumns, -) -from dask_awkward.lib.utils import ( - METADATA_ATTRIBUTES, - FormStructure, - buffer_keys_required_to_compute_shapes, - form_with_unique_keys, - parse_buffer_key, - render_buffer_key, - trace_form_structure, - walk_graph_depth_first, -) +from dask_awkward.layers.layers import (BackendT, ImplementsIOFunction, + ImplementsNecessaryColumns) +from dask_awkward.lib.utils import (METADATA_ATTRIBUTES, FormStructure, + buffer_keys_required_to_compute_shapes, + form_with_unique_keys, parse_buffer_key, + render_buffer_key, trace_form_structure, + walk_graph_depth_first) if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index d8fd60b1..3fd86fd3 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -18,24 +18,16 @@ from dask.utils import funcname, is_integer, parse_bytes from fsspec.utils import infer_compression -from dask_awkward.layers.layers import ( - AwkwardBlockwiseLayer, - AwkwardInputLayer, - AwkwardMaterializedLayer, - AwkwardTreeReductionLayer, - ImplementsMocking, - ImplementsReport, - IOFunctionWithMocking, - io_func_implements_mocking, - io_func_implements_report, -) -from dask_awkward.lib.core import ( - Array, - empty_typetracer, - map_partitions, - new_array_object, - typetracer_array, -) +from dask_awkward.layers.layers import (AwkwardBlockwiseLayer, + AwkwardInputLayer, + AwkwardMaterializedLayer, + AwkwardTreeReductionLayer, + ImplementsMocking, ImplementsReport, + IOFunctionWithMocking, + io_func_implements_mocking, + io_func_implements_report) +from dask_awkward.lib.core import (Array, empty_typetracer, map_partitions, + new_array_object, typetracer_array) from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.utils import first, second diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 9b63a493..9a3c2156 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -18,19 +18,11 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.layers.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import ( - Array, - Scalar, - map_partitions, - new_scalar_object, - typetracer_array, -) +from dask_awkward.lib.core import (Array, Scalar, map_partitions, + new_scalar_object, typetracer_array) from dask_awkward.lib.io.columnar import ColumnProjectionMixin -from dask_awkward.lib.io.io import ( - _bytes_with_sample, - _BytesReadingInstructions, - from_map, -) +from dask_awkward.lib.io.io import (_bytes_with_sample, + _BytesReadingInstructions, from_map) if TYPE_CHECKING: from awkward.contents.content import Content diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 9ff2a58d..2ffd4d2a 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -19,7 +19,8 @@ from fsspec.core import get_fs_token_paths, url_to_fs from dask_awkward.layers.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import Array, Scalar, map_partitions, new_scalar_object +from dask_awkward.lib.core import (Array, Scalar, map_partitions, + new_scalar_object) from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import from_map from dask_awkward.lib.unproject_layout import unproject_layout diff --git a/src/dask_awkward/lib/io/text.py b/src/dask_awkward/lib/io/text.py index f38c9f35..1834e39f 100644 --- a/src/dask_awkward/lib/io/text.py +++ b/src/dask_awkward/lib/io/text.py @@ -13,11 +13,8 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.lib.core import Array -from dask_awkward.lib.io.io import ( - _bytes_with_sample, - _BytesReadingInstructions, - from_map, -) +from dask_awkward.lib.io.io import (_bytes_with_sample, + _BytesReadingInstructions, from_map) def _string_array_from_bytestring(bytestring: bytes, delimiter: bytes) -> ak.Array: diff --git a/src/dask_awkward/lib/operations.py b/src/dask_awkward/lib/operations.py index 6f1da8e5..66a5faca 100644 --- a/src/dask_awkward/lib/operations.py +++ b/src/dask_awkward/lib/operations.py @@ -4,22 +4,18 @@ from typing import TYPE_CHECKING, Any import awkward as ak -from awkward.operations.ak_concatenate import ( - enforce_concatenated_form as enforce_layout_to_concatenated_form, -) +from awkward.operations.ak_concatenate import \ + enforce_concatenated_form as enforce_layout_to_concatenated_form from awkward.typetracer import typetracer_from_form from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import ( - Array, - PartitionCompatibility, - map_partitions, - new_array_object, - partition_compatibility, -) -from dask_awkward.utils import DaskAwkwardNotImplemented, IncompatiblePartitions +from dask_awkward.lib.core import (Array, PartitionCompatibility, + map_partitions, new_array_object, + partition_compatibility) +from dask_awkward.utils import (DaskAwkwardNotImplemented, + IncompatiblePartitions) if TYPE_CHECKING: from awkward.forms import Form diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 6b70c8b9..0bfd5135 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -15,21 +15,13 @@ from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import ( - Array, - PartitionCompatibility, - _map_partitions, - map_partitions, - new_known_scalar, - new_scalar_object, - partition_compatibility, -) -from dask_awkward.utils import ( - DaskAwkwardNotImplemented, - IncompatiblePartitions, - borrow_docstring, - first, -) +from dask_awkward.lib.core import (Array, PartitionCompatibility, + _map_partitions, map_partitions, + new_known_scalar, new_scalar_object, + partition_compatibility) +from dask_awkward.utils import (DaskAwkwardNotImplemented, + IncompatiblePartitions, borrow_docstring, + first) if TYPE_CHECKING: from numpy.typing import DTypeLike diff --git a/src/dask_awkward/lib/unproject_layout.py b/src/dask_awkward/lib/unproject_layout.py index 493e3150..b92234bc 100644 --- a/src/dask_awkward/lib/unproject_layout.py +++ b/src/dask_awkward/lib/unproject_layout.py @@ -5,36 +5,15 @@ import awkward as ak import numpy as np -from awkward.contents import ( - BitMaskedArray, - ByteMaskedArray, - Content, - EmptyArray, - IndexedArray, - IndexedOptionArray, - ListArray, - ListOffsetArray, - NumpyArray, - RecordArray, - RegularArray, - UnionArray, - UnmaskedArray, -) -from awkward.forms import ( - BitMaskedForm, - ByteMaskedForm, - EmptyForm, - Form, - IndexedForm, - IndexedOptionForm, - ListForm, - ListOffsetForm, - NumpyForm, - RecordForm, - RegularForm, - UnionForm, - UnmaskedForm, -) +from awkward.contents import (BitMaskedArray, ByteMaskedArray, Content, + EmptyArray, IndexedArray, IndexedOptionArray, + ListArray, ListOffsetArray, NumpyArray, + RecordArray, RegularArray, UnionArray, + UnmaskedArray) +from awkward.forms import (BitMaskedForm, ByteMaskedForm, EmptyForm, Form, + IndexedForm, IndexedOptionForm, ListForm, + ListOffsetForm, NumpyForm, RecordForm, RegularForm, + UnionForm, UnmaskedForm) from awkward.typetracer import PlaceholderArray, unknown_length index_of = { diff --git a/src/dask_awkward/lib/utils.py b/src/dask_awkward/lib/utils.py index 7b067386..9d78f3e1 100644 --- a/src/dask_awkward/lib/utils.py +++ b/src/dask_awkward/lib/utils.py @@ -2,7 +2,8 @@ __all__ = ("trace_form_structure", "buffer_keys_required_to_compute_shapes") -from collections.abc import Callable, Iterable, Iterator, Mapping, MutableMapping +from collections.abc import (Callable, Iterable, Iterator, Mapping, + MutableMapping) from contextlib import contextmanager from typing import TYPE_CHECKING, TypedDict, TypeVar diff --git a/tests/test_core.py b/tests/test_core.py index 3ebf0777..0e0c788f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -16,23 +16,14 @@ from dask.delayed import delayed import dask_awkward as dak -from dask_awkward.lib.core import ( - Record, - Scalar, - calculate_known_divisions, - compute_typetracer, - empty_typetracer, - is_typetracer, - map_partitions, - meta_or_identity, - new_array_object, - new_known_scalar, - new_record_object, - new_scalar_object, - normalize_single_outer_inner_index, - to_meta, - typetracer_array, -) +from dask_awkward.lib.core import (Record, Scalar, calculate_known_divisions, + compute_typetracer, empty_typetracer, + is_typetracer, map_partitions, + meta_or_identity, new_array_object, + new_known_scalar, new_record_object, + new_scalar_object, + normalize_single_outer_inner_index, to_meta, + typetracer_array) from dask_awkward.lib.testutils import assert_eq from dask_awkward.utils import ConcretizationTypeError, IncompatiblePartitions diff --git a/tests/test_getitem.py b/tests/test_getitem.py index e7dd3f8d..1e756d44 100644 --- a/tests/test_getitem.py +++ b/tests/test_getitem.py @@ -9,7 +9,8 @@ import dask_awkward as dak import dask_awkward.lib.core as dakc -from dask_awkward.lib.core import DaskAwkwardNotImplemented, IncompatiblePartitions +from dask_awkward.lib.core import (DaskAwkwardNotImplemented, + IncompatiblePartitions) from dask_awkward.lib.testutils import assert_eq diff --git a/tests/test_parquet.py b/tests/test_parquet.py index 3206703a..45be133d 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -15,7 +15,8 @@ import pyarrow.dataset as pad import dask_awkward as dak -from dask_awkward.lib.io.parquet import _metadata_file_from_data_files, to_parquet +from dask_awkward.lib.io.parquet import (_metadata_file_from_data_files, + to_parquet) from dask_awkward.lib.testutils import assert_eq data = [[1, 2, 3], [4, None], None] diff --git a/tests/test_utils.py b/tests/test_utils.py index 3d1b38c7..e7d2ca10 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,12 +3,8 @@ import pytest from dask_awkward.lib.utils import typetracer_nochecks -from dask_awkward.utils import ( - LazyInputsDict, - field_access_to_front, - hyphenize, - is_empty_slice, -) +from dask_awkward.utils import (LazyInputsDict, field_access_to_front, + hyphenize, is_empty_slice) def test_is_empty_slice() -> None: From 8780daeb4d64cb601899c00dacbb4af2a52dcb1b Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:51:23 -0600 Subject: [PATCH 08/29] commas are good things --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cdbcd00c..2e4adb1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ classifiers = [ ] dependencies = [ "awkward >=2.5.1", - "dask >=2024.12.0;python_version>'3.9'" + "dask >=2024.12.0;python_version>'3.9'", "dask >=2023.04.0;python_version<'3.10'", "cachetools", "typing_extensions >=4.8.0", From 6f96f4fcaa8735e77b8629b3075f24583a9be410 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Dec 2024 15:51:39 +0000 Subject: [PATCH 09/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/__init__.py | 113 +++++++++++++++++------ src/dask_awkward/layers/__init__.py | 18 ++-- src/dask_awkward/lib/__init__.py | 110 ++++++++++++++++------ src/dask_awkward/lib/core.py | 37 ++++++-- src/dask_awkward/lib/io/columnar.py | 22 +++-- src/dask_awkward/lib/io/io.py | 28 ++++-- src/dask_awkward/lib/io/json.py | 16 +++- src/dask_awkward/lib/io/parquet.py | 3 +- src/dask_awkward/lib/io/text.py | 7 +- src/dask_awkward/lib/operations.py | 18 ++-- src/dask_awkward/lib/structure.py | 22 +++-- src/dask_awkward/lib/unproject_layout.py | 39 ++++++-- src/dask_awkward/lib/utils.py | 3 +- tests/test_core.py | 25 +++-- tests/test_getitem.py | 3 +- tests/test_parquet.py | 3 +- tests/test_utils.py | 8 +- 17 files changed, 340 insertions(+), 135 deletions(-) diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index 14a68bc3..34b5c4f5 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -11,39 +11,98 @@ import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type -from dask_awkward.lib.core import (compatible_partitions, dask_method, - dask_property, map_partitions, - partition_compatibility) +from dask_awkward.lib.core import ( + compatible_partitions, + dask_method, + dask_property, + map_partitions, + partition_compatibility, +) from dask_awkward.lib.describe import backend, fields -from dask_awkward.lib.inspect import (report_necessary_buffers, - report_necessary_columns, sample) +from dask_awkward.lib.inspect import ( + report_necessary_buffers, + report_necessary_columns, + sample, +) necessary_columns = report_necessary_columns # Export for backwards compatibility. -from dask_awkward.lib.io.io import (from_awkward, from_dask_array, - from_delayed, from_lists, from_map, - to_dask_array, to_dask_bag, to_dataframe, - to_delayed) +from dask_awkward.lib.io.io import ( + from_awkward, + from_dask_array, + from_delayed, + from_lists, + from_map, + to_dask_array, + to_dask_bag, + to_dataframe, + to_delayed, +) from dask_awkward.lib.io.json import from_json, layout_to_jsonschema, to_json from dask_awkward.lib.io.parquet import from_parquet, to_parquet from dask_awkward.lib.io.text import from_text from dask_awkward.lib.operations import concatenate -from dask_awkward.lib.reducers import (all, any, argmax, argmin, corr, count, - count_nonzero, covar, linear_fit, max, - mean, min, moment, prod, ptp, softmax, - std, sum, var) -from dask_awkward.lib.structure import (argcartesian, argcombinations, argsort, - broadcast_arrays, cartesian, - combinations, copy, drop_none, - fill_none, firsts, flatten, - from_regular, full_like, is_none, - isclose, local_index, mask, nan_to_num, - num, ones_like, pad_none, ravel, - run_lengths, singletons, sort, - strings_astype, to_list, to_packed, - to_regular, unflatten, unzip, - values_astype, where, with_field, - with_name, with_parameter, - without_field, without_parameters, - zeros_like, zip) +from dask_awkward.lib.reducers import ( + all, + any, + argmax, + argmin, + corr, + count, + count_nonzero, + covar, + linear_fit, + max, + mean, + min, + moment, + prod, + ptp, + softmax, + std, + sum, + var, +) +from dask_awkward.lib.structure import ( + argcartesian, + argcombinations, + argsort, + broadcast_arrays, + cartesian, + combinations, + copy, + drop_none, + fill_none, + firsts, + flatten, + from_regular, + full_like, + is_none, + isclose, + local_index, + mask, + nan_to_num, + num, + ones_like, + pad_none, + ravel, + run_lengths, + singletons, + sort, + strings_astype, + to_list, + to_packed, + to_regular, + unflatten, + unzip, + values_astype, + where, + with_field, + with_name, + with_parameter, + without_field, + without_parameters, + zeros_like, + zip, +) from dask_awkward.version import __version__ diff --git a/src/dask_awkward/layers/__init__.py b/src/dask_awkward/layers/__init__.py index 491f4702..d4ba4c5e 100644 --- a/src/dask_awkward/layers/__init__.py +++ b/src/dask_awkward/layers/__init__.py @@ -1,11 +1,13 @@ -from dask_awkward.layers.layers import (AwkwardBlockwiseLayer, - AwkwardInputLayer, - AwkwardMaterializedLayer, - AwkwardTreeReductionLayer, - ImplementsIOFunction, - ImplementsProjection, - IOFunctionWithMocking, - io_func_implements_projection) +from dask_awkward.layers.layers import ( + AwkwardBlockwiseLayer, + AwkwardInputLayer, + AwkwardMaterializedLayer, + AwkwardTreeReductionLayer, + ImplementsIOFunction, + ImplementsProjection, + IOFunctionWithMocking, + io_func_implements_projection, +) __all__ = ( "AwkwardInputLayer", diff --git a/src/dask_awkward/lib/__init__.py b/src/dask_awkward/lib/__init__.py index 699fddd3..74d16d6c 100644 --- a/src/dask_awkward/lib/__init__.py +++ b/src/dask_awkward/lib/__init__.py @@ -2,34 +2,92 @@ import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type -from dask_awkward.lib.core import (compatible_partitions, map_partitions, - partition_compatibility) +from dask_awkward.lib.core import ( + compatible_partitions, + map_partitions, + partition_compatibility, +) from dask_awkward.lib.describe import backend, fields -from dask_awkward.lib.inspect import (report_necessary_buffers, - report_necessary_columns, sample) -from dask_awkward.lib.io.io import (from_awkward, from_dask_array, - from_delayed, from_lists, from_map, - to_dask_array, to_dask_bag, to_dataframe, - to_delayed) +from dask_awkward.lib.inspect import ( + report_necessary_buffers, + report_necessary_columns, + sample, +) +from dask_awkward.lib.io.io import ( + from_awkward, + from_dask_array, + from_delayed, + from_lists, + from_map, + to_dask_array, + to_dask_bag, + to_dataframe, + to_delayed, +) from dask_awkward.lib.io.json import from_json, to_json from dask_awkward.lib.io.parquet import from_parquet, to_parquet from dask_awkward.lib.io.text import from_text from dask_awkward.lib.operations import concatenate -from dask_awkward.lib.reducers import (all, any, argmax, argmin, corr, count, - count_nonzero, covar, linear_fit, max, - mean, min, moment, prod, ptp, softmax, - std, sum, var) -from dask_awkward.lib.structure import (argcartesian, argcombinations, argsort, - broadcast_arrays, cartesian, - combinations, copy, drop_none, - fill_none, firsts, flatten, - from_regular, full_like, is_none, - isclose, local_index, mask, nan_to_num, - num, ones_like, pad_none, ravel, - run_lengths, singletons, sort, - strings_astype, to_list, to_packed, - to_regular, unflatten, unzip, - values_astype, where, with_field, - with_name, with_parameter, - without_field, without_parameters, - zeros_like, zip) +from dask_awkward.lib.reducers import ( + all, + any, + argmax, + argmin, + corr, + count, + count_nonzero, + covar, + linear_fit, + max, + mean, + min, + moment, + prod, + ptp, + softmax, + std, + sum, + var, +) +from dask_awkward.lib.structure import ( + argcartesian, + argcombinations, + argsort, + broadcast_arrays, + cartesian, + combinations, + copy, + drop_none, + fill_none, + firsts, + flatten, + from_regular, + full_like, + is_none, + isclose, + local_index, + mask, + nan_to_num, + num, + ones_like, + pad_none, + ravel, + run_lengths, + singletons, + sort, + strings_astype, + to_list, + to_packed, + to_regular, + unflatten, + unzip, + values_astype, + where, + with_field, + with_name, + with_parameter, + without_field, + without_parameters, + zeros_like, + zip, +) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 92d1b235..d7c1a4e0 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -21,10 +21,20 @@ import numpy as np from awkward._do import remove_structure as ak_do_remove_structure from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern -from awkward.typetracer import (MaybeNone, OneOf, TypeTracerArray, - create_unknown_scalar, is_unknown_scalar) -from dask.base import (DaskMethodsMixin, dont_optimize, is_dask_collection, - tokenize, unpack_collections) +from awkward.typetracer import ( + MaybeNone, + OneOf, + TypeTracerArray, + create_unknown_scalar, + is_unknown_scalar, +) +from dask.base import ( + DaskMethodsMixin, + dont_optimize, + is_dask_collection, + tokenize, + unpack_collections, +) from dask.blockwise import BlockwiseDep from dask.blockwise import blockwise as dask_blockwise from dask.context import globalmethod @@ -37,10 +47,15 @@ from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer from dask_awkward.lib.optimize import all_optimizations -from dask_awkward.utils import (ConcretizationTypeError, - DaskAwkwardNotImplemented, - IncompatiblePartitions, field_access_to_front, - first, hyphenize, is_empty_slice) +from dask_awkward.utils import ( + ConcretizationTypeError, + DaskAwkwardNotImplemented, + IncompatiblePartitions, + field_access_to_front, + first, + hyphenize, + is_empty_slice, +) if TYPE_CHECKING: from awkward.contents.content import Content @@ -981,8 +996,10 @@ def repartition( (npartitions=1) is a special case of this. """ from dask_awkward.layers import AwkwardMaterializedLayer - from dask_awkward.lib.structure import (repartition_layer, - simple_repartition_layer) + from dask_awkward.lib.structure import ( + repartition_layer, + simple_repartition_layer, + ) if ( sum( diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index 9dea26f1..32ffb3ae 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -8,13 +8,21 @@ from awkward.forms import Form from awkward.typetracer import typetracer_from_form, typetracer_with_report -from dask_awkward.layers.layers import (BackendT, ImplementsIOFunction, - ImplementsNecessaryColumns) -from dask_awkward.lib.utils import (METADATA_ATTRIBUTES, FormStructure, - buffer_keys_required_to_compute_shapes, - form_with_unique_keys, parse_buffer_key, - render_buffer_key, trace_form_structure, - walk_graph_depth_first) +from dask_awkward.layers.layers import ( + BackendT, + ImplementsIOFunction, + ImplementsNecessaryColumns, +) +from dask_awkward.lib.utils import ( + METADATA_ATTRIBUTES, + FormStructure, + buffer_keys_required_to_compute_shapes, + form_with_unique_keys, + parse_buffer_key, + render_buffer_key, + trace_form_structure, + walk_graph_depth_first, +) if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 3fd86fd3..d8fd60b1 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -18,16 +18,24 @@ from dask.utils import funcname, is_integer, parse_bytes from fsspec.utils import infer_compression -from dask_awkward.layers.layers import (AwkwardBlockwiseLayer, - AwkwardInputLayer, - AwkwardMaterializedLayer, - AwkwardTreeReductionLayer, - ImplementsMocking, ImplementsReport, - IOFunctionWithMocking, - io_func_implements_mocking, - io_func_implements_report) -from dask_awkward.lib.core import (Array, empty_typetracer, map_partitions, - new_array_object, typetracer_array) +from dask_awkward.layers.layers import ( + AwkwardBlockwiseLayer, + AwkwardInputLayer, + AwkwardMaterializedLayer, + AwkwardTreeReductionLayer, + ImplementsMocking, + ImplementsReport, + IOFunctionWithMocking, + io_func_implements_mocking, + io_func_implements_report, +) +from dask_awkward.lib.core import ( + Array, + empty_typetracer, + map_partitions, + new_array_object, + typetracer_array, +) from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.utils import first, second diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 9a3c2156..9b63a493 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -18,11 +18,19 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.layers.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import (Array, Scalar, map_partitions, - new_scalar_object, typetracer_array) +from dask_awkward.lib.core import ( + Array, + Scalar, + map_partitions, + new_scalar_object, + typetracer_array, +) from dask_awkward.lib.io.columnar import ColumnProjectionMixin -from dask_awkward.lib.io.io import (_bytes_with_sample, - _BytesReadingInstructions, from_map) +from dask_awkward.lib.io.io import ( + _bytes_with_sample, + _BytesReadingInstructions, + from_map, +) if TYPE_CHECKING: from awkward.contents.content import Content diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 2ffd4d2a..9ff2a58d 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -19,8 +19,7 @@ from fsspec.core import get_fs_token_paths, url_to_fs from dask_awkward.layers.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import (Array, Scalar, map_partitions, - new_scalar_object) +from dask_awkward.lib.core import Array, Scalar, map_partitions, new_scalar_object from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import from_map from dask_awkward.lib.unproject_layout import unproject_layout diff --git a/src/dask_awkward/lib/io/text.py b/src/dask_awkward/lib/io/text.py index 1834e39f..f38c9f35 100644 --- a/src/dask_awkward/lib/io/text.py +++ b/src/dask_awkward/lib/io/text.py @@ -13,8 +13,11 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.lib.core import Array -from dask_awkward.lib.io.io import (_bytes_with_sample, - _BytesReadingInstructions, from_map) +from dask_awkward.lib.io.io import ( + _bytes_with_sample, + _BytesReadingInstructions, + from_map, +) def _string_array_from_bytestring(bytestring: bytes, delimiter: bytes) -> ak.Array: diff --git a/src/dask_awkward/lib/operations.py b/src/dask_awkward/lib/operations.py index 66a5faca..6f1da8e5 100644 --- a/src/dask_awkward/lib/operations.py +++ b/src/dask_awkward/lib/operations.py @@ -4,18 +4,22 @@ from typing import TYPE_CHECKING, Any import awkward as ak -from awkward.operations.ak_concatenate import \ - enforce_concatenated_form as enforce_layout_to_concatenated_form +from awkward.operations.ak_concatenate import ( + enforce_concatenated_form as enforce_layout_to_concatenated_form, +) from awkward.typetracer import typetracer_from_form from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import (Array, PartitionCompatibility, - map_partitions, new_array_object, - partition_compatibility) -from dask_awkward.utils import (DaskAwkwardNotImplemented, - IncompatiblePartitions) +from dask_awkward.lib.core import ( + Array, + PartitionCompatibility, + map_partitions, + new_array_object, + partition_compatibility, +) +from dask_awkward.utils import DaskAwkwardNotImplemented, IncompatiblePartitions if TYPE_CHECKING: from awkward.forms import Form diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 0bfd5135..6b70c8b9 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -15,13 +15,21 @@ from dask.highlevelgraph import HighLevelGraph from dask_awkward.layers import AwkwardMaterializedLayer -from dask_awkward.lib.core import (Array, PartitionCompatibility, - _map_partitions, map_partitions, - new_known_scalar, new_scalar_object, - partition_compatibility) -from dask_awkward.utils import (DaskAwkwardNotImplemented, - IncompatiblePartitions, borrow_docstring, - first) +from dask_awkward.lib.core import ( + Array, + PartitionCompatibility, + _map_partitions, + map_partitions, + new_known_scalar, + new_scalar_object, + partition_compatibility, +) +from dask_awkward.utils import ( + DaskAwkwardNotImplemented, + IncompatiblePartitions, + borrow_docstring, + first, +) if TYPE_CHECKING: from numpy.typing import DTypeLike diff --git a/src/dask_awkward/lib/unproject_layout.py b/src/dask_awkward/lib/unproject_layout.py index b92234bc..493e3150 100644 --- a/src/dask_awkward/lib/unproject_layout.py +++ b/src/dask_awkward/lib/unproject_layout.py @@ -5,15 +5,36 @@ import awkward as ak import numpy as np -from awkward.contents import (BitMaskedArray, ByteMaskedArray, Content, - EmptyArray, IndexedArray, IndexedOptionArray, - ListArray, ListOffsetArray, NumpyArray, - RecordArray, RegularArray, UnionArray, - UnmaskedArray) -from awkward.forms import (BitMaskedForm, ByteMaskedForm, EmptyForm, Form, - IndexedForm, IndexedOptionForm, ListForm, - ListOffsetForm, NumpyForm, RecordForm, RegularForm, - UnionForm, UnmaskedForm) +from awkward.contents import ( + BitMaskedArray, + ByteMaskedArray, + Content, + EmptyArray, + IndexedArray, + IndexedOptionArray, + ListArray, + ListOffsetArray, + NumpyArray, + RecordArray, + RegularArray, + UnionArray, + UnmaskedArray, +) +from awkward.forms import ( + BitMaskedForm, + ByteMaskedForm, + EmptyForm, + Form, + IndexedForm, + IndexedOptionForm, + ListForm, + ListOffsetForm, + NumpyForm, + RecordForm, + RegularForm, + UnionForm, + UnmaskedForm, +) from awkward.typetracer import PlaceholderArray, unknown_length index_of = { diff --git a/src/dask_awkward/lib/utils.py b/src/dask_awkward/lib/utils.py index 9d78f3e1..7b067386 100644 --- a/src/dask_awkward/lib/utils.py +++ b/src/dask_awkward/lib/utils.py @@ -2,8 +2,7 @@ __all__ = ("trace_form_structure", "buffer_keys_required_to_compute_shapes") -from collections.abc import (Callable, Iterable, Iterator, Mapping, - MutableMapping) +from collections.abc import Callable, Iterable, Iterator, Mapping, MutableMapping from contextlib import contextmanager from typing import TYPE_CHECKING, TypedDict, TypeVar diff --git a/tests/test_core.py b/tests/test_core.py index 0e0c788f..3ebf0777 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -16,14 +16,23 @@ from dask.delayed import delayed import dask_awkward as dak -from dask_awkward.lib.core import (Record, Scalar, calculate_known_divisions, - compute_typetracer, empty_typetracer, - is_typetracer, map_partitions, - meta_or_identity, new_array_object, - new_known_scalar, new_record_object, - new_scalar_object, - normalize_single_outer_inner_index, to_meta, - typetracer_array) +from dask_awkward.lib.core import ( + Record, + Scalar, + calculate_known_divisions, + compute_typetracer, + empty_typetracer, + is_typetracer, + map_partitions, + meta_or_identity, + new_array_object, + new_known_scalar, + new_record_object, + new_scalar_object, + normalize_single_outer_inner_index, + to_meta, + typetracer_array, +) from dask_awkward.lib.testutils import assert_eq from dask_awkward.utils import ConcretizationTypeError, IncompatiblePartitions diff --git a/tests/test_getitem.py b/tests/test_getitem.py index 1e756d44..e7dd3f8d 100644 --- a/tests/test_getitem.py +++ b/tests/test_getitem.py @@ -9,8 +9,7 @@ import dask_awkward as dak import dask_awkward.lib.core as dakc -from dask_awkward.lib.core import (DaskAwkwardNotImplemented, - IncompatiblePartitions) +from dask_awkward.lib.core import DaskAwkwardNotImplemented, IncompatiblePartitions from dask_awkward.lib.testutils import assert_eq diff --git a/tests/test_parquet.py b/tests/test_parquet.py index 45be133d..3206703a 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -15,8 +15,7 @@ import pyarrow.dataset as pad import dask_awkward as dak -from dask_awkward.lib.io.parquet import (_metadata_file_from_data_files, - to_parquet) +from dask_awkward.lib.io.parquet import _metadata_file_from_data_files, to_parquet from dask_awkward.lib.testutils import assert_eq data = [[1, 2, 3], [4, None], None] diff --git a/tests/test_utils.py b/tests/test_utils.py index e7d2ca10..3d1b38c7 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,8 +3,12 @@ import pytest from dask_awkward.lib.utils import typetracer_nochecks -from dask_awkward.utils import (LazyInputsDict, field_access_to_front, - hyphenize, is_empty_slice) +from dask_awkward.utils import ( + LazyInputsDict, + field_access_to_front, + hyphenize, + is_empty_slice, +) def test_is_empty_slice() -> None: From d7b3d9f96762cc0cc5f30a2dbf14260502e0f091 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:53:44 -0600 Subject: [PATCH 10/29] would you kindly... --- src/dask_awkward/layers/layers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index c15189a8..89eaf4d2 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -167,7 +167,7 @@ def __init__( ) if _dask_uses_tasks: - super().__init__( + super().__init__( # type: ignore output=self.name, output_indices="i", task=Task(name, self.io_func, blockwise_token(0)), @@ -176,7 +176,7 @@ def __init__( annotations=None, ) else: - super().__init__( + super().__init__( # type: ignore output=self.name, output_indices="i", dsk={name: (self.io_func, blockwise_token(0))}, From 883b23e680d1ef734757a16161225a865f51ff3c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Dec 2024 15:53:59 +0000 Subject: [PATCH 11/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/layers/layers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 89eaf4d2..381b75dd 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -167,7 +167,7 @@ def __init__( ) if _dask_uses_tasks: - super().__init__( # type: ignore + super().__init__( # type: ignore output=self.name, output_indices="i", task=Task(name, self.io_func, blockwise_token(0)), @@ -176,7 +176,7 @@ def __init__( annotations=None, ) else: - super().__init__( # type: ignore + super().__init__( # type: ignore output=self.name, output_indices="i", dsk={name: (self.io_func, blockwise_token(0))}, From c25e1f6506ad74f6d285f3163319e8011f96c81d Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 09:55:34 -0600 Subject: [PATCH 12/29] appease mypy's insatiable lust for perfect correctness --- src/dask_awkward/layers/layers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 381b75dd..ab8e19bd 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -176,7 +176,7 @@ def __init__( annotations=None, ) else: - super().__init__( # type: ignore + super().__init__( output=self.name, output_indices="i", dsk={name: (self.io_func, blockwise_token(0))}, From 7c2174cd1aaef5e1e3dd7a383e7613376ba989c7 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:10:03 -0600 Subject: [PATCH 13/29] cleaner way of dealing with it --- src/dask_awkward/layers/layers.py | 35 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index ab8e19bd..98b1e677 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -6,8 +6,10 @@ _dask_uses_tasks = True try: - from dask._task_spec import Task + from dask._task_spec import convert_legacy_graph except ModuleNotFoundError as _: + def convert_legacy_graph(_): + return _ _dask_uses_tasks = False from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token @@ -166,24 +168,21 @@ def __init__( produces_tasks=self.produces_tasks, ) + super_kwargs = { + "output": self.name, + "output_indices": "i", + "dsk": {name: (self.io_func, blockwise_token(0))}, + "indices": [(io_arg_map, "i")], + "numblocks": {}, + "annotations": None, + } + if _dask_uses_tasks: - super().__init__( # type: ignore - output=self.name, - output_indices="i", - task=Task(name, self.io_func, blockwise_token(0)), - indices=[(io_arg_map, "i")], - numblocks={}, - annotations=None, - ) - else: - super().__init__( - output=self.name, - output_indices="i", - dsk={name: (self.io_func, blockwise_token(0))}, - indices=[(io_arg_map, "i")], - numblocks={}, - annotations=None, - ) + task = convert_legacy_graph(super_args["dsk"]) + super_args["task"] = task + super_args.pop("dsk") + + super().__init__(**super_kwargs) def __repr__(self) -> str: return f"AwkwardInputLayer<{self.output}>" From 2e6da4e43328221b38c2db2a04c19326e51b0b81 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:11:41 +0000 Subject: [PATCH 14/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/layers/layers.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 98b1e677..d56ffa01 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -8,8 +8,10 @@ try: from dask._task_spec import convert_legacy_graph except ModuleNotFoundError as _: + def convert_legacy_graph(_): return _ + _dask_uses_tasks = False from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token @@ -176,12 +178,12 @@ def __init__( "numblocks": {}, "annotations": None, } - + if _dask_uses_tasks: task = convert_legacy_graph(super_args["dsk"]) super_args["task"] = task super_args.pop("dsk") - + super().__init__(**super_kwargs) def __repr__(self) -> str: From 06a475cf364d1c44adb2216797938adde0c2a846 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:12:49 -0600 Subject: [PATCH 15/29] forgot to update names --- src/dask_awkward/layers/layers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index d56ffa01..b573e642 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -180,9 +180,9 @@ def __init__( } if _dask_uses_tasks: - task = convert_legacy_graph(super_args["dsk"]) - super_args["task"] = task - super_args.pop("dsk") + task = convert_legacy_graph(super_kwargs["dsk"]) + super_kwargs["task"] = task + super_kwargs.pop("dsk") super().__init__(**super_kwargs) From ef10929ed77b355ccb0649d03a6b7b20fc084540 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:16:07 -0600 Subject: [PATCH 16/29] mypy --- src/dask_awkward/layers/layers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index b573e642..7488d4ae 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -9,8 +9,8 @@ from dask._task_spec import convert_legacy_graph except ModuleNotFoundError as _: - def convert_legacy_graph(_): - return _ + def convert_legacy_graph(dsk: Mapping, all_keys: Container | None = None): + return dsk _dask_uses_tasks = False @@ -184,7 +184,7 @@ def __init__( super_kwargs["task"] = task super_kwargs.pop("dsk") - super().__init__(**super_kwargs) + super().__init__(**super_kwargs) # type: ignore def __repr__(self) -> str: return f"AwkwardInputLayer<{self.output}>" From 954f6e6dbc308e217676f120c04d849261aa177c Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:16:41 -0600 Subject: [PATCH 17/29] missing types --- src/dask_awkward/layers/layers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 7488d4ae..db7dcc12 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -1,7 +1,7 @@ from __future__ import annotations import copy -from collections.abc import Callable, Mapping +from collections.abc import Callable, Container, Mapping from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast _dask_uses_tasks = True From 78d65036b320a4e0287231fac675b48ec038ffdf Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:18:33 -0600 Subject: [PATCH 18/29] mypy... --- src/dask_awkward/layers/layers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index db7dcc12..7ab3c73b 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -9,7 +9,7 @@ from dask._task_spec import convert_legacy_graph except ModuleNotFoundError as _: - def convert_legacy_graph(dsk: Mapping, all_keys: Container | None = None): + def convert_legacy_graph(dsk, all_keys=None): return dsk _dask_uses_tasks = False From 3a88ec61aa0e4e84c5aa8a476fb2306bdad10f7d Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:21:06 -0600 Subject: [PATCH 19/29] ... mypy --- src/dask_awkward/layers/layers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 7ab3c73b..90bdfd92 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -1,7 +1,7 @@ from __future__ import annotations import copy -from collections.abc import Callable, Container, Mapping +from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast _dask_uses_tasks = True @@ -180,7 +180,7 @@ def __init__( } if _dask_uses_tasks: - task = convert_legacy_graph(super_kwargs["dsk"]) + task = convert_legacy_graph(super_kwargs["dsk"]) # type: ignore super_kwargs["task"] = task super_kwargs.pop("dsk") From 01b0a45c3b6c11ace6750d60bc3b26a6c9008862 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 4 Dec 2024 10:21:51 -0600 Subject: [PATCH 20/29] just ignore it all --- src/dask_awkward/layers/layers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 90bdfd92..b925e35a 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -9,7 +9,7 @@ from dask._task_spec import convert_legacy_graph except ModuleNotFoundError as _: - def convert_legacy_graph(dsk, all_keys=None): + def convert_legacy_graph(dsk, all_keys=None): # type: ignore return dsk _dask_uses_tasks = False From 536dfc49b05fa6d8a32e3bca38b04ed592a0a5bc Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Fri, 6 Dec 2024 08:45:33 -0500 Subject: [PATCH 21/29] update rewrite_layer_chains with new dask Tasks --- src/dask_awkward/layers/__init__.py | 2 ++ src/dask_awkward/layers/layers.py | 23 +++++++---------- src/dask_awkward/lib/optimize.py | 39 +++++++++++++++++++++++------ 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/src/dask_awkward/layers/__init__.py b/src/dask_awkward/layers/__init__.py index d4ba4c5e..098bbf2e 100644 --- a/src/dask_awkward/layers/__init__.py +++ b/src/dask_awkward/layers/__init__.py @@ -6,6 +6,7 @@ ImplementsIOFunction, ImplementsProjection, IOFunctionWithMocking, + _dask_uses_tasks, io_func_implements_projection, ) @@ -18,4 +19,5 @@ "ImplementsIOFunction", "IOFunctionWithMocking", "io_func_implements_projection", + "_dask_uses_tasks", ) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index b925e35a..24b33c99 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -4,15 +4,9 @@ from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast -_dask_uses_tasks = True -try: - from dask._task_spec import convert_legacy_graph -except ModuleNotFoundError as _: +import dask - def convert_legacy_graph(dsk, all_keys=None): # type: ignore - return dsk - - _dask_uses_tasks = False +_dask_uses_tasks = dask.__version__ >= "2024.12.0" from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer @@ -170,21 +164,22 @@ def __init__( produces_tasks=self.produces_tasks, ) - super_kwargs = { + super_kwargs: dict[str, Any] = { "output": self.name, "output_indices": "i", - "dsk": {name: (self.io_func, blockwise_token(0))}, "indices": [(io_arg_map, "i")], "numblocks": {}, "annotations": None, } if _dask_uses_tasks: - task = convert_legacy_graph(super_kwargs["dsk"]) # type: ignore - super_kwargs["task"] = task - super_kwargs.pop("dsk") + from dask._task_spec import Task, TaskRef + + super_kwargs["task"] = Task(name, self.io_func, TaskRef(blockwise_token(0))) + else: + super_kwargs["dsk"] = {name: (self.io_func, blockwise_token(0))} - super().__init__(**super_kwargs) # type: ignore + super().__init__(**super_kwargs) def __repr__(self) -> str: return f"AwkwardInputLayer<{self.output}>" diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 6ad2e132..d75d51e3 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -13,7 +13,11 @@ from dask.highlevelgraph import HighLevelGraph from dask.local import get_sync -from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer +from dask_awkward.layers import ( + AwkwardBlockwiseLayer, + AwkwardInputLayer, + _dask_uses_tasks, +) from dask_awkward.lib.utils import typetracer_nochecks from dask_awkward.utils import first @@ -340,7 +344,10 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG deps[outkey] = deps[chain[0]] [deps.pop(ch) for ch in chain[:-1]] - subgraph = layer0.dsk.copy() # mypy: ignore + if _dask_uses_tasks: + all_tasks = [layer0.task] + else: + subgraph = layer0.dsk.copy() indices = list(layer0.indices) parent = chain[0] @@ -349,14 +356,27 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG layer = dsk.layers[chain_member] for k in layer.io_deps: # mypy: ignore outlayer.io_deps[k] = layer.io_deps[k] - func, *args = layer.dsk[chain_member] # mypy: ignore - args2 = _recursive_replace(args, layer, parent, indices) - subgraph[chain_member] = (func,) + tuple(args2) + + if _dask_uses_tasks: + from dask._task_spec import Task + + func = layer.task.func + args = layer.task.dependencies + # how to do this with `.substitute(...)`? + args2 = _recursive_replace(args, layer, parent, indices) + all_tasks.append(Task(chain_member, func, *args2)) + else: + func, *args = layer.dsk[chain_member] # mypy: ignore + args2 = _recursive_replace(args, layer, parent, indices) + subgraph[chain_member] = (func,) + tuple(args2) parent = chain_member outlayer.numblocks = { i[0]: (numblocks,) for i in indices if i[1] is not None } # mypy: ignore - outlayer.dsk = subgraph # mypy: ignore + if _dask_uses_tasks: + outlayer.task = Task.fuse(*all_tasks) + else: + outlayer.dsk = subgraph # mypy: ignore if hasattr(outlayer, "_dims"): del outlayer._dims outlayer.indices = tuple( # mypy: ignore @@ -379,7 +399,12 @@ def _recursive_replace(args, layer, parent, indices): args2.append(layer.indices[ind][0]) elif layer.indices[ind][0] == parent: # arg refers to output of previous layer - args2.append(parent) + if _dask_uses_tasks: + from dask._task_spec import TaskRef + + args2.append(TaskRef(parent)) + else: + args2.append(parent) else: # arg refers to things defined in io_deps indices.append(layer.indices[ind]) From 90cf27beba964e1f00166b13024b4a340449584f Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Fri, 6 Dec 2024 10:30:49 -0500 Subject: [PATCH 22/29] fix args preparation for rewrite_layer_chains and update _mock_output() --- src/dask_awkward/lib/optimize.py | 38 +++++++++++++++++++++++--------- tests/test_io_json.py | 8 +++++-- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index d75d51e3..e830a67b 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -238,14 +238,23 @@ def _touch_all_data(*args, **kwargs): def _mock_output(layer): """Update a layer to run the _touch_all_data.""" - assert len(layer.dsk) == 1 + if _dask_uses_tasks: + new_layer = copy.deepcopy(layer) + task = new_layer.task.copy() + # replace the original function with _touch_all_data + # and keep the rest of the task the same + task.func = _touch_all_data + new_layer.task = task + return new_layer + else: + assert len(layer.dsk) == 1 - new_layer = copy.deepcopy(layer) - mp = new_layer.dsk.copy() - for k in iter(mp.keys()): - mp[k] = (_touch_all_data,) + mp[k][1:] - new_layer.dsk = mp - return new_layer + new_layer = copy.deepcopy(layer) + mp = new_layer.dsk.copy() + for k in iter(mp.keys()): + mp[k] = (_touch_all_data,) + mp[k][1:] + new_layer.dsk = mp + return new_layer @no_type_check @@ -358,10 +367,13 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG outlayer.io_deps[k] = layer.io_deps[k] if _dask_uses_tasks: - from dask._task_spec import Task + from dask._task_spec import GraphNode, Task func = layer.task.func - args = layer.task.dependencies + args = [ + arg.key if isinstance(arg, GraphNode) else arg + for arg in layer.task.args + ] # how to do this with `.substitute(...)`? args2 = _recursive_replace(args, layer, parent, indices) all_tasks.append(Task(chain_member, func, *args2)) @@ -408,7 +420,13 @@ def _recursive_replace(args, layer, parent, indices): else: # arg refers to things defined in io_deps indices.append(layer.indices[ind]) - args2.append(f"__dask_blockwise__{len(indices) - 1}") + arg2 = f"__dask_blockwise__{len(indices) - 1}" + if _dask_uses_tasks: + from dask._task_spec import TaskRef + + args2.append(TaskRef(arg2)) + else: + args2.append(arg2) elif isinstance(arg, list): args2.append(_recursive_replace(arg, layer, parent, indices)) elif isinstance(arg, tuple): diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 688fb550..f7192aae 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -10,6 +10,7 @@ import pytest import dask_awkward as dak +from dask_awkward.layers import _dask_uses_tasks from dask_awkward.lib.core import Array from dask_awkward.lib.optimize import optimize as dak_optimize from dask_awkward.lib.testutils import assert_eq @@ -94,8 +95,11 @@ def input_layer_array_partition0(collection: Array) -> ak.Array: optimized_hlg = dak_optimize(collection.dask, collection.keys) # type: ignore layers = list(optimized_hlg.layers) # type: ignore layer_name = [name for name in layers if name.startswith("from-json")][0] - sgc, arg = optimized_hlg[(layer_name, 0)] - array = sgc.dsk[layer_name][0](arg) + if _dask_uses_tasks: + array = optimized_hlg[(layer_name, 0)]() + else: + sgc, arg = optimized_hlg[(layer_name, 0)] + array = sgc.dsk[layer_name][0](arg) return array From bdb42e7dc18f7d5af85d86df814ecbf882f104d1 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sat, 7 Dec 2024 14:17:34 -0600 Subject: [PATCH 23/29] use TaskRef to pass test - may not be correct --- src/dask_awkward/lib/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index d7c1a4e0..9c5aa280 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -1928,7 +1928,11 @@ def partitionwise_layer( pairs.extend([arg.name, "i"]) numblocks[arg.name] = (1,) elif isinstance(arg, Delayed): - pairs.extend([arg.key, None]) + if _dask_uses_tasks: + from dask._task_spec import TaskRef + pairs.extend([TaskRef(arg.key), None]) + else: + pairs.extend([arg.key, None]) elif is_dask_collection(arg): raise DaskAwkwardNotImplemented( "Use of Array with other Dask collections is currently unsupported." From 91e489017e2157a6ce3e8adf3ba70f548304ebab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 7 Dec 2024 20:17:50 +0000 Subject: [PATCH 24/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/lib/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 9c5aa280..957d1ccb 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -1930,6 +1930,7 @@ def partitionwise_layer( elif isinstance(arg, Delayed): if _dask_uses_tasks: from dask._task_spec import TaskRef + pairs.extend([TaskRef(arg.key), None]) else: pairs.extend([arg.key, None]) From 6420dcc299e6d7525eb5148713d7741febee7fda Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sat, 7 Dec 2024 14:19:45 -0600 Subject: [PATCH 25/29] import _dask_uses_tasks --- src/dask_awkward/lib/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 957d1ccb..1f24f655 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -45,7 +45,7 @@ from dask.utils import OperatorMethodMixin as DaskOperatorMethodMixin from dask.utils import funcname, is_arraylike, key_split -from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer +from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer, _dask_uses_tasks from dask_awkward.lib.optimize import all_optimizations from dask_awkward.utils import ( ConcretizationTypeError, From cee57c87378e22af6d87bee03e4db20bad09cb42 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 7 Dec 2024 20:20:00 +0000 Subject: [PATCH 26/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/dask_awkward/lib/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 1f24f655..0ce57c19 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -45,7 +45,11 @@ from dask.utils import OperatorMethodMixin as DaskOperatorMethodMixin from dask.utils import funcname, is_arraylike, key_split -from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer, _dask_uses_tasks +from dask_awkward.layers import ( + AwkwardBlockwiseLayer, + AwkwardMaterializedLayer, + _dask_uses_tasks, +) from dask_awkward.lib.optimize import all_optimizations from dask_awkward.utils import ( ConcretizationTypeError, From 57a247273b0357dbbf6889a4d44eb4df10a72910 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sat, 7 Dec 2024 14:21:09 -0600 Subject: [PATCH 27/29] don't import in the function call --- src/dask_awkward/lib/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 0ce57c19..786ff2bb 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -61,6 +61,9 @@ is_empty_slice, ) +if _dask_uses_tasks: + from dask._task_spec import TaskRef + if TYPE_CHECKING: from awkward.contents.content import Content from awkward.forms.form import Form @@ -1933,8 +1936,6 @@ def partitionwise_layer( numblocks[arg.name] = (1,) elif isinstance(arg, Delayed): if _dask_uses_tasks: - from dask._task_spec import TaskRef - pairs.extend([TaskRef(arg.key), None]) else: pairs.extend([arg.key, None]) From 6a19642cb61ce542e6d3df834fa3f7b269911c40 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sat, 7 Dec 2024 14:26:16 -0600 Subject: [PATCH 28/29] better check for dask._task_spec --- src/dask_awkward/layers/layers.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 24b33c99..b5c54523 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -6,7 +6,7 @@ import dask -_dask_uses_tasks = dask.__version__ >= "2024.12.0" +_dask_uses_tasks = hasattr(dask, "_task_spec") from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer @@ -15,6 +15,9 @@ from dask_awkward.utils import LazyInputsDict +if _dask_uses_tasks: + from dask._task_spec import Task, TaskRef + if TYPE_CHECKING: from awkward import Array as AwkwardArray from awkward._nplikes.typetracer import TypeTracerReport @@ -173,8 +176,6 @@ def __init__( } if _dask_uses_tasks: - from dask._task_spec import Task, TaskRef - super_kwargs["task"] = Task(name, self.io_func, TaskRef(blockwise_token(0))) else: super_kwargs["dsk"] = {name: (self.io_func, blockwise_token(0))} From 5d01fef8d8c94ca82e79e481345e91f79c59c5dc Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sat, 7 Dec 2024 15:16:09 -0600 Subject: [PATCH 29/29] avoid imports in loops --- src/dask_awkward/lib/optimize.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index e830a67b..4b9dd6cf 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -21,6 +21,9 @@ from dask_awkward.lib.utils import typetracer_nochecks from dask_awkward.utils import first +if _dask_uses_tasks: + from dask._task_spec import GraphNode, Task, TaskRef + if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport from dask.typing import Key @@ -367,8 +370,6 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelG outlayer.io_deps[k] = layer.io_deps[k] if _dask_uses_tasks: - from dask._task_spec import GraphNode, Task - func = layer.task.func args = [ arg.key if isinstance(arg, GraphNode) else arg @@ -412,8 +413,6 @@ def _recursive_replace(args, layer, parent, indices): elif layer.indices[ind][0] == parent: # arg refers to output of previous layer if _dask_uses_tasks: - from dask._task_spec import TaskRef - args2.append(TaskRef(parent)) else: args2.append(parent) @@ -422,8 +421,6 @@ def _recursive_replace(args, layer, parent, indices): indices.append(layer.indices[ind]) arg2 = f"__dask_blockwise__{len(indices) - 1}" if _dask_uses_tasks: - from dask._task_spec import TaskRef - args2.append(TaskRef(arg2)) else: args2.append(arg2)