Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

viz shared functions cleanup #1043

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,10 @@ def parse_range_token_value(reference_date_file, range_token, existing_list = []


def get_file_tokens(file_pattern):
token_dict = {}
tokens = re.findall("{{[a-z]*:[^{]*}}", file_pattern)
tokens = re.findall("{{(?P<key>[a-z])+:(?P<value>[^{])+}}", file_pattern)
token_dict = {'datetime': [], 'range': [], 'variable': []}
for token in tokens:
token_key = token.split(":")[0][2:]
token_value = token.split(":")[1][:-2]

token_dict[token_key].append(token_value)

for (key, value) in tokens:
token_dict[key].append(value)
return token_dict

def parse_datetime_token_value(input_file, reference_date, datetime_token):
Expand Down Expand Up @@ -726,17 +721,17 @@ def get_formatted_files(file_pattern, token_dict, reference_date):
return reference_date_files

def generate_file_list(file_pattern, file_step, file_window, reference_time):
import pandas as pd
import isodate
file_list = []

file_list = []
if 'common/data/model/com/nwm/prod' in file_pattern and (datetime.today() - timedelta(29)) > reference_time:
file_pattern = file_pattern.replace('common/data/model/com/nwm/prod', 'https://storage.googleapis.com/national-water-model')

if file_window:
if not file_step:
file_step = None
reference_dates = pd.date_range(reference_time-isodate.parse_duration(file_window), reference_time, freq=file_step)
raise ValueError("file_window and file_step must be specified together")
start = reference_time - isodate.parse_duration(file_window)
reference_dates = list(date_range(start, reference_time, isodate.parse_duration(file_step)))
else:
reference_dates = [reference_time]

Expand Down Expand Up @@ -766,4 +761,11 @@ def organize_input_files(fileset_bucket, fileset, download_subfolder):
for file in fileset:
download_path = check_if_file_exists(fileset_bucket, file, download=True, download_subfolder=download_subfolder)
local_files.append(download_path)
return local_files
return local_files


def date_range(start, stop, step):
assert start < stop and start + step > start
while start + step <= stop:
yield start
start += step
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: anomaly
lambda_ram: 10gb
Expand All @@ -16,7 +16,7 @@ python_preprocessing:
target_table: ingest.ana_7day_anomaly
target_keys: (feature_id)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: anomaly
lambda_ram: 10gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: max_values
lambda_ram: 3gb
output_file: max_flows/analysis_assim/{{datetime:%Y%m%d}}/ana_7day_00_max_flows.nc
target_table: ingest.nwm_channel_rt_ana_7day_max
target_keys: (feature_id, streamflow)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: max_values
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ run_times:

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P7D
product: max_values
lambda_ram: 3gb
output_file: max_flows/analysis_assim/{{datetime:%Y%m%d}}/ana_7day_00_max_flows.nc
target_table: ingest.nwm_channel_rt_ana_7day_max
target_keys: (feature_id, streamflow)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P14D
product: max_values
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: ana_past_24hr_snow_melt
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc
file_step: 1D
file_step: P1D
file_window: P1D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: ana_past_72hr_snow_water_equivalent_change
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.land.tm00.conus.nc
file_step: 1D
file_step: P1D
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.conus.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_alaska/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.alaska.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_hawaii/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.hawaii.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run: true
raster_input_files:
product_file: past_72hr_accum_precip
file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/forcing_analysis_assim_puertorico/nwm.t{{datetime:%H}}z.analysis_assim.forcing.tm00.puertorico.nc
file_step: 1H
file_step: PT1H
file_window: P3D

postprocess_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc
file_step: 1H
file_step: PT1H
file_window: PT7H
product: high_water_probability
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc
file_step: 1H
file_step: PT1H
file_window: PT7H
product: rapid_onset_flooding_probability
lambda_ram: 3gb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ run_times: # (OPTIONAL) List of reference time hours that this product will run

python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda function for larger file sets (like max flows or high water probability)
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
product: type_of_product # (REQUIRED) The product to use in the python_preprocessing lambda function. Most often this will be max_values.
output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run
Expand All @@ -17,7 +17,7 @@ python_preprocessing: # (OPTIONAL) List of preprocessing y) to run via a lambda

ingest_files: # (OPTIONAL) Dictionary dictating what filesets to ingest into the DB
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.f{{range:1,19,1,%03d}}.conus.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
target_table: db_table_name # (REQUIRED) Name of the DB table where the fileset will be imported
target_cols: ['feature_id', 'streamflow', 'velocity', 'qBucket'] # (OPTIONAL) Name of columns that will exist on the table once the file has been ingested.
Expand All @@ -38,7 +38,7 @@ db_max_flows: # (OPTIONAL) List of max_flows to run in the database directly
raster_input_files: # (OPTIONAL) List of fim configurations to run
product_file: raster_processing_product # (REQUIRED) Name of the product file used for processing
file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc

fim_configs: # (OPTIONAL) List of fim configurations to run
Expand All @@ -48,7 +48,7 @@ fim_configs: # (OPTIONAL) List of fim configurations to run
sql_file: fim_data_prep_sql_file_name # (OPTIONAL) Name of the fim data prep sql file
preprocess: # (OPTIONAL) Dictionary dictating what files to preprocess for the fim config if needed
- file_format: file/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to data with regex type format for dynamic handling
file_step: 1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_step: PT1H # (REQUIRED) If the datetime regex is used, then this determines the time slice that we want. 1H means hourly files, 6H means file every 6 hours, etc
file_window: P7D # (REQUIRED) If the datetime regex is used, then this determines the time range that we want. P1H means the past hour, P7D means the past 7 days, etc
output_file: output/path/to/date.{{datetime:%Y%m%d}}/data.nc # (REQUIRED) File path to output data. The datetime regex will end up being the reference time of the product run
postprocess: # (OPTIONAL) Dictionary dictating what files to postprocess for the fim config if needed
Expand All @@ -66,4 +66,3 @@ product_summaries: # (OPTIONAL) List of dictionaries which provides the names o

services: # (REQUIRED) List of services that will be published after the product data is processed
- service_name_1 # (REQUIRED) Name of service to be published

Loading