Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hydra integration #39

Merged
merged 38 commits into from
Apr 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
11ca362
added function to convert units
annajungbluth Mar 19, 2024
d762b2c
added function to convert units
annajungbluth Mar 19, 2024
e5ddbac
added goes16 downloader for pipeline
annajungbluth Mar 19, 2024
765517e
added downloader for terra
annajungbluth Mar 19, 2024
d7a3323
tested terra aqua downloader
annajungbluth Mar 19, 2024
637d3f8
tested goes downloader
annajungbluth Mar 19, 2024
23fcfb0
added and tested msg downloader
annajungbluth Mar 19, 2024
49a3900
First Merge to Hydra-Integration (#35)
jejjohnson Mar 20, 2024
e4925c7
added GOES filename parser
annajungbluth Mar 21, 2024
3849a45
added msg filename parser
annajungbluth Mar 21, 2024
968e9ef
added filename parser for msg
annajungbluth Mar 21, 2024
4402f04
started developing msg pipeline
annajungbluth Mar 21, 2024
0a72a6d
continued developing msg pipeline
annajungbluth Mar 21, 2024
cee8a13
continued working on msg pipeline
annajungbluth Mar 22, 2024
7fa2144
continued hydra integration and finished download components
annajungbluth Mar 22, 2024
6ecc687
added examples to main.py
annajungbluth Mar 22, 2024
75d0db6
added goes geoprocessor to repo, added docstrings
annajungbluth Mar 27, 2024
2828f05
deleted obsolete scripts
annajungbluth Mar 27, 2024
3089ef0
tested goes geoprocessor
annajungbluth Mar 27, 2024
0618df4
updated default args
annajungbluth Mar 27, 2024
a5c3928
notebooks/dev/goes/1.4-GOES-geoprocess-val.ipynb
annajungbluth Mar 27, 2024
cad391b
tested goes geoprocessor
annajungbluth Mar 27, 2024
ef9fa9a
renamed script for consistency
annajungbluth Mar 28, 2024
4ffdf1f
work in progress modis geoprocessing
annajungbluth Mar 28, 2024
2b33e92
merged with other branch
annajungbluth Mar 28, 2024
951d280
continued developing modis geoprocessor
annajungbluth Mar 28, 2024
ba94f25
tested modis geoprocessor
annajungbluth Mar 28, 2024
a04d71e
fixed errors, standardized code, and started with msg geoprocessor
annajungbluth Apr 1, 2024
def165a
fixed modis problem
annajungbluth Apr 1, 2024
075758d
fixed modis naming issue
annajungbluth Apr 2, 2024
34f04d9
fixed small errors and tested MSG geoprocessor
annajungbluth Apr 2, 2024
0729507
fixed default arguments
annajungbluth Apr 2, 2024
10a0314
finished hydra integration of geoprocessing
annajungbluth Apr 4, 2024
7d814d2
started developing prepatcher
annajungbluth Apr 4, 2024
5eb9175
work in progress patcher
annajungbluth Apr 4, 2024
7953222
fixed coordinate, dimension problem
annajungbluth Apr 5, 2024
31b0366
tested patcher and fixed small errors
annajungbluth Apr 5, 2024
4b95a40
integrated patcher into hydra pipeline
annajungbluth Apr 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed errors, standardized code, and started with msg geoprocessor
annajungbluth committed Apr 1, 2024
commit a04d71ebe9321bc566b37261f26c626b29bdea93
1,476 changes: 1,421 additions & 55 deletions notebooks/1.2-pipeline-msg.ipynb

Large diffs are not rendered by default.

84 changes: 42 additions & 42 deletions notebooks/dev/goes/1.4-GOES-geoprocess-val.ipynb

Large diffs are not rendered by default.

65 changes: 64 additions & 1 deletion notebooks/dev/modis/1.1-MODIS-geoprocess-val.ipynb
Original file line number Diff line number Diff line change
@@ -1141,12 +1141,75 @@
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"outputs": [
{
"data": {
"text/plain": [
"<matplotlib.collections.PathCollection at 0x180179c50>"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Select a specific band and plot it\n",
"ds.sel(band='1').plot.scatter(x='longitude', y='latitude')"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"str_time = \"2018274.1545\""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"20181001154500\n"
]
}
],
"source": [
"from datetime import datetime, timedelta\n",
"\n",
"# Split the string into date and time parts\n",
"date_str, time_str = str_time.split(\".\")\n",
"\n",
"# Convert the date part to a datetime object\n",
"date = datetime.strptime(date_str, \"%Y%j\")\n",
"\n",
"# Convert the time part to a timedelta object\n",
"time = timedelta(hours=int(time_str[:2]), minutes=int(time_str[2:]))\n",
"\n",
"# Add the date and time parts to get a datetime object\n",
"dt = date + time\n",
"\n",
"# Convert the datetime object to a string in the format \"YYYYMMDDHHMMSS\"\n",
"str_time_new = dt.strftime(\"%Y%m%d%H%M%S\")\n",
"\n",
"print(str_time_new)"
]
},
{
"cell_type": "code",
"execution_count": null,
1,347 changes: 1,332 additions & 15 deletions notebooks/dev/msg/0.1-MSG-download.ipynb

Large diffs are not rendered by default.

942 changes: 942 additions & 0 deletions notebooks/dev/msg/1.1-MSG-geoprocess-val.ipynb

Large diffs are not rendered by default.

43 changes: 42 additions & 1 deletion rs_tools/_src/geoprocessing/goes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import datetime, timedelta
from pathlib import Path

# All wavelengths in micrometers
GOES_WAVELENGTHS = {
"1": 0.47,
@@ -16,4 +19,42 @@
"14": 11.2,
"15": 12.3,
"16": 13.3,
}
}

def parse_goes16_dates_from_file(file: str):
"""
Parses the date and time information from a GOES-16 file name.
Args:
file (str): The file name to parse.
Returns:
str: The parsed date and time in the format 'YYYYJJJHHMM'.
"""
timestamp = Path(file).name.replace("-", "_").split("_")

return datetime.strptime(timestamp[-3][1:], "%Y%j%H%M%S%f").strftime("%Y%j%H%M%S")

def format_goes_dates(time: str) -> str:
"""
Function to format the date/time string.
Args:
time (str): The time string to be formatted.
Returns:
str: The formatted time string.
"""

# Split the string into date and time parts
date_str, time_str = time[:7], time[7:]
# Convert the date part to a datetime object
date = datetime.strptime(date_str, "%Y%j")
# Convert the time part to a timedelta object
time = timedelta(hours=int(time_str[:2]), minutes=int(time_str[2:4]), seconds=int(time_str[4:6]))
# Add the date and time parts to get a datetime object
dt = date + time
# Convert the datetime object to a string in the format "YYYYMMDDHHMMSS"
str_time = dt.strftime("%Y%m%d%H%M%S")

return str_time
27 changes: 5 additions & 22 deletions rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py
Original file line number Diff line number Diff line change
@@ -10,38 +10,20 @@
import typer
from loguru import logger
import xarray as xr
import datetime
from rs_tools._src.geoprocessing.interp import resample_rioxarray
from rs_tools._src.geoprocessing.goes import parse_goes16_dates_from_file, format_goes_dates
from rs_tools._src.geoprocessing.goes.validation import correct_goes16_bands, correct_goes16_satheight
from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs
from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon
from rs_tools._src.geoprocessing.utils import check_sat_FOV
import pandas as pd
from datetime import datetime
from functools import partial
import dask
import warnings

dask.config.set(**{'array.slicing.split_large_chunks': False})
warnings.filterwarnings('ignore', category=FutureWarning)

from datetime import datetime
from pathlib import Path

def parse_goes16_dates_from_file(file: str):
"""
Parses the date and time information from a GOES-16 file name.
Args:
file (str): The file name to parse.
Returns:
str: The parsed date and time in the format 'YYYYJJJHHMM'.
"""
timestamp = Path(file).name.replace("-", "_").split("_")
return datetime.strptime(timestamp[-2][1:], "%Y%j%H%M%S%f").strftime("%Y%j%H%M")


# TODO: Add unit conversion
@dataclass
class GOES16GeoProcessing:
"""
@@ -304,9 +286,10 @@ def preprocess_files(self):
# check if save path exists, and create if not
if not os.path.exists(self.save_path):
os.makedirs(self.save_path)

# remove file if it already exists
save_filename = Path(self.save_path).joinpath(f"{itime}_goes16.nc")
itime_name = format_goes_dates(itime)
save_filename = Path(self.save_path).joinpath(f"{itime_name}_goes16.nc")
if os.path.exists(save_filename):
logger.info(f"File already exists. Overwriting file: {save_filename}")
os.remove(save_filename)
43 changes: 43 additions & 0 deletions rs_tools/_src/geoprocessing/match.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Union, List, Dict, Tuple
import numpy as np
import pandas as pd
from loguru import logger

def match_timestamps(times_data: List[str], times_clouds: List[str], cutoff: int=15) -> pd.DataFrame:
"""
Matches timestamps of data and cloudmask files, if not measured at exactly the same time.
Args:
times_data (List[str]): Timestamps of data files.
times_clouds (List[str]): Timestamps of the cloud mask files.
cutoff (str, optional): Maximum time difference in minutes to consider a match. Defaults to 15.
Returns:
pd.DataFrame: A DataFrame with the matched timestamps.
"""
# Convert timestamps to datetime objects
timestamps_data = pd.to_datetime(times_data)
timestamps_clouds = pd.to_datetime(times_clouds)

matches_data = []
matches_clouds = []

# Loop through timestamps of data files
for time in timestamps_data:
# Find closest timestamp in cloud mask files
closest_time = timestamps_clouds[
np.abs((timestamps_clouds - time).total_seconds()).argmin()
]
# Check if the closest timestamp is within the cutoff time
if np.abs((closest_time - time).total_seconds()) <= pd.Timedelta(f'{cutoff}min').total_seconds():
matches_data.append(time.strftime("%Y%m%d%H%M%S"))
matches_clouds.append(closest_time.strftime("%Y%m%d%H%M%S"))
else:
logger.info(f"No matching cloud mask found for {time}")

matched_times = pd.DataFrame({
"timestamps_data": matches_data,
"timestamps_cloudmask": matches_clouds
})

return matched_times
45 changes: 44 additions & 1 deletion rs_tools/_src/geoprocessing/modis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

from pathlib import Path
from datetime import datetime, timedelta

MODIS_1KM_VARIABLES = {
"EV_1KM_Emissive": [20,21,22,23,24,25,27,28,29,30,31,32,33,34,35,36],
@@ -49,3 +50,45 @@
"35": 13.935,
"36": 14.235,
}

def parse_modis_dates_from_file(file: str):
"""
Parses the date and time information from a MODIS file name.
Args:
file (str): The file name to parse.
Returns:
str: The parsed date and time in the format 'YYYYJJJHHMM'.
"""
# get the date from the file
date = Path(file).name.split(".")[1][1:]
# get the time from the file
time = Path(file).name.split(".")[2]
datetime_str = f"{date}.{time}"

return datetime_str

def format_modis_dates(time: str) -> str:
"""
Function to format the date/time string.
Args:
time (str): The time string to be formatted.
Returns:
str: The formatted time string.
"""
print(time)
# Split the string into date and time parts
date_str, time_str = time.split(".")
# Convert the date part to a datetime object
date = datetime.strptime(date_str, "%Y%j")
# Convert the time part to a timedelta object
time = timedelta(hours=int(time_str[:2]), minutes=int(time_str[2:]))
# Add the date and time parts to get a datetime object
dt = date + time
# Convert the datetime object to a string in the format "YYYYMMDDHHMMSS"
str_time = dt.strftime("%Y%m%d%H%M%S")

return str_time
49 changes: 20 additions & 29 deletions rs_tools/_src/geoprocessing/modis/geoprocessor_modis.py
Original file line number Diff line number Diff line change
@@ -18,34 +18,16 @@
import datetime
from rs_tools._src.data.modis import MODISFileName, MODIS_ID_TO_NAME, MODIS_NAME_TO_ID, get_modis_paired_files
from rs_tools._src.geoprocessing.modis.reproject import add_modis_crs
from rs_tools._src.geoprocessing.modis import MODIS_WAVELENGTHS
from rs_tools._src.geoprocessing.modis import MODIS_WAVELENGTHS, parse_modis_dates_from_file, format_modis_dates
import pandas as pd
from datetime import datetime
from pathlib import Path
import dask
import warnings

dask.config.set(**{'array.slicing.split_large_chunks': False})
warnings.filterwarnings('ignore', category=FutureWarning)

from pathlib import Path

def parse_modis_dates_from_file(file: str):
"""
Parses the date and time information from a MODIS file name.
Args:
file (str): The file name to parse.
Returns:
str: The parsed date and time in the format 'YYYYJJJHHMM'.
"""
# get the date from the file
date = Path(file).name.split(".")[1][1:]
# get the time from the file
time = Path(file).name.split(".")[2]
datetime_str = f"{date}.{time}"
return datetime_str

@dataclass
class MODISGeoProcessing:
satellite: str
@@ -125,13 +107,12 @@ def preprocess_fn_radiances(self, file: List[str]) -> xr.Dataset:
# do core preprocess function (e.g. resample, add crs etc.)
ds = self.preprocess_fn(ds)

channels = get_modis_channel_numbers()
# Store the attributes in a dict before concatenation
attrs_dict = {x: ds[x].attrs for x in channels}

# concatinate in new band dimension
# NOTE: Concatination overwrites attrs of bands.
ds = xr.concat(list(map(lambda x: ds[x], channels)), dim="band")
ds = ds.assign(Rad=xr.concat(list(map(lambda x: ds[x], channels)), dim="band"))
# rename band dimensions
ds = ds.assign_coords(band=list(map(lambda x: x, channels)))

@@ -144,11 +125,11 @@ def preprocess_fn_radiances(self, file: List[str]) -> xr.Dataset:
# NOTE: Keep only certain relevant attributes
ds.attrs = {}
ds.attrs = dict(
calibration=attrs_dict['1']["calibration"],
standard_name=attrs_dict['1']["standard_name"],
platform_name=attrs_dict['1']["platform_name"],
sensor=attrs_dict['1']["sensor"],
units=attrs_dict['1']["units"],
calibration=attrs_dict[list(attrs_dict.keys())[0]]["calibration"],
standard_name=attrs_dict[list(attrs_dict.keys())[0]]["standard_name"],
platform_name=attrs_dict[list(attrs_dict.keys())[0]]["platform_name"],
sensor=attrs_dict[list(attrs_dict.keys())[0]]["sensor"],
units=attrs_dict[list(attrs_dict.keys())[0]]["units"],
)

# TODO: Correct wavelength assignment. This attaches 36++ wavelengths to each band.
@@ -226,6 +207,7 @@ def preprocess_cloud_mask(self, files: List[str]) -> xr.Dataset:
logger.info(f"Number of cloud mask files: {len(file)}")
assert len(file) == 1

# load file using satpy, convert to xarray dataset, and preprocess
ds = self.preprocess_fn_cloudmask(file)

return ds
@@ -251,7 +233,7 @@ def preprocess_files(self):
# load radiances
ds = self.preprocess_radiances(files)
except AssertionError:
logger.error(f"Skipping {itime} due to missing bands")
logger.error(f"Skipping {itime} due to error loading")
continue
try:
# load cloud mask
@@ -264,10 +246,17 @@ def preprocess_files(self):
ds = ds.assign_coords({"cloud_mask": (("y", "x"), ds_clouds.values)})
# add cloud mask attrs to dataset
ds["cloud_mask"].attrs = ds_clouds.attrs

# remove attrs that cause netcdf error
for attr in ["start_time", "end_time", "area", "_satpy_id"]:
ds["cloud_mask"].attrs.pop(attr)

for var in ds.data_vars:
ds[var].attrs.pop('start_time', None)
ds[var].attrs.pop('end_time', None)
ds[var].attrs.pop('area', None)
ds[var].attrs.pop('_satpy_id', None)

# remove crs from dataset
ds = ds.drop_vars("crs")

@@ -276,10 +265,12 @@ def preprocess_files(self):
os.makedirs(self.save_path)

# remove file if it already exists
save_filename = Path(self.save_path).joinpath(f"{itime}_{self.satellite}.nc")
itime_name = format_modis_dates(itime)
save_filename = Path(self.save_path).joinpath(f"{itime_name}_{self.satellite}.nc")
if os.path.exists(save_filename):
logger.info(f"File already exists. Overwriting file: {save_filename}")
os.remove(save_filename)

# save to netcdf
ds.to_netcdf(save_filename, engine="netcdf4")

373 changes: 373 additions & 0 deletions rs_tools/_src/geoprocessing/msg/geoprocessor_msg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
import autoroot
import os
import numpy as np
import rioxarray
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, List, Union, Tuple
from tqdm import tqdm
from rs_tools._src.utils.io import get_list_filenames
import typer
import pygrib
from loguru import logger
import xarray as xr
import datetime
from satpy import Scene
from rs_tools._src.geoprocessing.interp import resample_rioxarray
from rs_tools._src.geoprocessing.msg.reproject import add_msg_crs
from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon
from rs_tools._src.geoprocessing.utils import check_sat_FOV
from rs_tools._src.geoprocessing.match import match_timestamps
from rs_tools._src.geoprocessing.msg import MSG_WAVELENGTHS
import pandas as pd
from datetime import datetime
from functools import partial
import dask
import warnings

dask.config.set(**{'array.slicing.split_large_chunks': False})
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)

from datetime import datetime
from pathlib import Path

# TODO: Add unit conversion

def parse_msg_dates_from_file(file: str):
"""
Parses the date and time information from a MSG file name.
Args:
file (str): The file name to parse.
Returns:
str: The parsed date and time in the format 'YYYYJJJHHMM'.
"""
timestamp = Path(file).name.split("-")[-2]
timestamp = timestamp.split(".")[0]
return timestamp


@dataclass
class MSGGeoProcessing:
"""
A class for geoprocessing MSG data.
Attributes:
resolution (float): The resolution in meters.
read_path (str): The path to read the files from.
save_path (str): The path to save the processed files to.
region (Tuple[str]): The region of interest defined by the bounding box coordinates (lon_min, lat_min, lon_max, lat_max).
resample_method (str): The resampling method to use.
Methods:
msg_files(self) -> List[str]: Returns a list of all MSG files in the read path.
preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: Preprocesses the input dataset by applying corrections, subsetting, and resampling.
preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset: Preprocesses the input dataset for radiances.
preprocess_fn_cloudmask(self, ds: xr.Dataset) -> xr.Dataset: Preprocesses the input dataset for cloud mask.
preprocess_files(self): Preprocesses the files in the read path and saves the processed files to the save path.
preprocess_radiances(self, files: List[str]) -> xr.Dataset: Preprocesses the radiances from the input files.
preprocess_cloud_mask(self, files: List[str]) -> xr.Dataset: Preprocesses the cloud mask from the input files.
"""
resolution: float
read_path: str
save_path: str
region: Optional[Tuple[int, int, int, int]]
resample_method: str

@property
def msg_files(self) -> List[str]:
"""
Returns a list of all MSG files in the read path.
Returns:
List[str]: A list of file paths.
"""
# get a list of all MSG radiance files from specified path
files_radiances = get_list_filenames(self.read_path, ".nat")
# get a list of all MSG cloud mask files from specified path
files_cloudmask = get_list_filenames(self.read_path, ".grb")
return files_radiances, files_cloudmask

def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:
"""
Preprocesses the input dataset by applying corrections, subsetting, and resampling etc.
Args:
ds (xr.Dataset): The input dataset.
Returns:
Tuple[xr.Dataset, xr.Dataset]: The preprocessed dataset and the original dataset.
"""
# copy to avoid modifying original dataset
ds = ds.copy()

# assign coordinate reference system
ds = add_msg_crs(ds)

if self.region != (None, None, None, None):
logger.info(f"Subsetting data to region: {self.region}")
# subset data
lon_bnds = (self.region[0], self.region[2])
lat_bnds = (self.region[1], self.region[3])
# convert lat lon bounds to x y (in meters)
x_bnds, y_bnds = convert_lat_lon_to_x_y(ds.FOV.crs, lon=lon_bnds, lat=lat_bnds, )
# check that region is within the satellite field of view
# compile satellite FOV
satellite_FOV = (min(ds.x.values), min(ds.y.values), max(ds.x.values), max(ds.y.values))
# compile region bounds in x y
region_xy = (x_bnds[0], y_bnds[0], x_bnds[1], y_bnds[1])
if not check_sat_FOV(region_xy, FOV=satellite_FOV):
raise ValueError("Region is not within the satellite field of view")

ds = ds.sortby("x").sortby("y")
# slice based on x y bounds
ds = ds.sel(y=slice(y_bnds[0], y_bnds[1]), x=slice(x_bnds[0], x_bnds[1]))

if self.resolution is not None:
logger.info(f"Resampling data to resolution: {self.resolution} m")
# resampling
ds_subset = resample_rioxarray(ds, resolution=self.resolution, method=self.resample_method)
else:
ds_subset = ds

# assign coordinates
ds_subset = calc_latlon(ds_subset)

return ds_subset, ds

def preprocess_fn_radiances(self, file: List[str], cloud_mask: np.array) -> xr.Dataset:
"""
Preprocesses the MSG radiance dataset.
Args:
file (List[str]): The input file.
Returns:
xr.Dataset: The preprocessed dataset.
"""

# Load file using satpy scenes
scn = Scene(
reader="seviri_l1b_native",
filenames=file
)
# Load radiance bands
channels = [x for x in scn.available_dataset_names() if x!='HRV']
assert len(channels) == 11, "Number of channels is not 11"

scn.load(channels, generate=False, calibration='radiance')

# change to xarray data
ds = scn.to_xarray()

# attach cloud mask as data variable before preprocessing
ds = ds.assign_coords({"cloud_mask": (("y", "x"), cloud_mask)})

# do core preprocess function (e.g. resample, add crs etc.)
ds_subset, ds = self.preprocess_fn(ds)

# Store the attributes in a dict before concatenation
attrs_dict = {x: ds_subset[x].attrs for x in channels}

# concatinate in new band dimension
# NOTE: Concatination overwrites attrs of bands.
ds_subset = ds_subset.assign(Rad=xr.concat(list(map(lambda x: ds_subset[x], channels)), dim="band"))
# rename band dimensions
ds_subset = ds_subset.assign_coords(band=list(map(lambda x: x, channels)))
ds_subset = ds_subset.drop(list(map(lambda x: x, channels)))

# drop variables that will no longer be needed
ds_subset = ds_subset.drop(list(map(lambda x: f'{x}_acq_time', channels)))

# extract measurement time
time_stamp = attrs_dict[list(attrs_dict.keys())[0]]['start_time']
# assign bands and time data to each variable
ds_subset = ds_subset.expand_dims({"time": [time_stamp]})

# NOTE: Keep only certain relevant attributes
ds_subset.attrs = {}
ds_subset.attrs = dict(
calibration=attrs_dict[list(attrs_dict.keys())[0]]["calibration"],
standard_name=attrs_dict[list(attrs_dict.keys())[0]]["standard_name"],
platform_name=attrs_dict[list(attrs_dict.keys())[0]]["platform_name"],
sensor=attrs_dict[list(attrs_dict.keys())[0]]["sensor"],
units=attrs_dict[list(attrs_dict.keys())[0]]["units"],
orbital_parameters=attrs_dict[list(attrs_dict.keys())[0]]["orbital_parameters"]
)

# TODO: Correct wavelength assignment. This attaches 36++ wavelengths to each band.
# assign band wavelengths
ds_subset = ds_subset.expand_dims({"band_wavelength": list(MSG_WAVELENGTHS.values())})

# rename coord for coordinate reference system
ds_subset = ds_subset.rename({'msg_seviri_fes_3km': 'CRS'})

return ds_subset

def preprocess_fn_cloudmask(self, file: List[str]) -> np.array:
"""
Preprocesses the input dataset for MSG cloud masks.
Args:
file (List[str]): The input file.
Returns:
np.array: The preprocessed dataset.
"""

grbs = pygrib.open(file[0])
# Loop over all messages in the GRIB file
for grb in grbs:
if grb.name == 'Cloud mask':
# Extract values from grb and return np.array
cloud_mask = grb.values
return cloud_mask

def preprocess_radiances(self, files: List[str], cloud_mask: np.array) -> xr.Dataset:
"""
Preprocesses radiances from the input files.
Args:
files (List[str]): The list of file paths.
Returns:
xr.Dataset: The preprocessed dataset.
"""
# Check that all files contain radiance data
file = list(filter(lambda x: ".nat" in x, files))

# Check that only one file is selected
logger.info(f"Number of radiance files: {len(file)}")
assert len(file) == 1

# load file using satpy, convert to xarray dataset, and preprocess
ds = self.preprocess_fn_radiances(file, cloud_mask=cloud_mask)

return ds

def preprocess_cloud_mask(self, files: List[str]) -> xr.Dataset:
"""
Preprocesses cloud mask from the input files.
Args:
files (List[str]): The list of file paths.
Returns:
xr.Dataset: The preprocessed dataset.
"""
# Check that all files contain cloud mask data
file = list(filter(lambda x: "CLMK" in x, files))

# Check that only one file is present
logger.info(f"Number of cloud mask files: {len(file)}")
assert len(file) == 1

# load file using satpy, convert to xarray dataset, and preprocess
ds = self.preprocess_fn_cloudmask(file)

return ds

def preprocess_files(self):
"""
Preprocesses multiple files in read path and saves processed files to save path.
"""
# get unique times from read path
files_radiances, files_cloudmask = self.msg_files
unique_times_radiances = list(set(map(parse_msg_dates_from_file, files_radiances)))
unique_times_cloudmask = list(set(map(parse_msg_dates_from_file, files_cloudmask)))

df_matches = match_timestamps(unique_times_radiances, unique_times_cloudmask, cutoff=15)

pbar_time = tqdm(df_matches["timestamps_data"].values)

for itime in pbar_time:

pbar_time.set_description(f"Processing: {itime}")

# get cloud mask file for specific time
itime_cloud = df_matches.loc[df_matches["timestamps_data"] == itime, "timestamps_cloudmask"].values[0]
files_cloud = list(filter(lambda x: itime_cloud in x, files_cloudmask))

try:
# load cloud mask
cloud_mask = self.preprocess_cloud_mask(files_cloud)
except AssertionError:
logger.error(f"Skipping {itime} due to missing cloud mask")
continue

# get data files for specific time
files = list(filter(lambda x: itime in x, files_radiances))

try:
# load radiances and attach cloud mask
ds = self.preprocess_radiances(files, cloud_mask=cloud_mask)
except AssertionError:
logger.error(f"Skipping {itime} due to error loading")
continue

# check if save path exists, and create if not
if not os.path.exists(self.save_path):
os.makedirs(self.save_path)

# remove file if it already exists
save_filename = Path(self.save_path).joinpath(f"{itime}_msg.nc")
if os.path.exists(save_filename):
logger.info(f"File already exists. Overwriting file: {save_filename}")
os.remove(save_filename)

# remove attrs that cause netcdf error
for var in ds.data_vars:
ds[var].attrs.pop('grid_mapping', None)

# save to netcdf
ds.to_netcdf(save_filename, engine="netcdf4")


def geoprocess_msg(
resolution: float = None, # defined in meters
read_path: str = "/Users/anna.jungbluth/Desktop/git/rs_tools/data/msg",
save_path: str = "/Users/anna.jungbluth/Desktop/git/rs_tools/data/msg/geoprocessed",
region: Tuple[int, int, int, int] = (None, None, None, None),
resample_method: str = "bilinear",
):
"""
Geoprocesses MSG files
Args:
resolution (float, optional): The resolution of the downloaded files in meters. Defaults to None.
read_path (str, optional): The path to read the files from. Defaults to "./".
save_path (str, optional): The path to save the downloaded files. Defaults to "./".
region (Tuple[int, int, int, int], optional): The geographic region to download files for. Defaults to None.
resample_method (str, optional): The resampling method to use. Defaults to "bilinear".
Returns:
None
"""
# Initialize MSG GeoProcessor
logger.info(f"Initializing MSG GeoProcessor...")
msg_geoprocessor = MSGGeoProcessing(
resolution=resolution,
read_path=read_path,
save_path=save_path,
region=region,
resample_method=resample_method
)
logger.info(f"GeoProcessing Files...")
msg_geoprocessor.preprocess_files()

logger.info(f"Finished MSG GeoProcessing Script...!")


if __name__ == '__main__':
"""
# =========================
# Test Cases
# =========================
# ====================
# FAILURE TEST CASES
# ====================
"""
typer.run(geoprocess_msg)
1 change: 1 addition & 0 deletions rs_tools/_src/geoprocessing/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Union, List, Dict, Tuple
import numpy as np
import xarray as xr
import pandas as pd

def check_sat_FOV(region: Tuple[int, int, int, int], FOV: Tuple[int, int, int, int]) -> bool:
"""