diff --git a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_lambda_shared_funcs.py b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_lambda_shared_funcs.py index d3b4b0387..ed178f0ec 100644 --- a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_lambda_shared_funcs.py +++ b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_lambda_shared_funcs.py @@ -661,15 +661,10 @@ def parse_range_token_value(reference_date_file, range_token, existing_list = [] def get_file_tokens(file_pattern): - token_dict = {} - tokens = re.findall("{{[a-z]*:[^{]*}}", file_pattern) + tokens = re.findall("{{(?P[a-z])+:(?P[^{])+}}", file_pattern) token_dict = {'datetime': [], 'range': [], 'variable': []} - for token in tokens: - token_key = token.split(":")[0][2:] - token_value = token.split(":")[1][:-2] - - token_dict[token_key].append(token_value) - + for (key, value) in tokens: + token_dict[key].append(value) return token_dict def parse_datetime_token_value(input_file, reference_date, datetime_token): @@ -726,17 +721,17 @@ def get_formatted_files(file_pattern, token_dict, reference_date): return reference_date_files def generate_file_list(file_pattern, file_step, file_window, reference_time): - import pandas as pd import isodate - - file_list = [] + + file_list = [] if 'common/data/model/com/nwm/prod' in file_pattern and (datetime.today() - timedelta(29)) > reference_time: file_pattern = file_pattern.replace('common/data/model/com/nwm/prod', 'https://storage.googleapis.com/national-water-model') if file_window: if not file_step: - file_step = None - reference_dates = pd.date_range(reference_time-isodate.parse_duration(file_window), reference_time, freq=file_step) + raise ValueError("file_window and file_step must be specified together") + start = reference_time - isodate.parse_duration(file_window) + reference_dates = list(date_range(start, reference_time, isodate.parse_duration(file_step))) else: reference_dates = [reference_time] @@ -766,4 +761,11 @@ def organize_input_files(fileset_bucket, fileset, download_subfolder): for file in fileset: download_path = check_if_file_exists(fileset_bucket, file, download=True, download_subfolder=download_subfolder) local_files.append(download_path) - return local_files \ No newline at end of file + return local_files + + +def date_range(start, stop, step): + assert start < stop and start + step > start + while start + step <= stop: + yield start + start += step diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml index 2b20eb9ec..860621d2c 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml @@ -7,7 +7,7 @@ run_times: python_preprocessing: - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P7D product: anomaly lambda_ram: 10gb @@ -16,7 +16,7 @@ python_preprocessing: target_table: ingest.ana_7day_anomaly target_keys: (feature_id) - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P14D product: anomaly lambda_ram: 10gb diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_high_flow_magnitude.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_high_flow_magnitude.yml index ac11ad6f1..dfad3a9fa 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_high_flow_magnitude.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_high_flow_magnitude.yml @@ -7,7 +7,7 @@ run_times: python_preprocessing: - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P7D product: max_values lambda_ram: 3gb @@ -15,7 +15,7 @@ python_preprocessing: target_table: ingest.nwm_channel_rt_ana_7day_max target_keys: (feature_id, streamflow) - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P14D product: max_values lambda_ram: 3gb diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_inundation.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_inundation.yml index ea03d9fa2..7e31e0df8 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_inundation.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_14day_max_inundation.yml @@ -8,7 +8,7 @@ run_times: python_preprocessing: - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P7D product: max_values lambda_ram: 3gb @@ -16,7 +16,7 @@ python_preprocessing: target_table: ingest.nwm_channel_rt_ana_7day_max target_keys: (feature_id, streamflow) - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P14D product: max_values lambda_ram: 3gb diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_24hr_snow_melt.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_24hr_snow_melt.yml index d894a57df..1a5acad75 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_24hr_snow_melt.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_24hr_snow_melt.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: ana_past_24hr_snow_melt file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc - file_step: 1D + file_step: P1D file_window: P1D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_72hr_snow_water_equivalent_change.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_72hr_snow_water_equivalent_change.yml index c8ab4380a..9cbd4a59f 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_72hr_snow_water_equivalent_change.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_past_72hr_snow_water_equivalent_change.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: ana_past_72hr_snow_water_equivalent_change file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc - file_step: 1D + file_step: P1D file_window: P3D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim/ana_past_72hr_accum_precip.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim/ana_past_72hr_accum_precip.yml index 3efcd9193..7fff7e8cb 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim/ana_past_72hr_accum_precip.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim/ana_past_72hr_accum_precip.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: past_72hr_accum_precip file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.conus.nc - file_step: 1H + file_step: PT1H file_window: P3D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_alaska/ana_past_72hr_accum_precip_ak.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_alaska/ana_past_72hr_accum_precip_ak.yml index 13474f6a6..7a88ee155 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_alaska/ana_past_72hr_accum_precip_ak.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_alaska/ana_past_72hr_accum_precip_ak.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: past_72hr_accum_precip file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_alaska/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.alaska.nc - file_step: 1H + file_step: PT1H file_window: P3D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_hawaii/ana_past_72hr_accum_precip_hi.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_hawaii/ana_past_72hr_accum_precip_hi.yml index b15d25823..066ace7be 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_hawaii/ana_past_72hr_accum_precip_hi.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_hawaii/ana_past_72hr_accum_precip_hi.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: past_72hr_accum_precip file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_hawaii/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.hawaii.nc - file_step: 1H + file_step: PT1H file_window: P3D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_puertorico/ana_past_72hr_accum_precip_prvi.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_puertorico/ana_past_72hr_accum_precip_prvi.yml index 39bff21ff..405a6c171 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_puertorico/ana_past_72hr_accum_precip_prvi.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/forcing_analysis_assim_puertorico/ana_past_72hr_accum_precip_prvi.yml @@ -6,7 +6,7 @@ run: true raster_input_files: product_file: past_72hr_accum_precip file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_puertorico/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.puertorico.nc - file_step: 1H + file_step: PT1H file_window: P3D postprocess_sql: diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_max_high_water_probability.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_max_high_water_probability.yml index f329b6922..b34d466de 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_max_high_water_probability.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_max_high_water_probability.yml @@ -5,7 +5,7 @@ run: true python_preprocessing: - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc - file_step: 1H + file_step: PT1H file_window: PT7H product: high_water_probability lambda_ram: 3gb diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_rapid_onset_flooding_probability.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_rapid_onset_flooding_probability.yml index 1f911a455..31fcdf8dc 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_rapid_onset_flooding_probability.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_12hr_rapid_onset_flooding_probability.yml @@ -5,7 +5,7 @@ run: true python_preprocessing: - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc - file_step: 1H + file_step: PT1H file_window: PT7H product: rapid_onset_flooding_probability lambda_ram: 3gb diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml index cbee9966f..dc4a7bf44 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml @@ -8,7 +8,7 @@ run_times: # (OPTIONAL) List of reference time hours that this product will run python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda function for larger file sets (like max flows or high water probability) - file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling - file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc + file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc product: type_of_product # (REQUIRED) The product to use in the python_preprocessing lambda function. Most often this will be max_values. output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run @@ -17,7 +17,7 @@ python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda ingest_files: # (OPTIONAL) Dictionary dictating what filesets to ingest into the DB - file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling - file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc + file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc target_table: db_table_name # (REQUIRED) Name of the DB table where the fileset will be imported target_cols: ['feature_id', 'streamflow', 'velocity', 'qBucket'] # (OPTIONAL) Name of columns that will exist on the table once the file has been ingested. @@ -38,7 +38,7 @@ db_max_flows: # (OPTIONAL) List of max_flows to run in the database directly raster_input_files: # (OPTIONAL) List of fim configurations to run product_file: raster_processing_product # (REQUIRED) Name of the product file used for processing file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling - file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc + file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc fim_configs: # (OPTIONAL) List of fim configurations to run @@ -48,7 +48,7 @@ fim_configs: # (OPTIONAL) List of fim configurations to run sql_file: fim_data_prep_sql_file_name # (OPTIONAL) Name of the fim data prep sql file preprocess: # (OPTIONAL) Dictionary dictating what files to preprocess for the fim config if needed - file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling - file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc + file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run postprocess: # (OPTIONAL) Dictionary dictating what files to postprocess for the fim config if needed @@ -66,4 +66,3 @@ product_summaries: # (OPTIONAL) List of dictionaries which provides the names o services: # (REQUIRED) List of services that will be published after the product data is processed - service_name_1 # (REQUIRED) Name of service to be published -