diff --git a/nwm_filenames/operational_aws/filename_helpers.py b/nwm_filenames/operational_aws/filename_helpers.py new file mode 100644 index 0000000..e04b2c1 --- /dev/null +++ b/nwm_filenames/operational_aws/filename_helpers.py @@ -0,0 +1,38 @@ +#from concurrent.futures import ThreadPoolExecutor +import gevent +import requests +from functools import partial +from tqdm import tqdm + +def check_valid_urls(file_list, session=None): + """if not session: + session = requests.Session()""" + t = tqdm(range(len(file_list))) + check_url_part = partial(check_url, t) + """with ThreadPoolExecutor(max_workers=10) as executor: + valid_file_list = list(executor.map(check_url_part, file_list))""" + valid_file_list = [gevent.spawn(check_url_part, file_name) for file_name in file_list] + gevent.joinall(valid_file_list) + return [file.get() for file in valid_file_list if file.get() is not None] + + +def check_url(t, file): + filename = file.split("/")[-1] + try: + with requests.head(file) as response: + if response.status_code == 200: + t.set_description(f"Found: {filename}") + t.update(1) + t.refresh() + return file + else: + t.set_description(f"Not Found: {filename}") + t.update(1) + t.refresh() + return None + #response = session.head(file, timeout=1) + except requests.exceptions.RequestException: + t.set_description(f"Not Found: {filename}") + t.update(1) + t.refresh() + return None diff --git a/nwm_filenames/operational_aws/listofnwmfilenames.py b/nwm_filenames/operational_aws/listofnwmfilenames.py new file mode 100644 index 0000000..90829e7 --- /dev/null +++ b/nwm_filenames/operational_aws/listofnwmfilenames.py @@ -0,0 +1,441 @@ +from gevent import monkey +monkey.patch_all() +from dateutil import rrule +from datetime import datetime, timezone +from itertools import product +from filename_helpers import check_valid_urls +import time + +rundict = { + 1: "short_range", + 2: "medium_range", + 3: "medium_range_no_da", + 4: "long_range", + 5: "analysis_assim", + 6: "analysis_assim_extend", + 7: "analysis_assim_extend_no_da", + 8: "analysis_assim_long", + 9: "analysis_assim_long_no_da", + 10: "analysis_assim_no_da", + 11: "short_range_no_da", +} +memdict = { + 1: "mem_1", + 2: "mem_2", + 3: "mem_3", + 4: "mem_4", + 5: "mem_5", + 6: "mem_6", + 7: "mem_7", +} +vardict = {1: "channel_rt", 2: "land", 3: "reservoir", 4: "terrain_rt", 5: "forcing"} +geodict = {1: "conus", 2: "hawaii", 3: "puertorico"} + + +def selectvar(vardict, varinput): + return vardict[varinput] + + +def selectgeo(geodict, geoinput): + return geodict[geoinput] + + +def selectrun(rundict, runinput): + return rundict[runinput] + + +def makename( + date, + run_name, + var_name, + fcst_cycle, + fcst_hour, + geography, + run_type, + fhprefix="", + runsuffix="", + varsuffix="", + run_typesuffix="", + urlbase_prefix="", +): + """This function handles preprocessed text and converts it into the applicable url to access the appropriate file.""" + + datetxt = f"nwm.{date.strftime('%Y%m%d')}" + foldertxt = f"{run_type}{run_typesuffix}" + filetxt = f"nwm.t{fcst_cycle:02d}z.{run_name}{runsuffix}.{var_name}{varsuffix}.{fhprefix}{fcst_hour:03d}.{geography}.nc" + return f"{urlbase_prefix}{datetxt}/{foldertxt}/{filetxt}" + + +# setting run_type +def run_type(runinput, varinput, geoinput, default=""): + """This function takes the numeric command line input and converts to the text used in the url.""" + + if varinput == 5: # if forcing + if runinput == 5 and geoinput == 2: # if analysis_assim and hawaii + return "forcing_analysis_assim_hawaii" + elif runinput == 5 and geoinput == 3: # if analysis_assim and puerto rico + return "forcing_analysis_assim_puertorico" + elif runinput == 1 and geoinput == 2: # if short range and hawaii + return "forcing_short_range_hawaii" + elif runinput == 1 and geoinput == 3: # if short range and puerto rico + return "forcing_short_range_puertorico" + elif runinput == 5: # if analysis assim + return "forcing_analysis_assim" + elif runinput == 6: # if analysis_assim_extend + return "forcing_analysis_assim_extend" + elif runinput == 2: # if medium_range + return "forcing_medium_range" + elif runinput == 1: # if short range + return "forcing_short_range" + + elif runinput == 5 and geoinput == 3: # if analysis_assim and puertorico + return "analysis_assim_puertorico" + + elif runinput == 10 and geoinput == 3: # if analysis_assim_no_da and puertorico + return "analysis_assim_puertorico_no_da" + + elif runinput == 1 and geoinput == 3: # if short_range and puerto rico + return "short_range_puertorico" + + elif runinput == 11 and geoinput == 3: # if short_range_no_da and puerto rico + return "short_range_puertorico_no_da" + + else: + return default + + +def fhprefix(runinput): + if 4 <= runinput <= 10: + return "tm" + return "f" + + +def varsuffix(meminput): + if meminput in range(1, 8): + return f"_{meminput}" + else: + return "" + + +def run_typesuffix(meminput): + if meminput in range(1, 8): + return f"_mem{meminput}" + else: + return "" + + +def select_forecast_cycle(fcst_cycle=None, default=None): + if fcst_cycle: + return fcst_cycle + else: + return default + + +def select_lead_time(lead_time=None, default=None): + if lead_time: + return lead_time + else: + return default + + +urlbasedict = { + 0: "", + 1: "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/", + 2: "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/post-processed/WMS/", + 3: "https://storage.googleapis.com/national-water-model/", + 4: "https://storage.cloud.google.com/national-water-model/", + 5: "gs://national-water-model/", + 6: "gcs://national-water-model/", + 7: "https://noaa-nwm-pds.s3.amazonaws.com/", +} + + +def selecturlbase(urlbasedict, urlbaseinput, defaulturlbase=""): + if urlbaseinput in urlbasedict: + return urlbasedict[urlbaseinput] + else: + return defaulturlbase + + +def create_file_list( + runinput, + varinput, + geoinput, + meminput, + start_date=None, + end_date=None, + fcst_cycle=None, + urlbaseinput=None, + lead_time=None, # TODO: change this order; placed here to avoid breaking change +): + """for given date, run, var, fcst_cycle, and geography, print file names for the valid time (the range of fcst_hours) and dates""" + + runsuff = "" + + try: + geography = selectgeo(geodict, geoinput) + except: + geography = "geography_error" + try: + run_name = selectrun(rundict, runinput) + except: + run_name = "run_error" + try: + var_name = selectvar(vardict, varinput) + except: + var_name = "variable_error" + try: + urlbase_prefix = selecturlbase(urlbasedict, urlbaseinput) + except: + urlbase_prefix = "urlbase_error" + + try: + _dtstart = datetime.strptime(start_date, "%Y%m%d%H%M") + _until = datetime.strptime(end_date, "%Y%m%d%H%M") + except: + today = datetime.now(timezone.utc) + _dtstart = today + _until = today + + dates = rrule.rrule( + rrule.DAILY, + dtstart=_dtstart, + until=_until, + ) + run_t = run_type(runinput, varinput, geoinput, run_name) + fhp = fhprefix(runinput) + vsuff = varsuffix(meminput) + rtsuff = run_typesuffix(meminput) + + if runinput == 1: # if short_range + if varinput == 5: # if forcing + if geoinput == 2: # hawaii + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 13, 12)), + select_lead_time(lead_time, range(1, 49)), + ) + elif geoinput == 3: # puertorico + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [6]), + select_lead_time(lead_time, range(1, 48)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(1, 19)), + ) + elif geoinput == 3: # if puerto rico + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(6, 19, 12)), + select_lead_time(lead_time, range(1, 48)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(1, 19)), + ) + elif runinput == 2: # if medium_range + if varinput == 5: # if forcing + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 19, 6)), + select_lead_time(lead_time, range(1, 241)), + ) + else: + default_fc = range(0, 19, 6) + if meminput == 1: + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(1, 241)), + ) + elif varinput in {2, 4}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(3, 241, 3)), + ) + else: + raise ValueError("varinput") + elif meminput in range(2, 8): + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(1, 205)), + ) + elif varinput in {2, 4}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(3, 205, 3)), + ) + else: + raise ValueError("varinput") + else: + raise ValueError("meminput") + elif runinput == 3: # if medium_range_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 13, 6)), + select_lead_time(lead_time, range(3, 240, 3)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + elif runinput == 4: # if long_range + default_fc = range(0, 19, 6) + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(6, 721, 6)), + ) + elif varinput == 2: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(24, 721, 24)), + ) + else: + raise ValueError("varinput") + elif runinput == 5: # if analysis_assim (simplest form) + if varinput == 5: # if forcing + if geoinput == 2: # hawaii + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(19)), + select_lead_time(lead_time, range(3)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(20)), + select_lead_time(lead_time, range(3)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(3)), + ) + elif runinput == 6: # if analysis_assim_extend + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [16]), + select_lead_time(lead_time, range(28)), + ) + elif runinput == 7: # if analysis_assim_extend_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [16]), + select_lead_time(lead_time, range(28)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + elif runinput == 8: # if analysis_assim_long + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 24, 6)), + select_lead_time(lead_time, range(12)), + ) + elif runinput == 9: # if analysis_assim_long_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 24, 6)), + select_lead_time(lead_time, range(12)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + + elif runinput == 10: # if analysis_assim_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(21)), + select_lead_time(lead_time, range(3)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + + elif runinput == 11 and geoinput == 3: # if short_range_puertorico_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(6, 19, 12)), + select_lead_time(lead_time, range(1, 49)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + else: + raise ValueError("run error") + + r = [] + for _dt, _fc, _fh in prod: + r.append( + makename( + _dt, + run_name, + var_name, + _fc, + _fh, + geography, + run_t, + fhp, + runsuff, + vsuff, + rtsuff, + urlbase_prefix, + ) + ) + return r + + +def main(): + + + start_date = "201809170000" + end_date = "201809172300" + fcst_cycle = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23] + lead_time = [1, 2, 240] + # fcst_cycle = None # Retrieves a full day for each day within the range given. + runinput = 1 + varinput = 1 + geoinput = 1 + meminput = 0 + urlbaseinput = 3 + + file_list = create_file_list( + runinput, + varinput, + geoinput, + meminput, + start_date, + end_date, + fcst_cycle, + urlbaseinput, + ) + if len(file_list) == 0: + print(f"No files found") + else: + print(f"Files: {file_list}\nTotal files: {len(file_list)}") + valid_file_list = check_valid_urls(file_list) + print(f"Valid Files: {valid_file_list}\nValid files: {len(valid_file_list)}") + + with open("filenamelist.txt", "w") as file: + for item in valid_file_list: + file.write(f"{item}\n") + + +if __name__ == "__main__": + start = time.time() + main() + print(time.time() - start) diff --git a/nwm_filenames/operational_aws/test_cases.py b/nwm_filenames/operational_aws/test_cases.py new file mode 100644 index 0000000..83242bf --- /dev/null +++ b/nwm_filenames/operational_aws/test_cases.py @@ -0,0 +1,207 @@ +import pytest +from datetime import datetime +from listofnwmfilenames import ( + selectvar, + selectgeo, + selectrun, + makename, + run_type, + fhprefix, + varsuffix, + run_typesuffix, + select_forecast_cycle, + select_lead_time, + selecturlbase, + create_file_list, +) + + +def test_selectvar(): + assert selectvar({1: "channel_rt"}, 1) == "channel_rt" + + +def test_selectgeo(): + assert selectgeo({1: "conus"}, 1) == "conus" + + +def test_selectrun(): + assert selectrun({1: "short_range"}, 1) == "short_range" + + +def test_makename(): + assert makename( + datetime(2022, 1, 1, 0, 0, 0, 0), + "short_range", + "channel_rt", + 0, + 1, + "conus", + "forcing", + fhprefix="f", + runsuffix="_test", + varsuffix="_test", + run_typesuffix="_test", + urlbase_prefix="https://example.com/", + ) == "https://example.com/nwm.20220101/forcing_test/nwm.t00z.short_range_test.channel_rt_test.f001.conus.nc" + +@pytest.mark.parametrize("runinput, varinput, geoinput, expected_output", [ + (5, 5, 2, "forcing_analysis_assim_hawaii"), + (5, 5, 3, "forcing_analysis_assim_puertorico"), + (2, 5, 7, "forcing_medium_range"), + (1, 5, 7, "forcing_short_range"), + (1, 3, 3, "short_range_puertorico"), + (1, 5, 2, "forcing_short_range_hawaii"), + (1, 5, 3, "forcing_short_range_puertorico"), + (5, 5, 7, "forcing_analysis_assim"), + (6, 5, 7, "forcing_analysis_assim_extend"), + (5, 3, 3, "analysis_assim_puertorico"), + (10, 3, 3, "analysis_assim_puertorico_no_da"), + (1, 3, 3, "short_range_puertorico"), + (11, 3, 3, "short_range_puertorico_no_da"), + (2, 2, 2, "default_value") # Add a test case for default value +]) +def test_run_type(runinput, varinput, geoinput, expected_output): + assert run_type(runinput, varinput, geoinput, "default_value") == expected_output + + +def test_fhprefix(): + assert fhprefix(5) == "tm" + assert fhprefix(1) == "f" + assert fhprefix(10) == "tm" + + +def test_varsuffix(): + assert varsuffix(1) == "_1" + assert varsuffix(7) == "_7" + assert varsuffix(8) == "" + + +def test_run_typesuffix(): + assert run_typesuffix(1) == "_mem1" + assert run_typesuffix(7) == "_mem7" + assert run_typesuffix(8) == "" + + +def test_select_forecast_cycle(): + assert select_forecast_cycle(12, 0) == 12 + assert select_forecast_cycle(None, 0) == 0 + + +def test_select_lead_time(): + assert select_lead_time(240, 0) == 240 + assert select_lead_time(None, 0) == 0 + + +def test_selecturlbase(): + assert selecturlbase({1: "https://example.com/"}, 1) == "https://example.com/" + assert selecturlbase({1: "https://example.com/"}, 2, "default") == "default" + +fcst_cycle_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23] +lead_time_values = [1, 2, 240] +valid_base_urls = [ + "", + "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/", + "https://storage.googleapis.com/national-water-model/", + "https://storage.cloud.google.com/national-water-model/", + "gs://national-water-model/", + "gcs://national-water-model/", + "https://noaa-nwm-pds.s3.amazonaws.com/", + "https://ciroh-nwm-zarr-copy.s3.amazonaws.com/national-water-model/", +] + +valid_folder_names = [ + "analysis_assim", + "analysis_assim_alaska", + "analysis_assim_alaska_no_da", + "analysis_assim_coastal_atlgulf", + "analysis_assim_coastal_hawaii", + "analysis_assim_coastal_pacific", + "analysis_assim_coastal_puertorico", + "analysis_assim_extend", + "analysis_assim_extend_alaska", + "analysis_assim_extend_alaska_no_da", + "analysis_assim_extend_coastal_atlgulf", + "analysis_assim_extend_coastal_pacific", + "analysis_assim_extend_no_da", + "analysis_assim_hawaii", + "analysis_assim_hawaii_no_da", + "analysis_assim_long", + "analysis_assim_long_no_da", + "analysis_assim_no_da", + "analysis_assim_puertorico", + "analysis_assim_puertorico_no_da", + "forcing_analysis_assim", + "forcing_analysis_assim_alaska", + "forcing_analysis_assim_extend", + "forcing_analysis_assim_extend_alaska", + "forcing_analysis_assim_hawaii", + "forcing_analysis_assim_puertorico", + "forcing_medium_range", + "forcing_medium_range_alaska", + "forcing_medium_range_blend", + "forcing_medium_range_blend_alaska", + "forcing_short_range", + "forcing_short_range_alaska", + "forcing_short_range_hawaii", + "forcing_short_range_puertorico", + "long_range_mem1", + "long_range_mem2", + "long_range_mem3", + "long_range_mem4", + "medium_range_alaska_mem1", + "medium_range_alaska_mem2", + "medium_range_alaska_mem3", + "medium_range_alaska_mem4", + "medium_range_alaska_mem5", + "medium_range_alaska_mem6", + "medium_range_alaska_no_da", + "medium_range_blend", + "medium_range_blend_alaska", + "medium_range_blend_coastal_atlgulf", + "medium_range_blend_coastal_pacific", + "medium_range_coastal_atlgulf_mem1", + "short_range", + "medium_range", + "long_range_mem7", + "medium_range_no_da_mem6" +] +import requests + +def is_valid_url(url): + try: + response = requests.head(url) + return response.status_code < 400 + except requests.ConnectionError: + return False + + +@pytest.mark.parametrize("runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time, expected_output", [ + (1, 1, 1, 0, "201809170000", "201809172300", fcst_cycle_values, 3, None, ["expected_file_name_1"]), + (5, 5, 2, 1, "201809170000", "201809171200", fcst_cycle_values, 1, lead_time_values, ["expected_file_name_2"]), + (2, 5, 3, 3, "201809170600", "201809171800", fcst_cycle_values, 2, lead_time_values, ["expected_file_name_3"]), + (1, 1, 5, 4, "201809170200", "201809171400", fcst_cycle_values, 4, lead_time_values, ["expected_file_name_4"]), + (2, 2, 4, 5, "201809170800", "201809172000", fcst_cycle_values, 5, lead_time_values, ["expected_file_name_5"]), + (3, 1, 5, 6, "201809171000", "201809172200", fcst_cycle_values, 6, lead_time_values, ["expected_file_name_6"]), + (4, 2, 5, 7, "201809171200", "201809172400", fcst_cycle_values, 7, lead_time_values, ["expected_file_name_7"]), + (5, 5, 1, 8, "201809171400", "201809172600", fcst_cycle_values, 8, lead_time_values, ["expected_file_name_8"]), + (6, 1, 16, 9, "201809171600", "201809172800", fcst_cycle_values, 9, lead_time_values, ["expected_file_name_9"]), + (8, 5, 3, 12, "201809172200", "201809173400", fcst_cycle_values, 12, lead_time_values, ["expected_file_name_12"]), + (11, 1, 3, 18, "201809173400", "201809174600", fcst_cycle_values, 18, lead_time_values, ["expected_file_name_18"]), +]) +def test_create_file_list(runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time, expected_output): + file_list = create_file_list(runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time) + assert isinstance(file_list, list) + assert all(isinstance(file_name, str) for file_name in file_list) + for url in file_list: + # assert is_valid_url(url), f"Invalid URL: {url}" + assert any(substring in url for substring in valid_folder_names), f"No valid folder name found in URL: {url}" + + + # Check if all base URLs exist in the predefined list + for url in file_list: + assert any(url.startswith(base_url) for base_url in valid_base_urls), f"Invalid base URL in generated URL: {url}" + + + +if __name__ == "__main__": + pytest.main() \ No newline at end of file diff --git a/nwm_filenames/operational_aws/upload.py b/nwm_filenames/operational_aws/upload.py new file mode 100644 index 0000000..62fe09f --- /dev/null +++ b/nwm_filenames/operational_aws/upload.py @@ -0,0 +1,52 @@ +from concurrent.futures import ProcessPoolExecutor +import requests +import boto3 +from kerchunk.hdf import SingleHdf5ToZarr +import fsspec +import json +import os + +AWS_ACCESS_KEY_ID = 'AKIA4P7DSRJWW4TWOXVA' +AWS_SECRET_ACCESS_KEY = 'Gr9dS0Rrq8KmB8937honqzZDT06MXCy/j0H+VS4t' +BUCKET = "ciroh-nwm-zarr-copy" + +def download_and_convert_and_upload(filename): + + fileurl = filename.replace("\n","") + + text_split = fileurl.split("/") + filename = text_split[-1] + justname = filename[:-3] + sub_folder = text_split[-2] + date_folder = text_split[-3] + bucket_name = text_split[-4] + + s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) + + res = requests.get(fileurl, allow_redirects=True) + with open(f"./tmp{justname}.nc", "w+b") as file: + + file.write(res.content) + converted_data = SingleHdf5ToZarr(file, fileurl).translate() + with open(f"./{justname}.json", "w") as file2: + json.dump(converted_data,file2) + with open(f"./{justname}.json", "r") as file2: + s3.upload_file(f"./{justname}.json",BUCKET,f"{bucket_name}/{date_folder}/{sub_folder}/{filename}.json") + + + os.remove(f"./{justname}.json") + os.remove(f"./tmp{justname}.nc") + +def main(): + + with open("./filenamelist.txt") as file: + + lines = file.readlines() + + with ProcessPoolExecutor() as executor: + + executor.map(download_and_convert_and_upload, lines) + + +if __name__ == "__main__": + main() diff --git a/nwm_filenames/operational_aws_api/filename_helpers.py b/nwm_filenames/operational_aws_api/filename_helpers.py new file mode 100644 index 0000000..e04b2c1 --- /dev/null +++ b/nwm_filenames/operational_aws_api/filename_helpers.py @@ -0,0 +1,38 @@ +#from concurrent.futures import ThreadPoolExecutor +import gevent +import requests +from functools import partial +from tqdm import tqdm + +def check_valid_urls(file_list, session=None): + """if not session: + session = requests.Session()""" + t = tqdm(range(len(file_list))) + check_url_part = partial(check_url, t) + """with ThreadPoolExecutor(max_workers=10) as executor: + valid_file_list = list(executor.map(check_url_part, file_list))""" + valid_file_list = [gevent.spawn(check_url_part, file_name) for file_name in file_list] + gevent.joinall(valid_file_list) + return [file.get() for file in valid_file_list if file.get() is not None] + + +def check_url(t, file): + filename = file.split("/")[-1] + try: + with requests.head(file) as response: + if response.status_code == 200: + t.set_description(f"Found: {filename}") + t.update(1) + t.refresh() + return file + else: + t.set_description(f"Not Found: {filename}") + t.update(1) + t.refresh() + return None + #response = session.head(file, timeout=1) + except requests.exceptions.RequestException: + t.set_description(f"Not Found: {filename}") + t.update(1) + t.refresh() + return None diff --git a/nwm_filenames/operational_aws_api/listofnwmfilenames.py b/nwm_filenames/operational_aws_api/listofnwmfilenames.py new file mode 100644 index 0000000..67456c4 --- /dev/null +++ b/nwm_filenames/operational_aws_api/listofnwmfilenames.py @@ -0,0 +1,493 @@ +from gevent import monkey +monkey.patch_all() +import gevent +from dateutil import rrule +from datetime import datetime, timezone +from itertools import product +#from filename_helpers import check_valid_urls +import time +import requests + +rundict = { + 1: "short_range", + 2: "medium_range", + 3: "medium_range_no_da", + 4: "long_range", + 5: "analysis_assim", + 6: "analysis_assim_extend", + 7: "analysis_assim_extend_no_da", + 8: "analysis_assim_long", + 9: "analysis_assim_long_no_da", + 10: "analysis_assim_no_da", + 11: "short_range_no_da", +} +memdict = { + 1: "mem_1", + 2: "mem_2", + 3: "mem_3", + 4: "mem_4", + 5: "mem_5", + 6: "mem_6", + 7: "mem_7", +} +vardict = {1: "channel_rt", 2: "land", 3: "reservoir", 4: "terrain_rt", 5: "forcing"} +geodict = {1: "conus", 2: "hawaii", 3: "puertorico"} + + +def selectvar(vardict, varinput): + return vardict[varinput] + + +def selectgeo(geodict, geoinput): + return geodict[geoinput] + + +def selectrun(rundict, runinput): + return rundict[runinput] + + +def makename( + date, + run_name, + var_name, + fcst_cycle, + fcst_hour, + geography, + run_type, + fhprefix="", + runsuffix="", + varsuffix="", + run_typesuffix="", + urlbase_prefix="", +): + """This function handles preprocessed text and converts it into the applicable url to access the appropriate file.""" + + datetxt = f"nwm.{date.strftime('%Y%m%d')}" + foldertxt = f"{run_type}{run_typesuffix}" + filetxt = f"nwm.t{fcst_cycle:02d}z.{run_name}{runsuffix}.{var_name}{varsuffix}.{fhprefix}{fcst_hour:03d}.{geography}.nc" + return f"{urlbase_prefix}{datetxt}/{foldertxt}/{filetxt}" + + +# setting run_type +def run_type(runinput, varinput, geoinput, default=""): + """This function takes the numeric command line input and converts to the text used in the url.""" + + if varinput == 5: # if forcing + if runinput == 5 and geoinput == 2: # if analysis_assim and hawaii + return "forcing_analysis_assim_hawaii" + elif runinput == 5 and geoinput == 3: # if analysis_assim and puerto rico + return "forcing_analysis_assim_puertorico" + elif runinput == 1 and geoinput == 2: # if short range and hawaii + return "forcing_short_range_hawaii" + elif runinput == 1 and geoinput == 3: # if short range and puerto rico + return "forcing_short_range_puertorico" + elif runinput == 5: # if analysis assim + return "forcing_analysis_assim" + elif runinput == 6: # if analysis_assim_extend + return "forcing_analysis_assim_extend" + elif runinput == 2: # if medium_range + return "forcing_medium_range" + elif runinput == 1: # if short range + return "forcing_short_range" + + elif runinput == 5 and geoinput == 3: # if analysis_assim and puertorico + return "analysis_assim_puertorico" + + elif runinput == 10 and geoinput == 3: # if analysis_assim_no_da and puertorico + return "analysis_assim_puertorico_no_da" + + elif runinput == 1 and geoinput == 3: # if short_range and puerto rico + return "short_range_puertorico" + + elif runinput == 11 and geoinput == 3: # if short_range_no_da and puerto rico + return "short_range_puertorico_no_da" + + else: + return default + + +def fhprefix(runinput): + if 4 <= runinput <= 10: + return "tm" + return "f" + + +def varsuffix(meminput): + if meminput in range(1, 8): + return f"_{meminput}" + else: + return "" + + +def run_typesuffix(meminput): + if meminput in range(1, 8): + return f"_mem{meminput}" + else: + return "" + + +def select_forecast_cycle(fcst_cycle=None, default=None): + if fcst_cycle: + return fcst_cycle + else: + return default + + +def select_lead_time(lead_time=None, default=None): + if lead_time: + return lead_time + else: + return default + + +urlbasedict = { + 0: "", + 1: "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/", + 2: "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/post-processed/WMS/", + 3: "https://storage.googleapis.com/national-water-model/", + 4: "https://storage.cloud.google.com/national-water-model/", + 5: "gs://national-water-model/", + 6: "gcs://national-water-model/", + 7: "https://noaa-nwm-pds.s3.amazonaws.com/", +} + + +def selecturlbase(urlbasedict, urlbaseinput, defaulturlbase=""): + if urlbaseinput in urlbasedict: + return urlbasedict[urlbaseinput] + else: + return defaulturlbase + + +def create_file_list( + runinput, + varinput, + geoinput, + meminput, + start_date=None, + end_date=None, + fcst_cycle=None, + urlbaseinput=None, + lead_time=None, # TODO: change this order; placed here to avoid breaking change +): + """for given date, run, var, fcst_cycle, and geography, print file names for the valid time (the range of fcst_hours) and dates""" + + runsuff = "" + + try: + geography = selectgeo(geodict, geoinput) + except: + geography = "geography_error" + try: + run_name = selectrun(rundict, runinput) + except: + run_name = "run_error" + try: + var_name = selectvar(vardict, varinput) + except: + var_name = "variable_error" + try: + urlbase_prefix = selecturlbase(urlbasedict, urlbaseinput) + except: + urlbase_prefix = "urlbase_error" + + try: + _dtstart = datetime.strptime(start_date, "%Y%m%d%H%M") + _until = datetime.strptime(end_date, "%Y%m%d%H%M") + except: + today = datetime.now(timezone.utc) + _dtstart = today + _until = today + + dates = rrule.rrule( + rrule.DAILY, + dtstart=_dtstart, + until=_until, + ) + run_t = run_type(runinput, varinput, geoinput, run_name) + fhp = fhprefix(runinput) + vsuff = varsuffix(meminput) + rtsuff = run_typesuffix(meminput) + + if runinput == 1: # if short_range + if varinput == 5: # if forcing + if geoinput == 2: # hawaii + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 13, 12)), + select_lead_time(lead_time, range(1, 49)), + ) + elif geoinput == 3: # puertorico + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [6]), + select_lead_time(lead_time, range(1, 48)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(1, 19)), + ) + elif geoinput == 3: # if puerto rico + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(6, 19, 12)), + select_lead_time(lead_time, range(1, 48)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(1, 19)), + ) + elif runinput == 2: # if medium_range + if varinput == 5: # if forcing + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 19, 6)), + select_lead_time(lead_time, range(1, 241)), + ) + else: + default_fc = range(0, 19, 6) + if meminput == 1: + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(1, 241)), + ) + elif varinput in {2, 4}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(3, 241, 3)), + ) + else: + raise ValueError("varinput") + elif meminput in range(2, 8): + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(1, 205)), + ) + elif varinput in {2, 4}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(3, 205, 3)), + ) + else: + raise ValueError("varinput") + else: + raise ValueError("meminput") + elif runinput == 3: # if medium_range_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 13, 6)), + select_lead_time(lead_time, range(3, 240, 3)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + elif runinput == 4: # if long_range + default_fc = range(0, 19, 6) + if varinput in {1, 3}: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(6, 721, 6)), + ) + elif varinput == 2: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, default_fc), + select_lead_time(lead_time, range(24, 721, 24)), + ) + else: + raise ValueError("varinput") + elif runinput == 5: # if analysis_assim (simplest form) + if varinput == 5: # if forcing + if geoinput == 2: # hawaii + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(19)), + select_lead_time(lead_time, range(3)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(20)), + select_lead_time(lead_time, range(3)), + ) + else: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(24)), + select_lead_time(lead_time, range(3)), + ) + elif runinput == 6: # if analysis_assim_extend + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [16]), + select_lead_time(lead_time, range(28)), + ) + elif runinput == 7: # if analysis_assim_extend_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, [16]), + select_lead_time(lead_time, range(28)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + elif runinput == 8: # if analysis_assim_long + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 24, 6)), + select_lead_time(lead_time, range(12)), + ) + elif runinput == 9: # if analysis_assim_long_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(0, 24, 6)), + select_lead_time(lead_time, range(12)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + + elif runinput == 10: # if analysis_assim_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(21)), + select_lead_time(lead_time, range(3)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + + elif runinput == 11 and geoinput == 3: # if short_range_puertorico_no_da + if varinput == 1: + prod = product( + dates, + select_forecast_cycle(fcst_cycle, range(6, 19, 12)), + select_lead_time(lead_time, range(1, 49)), + ) + else: + raise ValueError("only valid variable for a _no_da type run is channel_rt") + else: + raise ValueError("run error") + + r = [] + for _dt, _fc, _fh in prod: + r.append( + makename( + _dt, + run_name, + var_name, + _fc, + _fh, + geography, + run_t, + fhp, + runsuff, + vsuff, + rtsuff, + urlbase_prefix, + ) + ) + return r + + +def main(): + + start_date = "201809170000" + end_date = "201809172300" + fcst_cycle = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23] + lead_time = [1, 2, 240] + # fcst_cycle = None # Retrieves a full day for each day within the range given. + runinput = 1 + varinput = 5 + geoinput = 1 + meminput = 0 + urlbaseinput = 3 + + file_list = create_file_list( + runinput, + varinput, + geoinput, + meminput, + start_date, + end_date, + fcst_cycle, + urlbaseinput, + ) + if len(file_list) == 0: + print(f"No files found") + else: + #print(f"Files: {file_list}\nTotal files: {len(file_list)}") + paths = link_search_algo(file_list) + tasks = [gevent.spawn(check_directory,path) for path in paths] + gevent.joinall(tasks) + all_links = [] + for task in tasks: + l = task.value + for link in l: + if "https://storage.googleapis.com/national-water-model/"+link in file_list: + all_links.append("https://storage.googleapis.com/national-water-model/"+link) + + print(f"{len(all_links)} files found") + #valid_file_list = check_valid_urls(file_list) + #print(f"Valid Files: {valid_file_list}\nValid files: {len(valid_file_list)}") + + with open("filenamelist.txt", "w") as file: + for item in all_links: + file.write(f"{item}\n") + print("Success") + +def link_search_algo(file_list): + paths = [] + for file_name in file_list: + text_sp = file_name.split("/") + path = "/".join(text_sp[-3:-1])+"/" + if not path in paths: + paths.append(path) + return paths + +DOWNLOAD_LINK = "https://storage.googleapis.com/download/storage/v1/b/national-water-model/o/" +URL = "https://storage.googleapis.com/storage/v1/b/national-water-model/o?delimiter=/&prefix=" +TOKEN_PREFIX = "&pageToken=" + +def check_directory(path): + try: + json = requests.get(URL+path).json() + except Exception as e: + print(f"Couldn't fetch {URL}{path}") + try: + found_files = [i["name"] for i in json["items"]] + except: + return [] + + if "nextPageToken" in json: + return loop_check(URL+path,json["nextPageToken"]) + found_files + return found_files + +def loop_check(url,token): + try: + json = requests.get(url+TOKEN_PREFIX+token).json() + except Exception as e: + print(f"Couldn't fetch {url}") + + found_files = [i["name"] for i in json["items"]] + + if "nextPageToken" in json: + return loop_check(url,json["nextPageToken"]) + found_files + return found_files + + +if __name__ == "__main__": + start = time.time() + main() + print(time.time() - start) diff --git a/nwm_filenames/operational_aws_api/test_cases.py b/nwm_filenames/operational_aws_api/test_cases.py new file mode 100644 index 0000000..83242bf --- /dev/null +++ b/nwm_filenames/operational_aws_api/test_cases.py @@ -0,0 +1,207 @@ +import pytest +from datetime import datetime +from listofnwmfilenames import ( + selectvar, + selectgeo, + selectrun, + makename, + run_type, + fhprefix, + varsuffix, + run_typesuffix, + select_forecast_cycle, + select_lead_time, + selecturlbase, + create_file_list, +) + + +def test_selectvar(): + assert selectvar({1: "channel_rt"}, 1) == "channel_rt" + + +def test_selectgeo(): + assert selectgeo({1: "conus"}, 1) == "conus" + + +def test_selectrun(): + assert selectrun({1: "short_range"}, 1) == "short_range" + + +def test_makename(): + assert makename( + datetime(2022, 1, 1, 0, 0, 0, 0), + "short_range", + "channel_rt", + 0, + 1, + "conus", + "forcing", + fhprefix="f", + runsuffix="_test", + varsuffix="_test", + run_typesuffix="_test", + urlbase_prefix="https://example.com/", + ) == "https://example.com/nwm.20220101/forcing_test/nwm.t00z.short_range_test.channel_rt_test.f001.conus.nc" + +@pytest.mark.parametrize("runinput, varinput, geoinput, expected_output", [ + (5, 5, 2, "forcing_analysis_assim_hawaii"), + (5, 5, 3, "forcing_analysis_assim_puertorico"), + (2, 5, 7, "forcing_medium_range"), + (1, 5, 7, "forcing_short_range"), + (1, 3, 3, "short_range_puertorico"), + (1, 5, 2, "forcing_short_range_hawaii"), + (1, 5, 3, "forcing_short_range_puertorico"), + (5, 5, 7, "forcing_analysis_assim"), + (6, 5, 7, "forcing_analysis_assim_extend"), + (5, 3, 3, "analysis_assim_puertorico"), + (10, 3, 3, "analysis_assim_puertorico_no_da"), + (1, 3, 3, "short_range_puertorico"), + (11, 3, 3, "short_range_puertorico_no_da"), + (2, 2, 2, "default_value") # Add a test case for default value +]) +def test_run_type(runinput, varinput, geoinput, expected_output): + assert run_type(runinput, varinput, geoinput, "default_value") == expected_output + + +def test_fhprefix(): + assert fhprefix(5) == "tm" + assert fhprefix(1) == "f" + assert fhprefix(10) == "tm" + + +def test_varsuffix(): + assert varsuffix(1) == "_1" + assert varsuffix(7) == "_7" + assert varsuffix(8) == "" + + +def test_run_typesuffix(): + assert run_typesuffix(1) == "_mem1" + assert run_typesuffix(7) == "_mem7" + assert run_typesuffix(8) == "" + + +def test_select_forecast_cycle(): + assert select_forecast_cycle(12, 0) == 12 + assert select_forecast_cycle(None, 0) == 0 + + +def test_select_lead_time(): + assert select_lead_time(240, 0) == 240 + assert select_lead_time(None, 0) == 0 + + +def test_selecturlbase(): + assert selecturlbase({1: "https://example.com/"}, 1) == "https://example.com/" + assert selecturlbase({1: "https://example.com/"}, 2, "default") == "default" + +fcst_cycle_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23] +lead_time_values = [1, 2, 240] +valid_base_urls = [ + "", + "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/", + "https://storage.googleapis.com/national-water-model/", + "https://storage.cloud.google.com/national-water-model/", + "gs://national-water-model/", + "gcs://national-water-model/", + "https://noaa-nwm-pds.s3.amazonaws.com/", + "https://ciroh-nwm-zarr-copy.s3.amazonaws.com/national-water-model/", +] + +valid_folder_names = [ + "analysis_assim", + "analysis_assim_alaska", + "analysis_assim_alaska_no_da", + "analysis_assim_coastal_atlgulf", + "analysis_assim_coastal_hawaii", + "analysis_assim_coastal_pacific", + "analysis_assim_coastal_puertorico", + "analysis_assim_extend", + "analysis_assim_extend_alaska", + "analysis_assim_extend_alaska_no_da", + "analysis_assim_extend_coastal_atlgulf", + "analysis_assim_extend_coastal_pacific", + "analysis_assim_extend_no_da", + "analysis_assim_hawaii", + "analysis_assim_hawaii_no_da", + "analysis_assim_long", + "analysis_assim_long_no_da", + "analysis_assim_no_da", + "analysis_assim_puertorico", + "analysis_assim_puertorico_no_da", + "forcing_analysis_assim", + "forcing_analysis_assim_alaska", + "forcing_analysis_assim_extend", + "forcing_analysis_assim_extend_alaska", + "forcing_analysis_assim_hawaii", + "forcing_analysis_assim_puertorico", + "forcing_medium_range", + "forcing_medium_range_alaska", + "forcing_medium_range_blend", + "forcing_medium_range_blend_alaska", + "forcing_short_range", + "forcing_short_range_alaska", + "forcing_short_range_hawaii", + "forcing_short_range_puertorico", + "long_range_mem1", + "long_range_mem2", + "long_range_mem3", + "long_range_mem4", + "medium_range_alaska_mem1", + "medium_range_alaska_mem2", + "medium_range_alaska_mem3", + "medium_range_alaska_mem4", + "medium_range_alaska_mem5", + "medium_range_alaska_mem6", + "medium_range_alaska_no_da", + "medium_range_blend", + "medium_range_blend_alaska", + "medium_range_blend_coastal_atlgulf", + "medium_range_blend_coastal_pacific", + "medium_range_coastal_atlgulf_mem1", + "short_range", + "medium_range", + "long_range_mem7", + "medium_range_no_da_mem6" +] +import requests + +def is_valid_url(url): + try: + response = requests.head(url) + return response.status_code < 400 + except requests.ConnectionError: + return False + + +@pytest.mark.parametrize("runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time, expected_output", [ + (1, 1, 1, 0, "201809170000", "201809172300", fcst_cycle_values, 3, None, ["expected_file_name_1"]), + (5, 5, 2, 1, "201809170000", "201809171200", fcst_cycle_values, 1, lead_time_values, ["expected_file_name_2"]), + (2, 5, 3, 3, "201809170600", "201809171800", fcst_cycle_values, 2, lead_time_values, ["expected_file_name_3"]), + (1, 1, 5, 4, "201809170200", "201809171400", fcst_cycle_values, 4, lead_time_values, ["expected_file_name_4"]), + (2, 2, 4, 5, "201809170800", "201809172000", fcst_cycle_values, 5, lead_time_values, ["expected_file_name_5"]), + (3, 1, 5, 6, "201809171000", "201809172200", fcst_cycle_values, 6, lead_time_values, ["expected_file_name_6"]), + (4, 2, 5, 7, "201809171200", "201809172400", fcst_cycle_values, 7, lead_time_values, ["expected_file_name_7"]), + (5, 5, 1, 8, "201809171400", "201809172600", fcst_cycle_values, 8, lead_time_values, ["expected_file_name_8"]), + (6, 1, 16, 9, "201809171600", "201809172800", fcst_cycle_values, 9, lead_time_values, ["expected_file_name_9"]), + (8, 5, 3, 12, "201809172200", "201809173400", fcst_cycle_values, 12, lead_time_values, ["expected_file_name_12"]), + (11, 1, 3, 18, "201809173400", "201809174600", fcst_cycle_values, 18, lead_time_values, ["expected_file_name_18"]), +]) +def test_create_file_list(runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time, expected_output): + file_list = create_file_list(runinput, varinput, geoinput, meminput, start_date, end_date, fcst_cycle, urlbaseinput, lead_time) + assert isinstance(file_list, list) + assert all(isinstance(file_name, str) for file_name in file_list) + for url in file_list: + # assert is_valid_url(url), f"Invalid URL: {url}" + assert any(substring in url for substring in valid_folder_names), f"No valid folder name found in URL: {url}" + + + # Check if all base URLs exist in the predefined list + for url in file_list: + assert any(url.startswith(base_url) for base_url in valid_base_urls), f"Invalid base URL in generated URL: {url}" + + + +if __name__ == "__main__": + pytest.main() \ No newline at end of file diff --git a/nwm_filenames/operational_aws_api/upload.py b/nwm_filenames/operational_aws_api/upload.py new file mode 100644 index 0000000..62fe09f --- /dev/null +++ b/nwm_filenames/operational_aws_api/upload.py @@ -0,0 +1,52 @@ +from concurrent.futures import ProcessPoolExecutor +import requests +import boto3 +from kerchunk.hdf import SingleHdf5ToZarr +import fsspec +import json +import os + +AWS_ACCESS_KEY_ID = 'AKIA4P7DSRJWW4TWOXVA' +AWS_SECRET_ACCESS_KEY = 'Gr9dS0Rrq8KmB8937honqzZDT06MXCy/j0H+VS4t' +BUCKET = "ciroh-nwm-zarr-copy" + +def download_and_convert_and_upload(filename): + + fileurl = filename.replace("\n","") + + text_split = fileurl.split("/") + filename = text_split[-1] + justname = filename[:-3] + sub_folder = text_split[-2] + date_folder = text_split[-3] + bucket_name = text_split[-4] + + s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) + + res = requests.get(fileurl, allow_redirects=True) + with open(f"./tmp{justname}.nc", "w+b") as file: + + file.write(res.content) + converted_data = SingleHdf5ToZarr(file, fileurl).translate() + with open(f"./{justname}.json", "w") as file2: + json.dump(converted_data,file2) + with open(f"./{justname}.json", "r") as file2: + s3.upload_file(f"./{justname}.json",BUCKET,f"{bucket_name}/{date_folder}/{sub_folder}/{filename}.json") + + + os.remove(f"./{justname}.json") + os.remove(f"./tmp{justname}.nc") + +def main(): + + with open("./filenamelist.txt") as file: + + lines = file.readlines() + + with ProcessPoolExecutor() as executor: + + executor.map(download_and_convert_and_upload, lines) + + +if __name__ == "__main__": + main()