Skip to content

Commit

Permalink
ANA Anomaly Lambda Storage Fix (#627)
Browse files Browse the repository at this point in the history
This is a super simple bug fix that changes the way the ANA Anomaly
loops through files in the python_preprocessing lambda function.
Downloaded NWM files are now deleted as data is loaded into a dataframe
in memory, so that ephemeral storage does not run out. RAM still seems
stable at about ~7GB used (of 10).

This was done because recent heavier weather was causing this function
to fail due to no space left on device.
  • Loading branch information
TylerSchrag-NOAA authored and nickchadwick-noaa committed Feb 21, 2024
1 parent ed76fa3 commit f7fa6e8
Showing 1 changed file with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
import boto3
import os
from viz_lambda_shared_funcs import check_if_file_exists, organize_input_files
from viz_lambda_shared_funcs import check_if_file_exists

INSUFFICIENT_DATA_ERROR_CODE = -9998
PERCENTILE_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_5th_perc.nc"
Expand Down Expand Up @@ -43,30 +43,31 @@ def run_anomaly(reference_time, fileset_bucket, fileset, output_file_bucket, out
percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_95TH, download=True, download_subfolder=download_subfolder)
else:
raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files")

print("Downloading NWM Data")
input_files = organize_input_files(fileset_bucket, fileset, download_subfolder=reference_time.strftime('%Y%m%d'))

#Get NWM version from first file
with xr.open_dataset(input_files[0]) as first_file:
first_file_path = check_if_file_exists(fileset_bucket, fileset[0], download=True, download_subfolder=reference_time.strftime('%Y%m%d'))
with xr.open_dataset(first_file_path) as first_file:
nwm_vers = first_file.NWM_version_number.replace("v","")
os.remove(first_file_path)

# Import Feature IDs
# Loop through filepaths, download file, and import data into pandas - we have to delete files as we go on anomaly, or else the lambda storage will fill up.
print("-->Looping through files to get streamflow sum")
df = pd.DataFrame()
for file in input_files:
ds_file = xr.open_dataset(file)
df_file = ds_file['streamflow'].to_dataframe()
df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs

if df.empty:
df = df_file
df = df.rename(columns={"streamflow": "streamflow_sum"})
else:
df['streamflow_sum'] += df_file['streamflow']
os.remove(file)
for file in fileset:
download_path = check_if_file_exists(fileset_bucket, file, download=True, download_subfolder=reference_time.strftime('%Y%m%d'))

with xr.open_dataset(download_path) as ds_file:
df_file = ds_file['streamflow'].to_dataframe()
df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs

if df.empty:
df = df_file
df = df.rename(columns={"streamflow": "streamflow_sum"})
else:
df['streamflow_sum'] += df_file['streamflow']
os.remove(download_path)

df[average_flow_col] = df['streamflow_sum'] / len(input_files)
df[average_flow_col] = df['streamflow_sum'] / len(fileset)
df = df.drop(columns=['streamflow_sum'])
df[average_flow_col] = df[average_flow_col].round(2)

Expand Down

0 comments on commit f7fa6e8

Please sign in to comment.