Skip to content

Commit

Permalink
Use unified stack and concatenate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
bouweandela committed Apr 25, 2024
1 parent be91f69 commit 423533a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 27 deletions.
8 changes: 4 additions & 4 deletions lib/iris/_concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from collections import defaultdict, namedtuple
import warnings

import dask.array as da
import numpy as np

from iris._lazy_data import concatenate as concatenate_arrays
import iris.coords
import iris.cube
import iris.exceptions
Expand Down Expand Up @@ -1112,7 +1112,7 @@ def _build_cell_measures(self):
skton.signature.cell_measures_and_dims[i].coord.data
for skton in skeletons
]
data = np.concatenate(tuple(data), axis=dim)
data = concatenate_arrays(tuple(data), axis=dim)

# Generate the associated metadata.
kwargs = cube_signature.cm_metadata[i].defn._asdict()
Expand Down Expand Up @@ -1152,7 +1152,7 @@ def _build_ancillary_variables(self):
skton.signature.ancillary_variables_and_dims[i].coord.data
for skton in skeletons
]
data = np.concatenate(tuple(data), axis=dim)
data = concatenate_arrays(tuple(data), axis=dim)

# Generate the associated metadata.
kwargs = cube_signature.av_metadata[i].defn._asdict()
Expand Down Expand Up @@ -1245,7 +1245,7 @@ def _build_data(self):
skeletons = self._skeletons
data = [skeleton.data for skeleton in skeletons]

data = da.concatenate(data, self.axis)
data = concatenate_arrays(data, self.axis)

return data

Expand Down
89 changes: 79 additions & 10 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ def is_lazy_data(data):
return result


def is_masked_data(data: np.ndarray | da.Array) -> bool:
"""Return whether the argument is a masked array."""
return isinstance(da.utils.meta_from_array(data), np.ma.MaskedArray)


def is_lazy_masked_data(data):
"""Determine whether managed data is lazy and masked.
Return True if the argument is both an Iris 'lazy' data array and the
underlying array is of masked type. Otherwise return False.
"""
return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data))
return is_lazy_data(data) and is_masked_data(data)


@lru_cache
Expand Down Expand Up @@ -348,19 +353,83 @@ def as_concrete_data(data):
return data


def stack(seq: Sequence[da.Array | np.ndarray]) -> da.Array:
"""Stack arrays along a new axis.
def _combine(
arrays: Sequence[da.Array | np.ndarray],
operation: str,
**kwargs,
) -> da.Array | np.ndarray:
"""Combine multiple arrays into a single array.
Parameters
----------
arrays :
The arrays to combine.
operation :
The combination operation to apply.
**kwargs :
Any keyword arguments to pass to the combination operation.
"""
lazy = any(is_lazy_data(a) for a in arrays)
masked = any(is_masked_data(a) for a in arrays)

array_module = np
if masked:
if lazy:
# Avoid inconsistent array type when slicing resulting array
arrays = tuple(
a if is_lazy_masked_data(a) else da.ma.masked_array(a) for a in arrays
)
else:
# Avoid dropping the masks
array_module = np.ma

func = getattr(array_module, operation)
return func(arrays, **kwargs)


def concatenate(
arrays: Sequence[da.Array | np.ndarray],
axis: int = 0,
) -> da.Array | np.ndarray:
"""Concatenate a sequence of arrays along a new axis.
Parameters
----------
arrays :
The arrays must have the same shape, except in the dimension
corresponding to `axis` (the first, by default).
axis :
Dimension along which to align all of the arrays. If axis is None,
arrays are flattened before use.
Returns
-------
The concatenated array.
This version of :func:`da.stack` ensures all slices of the resulting array
are masked arrays if any of the input arrays is masked.
"""
return _combine(arrays, operation="concatenate", axis=axis)


def is_masked(a):
return isinstance(da.utils.meta_from_array(a), np.ma.MaskedArray)
def stack(
arrays: Sequence[da.Array | np.ndarray],
axis: int = 0,
) -> da.Array | np.ndarray:
"""Stack a sequence of arrays along a new axis.
if any(is_masked(a) for a in seq):
seq = [a if is_masked(a) else da.ma.masked_array(a) for a in seq]
return da.stack(seq)
Parameters
----------
arrays :
The arrays must have the same shape.
axis :
Dimension along which to align all of the arrays.
Returns
-------
The stacked array.
"""
return _combine(arrays, operation="stack", axis=axis)


def multidim_lazy_stack(arr):
Expand Down
3 changes: 2 additions & 1 deletion lib/iris/aux_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dask.array as da
import numpy as np

from iris._lazy_data import concatenate
from iris.common import CFVariableMixin, CoordMetadata, metadata_manager_factory
import iris.coords
from iris.warnings import IrisIgnoringBoundsWarning
Expand Down Expand Up @@ -1076,7 +1077,7 @@ def _derive(self, sigma, eta, depth, depth_c, zlev, nsigma, coord_dims_func):
result_rest_levs = zlev[z_slices_rest] * ones_full_result[z_slices_rest]

# Combine nsigma and 'rest' levels for the final result.
result = da.concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim)
result = concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim)
return result

def make_coord(self, coord_dims_func):
Expand Down
15 changes: 3 additions & 12 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import zlib

from cf_units import Unit
import dask.array as da
import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -3134,12 +3133,7 @@ def make_chunk(key):
result = chunks[0]
else:
chunk_data = [chunk.core_data() for chunk in chunks]
if self.has_lazy_data():
func = da.concatenate
else:
module = ma if ma.isMaskedArray(self.data) else np
func = module.concatenate
data = func(chunk_data, dim)
data = _lazy.concatenate(chunk_data, axis=dim)
result = iris.cube.Cube(data)
result.metadata = deepcopy(self.metadata)

Expand Down Expand Up @@ -4432,13 +4426,10 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs):

# Choose appropriate data and functions for data aggregation.
if aggregator.lazy_func is not None and self.has_lazy_data():
stack = da.stack
input_data = self.lazy_data()
agg_method = aggregator.lazy_aggregate
else:
input_data = self.data
# Note numpy.stack does not preserve masks.
stack = ma.stack if ma.isMaskedArray(input_data) else np.stack
agg_method = aggregator.aggregate

# Create data and weights slices.
Expand Down Expand Up @@ -4475,11 +4466,11 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs):
# before combining the different slices.
if return_weights:
result, weights_result = list(zip(*result))
aggregateby_weights = stack(weights_result, axis=dimension_to_groupby)
aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby)
else:
aggregateby_weights = None

aggregateby_data = stack(result, axis=dimension_to_groupby)
aggregateby_data = _lazy.stack(result, axis=dimension_to_groupby)
# Ensure plain ndarray is output if plain ndarray was input.
if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(input_data):
aggregateby_data = ma.getdata(aggregateby_data)
Expand Down

0 comments on commit 423533a

Please sign in to comment.