diff --git a/.github/workflows/docs-ci.yml b/.github/workflows/docs-ci.yml new file mode 100644 index 00000000..d99540e3 --- /dev/null +++ b/.github/workflows/docs-ci.yml @@ -0,0 +1,25 @@ +name: "docs check" +on: + pull_request: + branches: + - master + - public + +jobs: + docs: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: '3.8' + - name: Install pandoc + run: | + sudo apt-get update -y && sudo apt-get install -y pandoc + - name: Install dependencies + run: | + pip install sphinx sphinx-rtd-theme nbsphinx nbsphinx-link + - name: Build Sphinx docs + working-directory: docs/ + run: | + make html diff --git a/docs/source/api/neuralhydrology.data.rst b/docs/source/api/neuralhydrology.data.rst deleted file mode 100644 index 8eed95f4..00000000 --- a/docs/source/api/neuralhydrology.data.rst +++ /dev/null @@ -1,20 +0,0 @@ -nh.data -======= - -.. automodule:: neuralhydrology.data - :members: - :undoc-members: - :show-inheritance: - -.. toctree:: - :maxdepth: 4 - - neuralhydrology.data.basedataset - neuralhydrology.data.camelsus - neuralhydrology.data.hourlycamelsus - neuralhydrology.data.camelsgb - neuralhydrology.data.caravan - neuralhydrology.data.climateindices - neuralhydrology.data.dischargeinput - neuralhydrology.data.pet - neuralhydrology.data.utils diff --git a/docs/source/api/neuralhydrology.data.basedataset.rst b/docs/source/api/neuralhydrology.datasetzoo.basedataset.rst similarity index 58% rename from docs/source/api/neuralhydrology.data.basedataset.rst rename to docs/source/api/neuralhydrology.datasetzoo.basedataset.rst index 8aaf33f1..bc60db2a 100644 --- a/docs/source/api/neuralhydrology.data.basedataset.rst +++ b/docs/source/api/neuralhydrology.datasetzoo.basedataset.rst @@ -1,7 +1,7 @@ BaseDataset =========== -.. automodule:: neuralhydrology.data.basedataset +.. automodule:: neuralhydrology.datasetzoo.basedataset :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/api/neuralhydrology.data.camelsgb.rst b/docs/source/api/neuralhydrology.datasetzoo.camelsgb.rst similarity index 58% rename from docs/source/api/neuralhydrology.data.camelsgb.rst rename to docs/source/api/neuralhydrology.datasetzoo.camelsgb.rst index dffefa3c..5c2bc844 100644 --- a/docs/source/api/neuralhydrology.data.camelsgb.rst +++ b/docs/source/api/neuralhydrology.datasetzoo.camelsgb.rst @@ -1,7 +1,7 @@ CamelsGB ======== -.. automodule:: neuralhydrology.data.camelsgb +.. automodule:: neuralhydrology.datasetzoo.camelsgb :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/api/neuralhydrology.data.camelsus.rst b/docs/source/api/neuralhydrology.datasetzoo.camelsus.rst similarity index 58% rename from docs/source/api/neuralhydrology.data.camelsus.rst rename to docs/source/api/neuralhydrology.datasetzoo.camelsus.rst index b3540506..8cd08366 100644 --- a/docs/source/api/neuralhydrology.data.camelsus.rst +++ b/docs/source/api/neuralhydrology.datasetzoo.camelsus.rst @@ -1,7 +1,7 @@ CamelsUS ======== -.. automodule:: neuralhydrology.data.camelsus +.. automodule:: neuralhydrology.datasetzoo.camelsus :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.data.hourlycamelsus.rst b/docs/source/api/neuralhydrology.datasetzoo.hourlycamelsus.rst similarity index 59% rename from docs/source/api/neuralhydrology.data.hourlycamelsus.rst rename to docs/source/api/neuralhydrology.datasetzoo.hourlycamelsus.rst index 7876745f..b1c3a527 100644 --- a/docs/source/api/neuralhydrology.data.hourlycamelsus.rst +++ b/docs/source/api/neuralhydrology.datasetzoo.hourlycamelsus.rst @@ -1,7 +1,7 @@ HourlyCamelsUS ============== -.. automodule:: neuralhydrology.data.hourlycamelsus +.. automodule:: neuralhydrology.datasetzoo.hourlycamelsus :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/api/neuralhydrology.datasetzoo.rst b/docs/source/api/neuralhydrology.datasetzoo.rst new file mode 100644 index 00000000..f8f49b58 --- /dev/null +++ b/docs/source/api/neuralhydrology.datasetzoo.rst @@ -0,0 +1,15 @@ +nh.datasetzoo +============= + +.. automodule:: neuralhydrology.datasetzoo + :members: + :undoc-members: + :show-inheritance: + +.. toctree:: + :maxdepth: 4 + + neuralhydrology.datasetzoo.basedataset + neuralhydrology.datasetzoo.camelsus + neuralhydrology.datasetzoo.hourlycamelsus + neuralhydrology.datasetzoo.camelsgb diff --git a/docs/source/api/neuralhydrology.data.climateindices.rst b/docs/source/api/neuralhydrology.datautils.climateindices.rst similarity index 59% rename from docs/source/api/neuralhydrology.data.climateindices.rst rename to docs/source/api/neuralhydrology.datautils.climateindices.rst index 253ed06c..2d6dd2cf 100644 --- a/docs/source/api/neuralhydrology.data.climateindices.rst +++ b/docs/source/api/neuralhydrology.datautils.climateindices.rst @@ -1,7 +1,7 @@ climateindices ============== -.. automodule:: neuralhydrology.data.climateindices +.. automodule:: neuralhydrology.datautils.climateindices :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.data.dischargeinput.rst b/docs/source/api/neuralhydrology.datautils.dischargeinput.rst similarity index 59% rename from docs/source/api/neuralhydrology.data.dischargeinput.rst rename to docs/source/api/neuralhydrology.datautils.dischargeinput.rst index 4f470eeb..894ee906 100644 --- a/docs/source/api/neuralhydrology.data.dischargeinput.rst +++ b/docs/source/api/neuralhydrology.datautils.dischargeinput.rst @@ -1,7 +1,7 @@ dischargeinput ============== -.. automodule:: neuralhydrology.data.dischargeinput +.. automodule:: neuralhydrology.datautils.dischargeinput :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.data.pet.rst b/docs/source/api/neuralhydrology.datautils.pet.rst similarity index 57% rename from docs/source/api/neuralhydrology.data.pet.rst rename to docs/source/api/neuralhydrology.datautils.pet.rst index 7c05d758..65cb03f1 100644 --- a/docs/source/api/neuralhydrology.data.pet.rst +++ b/docs/source/api/neuralhydrology.datautils.pet.rst @@ -1,7 +1,7 @@ pet === -.. automodule:: neuralhydrology.data.pet +.. automodule:: neuralhydrology.datautils.pet :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.datautils.rst b/docs/source/api/neuralhydrology.datautils.rst new file mode 100644 index 00000000..be25b0b2 --- /dev/null +++ b/docs/source/api/neuralhydrology.datautils.rst @@ -0,0 +1,15 @@ +nh.datautils +============ + +.. automodule:: neuralhydrology.datautils + :members: + :undoc-members: + :show-inheritance: + +.. toctree:: + :maxdepth: 4 + + neuralhydrology.datautils.climateindices + neuralhydrology.datautils.dischargeinput + neuralhydrology.datautils.pet + neuralhydrology.datautils.utils diff --git a/docs/source/api/neuralhydrology.data.utils.rst b/docs/source/api/neuralhydrology.datautils.utils.rst similarity index 58% rename from docs/source/api/neuralhydrology.data.utils.rst rename to docs/source/api/neuralhydrology.datautils.utils.rst index 258676ee..f9a5cfe4 100644 --- a/docs/source/api/neuralhydrology.data.utils.rst +++ b/docs/source/api/neuralhydrology.datautils.utils.rst @@ -1,7 +1,7 @@ utils ===== -.. automodule:: neuralhydrology.data.utils +.. automodule:: neuralhydrology.datautils.utils :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.data.caravan.rst b/docs/source/api/neuralhydrology.modelzoo.mtslstm.rst similarity index 52% rename from docs/source/api/neuralhydrology.data.caravan.rst rename to docs/source/api/neuralhydrology.modelzoo.mtslstm.rst index ef3bab55..ff849478 100644 --- a/docs/source/api/neuralhydrology.data.caravan.rst +++ b/docs/source/api/neuralhydrology.modelzoo.mtslstm.rst @@ -1,7 +1,7 @@ -Caravan +MTSLSTM ======= -.. automodule:: neuralhydrology.data.caravan +.. automodule:: neuralhydrology.modelzoo.mtslstm :members: :undoc-members: :show-inheritance: diff --git a/docs/source/api/neuralhydrology.modelzoo.multifreqlstm.rst b/docs/source/api/neuralhydrology.modelzoo.multifreqlstm.rst deleted file mode 100644 index 40909071..00000000 --- a/docs/source/api/neuralhydrology.modelzoo.multifreqlstm.rst +++ /dev/null @@ -1,7 +0,0 @@ -MultiFreqLSTM -============= - -.. automodule:: neuralhydrology.modelzoo.multifreqlstm - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/source/api/neuralhydrology.modelzoo.rst b/docs/source/api/neuralhydrology.modelzoo.rst index 66e50c77..77cee575 100644 --- a/docs/source/api/neuralhydrology.modelzoo.rst +++ b/docs/source/api/neuralhydrology.modelzoo.rst @@ -16,6 +16,6 @@ nh.modelzoo neuralhydrology.modelzoo.fc neuralhydrology.modelzoo.head neuralhydrology.modelzoo.lstm - neuralhydrology.modelzoo.multifreqlstm + neuralhydrology.modelzoo.mtslstm neuralhydrology.modelzoo.odelstm neuralhydrology.modelzoo.template diff --git a/docs/source/api/neuralhydrology.rst b/docs/source/api/neuralhydrology.rst index eddeb281..111ddea4 100644 --- a/docs/source/api/neuralhydrology.rst +++ b/docs/source/api/neuralhydrology.rst @@ -10,7 +10,8 @@ neuralhydrology API .. toctree:: :maxdepth: 4 - neuralhydrology.data + neuralhydrology.datasetzoo + neuralhydrology.datautils neuralhydrology.evaluation neuralhydrology.modelzoo neuralhydrology.training diff --git a/docs/source/usage/models.rst b/docs/source/usage/models.rst index e1095ad4..6c137352 100644 --- a/docs/source/usage/models.rst +++ b/docs/source/usage/models.rst @@ -39,14 +39,14 @@ activations for all time steps. This class is implemented for exploratory reason ``model.copy_weights()`` to copy the weights of a ``CudaLSTM`` model into an ``LSTM`` model. This allows to use the fast CUDA implementation for training, and only use this class for inference with more detailed outputs. -MultiFreqLSTM -------------- -:py:class:`neuralhydrology.modelzoo.multifreqlstm.MultiFreqLSTM` is a newly proposed model by Gauch et al. (pre-print -published soon). This model allows the training on more than one temporal frequency (e.g. daily and hourly inputs) and -returns multi-frequency model predictions accordingly. A more detailed tutorial will follow shortly. +MTS-LSTM +-------- +:py:class:`neuralhydrology.modelzoo.mtslstm.MTSLSTM` is a newly proposed model by Gauch et al. (pre-print +published soon). This model allows the training on more than temporal resolution (e.g. daily and hourly inputs) and +returns multi-timescale model predictions accordingly. A more detailed tutorial will follow shortly. -ODELSTM -------- +ODE-LSTM +-------- :py:class:`neuralhydrology.modelzoo.odelstm.ODELSTM` is a PyTorch implementation of the ODE-LSTM proposed by `Lechner and Hasani `_. This model can be used with unevenly sampled inputs and can be queried to return predictions for any arbitrary time step. diff --git a/examples/01-Introduction/1_basin.yml b/examples/01-Introduction/1_basin.yml index 0b5e3e28..d7119c1b 100644 --- a/examples/01-Introduction/1_basin.yml +++ b/examples/01-Introduction/1_basin.yml @@ -35,7 +35,7 @@ metrics: # --- Model configuration -------------------------------------------------------------------------- -# base model type [lstm, ealstm, cudalstm, embcudalstm, multifreqlstm] +# base model type [lstm, ealstm, cudalstm, embcudalstm, mtslstm] # (has to match the if statement in modelzoo/__init__.py) model: cudalstm diff --git a/examples/config.yml.example b/examples/config.yml.example index b7b3e770..0d4b5533 100644 --- a/examples/config.yml.example +++ b/examples/config.yml.example @@ -52,7 +52,7 @@ metrics: # --- Model configuration -------------------------------------------------------------------------- -# base model type [lstm, ealstm, cudalstm, embcudalstm, multifreqlstm] +# base model type [lstm, ealstm, cudalstm, embcudalstm, mtslstm] # (has to match the if statement in modelzoo/__init__.py) model: cudalstm @@ -79,13 +79,13 @@ embedding_activation: tanh # dropout applied to embedding network embedding_dropout: 0.0 -# ----> MultiFreqLSTM settings <---- +# ----> MTSLSTM settings <---- # Use an individual LSTM per frequencies (True) vs. use a single shared LSTM for all frequencies (False) -per_frequency_lstm: True +shared_mtslstm: True # how to transfer states from lower to higher frequencies. One of [identity, linear, None]. -transfer_multifreq_states: +transfer_mtslstm_states: h: identity c: identity diff --git a/neuralhydrology/__about__.py b/neuralhydrology/__about__.py index 3f013dc7..bbd0a5bf 100644 --- a/neuralhydrology/__about__.py +++ b/neuralhydrology/__about__.py @@ -1 +1 @@ -__version__ = "0.9.1-beta" +__version__ = "0.9.2-beta" diff --git a/neuralhydrology/data/camelsus.py b/neuralhydrology/data/camelsus.py deleted file mode 100644 index 05a461a6..00000000 --- a/neuralhydrology/data/camelsus.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import Dict, List, Union - -import numpy as np -import pandas as pd -import xarray - -from neuralhydrology.data.basedataset import BaseDataset -from neuralhydrology.data import utils -from neuralhydrology.utils.config import Config - - -class CamelsUS(BaseDataset): - """Data set class for the CAMELS US data set by [#]_ and [#]_. - - Parameters - ---------- - cfg : Config - The run configuration. - is_train : bool - Defines if the dataset is used for training or evaluating. If True (training), means/stds for each feature - are computed and stored to the run directory. If one-hot encoding is used, the mapping for the one-hot encoding - is created and also stored to disk. If False, a `scaler` input is expected and similarly the `id_to_int` input - if one-hot encoding is used. - period : {'train', 'validation', 'test'} - Defines the period for which the data will be loaded - basin : str, optional - If passed, the data for only this basin will be loaded. Otherwise the basin(s) are read from the appropriate - basin file, corresponding to the `period`. - additional_features : List[Dict[str, pd.DataFrame]], optional - List of dictionaries, mapping from a basin id to a pandas DataFrame. This DataFrame will be added to the data - loaded from the dataset and all columns are available as 'dynamic_inputs', 'static_inputs' and - 'target_variables' - id_to_int : Dict[str, int], optional - If the config argument 'use_basin_id_encoding' is True in the config and period is either 'validation' or - 'test', this input is required. It is a dictionary, mapping from basin id to an integer (the one-hot encoding). - scaler : Dict[str, Union[pd.Series, xarray.DataArray]], optional - If period is either 'validation' or 'test', this input is required. It contains the means and standard - deviations for each feature and is stored to the run directory during training (train_data/train_data_scaler.p) - - References - ---------- - .. [#] A. J. Newman, M. P. Clark, K. Sampson, A. Wood, L. E. Hay, A. Bock, R. J. Viger, D. Blodgett, - L. Brekke, J. R. Arnold, T. Hopson, and Q. Duan: Development of a large-sample watershed-scale - hydrometeorological dataset for the contiguous USA: dataset characteristics and assessment of regional - variability in hydrologic model performance. Hydrol. Earth Syst. Sci., 19, 209-223, - doi:10.5194/hess-19-209-2015, 2015 - .. [#] Addor, N., Newman, A. J., Mizukami, N. and Clark, M. P.: The CAMELS data set: catchment attributes and - meteorology for large-sample studies, Hydrol. Earth Syst. Sci., 21, 5293-5313, doi:10.5194/hess-21-5293-2017, - 2017. - """ - - def __init__(self, - cfg: Config, - is_train: bool, - period: str, - basin: str = None, - additional_features: List[Dict[str, pd.DataFrame]] = [], - id_to_int: Dict[str, int] = {}, - scaler: Dict[str, Union[pd.Series, xarray.DataArray]] = {}): - super(CamelsUS, self).__init__(cfg=cfg, - is_train=is_train, - period=period, - basin=basin, - additional_features=additional_features, - id_to_int=id_to_int, - scaler=scaler) - - def _load_basin_data(self, basin: str) -> pd.DataFrame: - """Load input and output data from text files.""" - # get forcings - dfs = [] - for forcing in self.cfg.forcings: - df, area = utils.load_camels_us_forcings(self.cfg.data_dir, basin, forcing) - - # rename columns - if len(self.cfg.forcings) > 1: - df = df.rename(columns={col: f"{col}_{forcing}" for col in df.columns}) - dfs.append(df) - df = pd.concat(dfs, axis=1) - - # add discharge - df['QObs(mm/d)'] = utils.load_camels_us_discharge(self.cfg.data_dir, basin, area) - - # replace invalid discharge values by NaNs - qobs_cols = [col for col in df.columns if "qobs" in col.lower()] - for col in qobs_cols: - df.loc[df[col] < 0, col] = np.nan - - return df - - def _load_attributes(self) -> pd.DataFrame: - if self.cfg.camels_attributes: - if self.is_train: - # sanity check attributes for NaN in per-feature standard deviation - utils.attributes_sanity_check(data_dir=self.cfg.data_dir, - attribute_set=self.cfg.dataset, - basins=self.basins, - attribute_list=self.cfg.camels_attributes) - - df = utils.load_camels_us_attributes(self.cfg.data_dir, basins=self.basins) - - # remove all attributes not defined in the config - drop_cols = [c for c in df.columns if c not in self.cfg.camels_attributes] - df = df.drop(drop_cols, axis=1) - - return df diff --git a/neuralhydrology/data/dischargeinput.py b/neuralhydrology/data/dischargeinput.py deleted file mode 100644 index b198a5fa..00000000 --- a/neuralhydrology/data/dischargeinput.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging -import pickle -import sys -from pathlib import Path - -from tqdm import tqdm - -from neuralhydrology.data.utils import load_basin_file, load_camels_us_forcings, load_camels_us_discharge - -LOGGER = logging.getLogger(__name__) - - -def create_discharge_files(data_dir: Path, basin_file: Path, out_dir: Path, shift: int = 1): - - out_file = out_dir / f"discharge_input_shift{shift}.p" - if out_file.is_file(): - raise FileExistsError - - basins = load_basin_file(basin_file) - - data = {} - - for basin in tqdm(basins, file=sys.stdout): - - df, area = load_camels_us_forcings(data_dir=data_dir, basin=basin, forcings="daymet") - df["QObs(mm/d)"] = load_camels_us_discharge(data_dir=data_dir, basin=basin, area=area) - - df[f"QObs(t-{shift})"] = df["QObs(mm/d)"].shift(shift) - - drop_columns = [col for col in df.columns if col != f"QObs(t-{shift})"] - - data[basin] = df.drop(labels=drop_columns, axis=1) - - with out_file.open("wb") as fp: - pickle.dump(data, fp) - - LOGGER.info(f"Data successfully stored at {out_file}") diff --git a/neuralhydrology/data/utils.py b/neuralhydrology/data/utils.py deleted file mode 100644 index 9e7e508a..00000000 --- a/neuralhydrology/data/utils.py +++ /dev/null @@ -1,543 +0,0 @@ -from pathlib import Path -from typing import List, Tuple, Union - -import numpy as np -import pandas as pd -import xarray -from xarray.core.dataarray import DataArray -from xarray.core.dataset import Dataset - -######################################################################################################################## -# CAMELS US utility functions # -######################################################################################################################## - - -def load_camels_us_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: - """Load CAMELS US attributes from the dataset provided by [#]_ - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a 'camels_attributes_v2.0' folder (the original - data set) containing the corresponding txt files for each attribute group. - basins : List[str], optional - If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins - are returned. - - Returns - ------- - pandas.DataFrame - Basin-indexed DataFrame, containing the attributes as columns. - - References - ---------- - .. [#] Addor, N., Newman, A. J., Mizukami, N. and Clark, M. P.: The CAMELS data set: catchment attributes and - meteorology for large-sample studies, Hydrol. Earth Syst. Sci., 21, 5293-5313, doi:10.5194/hess-21-5293-2017, - 2017. - """ - attributes_path = Path(data_dir) / 'camels_attributes_v2.0' - - if not attributes_path.exists(): - raise RuntimeError(f"Attribute folder not found at {attributes_path}") - - txt_files = attributes_path.glob('camels_*.txt') - - # Read-in attributes into one big dataframe - dfs = [] - for txt_file in txt_files: - df_temp = pd.read_csv(txt_file, sep=';', header=0, dtype={'gauge_id': str}) - df_temp = df_temp.set_index('gauge_id') - - dfs.append(df_temp) - - df = pd.concat(dfs, axis=1) - # convert huc column to double digit strings - df['huc'] = df['huc_02'].apply(lambda x: str(x).zfill(2)) - df = df.drop('huc_02', axis=1) - - if basins: - # drop rows of basins not contained in the passed list - drop_basins = [b for b in df.index if b not in basins] - df = df.drop(drop_basins, axis=0) - - return df - - -def load_camels_us_forcings(data_dir: Path, basin: str, forcings: str) -> Tuple[pd.DataFrame, int]: - """Load the forcing data for a basin of the CAMELS US data set. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a 'basin_mean_forcing' folder containing one - subdirectory for each forcing. The forcing directories have to contain 18 subdirectories (for the 18 HUCS) as in - the original CAMELS data set. In each HUC folder are the forcing files (.txt), starting with the 8-digit basin - id. - basin : str - 8-digit USGS identifier of the basin. - forcings : str - Can be e.g. 'daymet' or 'nldas', etc. Must match the folder names in the 'basin_mean_forcing' directory. - - Returns - ------- - pd.DataFrame - Time-indexed DataFrame, containing the forcing data. - int - Catchment area (m2), specified in the header of the forcing file. - """ - forcing_path = data_dir / 'basin_mean_forcing' / forcings - if not forcing_path.is_dir(): - raise OSError(f"{forcing_path} does not exist") - - files = list(forcing_path.glob('**/*_forcing_leap.txt')) - file_path = [f for f in files if f.name[:8] == basin] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') - - df = pd.read_csv(file_path, sep='\s+', header=3) - df["date"] = pd.to_datetime(df.Year.map(str) + "/" + df.Mnth.map(str) + "/" + df.Day.map(str), format="%Y/%m/%d") - df = df.set_index("date") - - # load area from header - with open(file_path, 'r') as fp: - content = fp.readlines() - area = int(content[2]) - - return df, area - - -def load_camels_us_discharge(data_dir: Path, basin: str, area: int) -> pd.Series: - """Load the discharge data for a basin of the CAMELS US data set. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a 'usgs_streamflow' folder with 18 - subdirectories (for the 18 HUCS) as in the original CAMELS data set. In each HUC folder are the discharge files - (.txt), starting with the 8-digit basin id. - basin : str - 8-digit USGS identifier of the basin. - area : int - Catchment area (m2), used to normalize the discharge. - - Returns - ------- - pd.Series - Time-index pandas.Series of the discharge values (mm/day) - """ - - discharge_path = data_dir / 'usgs_streamflow' - files = list(discharge_path.glob('**/*_streamflow_qc.txt')) - file_path = [f for f in files if f.name[:8] == basin] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') - - col_names = ['basin', 'Year', 'Mnth', 'Day', 'QObs', 'flag'] - df = pd.read_csv(file_path, sep='\s+', header=None, names=col_names) - df["date"] = pd.to_datetime(df.Year.map(str) + "/" + df.Mnth.map(str) + "/" + df.Day.map(str), format="%Y/%m/%d") - df = df.set_index("date") - - # normalize discharge from cubic feed per second to mm per day - df.QObs = 28316846.592 * df.QObs * 86400 / (area * 10**6) - - return df.QObs - - -######################################################################################################################## -# HOURLY CAMELS US utility functions # -######################################################################################################################## - - -def load_hourly_us_forcings(data_dir: Path, basin: str, forcings: str) -> pd.DataFrame: - """Load the hourly forcing data for a basin of the CAMELS US data set. - - The hourly forcings are not included in the original data set by Newman et al. (2017). - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain an 'hourly' folder containing one subdirectory - for each forcing, which contains the forcing files (.csv) for each basin. Files have to contain the 8-digit - basin id. - basin : str - 8-digit USGS identifier of the basin. - forcings : str - Must match the folder names in the 'hourly' directory. E.g. 'nldas_hourly' - - Returns - ------- - pd.DataFrame - Time-indexed DataFrame, containing the forcing data. - """ - forcing_path = data_dir / 'hourly' / forcings - if not forcing_path.is_dir(): - raise OSError(f"{forcing_path} does not exist") - - files = list(forcing_path.glob('*.csv')) - file_path = [f for f in files if basin in f.stem] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {forcing_path}') - - return pd.read_csv(file_path, index_col=['date'], parse_dates=['date']) - - -def load_hourly_us_discharge(data_dir: Path, basin: str) -> pd.DataFrame: - """Load the hourly discharge data for a basin of the CAMELS US data set. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a folder called 'hourly' with a subdirectory - 'usgs_streamflow' which contains the discharge files (.csv) for each basin. File names must contain the 8-digit - basin id. - basin : str - 8-digit USGS identifier of the basin. - - Returns - ------- - pd.Series - Time-index Series of the discharge values (mm/hour) - """ - discharge_path = data_dir / 'hourly' / 'usgs_streamflow' - files = list(discharge_path.glob('**/*usgs-hourly.csv')) - file_path = [f for f in files if basin in f.stem] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {discharge_path}') - - return pd.read_csv(file_path, index_col=['date'], parse_dates=['date']) - - -def load_hourly_us_stage(data_dir: Path, basin: str) -> pd.Series: - """Load the hourly stage data for a basin of the CAMELS US data set. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a folder called 'hourly' with a subdirectory - 'usgs_stage' which contains the stage files (.csv) for each basin. File names must contain the 8-digit basin id. - basin : str - 8-digit USGS identifier of the basin. - - Returns - ------- - pd.Series - Time-index Series of the stage values (m) - """ - stage_path = data_dir / 'hourly' / 'usgs_stage' - files = list(stage_path.glob('**/*_utc.csv')) - file_path = [f for f in files if basin in f.stem] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {stage_path}') - - df = pd.read_csv(file_path, - sep=',', - index_col=['datetime'], - parse_dates=['datetime'], - usecols=['datetime', 'gauge_height_ft']) - df = df.resample('H').mean() - df["gauge_height_m"] = df["gauge_height_ft"] * 0.3048 - - return df["gauge_height_m"] - - -def load_hourly_us_netcdf(data_dir: Path, forcings: str) -> xarray.Dataset: - """Load hourly forcing and discharge data from preprocessed netCDF file. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS US directory. This folder must contain a folder called 'hourly', containing the netCDF file. - forcings : str - Name of the forcing product. Must match the ending of the netCDF file. E.g. 'nldas_hourly' for - 'usgs-streamflow-nldas_hourly.nc' - - Returns - ------- - xarray.Dataset - Dataset containing the combined discharge and forcing data of all basins (as stored in the netCDF) - """ - netcdf_path = data_dir / 'hourly' / f'usgs-streamflow-{forcings}.nc' - if not netcdf_path.is_file(): - raise FileNotFoundError(f'No NetCDF file for hourly streamflow and {forcings} at {netcdf_path}.') - - return xarray.open_dataset(netcdf_path) - - -######################################################################################################################## -# CAMELS GB utility functions # -######################################################################################################################## - - -def load_camels_gb_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: - """Load CAMELS GB attributes from the dataset provided by [#]_ - - Parameters - ---------- - data_dir : Path - Path to the CAMELS GB directory. This folder must contain an 'attributes' folder containing the corresponding - csv files for each attribute group (ending with _attributes.csv). - basins : List[str], optional - If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins - are returned. - - Returns - ------- - pd.DataFrame - Basin-indexed DataFrame, containing the attributes as columns. - - References - ---------- - .. [#] Coxon, G., Addor, N., Bloomfield, J. P., Freer, J., Fry, M., Hannaford, J., Howden, N. J. K., Lane, R., - Lewis, M., Robinson, E. L., Wagener, T., and Woods, R.: CAMELS-GB: Hydrometeorological time series and landscape - attributes for 671 catchments in Great Britain, Earth Syst. Sci. Data Discuss., - https://doi.org/10.5194/essd-2020-49, in review, 2020. - """ - attributes_path = Path(data_dir) / 'attributes' - - if not attributes_path.exists(): - raise RuntimeError(f"Attribute folder not found at {attributes_path}") - - txt_files = attributes_path.glob('*_attributes.csv') - - # Read-in attributes into one big dataframe - dfs = [] - for txt_file in txt_files: - df_temp = pd.read_csv(txt_file, sep=',', header=0, dtype={'gauge_id': str}) - df_temp = df_temp.set_index('gauge_id') - - dfs.append(df_temp) - - df = pd.concat(dfs, axis=1) - - if basins: - # drop rows of basins not contained in the passed list - drop_basins = [b for b in df.index if b not in basins] - df = df.drop(drop_basins, axis=0) - - return df - - -def load_camels_gb_timeseries(data_dir: Path, basin: str) -> pd.DataFrame: - """Load the time series data for one basin of the CAMELS GB data set. - - Parameters - ---------- - data_dir : Path - Path to the CAMELS GB directory. This folder must contain a folder called 'timeseries' containing the forcing - files for each basin as .csv file. The file names have to start with 'CAMELS_GB_hydromet_timeseries'. - basin : str - Basin identifier number as string. - - Returns - ------- - pd.DataFrame - Time-indexed DataFrame, containing the time series data (forcings + discharge) data. - """ - forcing_path = data_dir / 'timeseries' - if not forcing_path.is_dir(): - raise OSError(f"{forcing_path} does not exist") - - files = list(forcing_path.glob('**/CAMELS_GB_hydromet_timeseries*.csv')) - file_path = [f for f in files if f"_{basin}_" in f.name] - if file_path: - file_path = file_path[0] - else: - raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') - - df = pd.read_csv(file_path, sep=',', header=0, dtype={'date': str}) - df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") - df = df.set_index("date") - - return df - - -######################################################################################################################## -# General utility functions # -######################################################################################################################## - - -def load_hydroatlas_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: - """Load HydroATLAS attributes into a pandas DataFrame - - Parameters - ---------- - data_dir : Path - Path to the root directory of the dataset. Must contain a folder called 'hydroatlas_attributes' with a file - called `attributes.csv`. The attributes file is expected to have one column called `basin_id`. - basins : List[str], optional - If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins - are returned. - - Returns - ------- - pd.DataFrame - Basin-indexed DataFrame containing the HydroATLAS attributes. - """ - attribute_file = data_dir / "hydroatlas_attributes" / "attributes.csv" - if not attribute_file.is_file(): - raise FileNotFoundError(attribute_file) - - df = pd.read_csv(attribute_file, dtype={'basin_id': str}) - df = df.set_index('basin_id') - - if basins: - drop_basins = [b for b in df.index if b not in basins] - df = df.drop(drop_basins, axis=0) - - return df - - -def load_basin_file(basin_file: Path) -> List[str]: - """Load list of basins from text file. - - Parameters - ---------- - basin_file : Path - Path to a basin txt file. File has to contain one basin id per row. - - Returns - ------- - List[str] - List of basin ids as strings. - """ - with basin_file.open('r') as fp: - basins = fp.readlines() - basins = sorted(basin.strip() for basin in basins) - return basins - - -def attributes_sanity_check(data_dir: Path, attribute_set: str, basins: List[str], attribute_list: List[str]): - """Utility function to check if the standard deviation of one (or more) attributes is zero. - - This utility function can be used to check if any attribute has a standard deviation of zero. This would lead to - NaN's, when normalizing the features and thus would lead to NaN's when training the model. The function will raise - a `RuntimeError` if one or more zeros have been detected and will print the list of corresponding attribute names - to the console. - - Parameters - ---------- - data_dir : Path - Path to the root directory of the data set - attribute_set : {'hydroatlas', 'camels_us', 'hourly_camels_us', 'camels_gb'} - Name of the attribute set to check. - basins : - List of basins to consider in the check. - attribute_list : - List of attribute names to consider in the check. - - Raises - ------ - ValueError - For an unknown 'attribute_set' - RuntimeError - If one or more attributes have a standard deviation of zero. - """ - if attribute_set == "hydroatlas": - df = load_hydroatlas_attributes(data_dir, basins) - elif attribute_set in ["camels_us", "hourly_camels_us"]: - df = load_camels_us_attributes(data_dir, basins) - elif attribute_set == "camels_gb": - df = load_camels_gb_attributes(data_dir, basins) - else: - raise ValueError(f"Unknown 'attribute_set' {attribute_set}") - drop_cols = [c for c in df.columns if c not in attribute_list] - df = df.drop(drop_cols, axis=1) - attributes = [] - if any(df.std() == 0.0) or any(df.std().isnull()): - for k, v in df.std().iteritems(): - if (v == 0) or (np.isnan(v)): - attributes.append(k) - if attributes: - msg = [ - "The following attributes have a std of zero or NaN, which results in NaN's ", - "when normalizing the features. Remove the attributes from the attribute feature list ", - "and restart the run. \n", f"Attributes: {attributes}" - ] - raise RuntimeError("".join(msg)) - - -def sort_frequencies(frequencies: List[str]) -> List[str]: - """Sort the passed frequencies from low to high frequencies. - - Use `pandas frequency strings - `_ - to define frequencies. Note: The strings need to include values, e.g., '1D' instead of 'D'. - - Parameters - ---------- - frequencies : List[str] - List of pandas frequency identifiers to be sorted. - - Returns - ------- - List[str] - Sorted list of pandas frequency identifiers. - """ - deltas = {freq: pd.to_timedelta(freq) for freq in frequencies} - return sorted(deltas, key=deltas.get)[::-1] - - -def infer_frequency(index: Union[pd.DatetimeIndex, np.ndarray]) -> str: - """Infer the frequency of an index of a pandas DataFrame/Series or xarray DataArray. - - Parameters - ---------- - index : Union[pd.DatetimeIndex, np.ndarray] - DatetimeIndex of a DataFrame/Series or array of datetime values. - - Returns - ------- - str - Frequency of the index as a `pandas frequency string - `_ - - Raises - ------ - ValueError - If the frequency cannot be inferred from the index or is zero. - """ - native_frequency = pd.infer_freq(index) - if native_frequency is None: - raise ValueError(f'Cannot infer a legal frequency from dataset: {native_frequency}.') - if native_frequency[0] not in '0123456789': # add a value to the unit so to_timedelta works - native_frequency = f'1{native_frequency}' - if pd.to_timedelta(native_frequency) == pd.to_timedelta(0): - raise ValueError('Inferred dataset frequency is zero.') - return native_frequency - - -def infer_datetime_coord(xr: Union[DataArray, Dataset]) -> str: - """Checks for coordinate with 'date' in its name and returns the name. - - Parameters - ---------- - xr : Union[DataArray, Dataset] - Array to infer coordinate name of. - - Returns - ------- - str - Name of datetime coordinate name. - - Raises - ------ - RuntimeError - If none or multiple coordinates with 'date' in its name are found. - """ - candidates = [c for c in list(xr.coords) if "date" in c] - if len(candidates) > 1: - raise RuntimeError("Found multiple coordinates with 'date' in its name.") - if not candidates: - raise RuntimeError("Did not find any coordinate with 'date' in its name") - - return candidates[0] diff --git a/neuralhydrology/data/__init__.py b/neuralhydrology/datasetzoo/__init__.py similarity index 92% rename from neuralhydrology/data/__init__.py rename to neuralhydrology/datasetzoo/__init__.py index 15f0d181..3ef0c1ea 100644 --- a/neuralhydrology/data/__init__.py +++ b/neuralhydrology/datasetzoo/__init__.py @@ -1,7 +1,7 @@ -from neuralhydrology.data.basedataset import BaseDataset -from neuralhydrology.data.camelsus import CamelsUS -from neuralhydrology.data.camelsgb import CamelsGB -from neuralhydrology.data.hourlycamelsus import HourlyCamelsUS +from neuralhydrology.datasetzoo.basedataset import BaseDataset +from neuralhydrology.datasetzoo.camelsus import CamelsUS +from neuralhydrology.datasetzoo.camelsgb import CamelsGB +from neuralhydrology.datasetzoo.hourlycamelsus import HourlyCamelsUS from neuralhydrology.utils.config import Config @@ -45,7 +45,7 @@ def get_dataset(cfg: Config, ------- BaseDataset A new data set instance, depending on the run configuration. - + Raises ------ NotImplementedError diff --git a/neuralhydrology/data/basedataset.py b/neuralhydrology/datasetzoo/basedataset.py similarity index 98% rename from neuralhydrology/data/basedataset.py rename to neuralhydrology/datasetzoo/basedataset.py index 651c7ea9..4be9d3a3 100644 --- a/neuralhydrology/data/basedataset.py +++ b/neuralhydrology/datasetzoo/basedataset.py @@ -13,7 +13,7 @@ from torch.utils.data import Dataset from tqdm import tqdm -from neuralhydrology.data import utils +from neuralhydrology.datautils import utils from neuralhydrology.utils.config import Config LOGGER = logging.getLogger(__name__) @@ -383,18 +383,16 @@ def _create_lookup_table(self, xr: xarray.Dataset): self.num_samples = len(self.lookup_table) def _load_hydroatlas_attributes(self): - if self.is_train: - # sanity check attributes for NaN in per-feature standard deviation - utils.attributes_sanity_check(data_dir=self.cfg.data_dir, - attribute_set="hydroatlas", - basins=self.basins, - attribute_list=self.cfg.hydroatlas_attributes) - df = utils.load_hydroatlas_attributes(self.cfg.data_dir, basins=self.basins) # remove all attributes not defined in the config drop_cols = [c for c in df.columns if c not in self.cfg.hydroatlas_attributes] df = df.drop(drop_cols, axis=1) + + if self.is_train: + # sanity check attributes for NaN in per-feature standard deviation + utils.attributes_sanity_check(df=df) + return df def _load_combined_attributes(self): diff --git a/neuralhydrology/data/camelsgb.py b/neuralhydrology/datasetzoo/camelsgb.py similarity index 51% rename from neuralhydrology/data/camelsgb.py rename to neuralhydrology/datasetzoo/camelsgb.py index 613298d5..7200835e 100644 --- a/neuralhydrology/data/camelsgb.py +++ b/neuralhydrology/datasetzoo/camelsgb.py @@ -1,10 +1,11 @@ +from pathlib import Path from typing import Dict, List, Union import pandas as pd import xarray -from neuralhydrology.data.basedataset import BaseDataset -from neuralhydrology.data import utils +from neuralhydrology.datasetzoo.basedataset import BaseDataset +from neuralhydrology.datautils import utils from neuralhydrology.utils.config import Config @@ -62,23 +63,104 @@ def __init__(self, def _load_basin_data(self, basin: str) -> pd.DataFrame: """Load input and output data from text files.""" - df = utils.load_camels_gb_timeseries(data_dir=self.cfg.data_dir, basin=basin) + df = load_camels_gb_timeseries(data_dir=self.cfg.data_dir, basin=basin) return df def _load_attributes(self) -> pd.DataFrame: if self.cfg.camels_attributes: - if self.is_train: - # sanity check attributes for NaN in per-feature standard deviation - utils.attributes_sanity_check(data_dir=self.cfg.data_dir, - attribute_set=self.cfg.dataset, - basins=self.basins, - attribute_list=self.cfg.camels_attributes) - df = utils.load_camels_gb_attributes(self.cfg.data_dir, basins=self.basins) + df = load_camels_gb_attributes(self.cfg.data_dir, basins=self.basins) # remove all attributes not defined in the config drop_cols = [c for c in df.columns if c not in self.cfg.camels_attributes] df = df.drop(drop_cols, axis=1) + if self.is_train: + # sanity check attributes for NaN in per-feature standard deviation + utils.attributes_sanity_check(df=df) + return df + + +def load_camels_gb_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: + """Load CAMELS GB attributes from the dataset provided by [#]_ + + Parameters + ---------- + data_dir : Path + Path to the CAMELS GB directory. This folder must contain an 'attributes' folder containing the corresponding + csv files for each attribute group (ending with _attributes.csv). + basins : List[str], optional + If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins + are returned. + + Returns + ------- + pd.DataFrame + Basin-indexed DataFrame, containing the attributes as columns. + + References + ---------- + .. [#] Coxon, G., Addor, N., Bloomfield, J. P., Freer, J., Fry, M., Hannaford, J., Howden, N. J. K., Lane, R., + Lewis, M., Robinson, E. L., Wagener, T., and Woods, R.: CAMELS-GB: Hydrometeorological time series and landscape + attributes for 671 catchments in Great Britain, Earth Syst. Sci. Data Discuss., + https://doi.org/10.5194/essd-2020-49, in review, 2020. + """ + attributes_path = Path(data_dir) / 'attributes' + + if not attributes_path.exists(): + raise RuntimeError(f"Attribute folder not found at {attributes_path}") + + txt_files = attributes_path.glob('*_attributes.csv') + + # Read-in attributes into one big dataframe + dfs = [] + for txt_file in txt_files: + df_temp = pd.read_csv(txt_file, sep=',', header=0, dtype={'gauge_id': str}) + df_temp = df_temp.set_index('gauge_id') + + dfs.append(df_temp) + + df = pd.concat(dfs, axis=1) + + if basins: + # drop rows of basins not contained in the passed list + drop_basins = [b for b in df.index if b not in basins] + df = df.drop(drop_basins, axis=0) + + return df + + +def load_camels_gb_timeseries(data_dir: Path, basin: str) -> pd.DataFrame: + """Load the time series data for one basin of the CAMELS GB data set. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS GB directory. This folder must contain a folder called 'timeseries' containing the forcing + files for each basin as .csv file. The file names have to start with 'CAMELS_GB_hydromet_timeseries'. + basin : str + Basin identifier number as string. + + Returns + ------- + pd.DataFrame + Time-indexed DataFrame, containing the time series data (forcings + discharge) data. + """ + forcing_path = data_dir / 'timeseries' + if not forcing_path.is_dir(): + raise OSError(f"{forcing_path} does not exist") + + files = list(forcing_path.glob('**/CAMELS_GB_hydromet_timeseries*.csv')) + file_path = [f for f in files if f"_{basin}_" in f.name] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') + + df = pd.read_csv(file_path, sep=',', header=0, dtype={'date': str}) + df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") + df = df.set_index("date") + + return df diff --git a/neuralhydrology/datasetzoo/camelsus.py b/neuralhydrology/datasetzoo/camelsus.py new file mode 100644 index 00000000..44cfe770 --- /dev/null +++ b/neuralhydrology/datasetzoo/camelsus.py @@ -0,0 +1,240 @@ +from pathlib import Path +from typing import Dict, List, Tuple, Union + +import numpy as np +import pandas as pd +import xarray + +from neuralhydrology.datasetzoo.basedataset import BaseDataset +from neuralhydrology.datautils import utils +from neuralhydrology.utils.config import Config + + +class CamelsUS(BaseDataset): + """Data set class for the CAMELS US data set by [#]_ and [#]_. + + Parameters + ---------- + cfg : Config + The run configuration. + is_train : bool + Defines if the dataset is used for training or evaluating. If True (training), means/stds for each feature + are computed and stored to the run directory. If one-hot encoding is used, the mapping for the one-hot encoding + is created and also stored to disk. If False, a `scaler` input is expected and similarly the `id_to_int` input + if one-hot encoding is used. + period : {'train', 'validation', 'test'} + Defines the period for which the data will be loaded + basin : str, optional + If passed, the data for only this basin will be loaded. Otherwise the basin(s) are read from the appropriate + basin file, corresponding to the `period`. + additional_features : List[Dict[str, pd.DataFrame]], optional + List of dictionaries, mapping from a basin id to a pandas DataFrame. This DataFrame will be added to the data + loaded from the dataset and all columns are available as 'dynamic_inputs', 'static_inputs' and + 'target_variables' + id_to_int : Dict[str, int], optional + If the config argument 'use_basin_id_encoding' is True in the config and period is either 'validation' or + 'test', this input is required. It is a dictionary, mapping from basin id to an integer (the one-hot encoding). + scaler : Dict[str, Union[pd.Series, xarray.DataArray]], optional + If period is either 'validation' or 'test', this input is required. It contains the means and standard + deviations for each feature and is stored to the run directory during training (train_data/train_data_scaler.p) + + References + ---------- + .. [#] A. J. Newman, M. P. Clark, K. Sampson, A. Wood, L. E. Hay, A. Bock, R. J. Viger, D. Blodgett, + L. Brekke, J. R. Arnold, T. Hopson, and Q. Duan: Development of a large-sample watershed-scale + hydrometeorological dataset for the contiguous USA: dataset characteristics and assessment of regional + variability in hydrologic model performance. Hydrol. Earth Syst. Sci., 19, 209-223, + doi:10.5194/hess-19-209-2015, 2015 + .. [#] Addor, N., Newman, A. J., Mizukami, N. and Clark, M. P.: The CAMELS data set: catchment attributes and + meteorology for large-sample studies, Hydrol. Earth Syst. Sci., 21, 5293-5313, doi:10.5194/hess-21-5293-2017, + 2017. + """ + + def __init__(self, + cfg: Config, + is_train: bool, + period: str, + basin: str = None, + additional_features: List[Dict[str, pd.DataFrame]] = [], + id_to_int: Dict[str, int] = {}, + scaler: Dict[str, Union[pd.Series, xarray.DataArray]] = {}): + super(CamelsUS, self).__init__(cfg=cfg, + is_train=is_train, + period=period, + basin=basin, + additional_features=additional_features, + id_to_int=id_to_int, + scaler=scaler) + + def _load_basin_data(self, basin: str) -> pd.DataFrame: + """Load input and output data from text files.""" + # get forcings + dfs = [] + for forcing in self.cfg.forcings: + df, area = load_camels_us_forcings(self.cfg.data_dir, basin, forcing) + + # rename columns + if len(self.cfg.forcings) > 1: + df = df.rename(columns={col: f"{col}_{forcing}" for col in df.columns}) + dfs.append(df) + df = pd.concat(dfs, axis=1) + + # add discharge + df['QObs(mm/d)'] = load_camels_us_discharge(self.cfg.data_dir, basin, area) + + # replace invalid discharge values by NaNs + qobs_cols = [col for col in df.columns if "qobs" in col.lower()] + for col in qobs_cols: + df.loc[df[col] < 0, col] = np.nan + + return df + + def _load_attributes(self) -> pd.DataFrame: + if self.cfg.camels_attributes: + + df = load_camels_us_attributes(self.cfg.data_dir, basins=self.basins) + + # remove all attributes not defined in the config + drop_cols = [c for c in df.columns if c not in self.cfg.camels_attributes] + df = df.drop(drop_cols, axis=1) + + if self.is_train: + # sanity check attributes for NaN in per-feature standard deviation + utils.attributes_sanity_check(df=df) + + return df + + +def load_camels_us_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: + """Load CAMELS US attributes from the dataset provided by [#]_ + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a 'camels_attributes_v2.0' folder (the original + data set) containing the corresponding txt files for each attribute group. + basins : List[str], optional + If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins + are returned. + + Returns + ------- + pandas.DataFrame + Basin-indexed DataFrame, containing the attributes as columns. + + References + ---------- + .. [#] Addor, N., Newman, A. J., Mizukami, N. and Clark, M. P.: The CAMELS data set: catchment attributes and + meteorology for large-sample studies, Hydrol. Earth Syst. Sci., 21, 5293-5313, doi:10.5194/hess-21-5293-2017, + 2017. + """ + attributes_path = Path(data_dir) / 'camels_attributes_v2.0' + + if not attributes_path.exists(): + raise RuntimeError(f"Attribute folder not found at {attributes_path}") + + txt_files = attributes_path.glob('camels_*.txt') + + # Read-in attributes into one big dataframe + dfs = [] + for txt_file in txt_files: + df_temp = pd.read_csv(txt_file, sep=';', header=0, dtype={'gauge_id': str}) + df_temp = df_temp.set_index('gauge_id') + + dfs.append(df_temp) + + df = pd.concat(dfs, axis=1) + # convert huc column to double digit strings + df['huc'] = df['huc_02'].apply(lambda x: str(x).zfill(2)) + df = df.drop('huc_02', axis=1) + + if basins: + # drop rows of basins not contained in the passed list + drop_basins = [b for b in df.index if b not in basins] + df = df.drop(drop_basins, axis=0) + + return df + + +def load_camels_us_forcings(data_dir: Path, basin: str, forcings: str) -> Tuple[pd.DataFrame, int]: + """Load the forcing data for a basin of the CAMELS US data set. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a 'basin_mean_forcing' folder containing one + subdirectory for each forcing. The forcing directories have to contain 18 subdirectories (for the 18 HUCS) as in + the original CAMELS data set. In each HUC folder are the forcing files (.txt), starting with the 8-digit basin + id. + basin : str + 8-digit USGS identifier of the basin. + forcings : str + Can be e.g. 'daymet' or 'nldas', etc. Must match the folder names in the 'basin_mean_forcing' directory. + + Returns + ------- + pd.DataFrame + Time-indexed DataFrame, containing the forcing data. + int + Catchment area (m2), specified in the header of the forcing file. + """ + forcing_path = data_dir / 'basin_mean_forcing' / forcings + if not forcing_path.is_dir(): + raise OSError(f"{forcing_path} does not exist") + + files = list(forcing_path.glob('**/*_forcing_leap.txt')) + file_path = [f for f in files if f.name[:8] == basin] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') + + df = pd.read_csv(file_path, sep='\s+', header=3) + df["date"] = pd.to_datetime(df.Year.map(str) + "/" + df.Mnth.map(str) + "/" + df.Day.map(str), format="%Y/%m/%d") + df = df.set_index("date") + + # load area from header + with open(file_path, 'r') as fp: + content = fp.readlines() + area = int(content[2]) + + return df, area + + +def load_camels_us_discharge(data_dir: Path, basin: str, area: int) -> pd.Series: + """Load the discharge data for a basin of the CAMELS US data set. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a 'usgs_streamflow' folder with 18 + subdirectories (for the 18 HUCS) as in the original CAMELS data set. In each HUC folder are the discharge files + (.txt), starting with the 8-digit basin id. + basin : str + 8-digit USGS identifier of the basin. + area : int + Catchment area (m2), used to normalize the discharge. + + Returns + ------- + pd.Series + Time-index pandas.Series of the discharge values (mm/day) + """ + + discharge_path = data_dir / 'usgs_streamflow' + files = list(discharge_path.glob('**/*_streamflow_qc.txt')) + file_path = [f for f in files if f.name[:8] == basin] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {file_path}') + + col_names = ['basin', 'Year', 'Mnth', 'Day', 'QObs', 'flag'] + df = pd.read_csv(file_path, sep='\s+', header=None, names=col_names) + df["date"] = pd.to_datetime(df.Year.map(str) + "/" + df.Mnth.map(str) + "/" + df.Day.map(str), format="%Y/%m/%d") + df = df.set_index("date") + + # normalize discharge from cubic feed per second to mm per day + df.QObs = 28316846.592 * df.QObs * 86400 / (area * 10**6) + + return df.QObs diff --git a/neuralhydrology/data/hourlycamelsus.py b/neuralhydrology/datasetzoo/hourlycamelsus.py similarity index 54% rename from neuralhydrology/data/hourlycamelsus.py rename to neuralhydrology/datasetzoo/hourlycamelsus.py index c02dcfa8..0cd4d8d5 100644 --- a/neuralhydrology/data/hourlycamelsus.py +++ b/neuralhydrology/datasetzoo/hourlycamelsus.py @@ -1,10 +1,12 @@ import logging import pickle +from pathlib import Path import numpy as np import pandas as pd +import xarray -from neuralhydrology.data import utils, CamelsUS +from neuralhydrology.datasetzoo.camelsus import CamelsUS, load_camels_us_forcings, load_camels_us_attributes from neuralhydrology.utils.config import Config LOGGER = logging.getLogger(__name__) @@ -71,7 +73,7 @@ def _load_basin_data(self, basin: str) -> pd.DataFrame: df = self.load_hourly_data(basin, forcing) else: # load daily CAMELS forcings and upsample to hourly - df, _ = utils.load_camels_us_forcings(self.cfg.data_dir, basin, forcing) + df, _ = load_camels_us_forcings(self.cfg.data_dir, basin, forcing) df = df.resample('1H').ffill() if len(self.cfg.forcings) > 1: # rename columns @@ -86,12 +88,12 @@ def _load_basin_data(self, basin: str) -> pd.DataFrame: # add stage, if requested if 'gauge_height_m' in self.cfg.target_variables: - df = df.join(utils.load_hourly_us_stage(self.cfg.data_dir, basin)) + df = df.join(load_hourly_us_stage(self.cfg.data_dir, basin)) df.loc[df['gauge_height_m'] < 0, 'gauge_height_m'] = np.nan # convert discharge to 'synthetic' stage, if requested if 'synthetic_qobs_stage_meters' in self.cfg.target_variables: - attributes = utils.load_camels_us_attributes(data_dir=self.cfg.data_dir, basins=[basin]) + attributes = load_camels_us_attributes(data_dir=self.cfg.data_dir, basins=[basin]) with open(self.cfg.rating_curve_file, 'rb') as f: rating_curves = pickle.load(f) df['synthetic_qobs_stage_meters'] = np.nan @@ -119,7 +121,7 @@ def load_hourly_data(self, basin: str, forcings: str) -> pd.DataFrame: fallback_csv = False try: if self._netcdf_dataset is None: - self._netcdf_dataset = utils.load_hourly_us_netcdf(self.cfg.data_dir, forcings) + self._netcdf_dataset = load_hourly_us_netcdf(self.cfg.data_dir, forcings) df = self._netcdf_dataset.sel(basin=basin).to_dataframe() except FileNotFoundError: fallback_csv = True @@ -130,9 +132,130 @@ def load_hourly_data(self, basin: str, forcings: str) -> pd.DataFrame: fallback_csv = True LOGGER.warning(f'## Warning: NetCDF file does not contain data for {basin}. Trying slower csv files.') if fallback_csv: - df = utils.load_hourly_us_forcings(self.cfg.data_dir, basin, forcings) + df = load_hourly_us_forcings(self.cfg.data_dir, basin, forcings) # add discharge - df = df.join(utils.load_hourly_us_discharge(self.cfg.data_dir, basin)) + df = df.join(load_hourly_us_discharge(self.cfg.data_dir, basin)) return df + + +def load_hourly_us_forcings(data_dir: Path, basin: str, forcings: str) -> pd.DataFrame: + """Load the hourly forcing data for a basin of the CAMELS US data set. + + The hourly forcings are not included in the original data set by Newman et al. (2017). + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain an 'hourly' folder containing one subdirectory + for each forcing, which contains the forcing files (.csv) for each basin. Files have to contain the 8-digit + basin id. + basin : str + 8-digit USGS identifier of the basin. + forcings : str + Must match the folder names in the 'hourly' directory. E.g. 'nldas_hourly' + + Returns + ------- + pd.DataFrame + Time-indexed DataFrame, containing the forcing data. + """ + forcing_path = data_dir / 'hourly' / forcings + if not forcing_path.is_dir(): + raise OSError(f"{forcing_path} does not exist") + + files = list(forcing_path.glob('*.csv')) + file_path = [f for f in files if basin in f.stem] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {forcing_path}') + + return pd.read_csv(file_path, index_col=['date'], parse_dates=['date']) + + +def load_hourly_us_discharge(data_dir: Path, basin: str) -> pd.DataFrame: + """Load the hourly discharge data for a basin of the CAMELS US data set. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a folder called 'hourly' with a subdirectory + 'usgs_streamflow' which contains the discharge files (.csv) for each basin. File names must contain the 8-digit + basin id. + basin : str + 8-digit USGS identifier of the basin. + + Returns + ------- + pd.Series + Time-index Series of the discharge values (mm/hour) + """ + discharge_path = data_dir / 'hourly' / 'usgs_streamflow' + files = list(discharge_path.glob('**/*usgs-hourly.csv')) + file_path = [f for f in files if basin in f.stem] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {discharge_path}') + + return pd.read_csv(file_path, index_col=['date'], parse_dates=['date']) + + +def load_hourly_us_stage(data_dir: Path, basin: str) -> pd.Series: + """Load the hourly stage data for a basin of the CAMELS US data set. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a folder called 'hourly' with a subdirectory + 'usgs_stage' which contains the stage files (.csv) for each basin. File names must contain the 8-digit basin id. + basin : str + 8-digit USGS identifier of the basin. + + Returns + ------- + pd.Series + Time-index Series of the stage values (m) + """ + stage_path = data_dir / 'hourly' / 'usgs_stage' + files = list(stage_path.glob('**/*_utc.csv')) + file_path = [f for f in files if basin in f.stem] + if file_path: + file_path = file_path[0] + else: + raise FileNotFoundError(f'No file for Basin {basin} at {stage_path}') + + df = pd.read_csv(file_path, + sep=',', + index_col=['datetime'], + parse_dates=['datetime'], + usecols=['datetime', 'gauge_height_ft']) + df = df.resample('H').mean() + df["gauge_height_m"] = df["gauge_height_ft"] * 0.3048 + + return df["gauge_height_m"] + + +def load_hourly_us_netcdf(data_dir: Path, forcings: str) -> xarray.Dataset: + """Load hourly forcing and discharge data from preprocessed netCDF file. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. This folder must contain a folder called 'hourly', containing the netCDF file. + forcings : str + Name of the forcing product. Must match the ending of the netCDF file. E.g. 'nldas_hourly' for + 'usgs-streamflow-nldas_hourly.nc' + + Returns + ------- + xarray.Dataset + Dataset containing the combined discharge and forcing data of all basins (as stored in the netCDF) + """ + netcdf_path = data_dir / 'hourly' / f'usgs-streamflow-{forcings}.nc' + if not netcdf_path.is_file(): + raise FileNotFoundError(f'No NetCDF file for hourly streamflow and {forcings} at {netcdf_path}.') + + return xarray.open_dataset(netcdf_path) diff --git a/neuralhydrology/datautils/__init__.py b/neuralhydrology/datautils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neuralhydrology/data/climateindices.py b/neuralhydrology/datautils/climateindices.py similarity index 53% rename from neuralhydrology/data/climateindices.py rename to neuralhydrology/datautils/climateindices.py index 6c6b9c32..05a36b0d 100644 --- a/neuralhydrology/data/climateindices.py +++ b/neuralhydrology/datautils/climateindices.py @@ -2,36 +2,65 @@ import pickle import sys from pathlib import Path -from typing import List +from typing import List, Dict import numpy as np +import pandas as pd from numba import njit from tqdm import tqdm -from .pet import get_priestley_taylor_pet -from .utils import load_camels_us_attributes, load_camels_us_forcings, load_basin_file +from neuralhydrology.datautils import pet, utils LOGGER = logging.getLogger(__name__) -def precalculate_dyn_climate_indices(data_dir: Path, basin_file: Path, window_length: int, forcings: str): - basins = load_basin_file(basin_file=basin_file) - camels_attributes = load_camels_us_attributes(data_dir=data_dir, basins=basins) +def calculate_dyn_climate_indices(data_dir: Path, + basins: List[str], + window_length: int, + forcings: str, + output_file: Path = None) -> Dict[str, pd.DataFrame]: + """Calculate dynamic climate indices. + + Compared to the long-term static climate indices included in the CAMELS data set, this function computes the same + climate indices by a moving window approach over the entire data set. That is, for each time step, the climate + indices are re-computed from the last `window_length` time steps. The resulting dictionary of DataFrames can be + used with the `additional_feature_files` argument. + + Parameters + ---------- + data_dir : Path + Path to the CAMELS US directory. + basins : List[str] + List of basin ids. + window_length : int + Look-back period to use to compute the climate indices. + forcings : str + Can be e.g. 'daymet' or 'nldas', etc. Must match the folder names in the 'basin_mean_forcing' directory. + output_file : Path, optional + If specified, stores the resulting dictionary of DataFrames to this location as a pickle dump. + + Returns + ------- + Dict[str, pd.DataFrame] + Dictionary with one time-indexed DataFrame per basin. By definition, the climate indices for a given day in the + DataFrame are computed from the `window_length` previous time steps (including the given day). + """ + camels_attributes = utils.load_camels_us_attributes(data_dir=data_dir, basins=basins) additional_features = {} new_columns = [ 'p_mean_dyn', 'pet_mean_dyn', 'aridity_dyn', 't_mean_dyn', 'frac_snow_dyn', 'high_prec_freq_dyn', 'high_prec_dur_dyn', 'low_prec_freq_dyn', 'low_prec_dur_dyn' ] for basin in tqdm(basins, file=sys.stdout): - df, _ = load_camels_us_forcings(data_dir=data_dir, basin=basin, forcings=forcings) + df, _ = utils.load_camels_us_forcings(data_dir=data_dir, basin=basin, forcings=forcings) lat = camels_attributes.loc[camels_attributes.index == basin, 'gauge_lat'].values elev = camels_attributes.loc[camels_attributes.index == basin, 'elev_mean'].values - df["PET(mm/d)"] = get_priestley_taylor_pet(t_min=df["tmin(C)"].values, - t_max=df["tmax(C)"].values, - s_rad=df["srad(W/m2)"].values, - lat=lat, - elev=elev, - doy=df.index.dayofyear.values) + df["PET(mm/d)"] = pet.get_priestley_taylor_pet(t_min=df["tmin(C)"].values, + t_max=df["tmax(C)"].values, + s_rad=df["srad(W/m2)"].values, + lat=lat, + elev=elev, + doy=df.index.dayofyear.values) for col in new_columns: df[col] = np.nan @@ -41,7 +70,7 @@ def precalculate_dyn_climate_indices(data_dir: Path, basin_file: Path, window_le df['vp(Pa)'].values, df['PET(mm/d)'].values ]).T - new_features = numba_climate_indexes(x, window_length=window_length) + new_features = _numba_climate_indexes(x, window_length=window_length) if np.sum(np.isnan(new_features)) > 0: raise ValueError(f"NaN in new features of basin {basin}") @@ -62,20 +91,16 @@ def precalculate_dyn_climate_indices(data_dir: Path, basin_file: Path, window_le additional_features[basin] = df - filename = f"dyn_climate_indices_{forcings}_{len(basins)}basins_{window_length}lookback.p" - - output_file = Path(__file__).parent.parent.parent / 'data' / filename - - with output_file.open("wb") as fp: - pickle.dump(additional_features, fp) - - LOGGER.info(f"Precalculated features successfully stored at {output_file}") + if output_file is not None: + with output_file.open("wb") as fp: + pickle.dump(additional_features, fp) + LOGGER.info(f"Precalculated features successfully stored at {output_file}") return additional_features @njit -def numba_climate_indexes(features: np.ndarray, window_length: int) -> np.ndarray: +def _numba_climate_indexes(features: np.ndarray, window_length: int) -> np.ndarray: n_samples = features.shape[0] new_features = np.zeros((n_samples - 365 + 1, 9)) @@ -94,11 +119,11 @@ def numba_climate_indexes(features: np.ndarray, window_length: int) -> np.ndarra low_prec_freq = precip_days[precip_days[:, 0] < 1].shape[0] / precip_days.shape[0] idx = np.where(x[:, 0] < 1)[0] - groups = split_list(idx) + groups = _split_list(idx) low_prec_dur = np.mean(np.array([len(p) for p in groups])) idx = np.where(x[:, 0] >= 5 * p_mean)[0] - groups = split_list(idx) + groups = _split_list(idx) high_prec_dur = np.mean(np.array([len(p) for p in groups])) new_features[i, 0] = p_mean @@ -115,16 +140,15 @@ def numba_climate_indexes(features: np.ndarray, window_length: int) -> np.ndarra @njit -def split_list(alist: List) -> List: - newlist = [] +def _split_list(a_list: List) -> List: + new_list = [] start = 0 - end = 0 - for index, value in enumerate(alist): - if index < len(alist) - 1: - if alist[index + 1] > value + 1: + for index, value in enumerate(a_list): + if index < len(a_list) - 1: + if a_list[index + 1] > value + 1: end = index + 1 - newlist.append(alist[start:end]) + new_list.append(a_list[start:end]) start = end else: - newlist.append(alist[start:len(alist)]) - return newlist + new_list.append(a_list[start:len(a_list)]) + return new_list diff --git a/neuralhydrology/datautils/dischargeinput.py b/neuralhydrology/datautils/dischargeinput.py new file mode 100644 index 00000000..e467c995 --- /dev/null +++ b/neuralhydrology/datautils/dischargeinput.py @@ -0,0 +1,73 @@ +import logging +import pickle +import sys +from pathlib import Path +from typing import Dict, List + +import pandas as pd +from tqdm import tqdm + +from neuralhydrology.datautils import utils + +LOGGER = logging.getLogger(__name__) + + +def shift_discharge(data_dir: Path, basins: List[str], dataset: str, shift: int = 1, + output_file: Path = None) -> Dict[str, pd.DataFrame]: + """Return shifted discharge data. + + This function returns the shifted discharge data for each basin in `basins`. Useful when training + models with lagged discharge as input. Use the `output_file` argument to store the resulting dictionary as + pickle dump to disk. This pickle file can be used with the config argument `additional_feature_files` to make the + data available as input feature. + For CAMELS GB, the 'discharge_spec' column is used. + + Parameters + ---------- + data_dir : Path + Path to the dataset directory. + basins : List[str] + List of basin ids. + dataset : {'camels_us', 'camels_gb', 'hourly_camels_us'} + Which data set to use. + shift : int, optional + Number of discharge lag in time steps, by default 1. + output_file : Path, optional + If specified, stores the resulting dictionary of DataFrames to this location as a pickle dump. + + Returns + ------- + Dict[str, pd.DataFrame] + Dictionary with one time-indexed DataFrame per basin. The lagged discharge column is named according to the + discharge column name, with '_t-SHIFT' as suffix, where 'SHIFT' corresponds to the argument `shift`. + """ + data = {} + for basin in tqdm(basins, file=sys.stdout): + + # load discharge data + if dataset == "camels_us": + df, area = utils.load_camels_us_forcings(data_dir=data_dir, basin=basin, forcings="daymet") + df["QObs(mm/d)"] = utils.load_camels_us_discharge(data_dir=data_dir, basin=basin, area=area) + discharge_col = "QObs(mm/d)" + elif dataset == "camels_gb": + df = utils.load_camels_gb_timeseries(data_dir=data_dir, basin=basin) + discharge_col = "discharge_spec" + elif dataset == "hourly_camels_gb": + df = utils.load_hourly_us_discharge(data_dir=data_dir, basin=basin) + discharge_col = "QObs(mm/h)" + + # shift discharge data by `shift` time steps + df[f"{discharge_col}_t-{shift}"] = df[discharge_col].shift(shift) + + # remove all columns from data set except shifted discharge + drop_columns = [col for col in df.columns if col != f"{discharge_col}_t-{shift}"] + data[basin] = df.drop(labels=drop_columns, axis=1) + + if output_file is not None: + # store pickle dump + with output_file.open("wb") as fp: + pickle.dump(data, fp) + + LOGGER.info(f"Data successfully stored at {output_file}") + + return data diff --git a/neuralhydrology/data/pet.py b/neuralhydrology/datautils/pet.py similarity index 68% rename from neuralhydrology/data/pet.py rename to neuralhydrology/datautils/pet.py index 818b615f..3a721058 100644 --- a/neuralhydrology/data/pet.py +++ b/neuralhydrology/datautils/pet.py @@ -1,19 +1,14 @@ import numpy as np from numba import njit -LAMBDA = 2.45 # Kept constant, MJkg-1 -ALPHA = 1.26 # Calibrated in CAMELS, here static -STEFAN_BOLTZMAN = 4.903e-09 -PI = np.pi - @njit -def get_priestley_taylor_pet(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.ndarray, lat: float, - elev: float, doy: np.ndarray) -> np.ndarray: +def get_priestley_taylor_pet(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.ndarray, lat: float, elev: float, + doy: np.ndarray) -> np.ndarray: """Calculate potential evapotranspiration (PET) as an approximation following the Priestley-Taylor equation. - The ground head flux G is assumed to be 0 at daily time steps (see Newman et al. 2015). The - equation follow FAO-56 (Allen et al. (1998)) + The ground head flux G is assumed to be 0 at daily time steps (see Newman et al., 2015 [#]_). The + equations follow FAO-56 (Allen et al., 1998 [#]_). Parameters ---------- @@ -34,35 +29,47 @@ def get_priestley_taylor_pet(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.nda ------- np.ndarray Array containing PET estimates in mm/day + + References + ---------- + .. [#] A. J. Newman, M. P. Clark, K. Sampson, A. Wood, L. E. Hay, A. Bock, R. J. Viger, D. Blodgett, + L. Brekke, J. R. Arnold, T. Hopson, and Q. Duan: Development of a large-sample watershed-scale + hydrometeorological dataset for the contiguous USA: dataset characteristics and assessment of regional + variability in hydrologic model performance. Hydrol. Earth Syst. Sci., 19, 209-223, + doi:10.5194/hess-19-209-2015, 2015 + .. [#] Allen, R. G., Pereira, L. S., Raes, D., & Smith, M. (1998). Crop evapotranspiration-Guidelines for computing + crop water requirements-FAO Irrigation and drainage paper 56. Fao, Rome, 300(9), D05109. """ - lat = lat * (PI / 180) # degree to rad + lat = lat * (np.pi / 180) # degree to rad # Slope of saturation vapour pressure curve t_mean = 0.5 * (t_min + t_max) - slope_svp = get_slope_svp_curve(t_mean) + slope_svp = _get_slope_svp_curve(t_mean) # incoming netto short-wave radiation s_rad = s_rad * 0.0864 # conversion Wm-2 -> MJm-2day-1 - in_sw_rad = get_net_sw_srad(s_rad) + in_sw_rad = _get_net_sw_srad(s_rad) # outgoginng netto long-wave radiation - sol_dec = get_sol_decl(doy) - sha = get_sunset_hour_angle(lat, sol_dec) - ird = get_ird_earth_sun(doy) - et_rad = get_extraterra_rad(lat, sol_dec, sha, ird) - cs_rad = get_clear_sky_rad(elev, et_rad) - a_vp = get_avp_tmin(t_min) - out_lw_rad = get_net_outgoing_lw_rad(t_min, t_max, s_rad, cs_rad, a_vp) + sol_dec = _get_sol_decl(doy) + sha = _get_sunset_hour_angle(lat, sol_dec) + ird = _get_ird_earth_sun(doy) + et_rad = _get_extraterra_rad(lat, sol_dec, sha, ird) + cs_rad = _get_clear_sky_rad(elev, et_rad) + a_vp = _get_avp_tmin(t_min) + out_lw_rad = _get_net_outgoing_lw_rad(t_min, t_max, s_rad, cs_rad, a_vp) # net radiation - net_rad = get_net_rad(in_sw_rad, out_lw_rad) + net_rad = _get_net_rad(in_sw_rad, out_lw_rad) # gamma - atm_pressure = get_atmos_pressure(elev) - gamma = get_psy_const(atm_pressure) + atm_pressure = _get_atmos_pressure(elev) + gamma = _get_psy_const(atm_pressure) # PET MJm-2day-1 - pet = (ALPHA / LAMBDA) * (slope_svp * net_rad) / (slope_svp + gamma) + alpha = 1.26 # Calibrated in CAMELS, here static + _lambda = 2.45 # Kept constant, MJkg-1 + pet = (alpha / _lambda) * (slope_svp * net_rad) / (slope_svp + gamma) # convert energy to evap pet = pet * 0.408 @@ -71,7 +78,7 @@ def get_priestley_taylor_pet(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.nda @njit -def get_slope_svp_curve(t_mean: np.ndarray) -> np.ndarray: +def _get_slope_svp_curve(t_mean: np.ndarray) -> np.ndarray: """Slope of saturation vapour pressure curve Equation 13 FAO-56 Allen et al. (1998) @@ -91,7 +98,7 @@ def get_slope_svp_curve(t_mean: np.ndarray) -> np.ndarray: @njit -def get_net_sw_srad(s_rad: np.ndarray, albedo: float = 0.23) -> np.ndarray: +def _get_net_sw_srad(s_rad: np.ndarray, albedo: float = 0.23) -> np.ndarray: """Calculate net shortwave radiation Equation 38 FAO-56 Allen et al. (1998) @@ -113,7 +120,7 @@ def get_net_sw_srad(s_rad: np.ndarray, albedo: float = 0.23) -> np.ndarray: @njit -def get_sol_decl(doy: np.ndarray) -> np.ndarray: +def _get_sol_decl(doy: np.ndarray) -> np.ndarray: """Get solar declination Equation 24 FAO-56 Allen et al. (1998) @@ -134,7 +141,7 @@ def get_sol_decl(doy: np.ndarray) -> np.ndarray: @njit -def get_sunset_hour_angle(lat: float, sol_dec: np.ndarray) -> np.ndarray: +def _get_sunset_hour_angle(lat: float, sol_dec: np.ndarray) -> np.ndarray: """Sunset hour angle @@ -159,7 +166,7 @@ def get_sunset_hour_angle(lat: float, sol_dec: np.ndarray) -> np.ndarray: @njit -def get_ird_earth_sun(doy: np.ndarray) -> np.ndarray: +def _get_ird_earth_sun(doy: np.ndarray) -> np.ndarray: """Inverse relative distance between Earth and Sun Equation 23 FAO-56 Allen et al. (1998) @@ -174,13 +181,12 @@ def get_ird_earth_sun(doy: np.ndarray) -> np.ndarray: np.ndarray Inverse relative distance between Earth and Sun """ - ird = 1 + 0.033 * np.cos((2 * PI) / 365 * doy) + ird = 1 + 0.033 * np.cos((2 * np.pi) / 365 * doy) return ird @njit -def get_extraterra_rad(lat: float, sol_dec: np.ndarray, sha: np.ndarray, - ird: np.ndarray) -> np.ndarray: +def _get_extraterra_rad(lat: float, sol_dec: np.ndarray, sha: np.ndarray, ird: np.ndarray) -> np.ndarray: """Extraterrestrial Radiation Equation 21 FAO-56 Allen et al. (1998) @@ -201,14 +207,14 @@ def get_extraterra_rad(lat: float, sol_dec: np.ndarray, sha: np.ndarray, np.ndarray Extraterrestrial radiation MJm-2day-1 """ - term1 = (24 * 60) / PI * 0.082 * ird + term1 = (24 * 60) / np.pi * 0.082 * ird term2 = sha * np.sin(lat) * np.sin(sol_dec) + np.cos(lat) * np.cos(sol_dec) * np.sin(sha) et_rad = term1 * term2 return et_rad @njit -def get_clear_sky_rad(elev: float, et_rad: np.ndarray) -> np.ndarray: +def _get_clear_sky_rad(elev: float, et_rad: np.ndarray) -> np.ndarray: """Clear sky radiation Equation 37 FAO-56 Allen et al. (1998) @@ -230,7 +236,7 @@ def get_clear_sky_rad(elev: float, et_rad: np.ndarray) -> np.ndarray: @njit -def get_avp_tmin(t_min: np.ndarray) -> np.ndarray: +def _get_avp_tmin(t_min: np.ndarray) -> np.ndarray: """Actual vapor pressure estimated using min temperature Equation 48 FAO-56 Allen et al. (1998) @@ -250,8 +256,8 @@ def get_avp_tmin(t_min: np.ndarray) -> np.ndarray: @njit -def get_net_outgoing_lw_rad(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.ndarray, - cs_rad: np.ndarray, a_vp: np.ndarray) -> np.ndarray: +def _get_net_outgoing_lw_rad(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.ndarray, cs_rad: np.ndarray, + a_vp: np.ndarray) -> np.ndarray: """Net outgoing longwave radiation Expects temperatures in degree and does the conversion in kelvin in the function. @@ -279,12 +285,13 @@ def get_net_outgoing_lw_rad(t_min: np.ndarray, t_max: np.ndarray, s_rad: np.ndar term1 = ((t_max + 273.16)**4 + (t_min + 273.16)**4) / 2 # conversion in K in equation term2 = 0.34 - 0.14 * np.sqrt(a_vp) term3 = 1.35 * s_rad / cs_rad - 0.35 - net_lw = STEFAN_BOLTZMAN * term1 * term2 * term3 + stefan_boltzman = 4.903e-09 + net_lw = stefan_boltzman * term1 * term2 * term3 return net_lw @njit -def get_net_rad(sw_rad: np.ndarray, lw_rad: np.ndarray) -> np.ndarray: +def _get_net_rad(sw_rad: np.ndarray, lw_rad: np.ndarray) -> np.ndarray: """Net radiation Equation 40 FAO-56 Allen et al. (1998) @@ -305,7 +312,7 @@ def get_net_rad(sw_rad: np.ndarray, lw_rad: np.ndarray) -> np.ndarray: @njit -def get_atmos_pressure(elev: float) -> float: +def _get_atmos_pressure(elev: float) -> float: """Atmospheric pressure Equation 7 FAO-56 Allen et al. (1998) @@ -325,7 +332,7 @@ def get_atmos_pressure(elev: float) -> float: @njit -def get_psy_const(atm_pressure: float) -> float: +def _get_psy_const(atm_pressure: float) -> float: """Psychometric constant Parameters @@ -342,7 +349,8 @@ def get_psy_const(atm_pressure: float) -> float: @njit -def srad_from_t(et_rad, cs_rad, t_min, t_max, coastal=False): +def _srad_from_t(et_rad, cs_rad, t_min, t_max, coastal=False): + """Estimate solar radiation from temperature""" # equation 50 if coastal: adj = 0.19 diff --git a/neuralhydrology/datautils/utils.py b/neuralhydrology/datautils/utils.py new file mode 100644 index 00000000..e40134fe --- /dev/null +++ b/neuralhydrology/datautils/utils.py @@ -0,0 +1,167 @@ +from pathlib import Path +from typing import List, Union + +import numpy as np +import pandas as pd +from xarray.core.dataarray import DataArray +from xarray.core.dataset import Dataset + + +def load_hydroatlas_attributes(data_dir: Path, basins: List[str] = []) -> pd.DataFrame: + """Load HydroATLAS attributes into a pandas DataFrame + + Parameters + ---------- + data_dir : Path + Path to the root directory of the dataset. Must contain a folder called 'hydroatlas_attributes' with a file + called `attributes.csv`. The attributes file is expected to have one column called `basin_id`. + basins : List[str], optional + If passed, return only attributes for the basins specified in this list. Otherwise, the attributes of all basins + are returned. + + Returns + ------- + pd.DataFrame + Basin-indexed DataFrame containing the HydroATLAS attributes. + """ + attribute_file = data_dir / "hydroatlas_attributes" / "attributes.csv" + if not attribute_file.is_file(): + raise FileNotFoundError(attribute_file) + + df = pd.read_csv(attribute_file, dtype={'basin_id': str}) + df = df.set_index('basin_id') + + if basins: + drop_basins = [b for b in df.index if b not in basins] + df = df.drop(drop_basins, axis=0) + + return df + + +def load_basin_file(basin_file: Path) -> List[str]: + """Load list of basins from text file. + + Parameters + ---------- + basin_file : Path + Path to a basin txt file. File has to contain one basin id per row. + + Returns + ------- + List[str] + List of basin ids as strings. + """ + with basin_file.open('r') as fp: + basins = fp.readlines() + basins = sorted(basin.strip() for basin in basins) + return basins + + +def attributes_sanity_check(df: pd.DataFrame): + """Utility function to check if the standard deviation of one (or more) attributes is zero. + + This utility function can be used to check if any attribute has a standard deviation of zero. This would lead to + NaN's, when normalizing the features and thus would lead to NaN's when training the model. The function will raise + a `RuntimeError` if one or more zeros have been detected and will print the list of corresponding attribute names + to the console. + + Parameters + ---------- + df : pd.DataFrame + DataFrame of catchment attributes as columns. + + Raises + ------ + RuntimeError + If one or more attributes have a standard deviation of zero. + """ + # Iterate over attributes and check for NaNs + attributes = [] + if any(df.std() == 0.0) or any(df.std().isnull()): + for k, v in df.std().iteritems(): + if (v == 0) or (np.isnan(v)): + attributes.append(k) + if attributes: + msg = [ + "The following attributes have a std of zero or NaN, which results in NaN's ", + "when normalizing the features. Remove the attributes from the attribute feature list ", + "and restart the run. \n", f"Attributes: {attributes}" + ] + raise RuntimeError("".join(msg)) + + +def sort_frequencies(frequencies: List[str]) -> List[str]: + """Sort the passed frequencies from low to high frequencies. + + Use `pandas frequency strings + `_ + to define frequencies. Note: The strings need to include values, e.g., '1D' instead of 'D'. + + Parameters + ---------- + frequencies : List[str] + List of pandas frequency identifiers to be sorted. + + Returns + ------- + List[str] + Sorted list of pandas frequency identifiers. + """ + deltas = {freq: pd.to_timedelta(freq) for freq in frequencies} + return sorted(deltas, key=deltas.get)[::-1] + + +def infer_frequency(index: Union[pd.DatetimeIndex, np.ndarray]) -> str: + """Infer the frequency of an index of a pandas DataFrame/Series or xarray DataArray. + + Parameters + ---------- + index : Union[pd.DatetimeIndex, np.ndarray] + DatetimeIndex of a DataFrame/Series or array of datetime values. + + Returns + ------- + str + Frequency of the index as a `pandas frequency string + `_ + + Raises + ------ + ValueError + If the frequency cannot be inferred from the index or is zero. + """ + native_frequency = pd.infer_freq(index) + if native_frequency is None: + raise ValueError(f'Cannot infer a legal frequency from dataset: {native_frequency}.') + if native_frequency[0] not in '0123456789': # add a value to the unit so to_timedelta works + native_frequency = f'1{native_frequency}' + if pd.to_timedelta(native_frequency) == pd.to_timedelta(0): + raise ValueError('Inferred dataset frequency is zero.') + return native_frequency + + +def infer_datetime_coord(xr: Union[DataArray, Dataset]) -> str: + """Checks for coordinate with 'date' in its name and returns the name. + + Parameters + ---------- + xr : Union[DataArray, Dataset] + Array to infer coordinate name of. + + Returns + ------- + str + Name of datetime coordinate name. + + Raises + ------ + RuntimeError + If none or multiple coordinates with 'date' in its name are found. + """ + candidates = [c for c in list(xr.coords) if "date" in c] + if len(candidates) > 1: + raise RuntimeError("Found multiple coordinates with 'date' in its name.") + if not candidates: + raise RuntimeError("Did not find any coordinate with 'date' in its name") + + return candidates[0] diff --git a/neuralhydrology/evaluation/metrics.py b/neuralhydrology/evaluation/metrics.py index 32e62944..2a7c9e36 100644 --- a/neuralhydrology/evaluation/metrics.py +++ b/neuralhydrology/evaluation/metrics.py @@ -5,7 +5,7 @@ from scipy import stats, signal from xarray.core.dataarray import DataArray -from neuralhydrology.data import utils +from neuralhydrology.datautils import utils def get_available_metrics() -> List[str]: diff --git a/neuralhydrology/evaluation/signatures.py b/neuralhydrology/evaluation/signatures.py index dcc1e726..b282afee 100644 --- a/neuralhydrology/evaluation/signatures.py +++ b/neuralhydrology/evaluation/signatures.py @@ -8,7 +8,7 @@ from numba import njit from xarray.core.dataarray import DataArray -from neuralhydrology.data import utils +from neuralhydrology.datautils import utils def get_available_signatures() -> List[str]: diff --git a/neuralhydrology/evaluation/tester.py b/neuralhydrology/evaluation/tester.py index e001bc71..6e25c505 100644 --- a/neuralhydrology/evaluation/tester.py +++ b/neuralhydrology/evaluation/tester.py @@ -14,9 +14,9 @@ from torch.utils.data import DataLoader from tqdm import tqdm -from neuralhydrology.data import get_dataset -from neuralhydrology.data.basedataset import BaseDataset -from neuralhydrology.data.utils import load_basin_file, sort_frequencies +from neuralhydrology.datasetzoo import get_dataset +from neuralhydrology.datasetzoo.basedataset import BaseDataset +from neuralhydrology.datautils.utils import load_basin_file, sort_frequencies from neuralhydrology.evaluation import plots from neuralhydrology.evaluation.metrics import calculate_metrics from neuralhydrology.modelzoo import get_model @@ -258,12 +258,12 @@ def evaluate(self, # stack dates and time_steps so we don't just evaluate every 24H when use_frequencies=[1D, 1H] obs = xr.isel(time_step=slice(-frequency_factor, None)) \ .stack(datetime=['date', 'time_step'])[f"{target_variable}_obs"] - obs['datetime'] = [c[0] + c[1] for c in obs.coords['datetime'].values] + obs['datetime'] = obs.coords['date'] + obs.coords['time_step'] # check if not empty (in case no observations exist in this period if not all(obs.isnull()): sim = xr.isel(time_step=slice(-frequency_factor, None)) \ .stack(datetime=['date', 'time_step'])[f"{target_variable}_sim"] - sim['datetime'] = [c[0] + c[1] for c in sim.coords['datetime'].values] + sim['datetime'] = sim.coords['date'] + sim.coords['time_step'] # clip negative predictions to zero, if variable is listed in config 'clip_target_to_zero' if target_variable in self.cfg.clip_targets_to_zero: diff --git a/neuralhydrology/modelzoo/__init__.py b/neuralhydrology/modelzoo/__init__.py index 0a30816e..d2b89a5a 100644 --- a/neuralhydrology/modelzoo/__init__.py +++ b/neuralhydrology/modelzoo/__init__.py @@ -5,7 +5,7 @@ from neuralhydrology.modelzoo.embcudalstm import EmbCudaLSTM from neuralhydrology.modelzoo.lstm import LSTM from neuralhydrology.modelzoo.odelstm import ODELSTM -from neuralhydrology.modelzoo.multifreqlstm import MultiFreqLSTM +from neuralhydrology.modelzoo.mtslstm import MTSLSTM from neuralhydrology.utils.config import Config SINGLE_FREQ_MODELS = ["cudalstm", "ealstm", "lstm", "embcudalstm"] @@ -35,8 +35,8 @@ def get_model(cfg: Config) -> nn.Module: model = LSTM(cfg=cfg) elif cfg.model == "embcudalstm": model = EmbCudaLSTM(cfg=cfg) - elif cfg.model == "multifreqlstm": - model = MultiFreqLSTM(cfg=cfg) + elif cfg.model == "mtslstm": + model = MTSLSTM(cfg=cfg) elif cfg.model == "odelstm": model = ODELSTM(cfg=cfg) else: diff --git a/neuralhydrology/modelzoo/basemodel.py b/neuralhydrology/modelzoo/basemodel.py index 212112d0..984fe277 100644 --- a/neuralhydrology/modelzoo/basemodel.py +++ b/neuralhydrology/modelzoo/basemodel.py @@ -7,6 +7,18 @@ class BaseModel(nn.Module): + """Abstract base model class, don't use this class for model training. + + Use subclasses of this class for training/evaluating different models, e.g. use `CudaLSTM` for training a standard + LSTM model or `EA-LSTM` for training an Entity-Aware-LSTM. Refer to + `Documentation/Modelzoo `_ for a full list of + available models and how to integrate a new model. + + Parameters + ---------- + cfg : Config + The run configuration. + """ def __init__(self, cfg: Config): super(BaseModel, self).__init__() @@ -15,13 +27,44 @@ def __init__(self, cfg: Config): self.output_size = len(cfg.target_variables) def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: + """Perform a forward pass. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Dictionary, containing input features as key-value pairs. + + Returns + ------- + Dict[str, torch.Tensor] + Model output and potentially any intermediate states and activations as a dictionary. + """ raise NotImplementedError def sample(self, data: Dict[str, torch.Tensor], n_samples: int) -> torch.Tensor: + """Sample model predictions, e.g., for MC-Dropout. + + This function does `n_samples` forward passes for each sample in the batch. Only useful for models with dropout, + to perform MC-Dropout sampling. Make sure to set the model to train mode before calling this function + (`model.train()`), otherwise dropout won't be active. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Dictionary, containing input features as key-value pairs. + n_samples : int + Number of samples to generate for each input sample. + + Returns + ------- + torch.Tensor + Sampled model outputs for the `predict_last_n` (config argument) time steps of each sequence. The shape of + the output is ``[batch size, predict_last_n, n_samples]``. + """ predict_last_n = self.cfg.predict_last_n samples = torch.zeros(data['x_d'].shape[0], predict_last_n, n_samples) for i in range(n_samples): prediction = self.forward(data) samples[:, -predict_last_n:, i] = prediction['y_hat'][:, -predict_last_n:, 0] - return samples \ No newline at end of file + return samples diff --git a/neuralhydrology/modelzoo/cudalstm.py b/neuralhydrology/modelzoo/cudalstm.py index 47818f6e..6dfc83f1 100644 --- a/neuralhydrology/modelzoo/cudalstm.py +++ b/neuralhydrology/modelzoo/cudalstm.py @@ -12,6 +12,22 @@ class CudaLSTM(BaseModel): + """LSTM model class, which relies on PyTorch's CUDA LSTM class. + + This class implements the standard LSTM combined with a model head, as specified in the config. All features + (time series and static) are concatenated and passed to the LSTM directly. If you want to embedd the static features + prior to the concatenation, use the `EmbCudaLSTM` class. + To control the initial forget gate bias, use the config argument `initial_forget_bias`. Often it is useful to set + this value to a positive value at the start of the model training, to keep the forget gate closed and to facilitate + the gradient flow. + The `CudaLSTM` class does only support single timescale predictions. Use `MTSLSTM` to train a model and get + predictions on multiple temporal resolutions at the same time. + + Parameters + ---------- + cfg : Config + The run configuration. + """ def __init__(self, cfg: Config): super(CudaLSTM, self).__init__(cfg=cfg) @@ -32,13 +48,29 @@ def __init__(self, cfg: Config): self.head = get_head(cfg=cfg, n_in=cfg.hidden_size, n_out=self.output_size) - self.reset_parameters() + self._reset_parameters() - def reset_parameters(self): + def _reset_parameters(self): + """Special initialization of certain model weights.""" if self.cfg.initial_forget_bias is not None: self.lstm.bias_hh_l0.data[self.cfg.hidden_size:2 * self.cfg.hidden_size] = self.cfg.initial_forget_bias def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: + """Perform a forward pass on the CudaLSTM model. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Dictionary, containing input features as key-value pairs. + + Returns + ------- + Dict[str, torch.Tensor] + Model outputs and intermediate states as a dictionary. + - `y_hat`: model predictions of shape [batch size, sequence length, number of target variables]. + - `h_n`: hidden state at the last time step of the sequence of shape [1, batch size, hidden size]. + - `c_n`: cell state at the last time step of the sequence of shape [1, batch size, hidden size]. + """ # transpose to [seq_length, batch_size, n_features] x_d = data['x_d'].transpose(0, 1) @@ -58,7 +90,7 @@ def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: lstm_output, (h_n, c_n) = self.lstm(input=x_d) - # reshape to [batch_size, seq_length, n_hiddens] + # reshape to [1 , batch_size, n_hiddens] h_n = h_n.transpose(0, 1) c_n = c_n.transpose(0, 1) diff --git a/neuralhydrology/modelzoo/ealstm.py b/neuralhydrology/modelzoo/ealstm.py index 64c0e4fc..dba26e08 100644 --- a/neuralhydrology/modelzoo/ealstm.py +++ b/neuralhydrology/modelzoo/ealstm.py @@ -10,6 +10,28 @@ class EALSTM(BaseModel): + """Entity-Aware LSTM (EA-LSTM) model class. + + This model has been proposed by Kratzert et al. [#]_ as a variant of the standard LSTM. The main difference is that + the input gate of the EA-LSTM is modulated using only the static inputs, while the dynamic (time series) inputs are + used in all other parts of the model (i.e. forget gate, cell update gate and output gate). + To control the initial forget gate bias, use the config argument `initial_forget_bias`. Often it is useful to set + this value to a positive value at the start of the model training, to keep the forget gate closed and to facilitate + the gradient flow. + The `EALSTM` class does only support single timescale predictions. Use `MTSLSTM` to train an LSTM-based model and + get predictions on multiple temporal resolutions at the same time. + + Parameters + ---------- + cfg : Config + The run configuration. + + References + ---------- + .. [#] Kratzert, F., Klotz, D., Shalev, G., Klambauer, G., Hochreiter, S., and Nearing, G.: Towards learning + universal, regional, and local hydrological behaviors via machine learning applied to large-sample datasets, + Hydrol. Earth Syst. Sci., 23, 5089–5110, https://doi.org/10.5194/hess-23-5089-2019, 2019. + """ def __init__(self, cfg: Config): super(EALSTM, self).__init__(cfg=cfg) @@ -36,9 +58,10 @@ def __init__(self, cfg: Config): self.head = get_head(cfg=cfg, n_in=cfg.hidden_size, n_out=self.output_size) # initialize parameters - self.reset_parameters() + self._reset_parameters() - def reset_parameters(self): + def _reset_parameters(self): + """Special initialization of certain model weights.""" nn.init.orthogonal_(self.weight_ih.data) weight_hh_data = torch.eye(self.cfg.hidden_size) @@ -51,7 +74,23 @@ def reset_parameters(self): self.bias.data[:self.cfg.hidden_size] = self.cfg.initial_forget_bias def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: - + """Perform a forward pass on the EA-LSTM model. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Dictionary, containing input features as key-value pairs. + + Returns + ------- + Dict[str, torch.Tensor] + Model outputs and intermediate states as a dictionary. + - `y_hat`: model predictions of shape [batch size, sequence length, number of target variables]. + - `h_n`: hidden state at the last time step of the sequence of shape + [batch size, sequence length, number of target variables]. + - `c_n`: cell state at the last time step of the sequence of shape + [batch size, sequence length, number of target variables]. + """ if 'x_s' in data and 'x_one_hot' in data: x_s = torch.cat([data['x_s'], data['x_one_hot']], dim=-1) elif 'x_s' in data: diff --git a/neuralhydrology/modelzoo/embcudalstm.py b/neuralhydrology/modelzoo/embcudalstm.py index e098daf2..38ec694e 100644 --- a/neuralhydrology/modelzoo/embcudalstm.py +++ b/neuralhydrology/modelzoo/embcudalstm.py @@ -10,6 +10,23 @@ class EmbCudaLSTM(BaseModel): + """EmbCudaLSTM model class, which adds embedding networks for static inputs to the standard LSTM. + + This class extends the standard `CudaLSTM` class to preprocess the static inputs by an embedding network, prior + to concatenating those values to the dynamic (time series) inputs. Use the config argument `embedding_hiddens` to + specify the architecture of the fully-connected embedding network. No activation function is applied to the outputs + of the embedding network. + To control the initial forget gate bias, use the config argument `initial_forget_bias`. Often it is useful to set + this value to a positive value at the start of the model training, to keep the forget gate closed and to facilitate + the gradient flow. + The `EmbCudaLSTM` class does only support single timescale predictions. Use `MTSLSTM` to train a model and get + predictions on multiple temporal resolutions at the same time. + + Parameters + ---------- + cfg : Config + The run configuration. + """ def __init__(self, cfg: Config): super(EmbCudaLSTM, self).__init__(cfg=cfg) @@ -27,14 +44,29 @@ def __init__(self, cfg: Config): self.head = get_head(cfg=cfg, n_in=cfg.hidden_size, n_out=self.output_size) - self.reset_parameters() + self._reset_parameters() - def reset_parameters(self): + def _reset_parameters(self): + """Special initialization of certain model weights.""" if self.cfg.initial_forget_bias is not None: self.lstm.bias_hh_l0.data[self.cfg.hidden_size:2 * self.cfg.hidden_size] = self.cfg.initial_forget_bias def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: - + """Perform a forward pass on the EmbCudaLSTM model. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Dictionary, containing input features as key-value pairs. + + Returns + ------- + Dict[str, torch.Tensor] + Model outputs and intermediate states as a dictionary. + - `y_hat`: model predictions of shape [batch size, sequence length, number of target variables]. + - `h_n`: hidden state at the last time step of the sequence of shape [1, batch size, hidden size]. + - `c_n`: cell state at the last time step of the sequence of shape [1, batch size, hidden size]. + """ x_d = data['x_d'].transpose(0, 1) if 'x_s' in data and 'x_one_hot' in data: diff --git a/neuralhydrology/modelzoo/fc.py b/neuralhydrology/modelzoo/fc.py index d024b0e4..fb5bb175 100644 --- a/neuralhydrology/modelzoo/fc.py +++ b/neuralhydrology/modelzoo/fc.py @@ -6,6 +6,25 @@ class FC(nn.Module): + """Auxiliary class to build (multi-layer) fully-connected networks. + + This class is used in different places of the codebase to build fully-connected networks. E.g., the `EA-LSTM` and + `EmbCudaLSTM` use this class to create embedding networks for the static inputs. + Use the config argument `embedding_hiddens` to specify the hidden neurons of the embedding network. If only one + number is specified the embedding network consists of a single linear layer that maps the input into the specified + dimension. + Use the config argument `embedding_activation` to specify the activation function for intermediate layers. Currently + supported are 'tanh' and 'sigmoid'. + Use the config argument `embedding_dropout` to specify the dropout rate in intermediate layers. + + Parameters + ---------- + cfg : Config + The run configuration. + input_size : int, optional + Number of input features. If not specified, the number of input features is the sum of all static inputs (i.e., + catchment attributes, one-hot-encoding, etc.) + """ def __init__(self, cfg: Config, input_size: int = None): super(FC, self).__init__() @@ -44,7 +63,7 @@ def __init__(self, cfg: Config, input_size: int = None): layers.append(nn.Linear(input_size, output_size)) self.net = nn.Sequential(*layers) - self.reset_parameters() + self._reset_parameters() def _get_activation(self, name: str) -> nn.Module: if name.lower() == "tanh": @@ -55,7 +74,8 @@ def _get_activation(self, name: str) -> nn.Module: raise NotImplementedError(f"{name} currently not supported as activation in this class") return activation - def reset_parameters(self): + def _reset_parameters(self): + """Special initialization of certain model weights.""" for layer in self.net: if isinstance(layer, nn.modules.linear.Linear): n_in = layer.weight.shape[1] @@ -64,4 +84,17 @@ def reset_parameters(self): nn.init.constant_(layer.bias, val=0) def forward(self, x: torch.Tensor) -> torch.Tensor: + """Perform a forward pass on the FC model. + + Parameters + ---------- + x : torch.Tensor + Input data of shape [any, any, input size] + + Returns + ------- + torch.Tensor + Embedded inputs of shape [any, any, output_size], where 'output_size' is the last number specified in the + `embedding_hiddens` config argument. + """ return self.net(x) diff --git a/neuralhydrology/modelzoo/head.py b/neuralhydrology/modelzoo/head.py index cc112de4..7739a8c3 100644 --- a/neuralhydrology/modelzoo/head.py +++ b/neuralhydrology/modelzoo/head.py @@ -1,4 +1,5 @@ import logging +from typing import Dict import torch import torch.nn as nn @@ -34,13 +35,24 @@ def get_head(cfg: Config, n_in: int, n_out: int) -> nn.Module: class Regression(nn.Module): - """ - Regression head with different output activations. + """Single-layer regression head with different output activations. + + Parameters + ---------- + n_in : int + Number of input neurons. + n_out : int + Number of output neurons. + activation: str, optional + Output activation function. Can be specified in the config using the `output_activation` argument. Supported + are {'linear', 'relu', 'softplus'}. If not specified (or an unsupported activation function is specified), will + default to 'linear' activation. """ def __init__(self, n_in: int, n_out: int, activation: str = "linear"): super(Regression, self).__init__() + # TODO: Add multi-layer support layers = [nn.Linear(n_in, n_out)] if activation != "linear": if activation.lower() == "relu": @@ -51,5 +63,16 @@ def __init__(self, n_in: int, n_out: int, activation: str = "linear"): LOGGER.warning(f"## WARNING: Ignored output activation {activation} and used 'linear' instead.") self.net = nn.Sequential(*layers) - def forward(self, x: torch.Tensor): - return {'y_hat': self.net(x)} \ No newline at end of file + def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]: + """Perform a forward pass on the Regression head. + + Parameters + ---------- + x : torch.Tensor + + Returns + ------- + Dict[str, torch.Tensor] + Dictionary, containing the model predictions in the 'y_hat' key. + """ + return {'y_hat': self.net(x)} diff --git a/neuralhydrology/modelzoo/lstm.py b/neuralhydrology/modelzoo/lstm.py index edc91d7a..aa478506 100644 --- a/neuralhydrology/modelzoo/lstm.py +++ b/neuralhydrology/modelzoo/lstm.py @@ -17,6 +17,8 @@ class LSTM(BaseModel): The idea of this model is to be able to train an LSTM using the nn.LSTM layer, which uses the optimized CuDNN implementation, and later to copy the weights into this model for a more in-depth network analysis. + Can be used with trained models. For instance, from the `CudaLSTM` or `EmbCudaLSTM` classes, the CudNN LSTM layer can be + accessed as `model.lstm`, where `model` is a `CudaLSTM` or `EmbCudaLSTM` instance. Note: Currently only supports one-layer CuDNN LSTMs @@ -50,11 +52,9 @@ def __init__(self, cfg: Config): self.head = get_head(cfg=cfg, n_in=self._hidden_size, n_out=self.output_size) - def forward(self, - data: Dict[str, torch.Tensor], - h_0: torch.Tensor = None, + def forward(self, data: Dict[str, torch.Tensor], h_0: torch.Tensor = None, c_0: torch.Tensor = None) -> Dict[str, torch.Tensor]: - """Forward pass through the LSTM network + """Perform a forward pass on the LSTM model. Parameters ---------- @@ -141,9 +141,10 @@ def __init__(self, input_size: int, hidden_size: int, initial_forget_bias: float self.b_hh = nn.Parameter(torch.FloatTensor(4 * hidden_size)) self.b_ih = nn.Parameter(torch.FloatTensor(4 * hidden_size)) - self.reset_parameters() + self._reset_parameters() - def reset_parameters(self): + def _reset_parameters(self): + """Special initialization of certain model weights.""" stdv = math.sqrt(3 / self.hidden_size) for weight in self.parameters(): if len(weight.shape) > 1: diff --git a/neuralhydrology/modelzoo/multifreqlstm.py b/neuralhydrology/modelzoo/mtslstm.py similarity index 68% rename from neuralhydrology/modelzoo/multifreqlstm.py rename to neuralhydrology/modelzoo/mtslstm.py index fba41871..d2fd9587 100644 --- a/neuralhydrology/modelzoo/multifreqlstm.py +++ b/neuralhydrology/modelzoo/mtslstm.py @@ -5,7 +5,7 @@ import torch import torch.nn as nn -from neuralhydrology.data.utils import sort_frequencies +from neuralhydrology.datautils.utils import sort_frequencies from neuralhydrology.modelzoo.head import get_head from neuralhydrology.modelzoo.basemodel import BaseModel from neuralhydrology.utils.config import Config @@ -13,10 +13,34 @@ LOGGER = logging.getLogger(__name__) -class MultiFreqLSTM(BaseModel): +class MTSLSTM(BaseModel): + """Multi-Timescale LSTM (MTS-LSTM) from Gauch et al. (preprint to be released soon). + + An LSTM architecture that allows simultaneous prediction at multiple timescales within one model. + There are two flavors of this model: MTS-LTSM and sMTS-LSTM (shared MTS-LSTM). The MTS-LSTM processes inputs at + low temporal resolutions up to a point in time. Then, the LSTM splits into one branch for each target timescale. + Each branch processes the inputs at its respective timescale. Finally, one prediction head per timescale generates + the predictions for that timescale based on the LSTM output. + Optionally, one can specify: + - a different hidden size for each LSTM branch (use a dict in the ``hidden_size`` config argument) + - different dynamic input variables for each timescale (use a dict in the ``dynamic_inputs`` config argument) + - the strategy to transfer states from the initial shared low-resolution LSTM to the per-timescale + higher-resolution LSTMs. By default, this is a linear transfer layer, but you can specify 'identity' to use an + identity operation or 'None' to turn off any transfer (via the ``transfer_mtlstm_states`` config argument). + + + The sMTS-LSTM variant has the same overall architecture, but the weights of the per-timescale branches (including + the output heads) are shared. + Thus, unlike MTS-LSTM, the sMTS-LSTM cannot use per-timescale hidden sizes or dynamic input variables. + + Parameters + ---------- + cfg : Config + The run configuration. + """ def __init__(self, cfg: Config): - super(MultiFreqLSTM, self).__init__(cfg=cfg) + super(MTSLSTM, self).__init__(cfg=cfg) self.lstms = None self.transfer_fcs = None self.heads = None @@ -26,22 +50,22 @@ def __init__(self, cfg: Config): self._frequency_factors = [] self._seq_lengths = cfg.seq_length - self._per_frequency_lstm = self.cfg.per_frequency_lstm # default: a distinct LSTM per frequency - self._transfer_multifreq_states = self.cfg.transfer_multifreq_states # default: linear transfer layer + self._is_shared_mtslstm = self.cfg.shared_mtslstm # default: a distinct LSTM per timescale + self._transfer_mtslstm_states = self.cfg.transfer_mtslstm_states # default: linear transfer layer transfer_modes = [None, "None", "identity", "linear"] - if self._transfer_multifreq_states["h"] not in transfer_modes \ - or self._transfer_multifreq_states["c"] not in transfer_modes: - raise ValueError(f"MultiFreqLSTM supports state transfer modes {transfer_modes}") + if self._transfer_mtslstm_states["h"] not in transfer_modes \ + or self._transfer_mtslstm_states["c"] not in transfer_modes: + raise ValueError(f"MTS-LSTM supports state transfer modes {transfer_modes}") if len(cfg.use_frequencies) < 2: - raise ValueError("MultiFreqLSTM expects more than one input frequency") + raise ValueError("MTS-LSTM expects more than one input frequency") self._frequencies = sort_frequencies(cfg.use_frequencies) # start to count the number of inputs input_sizes = len(cfg.camels_attributes + cfg.hydroatlas_attributes + cfg.static_inputs) - # if not per_frequency_lstm, the LSTM gets an additional frequency flag as input. - if not self._per_frequency_lstm: + # if not is_shared_mtslstm, the LSTM gets an additional frequency flag as input. + if not self._is_shared_mtslstm: input_sizes += len(self._frequencies) if cfg.use_basin_id_encoding: @@ -52,8 +76,8 @@ def __init__(self, cfg: Config): if isinstance(cfg.dynamic_inputs, list): input_sizes = {freq: input_sizes + len(cfg.dynamic_inputs) for freq in self._frequencies} else: - if not self._per_frequency_lstm: - raise ValueError(f'Different inputs not allowed if per_frequency_lstm is False.') + if not self._is_shared_mtslstm: + raise ValueError(f'Different inputs not allowed if shared_mtslstm is False.') input_sizes = {freq: input_sizes + len(cfg.dynamic_inputs[freq]) for freq in self._frequencies} if not isinstance(cfg.hidden_size, dict): @@ -62,15 +86,15 @@ def __init__(self, cfg: Config): else: self._hidden_size = cfg.hidden_size - if (not self._per_frequency_lstm - or self._transfer_multifreq_states["h"] == "identity" - or self._transfer_multifreq_states["c"] == "identity") \ + if (not self._is_shared_mtslstm + or self._transfer_mtslstm_states["h"] == "identity" + or self._transfer_mtslstm_states["c"] == "identity") \ and any(size != self._hidden_size[self._frequencies[0]] for size in self._hidden_size.values()): - raise ValueError("All hidden sizes must be equal if per_frequency_lstm=False or state transfer=identity.") + raise ValueError("All hidden sizes must be equal if shared_mtslstm=False or state transfer=identity.") # create layer depending on selected frequencies self._init_modules(input_sizes) - self.reset_parameters() + self._reset_parameters() # frequency factors are needed to determine the time step of information transfer self._init_frequency_factors_and_slice_timesteps() @@ -83,7 +107,7 @@ def _init_modules(self, input_sizes: Dict[str, int]): for idx, freq in enumerate(self._frequencies): freq_input_size = input_sizes[freq] - if not self._per_frequency_lstm and idx > 0: + if not self._is_shared_mtslstm and idx > 0: self.lstms[freq] = self.lstms[self._frequencies[idx - 1]] # same LSTM for all frequencies. self.heads[freq] = self.heads[self._frequencies[idx - 1]] # same head for all frequencies. else: @@ -92,10 +116,10 @@ def _init_modules(self, input_sizes: Dict[str, int]): if idx < len(self._frequencies) - 1: for state in ["c", "h"]: - if self._transfer_multifreq_states[state] == "linear": + if self._transfer_mtslstm_states[state] == "linear": self.transfer_fcs[f"{state}_{freq}"] = nn.Linear(self._hidden_size[freq], self._hidden_size[self._frequencies[idx + 1]]) - elif self._transfer_multifreq_states[state] == "identity": + elif self._transfer_mtslstm_states[state] == "identity": self.transfer_fcs[f"{state}_{freq}"] = nn.Identity() else: pass @@ -112,7 +136,7 @@ def _init_frequency_factors_and_slice_timesteps(self): slice_timestep = int(self._seq_lengths[self._frequencies[idx + 1]] / self._frequency_factors[idx]) self._slice_timestep[freq] = slice_timestep - def reset_parameters(self): + def _reset_parameters(self): if self.cfg.initial_forget_bias is not None: for freq in self._frequencies: hidden_size = self._hidden_size[freq] @@ -138,7 +162,7 @@ def _prepare_inputs(self, data: Dict[str, torch.Tensor], freq: str) -> torch.Ten else: pass - if not self._per_frequency_lstm: + if not self._is_shared_mtslstm: # add frequency one-hot encoding idx = self._frequencies.index(freq) one_hot_freq = torch.zeros(x_d.shape[0], x_d.shape[1], len(self._frequencies)).to(x_d) @@ -148,6 +172,18 @@ def _prepare_inputs(self, data: Dict[str, torch.Tensor], freq: str) -> torch.Ten return x_d def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: + """Perform a forward pass on the MTS-LSTM model. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Input data for the forward pass. See the documentation overview of all models for details on the dict keys. + + Returns + ------- + Dict[str, torch.Tensor] + Model predictions for each target timescale. + """ x_d = {freq: self._prepare_inputs(data, freq) for freq in self._frequencies} # initial states for lowest frequencies are set to zeros @@ -165,9 +201,9 @@ def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: (h_0_transfer, c_0_transfer)) # project the states through a hidden layer to the dimensions of the next LSTM - if self._transfer_multifreq_states["h"] is not None: + if self._transfer_mtslstm_states["h"] is not None: h_0_transfer = self.transfer_fcs[f"h_{freq}"](h_n_slice1) - if self._transfer_multifreq_states["c"] is not None: + if self._transfer_mtslstm_states["c"] is not None: c_0_transfer = self.transfer_fcs[f"c_{freq}"](c_n_slice1) # get predictions of remaining part and concat results diff --git a/neuralhydrology/modelzoo/odelstm.py b/neuralhydrology/modelzoo/odelstm.py index afa31999..248dc6be 100644 --- a/neuralhydrology/modelzoo/odelstm.py +++ b/neuralhydrology/modelzoo/odelstm.py @@ -6,7 +6,7 @@ import torch import torch.nn as nn -from neuralhydrology.data.utils import sort_frequencies +from neuralhydrology.datautils.utils import sort_frequencies from neuralhydrology.modelzoo.basemodel import BaseModel from neuralhydrology.modelzoo.head import get_head from neuralhydrology.modelzoo.lstm import _LSTMCell @@ -42,6 +42,11 @@ class ODELSTM(BaseModel): 5. lowest-frequency steps to generate predict_last_n lowest-frequency predictions. 6. repeat steps four and five for the next-higher frequency (using the same random-frequency bounds but generating predictions for the next-higher frequency). + + Parameters + ---------- + cfg : Config + The run configuration. References ---------- @@ -57,7 +62,7 @@ def __init__(self, cfg: Config): raise ValueError('ODELSTM does not support per-frequency input variables or hidden sizes.') # Note: be aware that frequency_factors and slice_timesteps have a slightly different meaning here vs. in - # multifreqlstm. Here, the frequency_factor is relative to the _lowest_ (not the next-lower) frequency. + # MTSLSTM. Here, the frequency_factor is relative to the _lowest_ (not the next-lower) frequency. # slice_timesteps[freq] is the input step (counting backwards) in the next-*lower* frequency from where on input # data at frequency freq is available. self._frequency_factors = {} @@ -171,6 +176,18 @@ def _randomize_freq(self, x_d: torch.Tensor, low_frequency: str, high_frequency: return torch.cat(x_d_randomized, dim=0) def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: + """Perform a forward pass on the ODE-LSTM model. + + Parameters + ---------- + data : Dict[str, torch.Tensor] + Input data for the forward pass. See the documentation overview of all models for details on the dict keys. + + Returns + ------- + Dict[str, torch.Tensor] + Model predictions for each target timescale. + """ x_d = {freq: self._prepare_inputs(data, freq) for freq in self._frequencies} @@ -245,34 +262,67 @@ def _run_odelstm(self, input_slice: torch.Tensor, h_0: torch.Tensor, class _ODERNNCell(nn.Module): - """An ODE-RNN cell (Adapted from https://github.com/mlech26l/learning-long-term-irregular-ts). """ + """An ODE-RNN cell (Adapted from https://github.com/mlech26l/learning-long-term-irregular-ts) [#]_. + + Parameters + ---------- + input_size : int + Input dimension + hidden_size : int + Size of the cell's hidden state + num_unfolds : int + Number of steps into which each timestep will be broken down to solve the ODE. + method : {'euler', 'heun', 'rk4'} + Method to use for ODE solving (Euler's method, Heun's method, or Runge-Kutta 4) + + References + ---------- + .. [#] Lechner, M.; Hasani, R.: Learning Long-Term Dependencies in Irregularly-Sampled Time Series. arXiv, 2020, + https://arxiv.org/abs/2006.04418. + """ - def __init__(self, input_size: int, hidden_size: int, num_unfolds: int, method: str, tau=1): + def __init__(self, input_size: int, hidden_size: int, num_unfolds: int, method: str): super(_ODERNNCell, self).__init__() self.method = { - 'euler': self.euler, - 'heun': self.heun, - 'rk4': self.rk4, + 'euler': self._euler, + 'heun': self._heun, + 'rk4': self._rk4, }[method] self.input_size = input_size self.hidden_size = hidden_size self.num_unfolds = num_unfolds - self.tau = tau self.w_ih = nn.Parameter(torch.FloatTensor(hidden_size, input_size)) self.w_hh = nn.Parameter(torch.FloatTensor(hidden_size, hidden_size)) self.bias = nn.Parameter(torch.FloatTensor(hidden_size)) self.scale = nn.Parameter(torch.FloatTensor(hidden_size)) - self.reset_parameters() + self._reset_parameters() - def reset_parameters(self): + def _reset_parameters(self): + """Reset the paramters of the ODERNNCell. """ nn.init.orthogonal_(self.w_hh) nn.init.xavier_uniform_(self.w_ih) nn.init.zeros_(self.bias) nn.init.constant_(self.scale, 1.0) def forward(self, new_hidden_state: torch.Tensor, old_hidden_state: torch.Tensor, elapsed: float) -> torch.Tensor: + """Perform a forward pass on the ODERNNCell. + + Parameters + ---------- + new_hidden_state : torch.Tensor + The current hidden state to be updated by the ODERNNCell. + old_hidden_state : torch.Tensor + The previous hidden state. + elapsed : float + Time elapsed between new and old hidden state. + + Returns + ------- + torch.Tensor + Predicted new hidden state + """ delta_t = elapsed / self.num_unfolds hidden_state = old_hidden_state @@ -280,29 +330,26 @@ def forward(self, new_hidden_state: torch.Tensor, old_hidden_state: torch.Tensor hidden_state = self.method(new_hidden_state, hidden_state, delta_t) return hidden_state - def dfdt(self, inputs: torch.Tensor, hidden_state: torch.Tensor) -> torch.Tensor: + def _dfdt(self, inputs: torch.Tensor, hidden_state: torch.Tensor) -> torch.Tensor: h_in = torch.matmul(inputs, self.w_ih) h_rec = torch.matmul(hidden_state, self.w_hh) dh_in = self.scale * torch.tanh(h_in + h_rec + self.bias) - if self.tau > 0: - dh = dh_in - hidden_state * self.tau - else: - dh = dh_in + dh = dh_in - hidden_state return dh - def euler(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: - dy = self.dfdt(inputs, hidden_state) + def _euler(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: + dy = self._dfdt(inputs, hidden_state) return hidden_state + delta_t * dy - def heun(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: - k1 = self.dfdt(inputs, hidden_state) - k2 = self.dfdt(inputs, hidden_state + delta_t * k1) + def _heun(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: + k1 = self._dfdt(inputs, hidden_state) + k2 = self._dfdt(inputs, hidden_state + delta_t * k1) return hidden_state + delta_t * 0.5 * (k1 + k2) - def rk4(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: - k1 = self.dfdt(inputs, hidden_state) - k2 = self.dfdt(inputs, hidden_state + k1 * delta_t * 0.5) - k3 = self.dfdt(inputs, hidden_state + k2 * delta_t * 0.5) - k4 = self.dfdt(inputs, hidden_state + k3 * delta_t) + def _rk4(self, inputs: torch.Tensor, hidden_state: torch.Tensor, delta_t: float) -> torch.Tensor: + k1 = self._dfdt(inputs, hidden_state) + k2 = self._dfdt(inputs, hidden_state + k1 * delta_t * 0.5) + k3 = self._dfdt(inputs, hidden_state + k2 * delta_t * 0.5) + k4 = self._dfdt(inputs, hidden_state + k3 * delta_t) return hidden_state + delta_t * (k1 + 2 * k2 + 2 * k3 + k4) / 6.0 diff --git a/neuralhydrology/training/basetrainer.py b/neuralhydrology/training/basetrainer.py index 6cdb1ee6..053c64c7 100644 --- a/neuralhydrology/training/basetrainer.py +++ b/neuralhydrology/training/basetrainer.py @@ -11,9 +11,9 @@ from tqdm import tqdm import neuralhydrology.training.loss as loss -from neuralhydrology.data import get_dataset -from neuralhydrology.data.basedataset import BaseDataset -from neuralhydrology.data.utils import load_basin_file +from neuralhydrology.datasetzoo import get_dataset +from neuralhydrology.datasetzoo.basedataset import BaseDataset +from neuralhydrology.datautils.utils import load_basin_file from neuralhydrology.evaluation import get_tester from neuralhydrology.evaluation.tester import BaseTester from neuralhydrology.modelzoo import get_model diff --git a/neuralhydrology/training/regularization.py b/neuralhydrology/training/regularization.py index e39485a3..33b7f408 100644 --- a/neuralhydrology/training/regularization.py +++ b/neuralhydrology/training/regularization.py @@ -3,7 +3,7 @@ import pandas as pd import torch -from neuralhydrology.data.utils import sort_frequencies +from neuralhydrology.datautils.utils import sort_frequencies from neuralhydrology.utils.config import Config diff --git a/neuralhydrology/utils/config.py b/neuralhydrology/utils/config.py index 85abd525..98392af1 100644 --- a/neuralhydrology/utils/config.py +++ b/neuralhydrology/utils/config.py @@ -419,10 +419,6 @@ def per_basin_train_periods_file(self) -> Path: def per_basin_validation_periods_file(self) -> Path: return self._cfg.get("per_basin_validation_periods_file", None) - @property - def per_frequency_lstm(self) -> bool: - return self._cfg.get("per_frequency_lstm", True) - @property def predict_last_n(self) -> Union[int, Dict[str, int]]: return self._get_value_verbose("predict_last_n") @@ -470,6 +466,10 @@ def seed(self, seed: int): def seq_length(self) -> Union[int, Dict[str, int]]: return self._get_value_verbose("seq_length") + @property + def shared_mtslstm(self) -> bool: + return self._cfg.get("shared_mtslstm", True) + @property def static_inputs(self) -> List[str]: return self._as_default_list(self._cfg.get("static_inputs", [])) @@ -540,8 +540,8 @@ def train_start_date(self) -> pd.Timestamp: return self._get_value_verbose("train_start_date") @property - def transfer_multifreq_states(self) -> Dict[str, str]: - return self._cfg.get("transfer_multifreq_states", {'h': 'linear', 'c': 'linear'}) + def transfer_mtslstm_states(self) -> Dict[str, str]: + return self._cfg.get("transfer_mtslstm_states", {'h': 'linear', 'c': 'linear'}) @property def umal_extend_batch(self) -> bool: diff --git a/neuralhydrology/utils/nh_results_ensemble.py b/neuralhydrology/utils/nh_results_ensemble.py index 255a0089..7abd79fa 100644 --- a/neuralhydrology/utils/nh_results_ensemble.py +++ b/neuralhydrology/utils/nh_results_ensemble.py @@ -12,7 +12,7 @@ from tqdm import tqdm sys.path.append(str(Path(__file__).parent.parent.parent)) -from neuralhydrology.data.utils import sort_frequencies +from neuralhydrology.datautils.utils import sort_frequencies from neuralhydrology.evaluation.metrics import calculate_metrics from neuralhydrology.utils.config import Config diff --git a/test/conftest.py b/test/conftest.py index feb18906..f204198f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -11,7 +11,7 @@ def pytest_addoption(parser): parser.addoption('--smoke-test', action='store_true', default=False, - help='Skips some tests for faster execution. Out of the single-frequency ' + help='Skips some tests for faster execution. Out of the single-timescale ' 'models and forcings, only test cudalstm on forcings that include daymet.') @@ -44,13 +44,13 @@ def _get_config(name): @pytest.fixture(params=['lstm', 'ealstm', 'cudalstm', 'embcudalstm']) -def single_freq_model(request) -> str: - """Fixture that provides models that support predicting only a single frequency. +def single_timescale_model(request) -> str: + """Fixture that provides models that support predicting only a single timescale. Returns ------- str - Name of the single-frequency model. + Name of the single-timescale model. """ if request.config.getoption('--smoke-test') and request.param != 'cudalstm': pytest.skip('--smoke-test skips this test.') @@ -62,7 +62,7 @@ def single_freq_model(request) -> str: (['daymet', 'nldas'], ['prcp(mm/day)_daymet', 'tmax(C)_daymet', 'PRCP(mm/day)_nldas', 'Tmax(C)_nldas'])], ids=lambda param: str(param[0])) -def single_freq_forcings(request) -> Dict[str, Union[str, List[str]]]: +def single_timescale_forcings(request) -> Dict[str, Union[str, List[str]]]: """Fixture that provides daily forcings. Returns @@ -75,14 +75,14 @@ def single_freq_forcings(request) -> Dict[str, Union[str, List[str]]]: return {'forcings': request.param[0], 'variables': request.param[1]} -@pytest.fixture(params=['multifreqlstm', 'odelstm']) -def multi_freq_model(request) -> str: - """Fixture that provides multifrequency models. +@pytest.fixture(params=['mtslstm', 'odelstm']) +def multi_timescale_model(request) -> str: + """Fixture that provides multi-timescale models. Returns ------- str - Name of the multifrequency model. + Name of the multi-timescale model. """ return request.param diff --git a/test/test_config_runs.py b/test/test_config_runs.py index d2f9b600..12c4aeea 100644 --- a/test/test_config_runs.py +++ b/test/test_config_runs.py @@ -8,35 +8,35 @@ import pytest from pytest import approx -from neuralhydrology.data.utils import load_camels_us_forcings, load_camels_us_discharge, load_hourly_us_netcdf +from neuralhydrology.datasetzoo import camelsus, hourlycamelsus from neuralhydrology.evaluation.evaluate import start_evaluation from neuralhydrology.training.train import start_training from neuralhydrology.utils.config import Config from test import Fixture -def test_daily_regression(get_config: Fixture[Callable[[str], dict]], single_freq_model: Fixture[str], - daily_dataset: Fixture[str], single_freq_forcings: Fixture[str]): +def test_daily_regression(get_config: Fixture[Callable[[str], dict]], single_timescale_model: Fixture[str], + daily_dataset: Fixture[str], single_timescale_forcings: Fixture[str]): """Test regression training and evaluation for daily predictions. Parameters ---------- get_config : Fixture[Callable[[str], dict] Method that returns a run configuration to test. - single_freq_model : Fixture[str] + single_timescale_model : Fixture[str] Model to test. daily_dataset : Fixture[str] Daily dataset to use. - single_freq_forcings : Fixture[str] + single_timescale_forcings : Fixture[str] Daily forcings set to use. """ config = get_config('daily_regression') - config.log_only('model', single_freq_model) + config.log_only('model', single_timescale_model) config.log_only('dataset', daily_dataset['dataset']) config.log_only('data_dir', config.data_dir / daily_dataset['dataset']) config.log_only('target_variables', daily_dataset['target']) - config.log_only('forcings', single_freq_forcings['forcings']) - config.log_only('dynamic_inputs', single_freq_forcings['variables']) + config.log_only('forcings', single_timescale_forcings['forcings']) + config.log_only('dynamic_inputs', single_timescale_forcings['variables']) basin = '01022500' test_start_date, test_end_date = _get_test_start_end_dates(config) @@ -88,20 +88,18 @@ def test_daily_regression_additional_features(get_config: Fixture[Callable[[str] assert not pd.isna(results[f'{config.target_variables[0]}_sim']).any() -def test_multifreq_regression(get_config: Fixture[Callable[[str], dict]], multi_freq_model: Fixture[str]): - """Test regression training and evaluation for multifrequency predictions. +def test_multi_timescale_regression(get_config: Fixture[Callable[[str], dict]], multi_timescale_model: Fixture[str]): + """Test regression training and evaluation for multi-timescale predictions. Parameters ---------- get_config : Fixture[Callable[[str], dict] Method that returns a run configuration to test. - multi_freq_model : Fixture[str] + multi_timescale_model : Fixture[str] Model to test. - multi_freq_forcings : Fixture[str] - Forcings set to use. """ - config = get_config('multifreq_regression') - config.log_only('model', multi_freq_model) + config = get_config('multi_timescale_regression') + config.log_only('model', multi_timescale_model) basin = '01022500' test_start_date, test_end_date = _get_test_start_end_dates(config) @@ -110,7 +108,7 @@ def test_multifreq_regression(get_config: Fixture[Callable[[str], dict]], multi_ start_evaluation(cfg=config, run_dir=config.run_dir, epoch=1, period='test') results = _get_basin_results(config.run_dir, 1)[basin] - discharge = load_hourly_us_netcdf(config.data_dir, config.forcings[0]) \ + discharge = hourlycamelsus.load_hourly_us_netcdf(config.data_dir, config.forcings[0]) \ .sel(basin=basin, date=slice(test_start_date, test_end_date))['qobs_mm_per_hour'] hourly_results = results['1H']['xr'].to_dataframe().reset_index() @@ -149,7 +147,7 @@ def _get_basin_results(run_dir: Path, epoch: int) -> Dict: def _get_discharge(config: Config, basin: str) -> pd.Series: if config.dataset == 'camels_us': - _, area = load_camels_us_forcings(config.data_dir, basin, 'daymet') - return load_camels_us_discharge(config.data_dir, basin, area) + _, area = camelsus.load_camels_us_forcings(config.data_dir, basin, 'daymet') + return camelsus.load_camels_us_discharge(config.data_dir, basin, area) else: raise NotImplementedError diff --git a/test/test_configs/multifreq_regression.test.yml b/test/test_configs/multi_timescale_regression.test.yml similarity index 98% rename from test/test_configs/multifreq_regression.test.yml rename to test/test_configs/multi_timescale_regression.test.yml index 69aaf8d0..c6fcca57 100644 --- a/test/test_configs/multifreq_regression.test.yml +++ b/test/test_configs/multi_timescale_regression.test.yml @@ -28,7 +28,7 @@ metrics: - Beta-NSE # --- Model configuration -------------------------------------------------------------------------- -model: multifreqlstm +model: mtslstm head: regression output_activation: linear