Skip to content

Commit

Permalink
Added functionality to input timerange
Browse files Browse the repository at this point in the history
  • Loading branch information
annajungbluth committed Dec 8, 2023
1 parent b6b32b6 commit b9038d5
Showing 1 changed file with 97 additions and 9 deletions.
106 changes: 97 additions & 9 deletions scripts/goes-download.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,26 @@
import tqdm
import typer
from loguru import logger
from datetime import datetime, timedelta

from goes2go import GOES

# The cadence depends on the measurement scale
# The full disk is measured every 15 mins
# CONUS is measured every 5 mins
# Mesoscale is measured every 1 min
DOMAIN_TIMESTEP = {
'F': 15,
'C': 5,
'M': 1
}

def goes_download(
list_of_dates: List[str],
# t0: str,
# t1: str,
# dt: str,
start_date: str,
end_date: Optional[str]=None,
start_time: Optional[str]='00:00:00',
end_time: Optional[str]='23:59:00',
time_step: Optional[str]=None,
satellite_number: int=16,
save_dir: Optional[str]=".",
num_files: Optional[int]=None,
Expand All @@ -131,14 +143,37 @@ def goes_download(
_check_satellite_number(satellite_number=satellite_number)
_check_domain(domain=domain)
_check_bands(bands=bands)
# TODO: check bands
data_product = f"{instrument}-{processing_level}-{data_product}"
logger.info(f"Data Product: {data_product}")
_check_data_product_name(data_product=data_product)

if end_date is None:
end_date = start_date

start_datetime_str = start_date + ' ' + start_time
end_datetime_str = end_date + ' ' + end_time
_check_datetime_format(start_datetime_str, end_datetime_str)
# datetime conversion
start_datetime = datetime.strptime(start_datetime_str, "%Y-%m-%d %H:%M:%S")
end_datetime = datetime.strptime(end_datetime_str, "%Y-%m-%d %H:%M:%S")

_check_start_end_times(start_datetime=start_datetime, end_datetime=end_datetime)

if time_step is None:
time_step = '1:00:00'
logger.info("No timedelta specified. Default is 1 hour.")
_check_timedelta_format(time_delta=time_step)

hours, minutes, seconds = convert_str2time(time=time_step)
time_delta = timedelta(hours=hours, minutes=minutes, seconds=seconds)

_check_timedelta(time_delta=time_delta, domain=domain)

# Compile list of dates/times
list_of_dates = np.arange(start_datetime, end_datetime, time_delta).astype(datetime).astype(str)

G = GOES(satellite=satellite_number, product=data_product, domain=domain)
# get list of times
# TODO: get timetimes

if bands == 'all':
list_of_bands = list(np.arange(1, 17))
else:
Expand All @@ -158,13 +193,58 @@ def goes_download(
return files


def _check_datetime_format(start_datetime_str: str, end_datetime_str: str) -> bool:
try:
datetime.strptime(start_datetime_str, "%Y-%m-%d %H:%M:%S")
datetime.strptime(end_datetime_str, "%Y-%m-%d %H:%M:%S")
return True
except Exception as e:
msg = "Please check date/time format"
msg += "\nExpected date format: %Y-%m-%d"
msg += "\nExpected time format: %H:%M:%S"
raise SyntaxError(msg)


def _check_start_end_times(start_datetime: datetime, end_datetime: datetime) -> bool:
""" check end_datetime is after start_datetime """
if start_datetime < end_datetime:
return True
else:
msg = "Start datetime must be before end datetime\n"
msg += f"This does not hold for start = {str(start_datetime)} and end = {str(end_datetime)}"
raise ValueError(msg)

def _check_timedelta_format(time_delta: str) -> bool:
try:
time_list = time_delta.split(":")
assert len(time_list) == 3
assert 0 <= int(time_list[0]) # Check that hours is >= 0, and convertible to int
assert 0 <= int(time_list[1]) < 60 # Check that minutes < 60, and convertible to int
assert 0 <= int(time_list[2]) < 60 # Check that seconds < 60, and convertible to int

except Exception as e:
msg = "Please check time step format"
msg += "\nExpected time format: %H:%M:%S"
raise SyntaxError(msg)


def _check_timedelta(time_delta: datetime, domain: str) -> bool:
if time_delta.days > 0: return True

if time_delta.seconds >= DOMAIN_TIMESTEP[domain] * 60: return True

msg = "Time delta must not be smaller than the time resolution of the data\n"
msg += f"Time delta {str(time_delta)} is too small for domain {domain}\n"
msg += f"The minimum required time delta is {DOMAIN_TIMESTEP[domain]} min"
raise ValueError(msg)

def _check_domain(domain: str) -> bool:
"""checks domain GOES data"""
# TODO: Check mesoscale
if str(domain) in ["F", "C", "M"]:
return True
else:
msg = "Unrecognized domain "
msg = "Unrecognized domain"
msg += f"\nNeeds to be 'F', 'C', 'M'."
msg += "\nOthers are not yet implemented"
raise ValueError(msg)
Expand Down Expand Up @@ -223,7 +303,15 @@ def _check_bands(bands: Union[List[str], str]) -> bool:
result = list(map(criteria, bands))

assert result.sum() == len(bands)


# TODO: Move to different script?
def convert_str2time(time: str):
time_list = time.split(":")
hours = int(time_list[0])
minutes = int(time_list[1])
seconds = int(time_list[2])

return hours, minutes, seconds

def main(input: str):

Expand Down

0 comments on commit b9038d5

Please sign in to comment.