Skip to content

Commit

Permalink
Add support for comparing different data types
Browse files Browse the repository at this point in the history
  • Loading branch information
bouweandela committed Apr 26, 2024
1 parent 35facc7 commit 6e883da
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 46 deletions.
93 changes: 47 additions & 46 deletions lib/iris/_concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64:
"""Calculate a hash representation of the provided array.
Calculates a 64-bit non-cryptographic hash of the provided array, using
the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest
string representation of the hash.
the fast ``xxhash`` hashing algorithm.
Note that the computed hash depends on how the array is chunked.
Expand All @@ -303,12 +302,13 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64:
Returns
-------
np.int64
The string hexadecimal representation of the item's 64-bit hash.
The array's hash.
"""

def arrayhash(x):
value = xxh3_64(x.data.tobytes())
value = xxh3_64(np.array(x.shape, dtype=np.uint).tobytes())
value.update(x.data.tobytes())
if is_masked_data(x):
value.update(x.mask.tobytes())
return np.frombuffer(value.digest(), dtype=np.int64)
Expand All @@ -335,47 +335,34 @@ def __eq__(self, other: "_ArrayHash") -> bool:
return self.value == other.value


def _compute_hashes(
cubes: Iterable[iris.cube.Cube],
check_aux_coords: bool,
check_derived_coords: bool,
check_cell_measures: bool,
check_ancils: bool,
) -> dict[str, _ArrayHash]:
def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]:
"""Compute hashes for the arrays that will be compared."""
arrays = []
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

hashes = {}

def get_shape(a):
return a.shape
def is_numerical(dtype):
return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number)

arrays.sort(key=get_shape)
for _, group in itertools.groupby(arrays, key=get_shape):
def group_key(a):
if is_numerical(a.dtype):
dtype = "numerical"
else:
dtype = str(a.dtype)
return a.shape, dtype

arrays = sorted(arrays, key=group_key)
for _, group in itertools.groupby(arrays, key=group_key):
group = list(group)
# TODO: Unify dtype as the hash depends on the dtype
# Unify dtype for numerical arrays, as the hash depends on it
if is_numerical(group[0].dtype):
dtype = np.result_type(*group)
same_dtype_arrays = [a.astype(dtype) for a in group]
else:
same_dtype_arrays = group
# Unify chunks as the hash depends on the chunks.
indices = tuple(range(group[0].ndim))[::-1]
argpairs = [(a, indices) for a in group]
_, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs))
for array, rechunked in zip(group, rechunked_group):
argpairs = [(a, indices) for a in same_dtype_arrays]
rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1]
for array, rechunked in zip(group, rechunked_arrays):
hashes[dask.base.tokenize(array)] = (
_hash_array(rechunked),
rechunked.chunks,
Expand Down Expand Up @@ -435,15 +422,29 @@ def concatenate(
# which requires to be negotiated.
axis = None

# Register each cube with its appropriate proto-cube.
hashes = _compute_hashes(
cubes,
check_aux_coords=check_aux_coords,
check_cell_measures=check_cell_measures,
check_ancils=check_ancils,
check_derived_coords=check_derived_coords,
)
# Compute hashes for parallel array comparison.
arrays = []
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

hashes = _compute_hashes(arrays)

# Register each cube with its appropriate proto-cube.
for cube in cubes:
registered = False

Expand Down
21 changes: 21 additions & 0 deletions lib/iris/tests/unit/concatenate/test_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import dask.array as da
from dask.base import tokenize
import numpy as np
import pytest

from iris import _concatenate


@pytest.mark.parametrize(
"a,b,eq",
[
(np.arange(2), da.arange(2), True),
(np.array([1], dtype=np.float32), np.array([1], dtype=bool), True),
(np.array([1]), np.array([[1]]), False),
(np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True),
(da.arange(2, chunks=1), da.arange(2, chunks=2), True),
],
)
def test_compute_hashes(a, b, eq):
hashes = _concatenate._compute_hashes([a, b])
assert eq == (hashes[tokenize(a)].value == hashes[tokenize(b)].value)

0 comments on commit 6e883da

Please sign in to comment.