Skip to content

Commit

Permalink
Refactored EchoData.update_platform to improve readability (OSOcean…
Browse files Browse the repository at this point in the history
…Acoustics#1209)

* refactored update_platform method

* removed extra print statements
  • Loading branch information
praneethratna authored Nov 8, 2023
1 parent 9344fcf commit 71f0250
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 112 deletions.
119 changes: 7 additions & 112 deletions echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
if TYPE_CHECKING:
from ..core import EngineHint, FileFormatHint, PathHint, SonarModelsHint

from ..echodata.utils_platform import _clip_by_time_dim, get_mappings_expanded
from ..utils.coding import sanitize_dtypes, set_time_encodings
from ..utils.io import check_file_existence, delete_zarr_store, sanitize_file_path
from ..utils.log import _init_logger
Expand Down Expand Up @@ -353,61 +354,6 @@ def update_platform(
obs_dim = list(extra_platform_data[time_dim].dims)[0]
extra_platform_data = extra_platform_data.swap_dims({obs_dim: time_dim})

def _extvar_properties(ds, name):
"""Test the external variable for presence and all-nan values,
and extract its time dimension name.
Returns <presence>, <valid values>, <time dim name>
"""
if name in ds:
# Assumes the only dimension in the variable is a time dimension
time_dim_name = ds[name].dims[0] if len(ds[name].dims) > 0 else "scalar"
if not ds[name].isnull().all():
return True, True, time_dim_name
else:
return True, False, time_dim_name
else:
return False, False, None

# clip incoming time to 1 less than min of EchoData["Sonar/Beam_group1"]["ping_time"] and
# 1 greater than max of EchoData["Sonar/Beam_group1"]["ping_time"]
# account for unsorted external time by checking whether each time value is between
# min and max ping_time instead of finding the 2 external times corresponding to the
# min and max ping_time and taking all the times between those indices
def _clip_by_time_dim(external_ds, ext_time_dim_name):
# external_ds is extra_platform_data[vars-list-on-time_dim_name]
sorted_external_time = external_ds[ext_time_dim_name].data
sorted_external_time.sort()
# fmt: off
min_index = max(
np.searchsorted(
sorted_external_time,
self["Sonar/Beam_group1"]["ping_time"].min(),
side="left"
) - 1,
0,
)
# fmt: on
max_index = min(
np.searchsorted(
sorted_external_time,
self["Sonar/Beam_group1"]["ping_time"].max(),
side="right",
),
len(sorted_external_time) - 1,
)
# TODO: this element-wise comparison is expensive and seems an ad-hoc patch
# to deal with potentially reversed timestamps in the external dataset.
# Review at workflow stage to see if to clean up timestamp reversals
# and just find start/end timestamp for slicing.
return external_ds.sel(
{
ext_time_dim_name: np.logical_and(
sorted_external_time[min_index] <= external_ds[ext_time_dim_name],
external_ds[ext_time_dim_name] <= sorted_external_time[max_index],
)
}
)

# History attribute to be included in each updated variable
history_attr = f"{datetime.datetime.utcnow()} +00:00. Added from external platform data"
if extra_platform_data_file_name:
Expand All @@ -418,62 +364,9 @@ def _clip_by_time_dim(external_ds, ext_time_dim_name):
# Retain only variable_mappings items where
# either the Platform group or extra_platform_data
# contain the corresponding variables or contain valid (not all nan) data
mappings_expanded = {}
for platform_var, external_var in variable_mappings.items():
# TODO: instead of using existing Platform group variables, a better practice is to
# define a set of allowable Platform variables (sonar_model dependent) for this check.
# This set can be dynamically generated from an external source like a CDL or yaml.
if platform_var in platform:
platform_validvalues = not platform[platform_var].isnull().all()
ext_present, ext_validvalues, ext_time_dim_name = _extvar_properties(
extra_platform_data, external_var
)
if ext_present and ext_validvalues:
mappings_expanded[platform_var] = dict(
external_var=external_var,
ext_time_dim_name=ext_time_dim_name,
platform_validvalues=platform_validvalues,
)

# Generate warning if mappings_expanded is empty
if not mappings_expanded:
logger.warning(
"No variables will be updated, "
"check variable_mappings to ensure variable names are correctly specified!"
)

# If longitude or latitude are requested, verify that both are present
# and they share the same external time dimension
if "longitude" in mappings_expanded or "latitude" in mappings_expanded:
if "longitude" not in mappings_expanded or "latitude" not in mappings_expanded:
raise ValueError(
"Only one of latitude and longitude are specified. Please include both, or neither." # noqa
)
if (
mappings_expanded["longitude"]["ext_time_dim_name"]
!= mappings_expanded["latitude"]["ext_time_dim_name"]
):
raise ValueError(
"The external latitude and longitude use different time dimensions. "
"They must share the same time dimension."
)

# Generate warnings regarding variables that will be updated
vars_not_handled = set(variable_mappings.keys()).difference(mappings_expanded.keys())
if len(vars_not_handled) > 0:
logger.warning(
f"The following requested variables will not be updated: {', '.join(vars_not_handled)}" # noqa
)

vars_notnan_replaced = [
platform_var
for platform_var, v in mappings_expanded.items()
if v["platform_validvalues"]
]
if len(vars_notnan_replaced) > 0:
logger.warning(
f"Some variables with valid data in the original Platform group will be overwritten: {', '.join(vars_notnan_replaced)}" # noqa
)
mappings_expanded = get_mappings_expanded(
logger, extra_platform_data, variable_mappings, platform
)

# Create names for required new time dimensions
ext_time_dims = list(
Expand All @@ -495,7 +388,9 @@ def _clip_by_time_dim(external_ds, ext_time_dim_name):
k: v for k, v in mappings_expanded.items() if v["ext_time_dim_name"] == ext_time_dim
}
ext_vars = [v["external_var"] for v in mappings_selected.values()]
ext_ds = _clip_by_time_dim(extra_platform_data[ext_vars], ext_time_dim)
ext_ds = _clip_by_time_dim(
extra_platform_data[ext_vars], ext_time_dim, self["Sonar/Beam_group1"]["ping_time"]
)

# Create new time coordinate and dimension
platform = platform.assign_coords(**{time_dim: ext_ds[ext_time_dim].values})
Expand Down
139 changes: 139 additions & 0 deletions echopype/echodata/utils_platform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import numpy as np


def _extvar_properties(ds, name):
"""Test the external variable for presence and all-nan values,
and extract its time dimension name.
Returns <presence>, <valid values>, <time dim name>
"""
if name in ds:
# Assumes the only dimension in the variable is a time dimension
time_dim_name = ds[name].dims[0] if len(ds[name].dims) > 0 else "scalar"
if not ds[name].isnull().all():
return True, True, time_dim_name
else:
return True, False, time_dim_name
else:
return False, False, None


def _clip_by_time_dim(external_ds, ext_time_dim_name, ping_time):
"""
Clip incoming time to 1 less than min of EchoData["Sonar/Beam_group1"]["ping_time"]
and 1 greater than max of EchoData["Sonar/Beam_group1"]["ping_time"].
Accounts for unsorted external time by checking whether each time value is between
min and max ping_time instead of finding the 2 external times corresponding to the
min and max ping_time and taking all the times between those indices.
"""

sorted_external_time = external_ds[ext_time_dim_name].data
sorted_external_time.sort()

min_index = max(
np.searchsorted(sorted_external_time, ping_time.min(), side="left") - 1,
0,
)

max_index = min(
np.searchsorted(
sorted_external_time,
ping_time.max(),
side="right",
),
len(sorted_external_time) - 1,
)

return external_ds.sel(
{
ext_time_dim_name: np.logical_and(
sorted_external_time[min_index] <= external_ds[ext_time_dim_name],
external_ds[ext_time_dim_name] <= sorted_external_time[max_index],
)
}
)


def get_mappings_expanded(logger, extra_platform_data, variable_mappings, platform):
"""
Generate a dictionary of mappings between Platform group variables and external variables.
Parameters
----------
logger : logging.Logger
A logger object to log warnings and errors.
extra_platform_data : xr.Dataset
An `xr.Dataset` containing the additional platform data to be added
to the `EchoData["Platform"]` group.
variable_mappings: Dict[str,str]
A dictionary mapping Platform variable names (dict key) to the
external-data variable name (dict value).
platform: xr.Dataset
An `xr.Dataset` containing the original Platform data.
Returns
-------
mappings_expanded: Dict[str, Dict[str, Any]]
A dictionary containing mappings between Platform group variables and external variables.
Raises
------
ValueError: If only one of latitude and longitude are specified.
If the external latitude and longitude use different time dimensions.
"""

mappings_expanded = {}
for platform_var, external_var in variable_mappings.items():
# TODO: instead of using existing Platform group variables, a better practice is to
# define a set of allowable Platform variables (sonar_model dependent) for this check.
# This set can be dynamically generated from an external source like a CDL or yaml.
if platform_var in platform:
platform_validvalues = not platform[platform_var].isnull().all()
ext_present, ext_validvalues, ext_time_dim_name = _extvar_properties(
extra_platform_data, external_var
)
if ext_present and ext_validvalues:
mappings_expanded[platform_var] = dict(
external_var=external_var,
ext_time_dim_name=ext_time_dim_name,
platform_validvalues=platform_validvalues,
)

# Generate warning if mappings_expanded is empty
if not mappings_expanded:
logger.warning(
"No variables will be updated, "
"check variable_mappings to ensure variable names are correctly specified!"
)

# If longitude or latitude are requested, verify that both are present
# and they share the same external time dimension
if "longitude" in mappings_expanded or "latitude" in mappings_expanded:
if "longitude" not in mappings_expanded or "latitude" not in mappings_expanded:
raise ValueError(
"Only one of latitude and longitude are specified. Please include both, or neither." # noqa
)
if (
mappings_expanded["longitude"]["ext_time_dim_name"]
!= mappings_expanded["latitude"]["ext_time_dim_name"]
):
raise ValueError(
"The external latitude and longitude use different time dimensions. "
"They must share the same time dimension."
)

# Generate warnings regarding variables that will be updated
vars_not_handled = set(variable_mappings.keys()).difference(mappings_expanded.keys())
if len(vars_not_handled) > 0:
logger.warning(
f"The following requested variables will not be updated: {', '.join(vars_not_handled)}" # noqa
)

vars_notnan_replaced = [
platform_var for platform_var, v in mappings_expanded.items() if v["platform_validvalues"]
]
if len(vars_notnan_replaced) > 0:
logger.warning(
f"Some variables with valid data in the original Platform group will be overwritten: {', '.join(vars_notnan_replaced)}" # noqa
)

return mappings_expanded

0 comments on commit 71f0250

Please sign in to comment.