Skip to content

Commit

Permalink
viz shared functions cleanup (#1043)
Browse files Browse the repository at this point in the history
Some improvements to the shared lambda functions.
1. `file_step` changes to ISO8601 formatting for duration
2. Remove pandas dependency in `generate_file_list`
3. Made tokenizing regex more robust
  • Loading branch information
groutr authored Jan 16, 2025
1 parent 27c5919 commit 0f4007f
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,10 @@ def parse_range_token_value(reference_date_file, range_token, existing_list = []


def get_file_tokens(file_pattern):
token_dict = {}
tokens = re.findall("{{[a-z]*:[^{]*}}", file_pattern)
tokens = re.findall("{{(?P<key>[a-z])+:(?P<value>[^{])+}}", file_pattern)
token_dict = {'datetime': [], 'range': [], 'variable': []}
for token in tokens:
token_key = token.split(":")[0][2:]
token_value = token.split(":")[1][:-2]

token_dict[token_key].append(token_value)

for (key, value) in tokens:
token_dict[key].append(value)
return token_dict

def parse_datetime_token_value(input_file, reference_date, datetime_token):
Expand Down Expand Up @@ -726,17 +721,17 @@ def get_formatted_files(file_pattern, token_dict, reference_date):
return reference_date_files

def generate_file_list(file_pattern, file_step, file_window, reference_time):
import pandas as pd
import isodate
file_list = []

file_list = []
if 'common/data/model/com/nwm/prod' in file_pattern and (datetime.today() - timedelta(29)) > reference_time:
file_pattern = file_pattern.replace('common/data/model/com/nwm/prod', 'https://storage.googleapis.com/national-water-model')

if file_window:
if not file_step:
file_step = None
reference_dates = pd.date_range(reference_time-isodate.parse_duration(file_window), reference_time, freq=file_step)
raise ValueError("file_window and file_step must be specified together")
start = reference_time - isodate.parse_duration(file_window)
reference_dates = list(date_range(start, reference_time, isodate.parse_duration(file_step)))
else:
reference_dates = [reference_time]

Expand Down Expand Up @@ -766,4 +761,11 @@ def organize_input_files(fileset_bucket, fileset, download_subfolder):
for file in fileset:
download_path = check_if_file_exists(fileset_bucket, file, download=True, download_subfolder=download_subfolder)
local_files.append(download_path)
return local_files
return local_files


def date_range(start, stop, step):
assert start < stop and start + step > start
while start + step <= stop:
yield start
start += step
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: anomaly
lambda_ram: 10gb
Expand All @@ -16,7 +16,7 @@ python_preprocessing:
target_table: ingest.ana_7day_anomaly
target_keys: (feature_id)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: anomaly
lambda_ram: 10gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: max_values
lambda_ram: 3gb
output_file: max_flows/analysis_assim/{{datetime:%Y%m%d}}/ana_7day_00_max_flows.nc
target_table: ingest.nwm_channel_rt_ana_7day_max
target_keys: (feature_id, streamflow)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: max_values
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: max_values
lambda_ram: 3gb
output_file: max_flows/analysis_assim/{{datetime:%Y%m%d}}/ana_7day_00_max_flows.nc
target_table: ingest.nwm_channel_rt_ana_7day_max
target_keys: (feature_id, streamflow)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: max_values
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: ana_past_24hr_snow_melt
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc
file_step: 1D
file_step: P1D
file_window: P1D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: ana_past_72hr_snow_water_equivalent_change
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc
file_step: 1D
file_step: P1D
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_alaska/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.alaska.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_hawaii/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.hawaii.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_puertorico/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.puertorico.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc
file_step: 1H
file_step: PT1H
file_window: PT7H
product: high_water_probability
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc
file_step: 1H
file_step: PT1H
file_window: PT7H
product: rapid_onset_flooding_probability
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ run_times: # (OPTIONAL) List of reference time hours that this product will run

python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda function for larger file sets (like max flows or high water probability)
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
product: type_of_product # (REQUIRED) The product to use in the python_preprocessing lambda function. Most often this will be max_values.
output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run
Expand All @@ -17,7 +17,7 @@ python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda

ingest_files: # (OPTIONAL) Dictionary dictating what filesets to ingest into the DB
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
target_table: db_table_name # (REQUIRED) Name of the DB table where the fileset will be imported
target_cols: ['feature_id', 'streamflow', 'velocity', 'qBucket'] # (OPTIONAL) Name of columns that will exist on the table once the file has been ingested.
Expand All @@ -38,7 +38,7 @@ db_max_flows: # (OPTIONAL) List of max_flows to run in the database directly
raster_input_files: # (OPTIONAL) List of fim configurations to run
product_file: raster_processing_product # (REQUIRED) Name of the product file used for processing
file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc

fim_configs: # (OPTIONAL) List of fim configurations to run
Expand All @@ -48,7 +48,7 @@ fim_configs: # (OPTIONAL) List of fim configurations to run
sql_file: fim_data_prep_sql_file_name # (OPTIONAL) Name of the fim data prep sql file
preprocess: # (OPTIONAL) Dictionary dictating what files to preprocess for the fim config if needed
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run
postprocess: # (OPTIONAL) Dictionary dictating what files to postprocess for the fim config if needed
Expand All @@ -66,4 +66,3 @@ product_summaries: # (OPTIONAL) List of dictionaries which provides the names o

services: # (REQUIRED) List of services that will be published after the product data is processed
- service_name_1 # (REQUIRED) Name of service to be published

0 comments on commit 0f4007f

Please sign in to comment.