From 423533a31cf9efd46243108fdf1a22f4dee1fddc Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 25 Apr 2024 12:38:58 +0200 Subject: [PATCH] Use unified stack and concatenate functions --- lib/iris/_concatenate.py | 8 ++-- lib/iris/_lazy_data.py | 89 +++++++++++++++++++++++++++++++++++----- lib/iris/aux_factory.py | 3 +- lib/iris/cube.py | 15 ++----- 4 files changed, 88 insertions(+), 27 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 8d929c2af2..b25698da89 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -7,9 +7,9 @@ from collections import defaultdict, namedtuple import warnings -import dask.array as da import numpy as np +from iris._lazy_data import concatenate as concatenate_arrays import iris.coords import iris.cube import iris.exceptions @@ -1112,7 +1112,7 @@ def _build_cell_measures(self): skton.signature.cell_measures_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.cm_metadata[i].defn._asdict() @@ -1152,7 +1152,7 @@ def _build_ancillary_variables(self): skton.signature.ancillary_variables_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.av_metadata[i].defn._asdict() @@ -1245,7 +1245,7 @@ def _build_data(self): skeletons = self._skeletons data = [skeleton.data for skeleton in skeletons] - data = da.concatenate(data, self.axis) + data = concatenate_arrays(data, self.axis) return data diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 1a2047cbd7..9bb5293d25 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -42,6 +42,11 @@ def is_lazy_data(data): return result +def is_masked_data(data: np.ndarray | da.Array) -> bool: + """Return whether the argument is a masked array.""" + return isinstance(da.utils.meta_from_array(data), np.ma.MaskedArray) + + def is_lazy_masked_data(data): """Determine whether managed data is lazy and masked. @@ -49,7 +54,7 @@ def is_lazy_masked_data(data): underlying array is of masked type. Otherwise return False. """ - return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data)) + return is_lazy_data(data) and is_masked_data(data) @lru_cache @@ -348,19 +353,83 @@ def as_concrete_data(data): return data -def stack(seq: Sequence[da.Array | np.ndarray]) -> da.Array: - """Stack arrays along a new axis. +def _combine( + arrays: Sequence[da.Array | np.ndarray], + operation: str, + **kwargs, +) -> da.Array | np.ndarray: + """Combine multiple arrays into a single array. + + Parameters + ---------- + arrays : + The arrays to combine. + operation : + The combination operation to apply. + **kwargs : + Any keyword arguments to pass to the combination operation. + + """ + lazy = any(is_lazy_data(a) for a in arrays) + masked = any(is_masked_data(a) for a in arrays) + + array_module = np + if masked: + if lazy: + # Avoid inconsistent array type when slicing resulting array + arrays = tuple( + a if is_lazy_masked_data(a) else da.ma.masked_array(a) for a in arrays + ) + else: + # Avoid dropping the masks + array_module = np.ma + + func = getattr(array_module, operation) + return func(arrays, **kwargs) + + +def concatenate( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Concatenate a sequence of arrays along a new axis. + + Parameters + ---------- + arrays : + The arrays must have the same shape, except in the dimension + corresponding to `axis` (the first, by default). + axis : + Dimension along which to align all of the arrays. If axis is None, + arrays are flattened before use. + + Returns + ------- + The concatenated array. - This version of :func:`da.stack` ensures all slices of the resulting array - are masked arrays if any of the input arrays is masked. """ + return _combine(arrays, operation="concatenate", axis=axis) + - def is_masked(a): - return isinstance(da.utils.meta_from_array(a), np.ma.MaskedArray) +def stack( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Stack a sequence of arrays along a new axis. - if any(is_masked(a) for a in seq): - seq = [a if is_masked(a) else da.ma.masked_array(a) for a in seq] - return da.stack(seq) + Parameters + ---------- + arrays : + The arrays must have the same shape. + axis : + Dimension along which to align all of the arrays. + + Returns + ------- + The stacked array. + + """ + return _combine(arrays, operation="stack", axis=axis) def multidim_lazy_stack(arr): diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index cd59575f93..41e1e9f573 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -11,6 +11,7 @@ import dask.array as da import numpy as np +from iris._lazy_data import concatenate from iris.common import CFVariableMixin, CoordMetadata, metadata_manager_factory import iris.coords from iris.warnings import IrisIgnoringBoundsWarning @@ -1076,7 +1077,7 @@ def _derive(self, sigma, eta, depth, depth_c, zlev, nsigma, coord_dims_func): result_rest_levs = zlev[z_slices_rest] * ones_full_result[z_slices_rest] # Combine nsigma and 'rest' levels for the final result. - result = da.concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) + result = concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) return result def make_coord(self, coord_dims_func): diff --git a/lib/iris/cube.py b/lib/iris/cube.py index fecbcaf0da..8418a630b5 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -25,7 +25,6 @@ import zlib from cf_units import Unit -import dask.array as da import numpy as np import numpy.ma as ma @@ -3134,12 +3133,7 @@ def make_chunk(key): result = chunks[0] else: chunk_data = [chunk.core_data() for chunk in chunks] - if self.has_lazy_data(): - func = da.concatenate - else: - module = ma if ma.isMaskedArray(self.data) else np - func = module.concatenate - data = func(chunk_data, dim) + data = _lazy.concatenate(chunk_data, axis=dim) result = iris.cube.Cube(data) result.metadata = deepcopy(self.metadata) @@ -4432,13 +4426,10 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # Choose appropriate data and functions for data aggregation. if aggregator.lazy_func is not None and self.has_lazy_data(): - stack = da.stack input_data = self.lazy_data() agg_method = aggregator.lazy_aggregate else: input_data = self.data - # Note numpy.stack does not preserve masks. - stack = ma.stack if ma.isMaskedArray(input_data) else np.stack agg_method = aggregator.aggregate # Create data and weights slices. @@ -4475,11 +4466,11 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # before combining the different slices. if return_weights: result, weights_result = list(zip(*result)) - aggregateby_weights = stack(weights_result, axis=dimension_to_groupby) + aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby) else: aggregateby_weights = None - aggregateby_data = stack(result, axis=dimension_to_groupby) + aggregateby_data = _lazy.stack(result, axis=dimension_to_groupby) # Ensure plain ndarray is output if plain ndarray was input. if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(input_data): aggregateby_data = ma.getdata(aggregateby_data)