diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py new file mode 100644 index 0000000..18175c8 --- /dev/null +++ b/ngen_forcing/defs.py @@ -0,0 +1,26 @@ +import rasterio.mask as riomask + + +def polymask(dataset, invert=False, all_touched=False): + def _polymask(poly): + return riomask.raster_geometry_mask( + dataset, [poly], invert=invert, all_touched=all_touched, crop=True + ) + + return _polymask + + +def xr_read_window(ds, window, mask=None): + data = ds.isel(window) + if mask is None: + return data + else: + return data.where(mask) + + +def xr_read_window_time(ds, window, mask=None, idx=None, time=None): + data = ds.isel(window) + if mask is None: + return idx, time, data + else: + return idx, time, data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py new file mode 100644 index 0000000..300b626 --- /dev/null +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -0,0 +1,258 @@ +from defs import xr_read_window, polymask, xr_read_window_time +from rasterio import _io, windows +import xarray as xr +import pandas as pd + + +class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): + pass + + +def get_forcing_dict_newway( + feature_index, + feature_list, + folder_prefix, + file_list, + var_list, +): + reng = "rasterio" + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds_dummy.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for i, feature in enumerate(feature_list): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = polymask(_u2d)(feature) + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for j, _xds in enumerate(ds_list): + time_value = _xds.time.values[0] + cropped = xr_read_window(_xds, winslices, mask=mask) + stats = cropped.mean() + for var in var_list: + df_dict[var].loc[i, time_value] = stats[var] + + [ds.close() for ds in ds_list] + return df_dict + + +def get_forcing_dict_newway_parallel( + feature_index, + feature_list, + folder_prefix, + file_list, + var_list, + para="thread", + para_n=2, +): + + import concurrent.futures + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result + + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + stats = [] + future_list = [] + + with pool(max_workers=para_n) as executor: + + for _i, _m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = _m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for ds in ds_list: + _t = ds.time.values[0] + future = executor.submit( + xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t + ) + # cropped = xr_read_window(f, winslices, mask=mask) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict + + +def get_forcing_dict_newway_inverted( + feature_index, + feature_list, + folder_prefix, + file_list, + var_list, +): + reng = "rasterio" + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds_dummy.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + stats = [] + mask_win_list = [] + + for i, feature in enumerate(feature_list): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = polymask(_u2d)(feature) + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + for i, f in enumerate(ds_list): + print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + time_value = f.time.values[0] + # TODO: when we read the window, could the time be added as a dimension? + for j, (_m, _w) in enumerate(mask_win_list): + cropped = xr_read_window(f, _w, mask=_m) + stats.append((j, time_value, cropped.mean())) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var] + + [ds.close() for ds in ds_list] + return df_dict + + +def get_forcing_dict_newway_inverted_parallel( + feature_index, + feature_list, + folder_prefix, + file_list, + var_list, + para="thread", + para_n=2, +): + import concurrent.futures + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + ds_list = [xr.open_dataset("data/" + f) for f in file_list] + + stats = [] + future_list = [] + mask_win_list = [] + + for i, feature in enumerate(feature_list): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = polymask(_u2d)(feature) + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result + + stats = [] + future_list = [] + + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + with pool(max_workers=para_n) as executor: + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, ds in enumerate(ds_list): + print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + _t = ds.time.values[0] + for _i, (_m, _w) in enumerate(mask_win_list): + future = executor.submit( + xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t + ) + # cropped = xr_read_window(ds, _w, mask=_m) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py new file mode 100644 index 0000000..ed98587 --- /dev/null +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -0,0 +1,211 @@ +# import rioxarray as rxr +import xarray as xr +import geopandas as gpd +from rasterstats import zonal_stats + +# import rasterio +import pandas as pd + +import time + +from process_nwm_forcing_to_ngen import ( + get_forcing_dict_newway, + get_forcing_dict_newway_parallel, + get_forcing_dict_newway_inverted, + get_forcing_dict_newway_inverted_parallel, +) + +from pathlib import Path +import warnings + +warnings.simplefilter("ignore") + +# Read forcing files +# Generate List of files + +# TODO: Add looping through lists of forcing files +# consider looking at the "listofnwmfilenames.py" in the data_access_examples repository. +# Integer values for runinput, varinput, etc. are listed at the top of the file +# and an example is given in the `main` function. + +# import listofnwmfilenames +# create_file_list( +# runinput, +# varinput, +# geoinput, +# meminput, +# start_date, +# end_date, +# fcst_cycle, +# ) + +""" +A set of test files can be generated downloading these files +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f001.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f002.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f003.conus.nc +wget -P 03w -c https://nextgen-hydrofabric.s3.amazonaws.com/v1.2/nextgen_03W.gpkg +""" + + +def get_forcing_dict( + feature_index, + feature_list, + folder_prefix, + filelist, + var_list, +): + reng = "rasterio" + sum_stat = "mean" + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + ds_list = [] + for _nc_file in filelist: + # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + for _i, _nc_file in enumerate(filelist): + _xds = ds_list[_i] + print(f"{_i}, {round(_i/len(filelist), 5)*100}".ljust(40), end="\r") + if 1 == 1: + for _v in var_list: + _src = _xds[_v] + _aff2 = _src.rio.transform() + _arr2 = _src.values[0] + + _df_zonal_stats = pd.DataFrame( + zonal_stats(feature_list, _arr2, affine=_aff2) + ) + # if adding statistics back to original GeoDataFrame + # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) + df_dict[_v][_xds.time.values[0]] = _df_zonal_stats[sum_stat] + + [_xds.close() for _xds in ds_list] + + return df_dict + + +# TODO: Convert the output to CSV with something like +# `gdf3.to_csv` + + +def main(): + folder_prefix = Path("data") + list_of_files = [ + f"nwm.t12z.medium_range.forcing.f{_r:03}.conus.nc" for _r in range(1, 241) + ] + + # Read basin boundary file + f_03 = "03w/nextgen_03W.gpkg" + gpkg_divides = gpd.read_file(f_03, layer="divides") + var_list = [ + "U2D", + "V2D", + "LWDOWN", + "RAINRATE", + "T2D", + "Q2D", + "PSFC", + "SWDOWN", + ] + + # file_list = list_of_files[0:30] + # gpkg_subset = gpkg_divides[0:2000] + file_list = list_of_files[0:3] + gpkg_subset = gpkg_divides[0:200] + feature_list = gpkg_subset.geometry.to_list() + + # This way is extremely slow for anything more than a + # few files, so we comment it out of the test + + start_time = time.time() + print(f"Working on the old (slow) way") + fd1 = get_forcing_dict( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way") + fd2 = get_forcing_dict_newway( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + ) + print(time.time() - start_time) + + start_time = time.time() + + print(f"Working on the new way with threading parallel.") + fd3t = get_forcing_dict_newway_parallel( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + para="thread", + para_n=16, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with process parallel.") + fd3p = get_forcing_dict_newway_parallel( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + para="process", + para_n=16, + ) + print(time.time() - start_time) + start_time = time.time() + print(f"Working on the new way with loops reversed.") + fd4 = get_forcing_dict_newway_inverted( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed with threading parallel.") + fd5t = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + para="thread", + para_n=16, + ) + print(time.time() - start_time) + start_time = time.time() + print(f"Working on the new way with loops reversed with process parallel.") + fd5p = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, + feature_list, + folder_prefix, + file_list, + var_list, + para="process", + para_n=16, + ) + print(time.time() - start_time) + + +if __name__ == "__main__": + main()