Skip to content

Commit

Permalink
Cached FIM - Part 1b - Removal of Redshift & Other Optimizations / Fi…
Browse files Browse the repository at this point in the history
…xes (#620)

This PR marks another iterative step towards stabilizing the new cached
FIM implementation. It includes several minor optimizations and fixes,
with a couple major pivots to the infrastructure / workflow:

**Major Changes:**
- **Switch from HydroID, HUC8 & Branch indexing of HAND data to a new
unique 'HAND_ID' integer.** - We may need to update this moving forward
to match a coordinated effort on the FIM Dev team, but switching to a
single unique integer ID speeds up the database operations
significantly, as well as simplifying the scripting and join logic.

- **Removal of Redshift Data Warehouse, Implementing in RDS now
instead** - I initially chose Redshift for this feature after testing a
prototype in early 2023 that was too large for RDS to run. That
prototype was based on the initial [bad] plan that we would write a
process that would preprocess all ~440 million HAND hydrotable
geometries in advance (all steps of the synthetic rating curves), and
would need the infrastructure to query that entire dataset efficiently
enough for VPP pipelines. After testing our new lazy loading approach
for several weeks (a much better idea, suggested by Corey), it has
become apparent that most pipelines utilize a very small portion of the
full hydrotables, and with the HAND_ID optimization, RDS is likely up to
the task of handling these cache workflows just fine.

While I wish I would have thought of these considerations earlier on and
saved the work of trying out Redshift fully to begin with, I think this
is ultimately a great pivot that dramatically simplifies and stabilizes
this Cached FIM enhancement. The previous PRs of this series can serve
as a reference for the team should they decide to utilize Redshift in
the future... but it is worth noting that I still hadn't completely
sorted out some issues that Redshift was having with some of the more
complex FIM geometries that weren't included in my initial testing last
year - that may end up being a full on deal breaker with Redshift
(Aurora may be worth a try first if/when scaling the RDS instance isn't
a good option any longer).

**Other Noteworthy Edits:**
- **Public FIM Clipping Optimization** - I changed the code to use a
lookup on the derived.channels table to determine which reaches are in
the public subset, instead of doing a spatial join with every pipeline
(this was the thing causing MRF to fail during heavy mid-January
weather). This is much more performant... but doesn't clip the FIM
extent shapes to the exact border of the public domain, which is an
enhancement that Corey added after specifically being asked by someone.
I'll try to see if there is a way to do that in a more optimal way, but
this will at least keep those pipelines from failing under load.

**Deployment / DB Dump Considerations:**
- I've added several version 2.1.5 DB dump files to the deployed folder
in hydrovis-ti-deployment-us-east-1 (we could move them outside of this
folder, if we want to test those during deployment, of course).
- These will need to be added to the UAT S3 folders before deployment
there.
- I purposely avoided any changes to the ArcGIS mapx files... so
hopefully the SD creation script on the EC2 won't cause issues for this.
Hopefully.

**Update:**
I've added a bunch of misc. fixes and clean-up to this branch while it's
been waiting deployment, and think I have resolved all issues with the
regular operational pipelines (at least that I know of). I still need to
fully implement AEP and CatFIM pipelines, but these should not hold up
deployment to UAT, since they are only run as one-off products anyways
(will need to be updated next for NWM 3.0 Recurrence Flow Updates and/or
next FIM Version update). I'm planning to wrap that up in early
February.

---------

Co-authored-by: Nick Chadwick <[email protected]>
  • Loading branch information
TylerSchrag-NOAA and nickchadwick-noaa authored Jan 25, 2024
1 parent eb7ef8d commit 1350ac7
Show file tree
Hide file tree
Showing 77 changed files with 512 additions and 1,038 deletions.
73 changes: 0 additions & 73 deletions Core/EC2/RDSBastion/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,6 @@ variable "viz_db_name" {
type = string
}

variable "viz_redshift_master_secret_string" {
type = string
}

variable "viz_redshift_address" {
type = string
}

variable "viz_redshift_port" {
type = string
}

variable "viz_redshift_name" {
type = string
}

variable "viz_redshift_iam_role_arn" {
type = string
}

variable "egis_db_master_secret_string" {
type = string
}
Expand Down Expand Up @@ -148,10 +128,6 @@ variable "viz_proc_dev_rw_secret_string" {
type = string
}

variable "viz_redshift_user_secret_string" {
type = string
}

variable "fim_version" {
type = string
}
Expand Down Expand Up @@ -186,13 +162,6 @@ locals {
db_username = jsondecode(var.viz_db_secret_string)["username"]
db_password = jsondecode(var.viz_db_secret_string)["password"]
}
viz_redshift = {
db_host = var.viz_redshift_address
db_port = var.viz_redshift_port
db_name = var.viz_redshift_name
db_username = jsondecode(var.viz_redshift_master_secret_string)["username"]
db_password = jsondecode(var.viz_redshift_master_secret_string)["password"]
}
egis = {
db_host = var.egis_db_address
db_port = var.egis_db_port
Expand Down Expand Up @@ -367,29 +336,6 @@ data "cloudinit_config" "startup" {
})
}

part {
content_type = "text/x-shellscript"
filename = "2b_viz_redshift_setup.sh"
content = templatefile("${path.module}/scripts/viz/redshift_setup.sh.tftpl", {
viz_redshift_name = local.dbs["viz_redshift"]["db_name"]
viz_redshift_host = local.dbs["viz_redshift"]["db_host"]
viz_redshift_port = local.dbs["viz_redshift"]["db_port"]
viz_redshift_master_username = local.dbs["viz_redshift"]["db_username"]
viz_redshift_master_password = local.dbs["viz_redshift"]["db_password"]
viz_redshift_user_username = jsondecode(var.viz_redshift_user_secret_string)["username"]
viz_redshift_user_password = jsondecode(var.viz_redshift_user_secret_string)["password"]
viz_redshift_iam_role_arn = var.viz_redshift_iam_role_arn
viz_db_name = local.dbs["viz"]["db_name"]
viz_db_host = local.dbs["viz"]["db_host"]
viz_db_port = local.dbs["viz"]["db_port"]
viz_db_username = local.dbs["viz"]["db_username"]
viz_db_password = local.dbs["viz"]["db_password"]
viz_proc_admin_rw_secret_arn = var.viz_proc_admin_rw_secret_arn
viz_proc_admin_rw_username = jsondecode(var.viz_proc_admin_rw_secret_string)["username"]
viz_proc_admin_rw_password = jsondecode(var.viz_proc_admin_rw_secret_string)["password"]
})
}

part {
content_type = "text/x-shellscript"
filename = "3_viz_restore_db_dumps.sh"
Expand Down Expand Up @@ -447,25 +393,6 @@ data "cloudinit_config" "startup" {
})
}

part {
content_type = "text/x-shellscript"
filename = "4c_viz_setup_fdw_to_redshift.sh"
content = templatefile("${path.module}/scripts/utils/setup_db_link_fdw_to_redshift.tftpl", {
db_name = local.dbs["viz"]["db_name"]
db_host = local.dbs["viz"]["db_host"]
db_port = local.dbs["viz"]["db_port"]
db_username = local.dbs["viz"]["db_username"]
db_password = local.dbs["viz"]["db_password"]
db_admin_mapping = jsondecode(var.viz_db_secret_string)["username"]
db_user_mapping = jsondecode(var.viz_proc_admin_rw_secret_string)["username"]
viz_redshift_name = local.dbs["viz_redshift"]["db_name"]
viz_redshift_host = local.dbs["viz_redshift"]["db_host"]
viz_redshift_port = local.dbs["viz_redshift"]["db_port"]
viz_redshift_user_username = jsondecode(var.viz_redshift_user_secret_string)["username"]
viz_redshift_user_password = jsondecode(var.viz_redshift_user_secret_string)["password"]
})
}

part {
content_type = "text/x-shellscript"
filename = "5_egis_postgresql_setup.sh"
Expand Down

This file was deleted.

7 changes: 0 additions & 7 deletions Core/EC2/RDSBastion/scripts/viz/postgresql_setup.sh.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ rm "$${postgres_data_folder}/postgis_setup.sql"
echo "Setting up aws_s3..."
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;"

echo "Setting up db_link..."
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "CREATE EXTENSION IF NOT EXISTS dblink CASCADE;"

# Set password encryption to md5 - we need this to play nice with Redshift foreign data connections
echo "Setting password encyption to md5"
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "SET password_encryption = 'md5';"

# Adding users to Viz DB
echo "Adding viz proc user..."
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "CREATE ROLE ${viz_proc_admin_rw_username};"
Expand Down
27 changes: 0 additions & 27 deletions Core/EC2/RDSBastion/scripts/viz/redshift_setup.sh.tftpl

This file was deleted.

38 changes: 0 additions & 38 deletions Core/IAM/Roles/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ variable "nws_shared_account_s3_bucket" {
type = string
}

variable "viz_proc_admin_rw_secret_arn" {
type = string
}

# Autoscaling Role
resource "aws_iam_service_linked_role" "autoscaling" {
aws_service_name = "autoscaling.amazonaws.com"
Expand Down Expand Up @@ -203,36 +199,6 @@ resource "aws_iam_role_policy" "rds_s3_export" {
})
}

# Redshift Role
resource "aws_iam_role" "redshift" {
name = "hv-vpp-${var.environment}-${var.region}-redshift"

assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "redshift.amazonaws.com"
}
},
]
})
}

resource "aws_iam_role_policy" "redshift" {
name = "hv-vpp-${var.environment}-${var.region}-redshift"
role = aws_iam_role.redshift.id
policy = templatefile("${path.module}/redshift.json.tftpl", {
environment = var.environment
account_id = var.account_id
region = var.region
viz_proc_admin_rw_secret_arn = var.viz_proc_admin_rw_secret_arn
})
}

# data-services Role
resource "aws_iam_role" "data_services" {
name = "hv-vpp-${var.environment}-${var.region}-data-services"
Expand Down Expand Up @@ -449,10 +415,6 @@ output "role_rds_s3_export" {
value = aws_iam_role.rds_s3_export
}

output "role_redshift" {
value = aws_iam_role.redshift
}

output "profile_data_services" {
value = aws_iam_instance_profile.data_services
}
Expand Down
28 changes: 0 additions & 28 deletions Core/IAM/Roles/redshift.json.tftpl

This file was deleted.

2 changes: 1 addition & 1 deletion Core/LAMBDA/layers/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ output "requests" {
}

output "yaml" {
value = resource.aws_lambda_layer_version.dask
value = resource.aws_lambda_layer_version.yaml
}

output "dask" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ basic_output AS (
base.hydro_id,
base.nws_station_id,
base.time,
COALESCE(ana.maxflow_1hour_cms, fcst.flow_cms) as flow_cms
COALESCE(ana.discharge_cms, fcst.flow_cms) as flow_cms
FROM timeslice_base base
LEFT JOIN rnr.domain_forecasts fcst
ON fcst.valid_time = base.time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def lambda_handler(event, context):

# Upload zero_stage reaches for tracking / FIM cache
print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data
df_zero_stage_records['branch'] = int(branch)
df_zero_stage_records['huc8'] = int(huc8)
df_zero_stage_records.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=True)
df_zero_stage_records = df_zero_stage_records.reset_index()
df_zero_stage_records.drop(columns=['hydro_id','feature_id'], inplace=True)
df_zero_stage_records.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False)

# If no features with above zero stages are present, then just copy an unflood raster instead of processing nothing
if stage_lookup.empty:
Expand All @@ -107,16 +107,15 @@ def lambda_handler(event, context):
df_inundation = create_inundation_output(huc8, branch, stage_lookup, reference_time, input_variable)

# Split geometry into seperate table per new schema
df_inundation_geo = df_inundation[['hydro_id', 'feature_id', 'huc8', 'branch', 'rc_stage_ft', 'geom']]
df_inundation_geo = df_inundation[['hand_id', 'rc_stage_ft', 'geom']]
df_inundation.drop(columns=['geom'], inplace=True)

# If records exist in stage_lookup that don't exist in df_inundation, add those to the zero_stage table.
df_no_inundation = stage_lookup.merge(df_inundation.drop_duplicates(), on=['feature_id','hydro_id'],how='left',indicator=True)
df_no_inundation = stage_lookup.merge(df_inundation.drop_duplicates(), on=['hand_id'],how='left',indicator=True)
df_no_inundation = df_no_inundation.loc[df_no_inundation['_merge'] == 'left_only']
if df_no_inundation.empty == False:
df_no_inundation.drop(df_no_inundation.columns.difference(['hydro_id','feature_id','huc8','branch','rc_discharge_cms','note']), axis=1, inplace=True)
df_no_inundation['branch'] = int(branch)
df_no_inundation['huc8'] = int(huc8)
print(f"Adding {len(df_no_inundation)} reaches with NaN inundation to zero_stage table")
df_no_inundation.drop(df_no_inundation.columns.difference(['hand_id','rc_discharge_cms','note']), axis=1, inplace=True)
df_no_inundation['note'] = "Error - No inundation returned from hand processing."
df_no_inundation.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False)
# If no records exist for valid inundation, stop.
Expand All @@ -125,6 +124,7 @@ def lambda_handler(event, context):

print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
try:
df_inundation.drop(columns=['hydro_id', 'feature_id'], inplace=True)
df_inundation.to_sql(db_table, con=process_db.engine, schema=db_schema, if_exists='append', index=False)
df_inundation_geo.to_postgis(f"{db_table}_geo", con=process_db.engine, schema=db_schema, if_exists='append')
except Exception as e:
Expand Down Expand Up @@ -452,8 +452,6 @@ def process(window):
df_final = df_final.rename(columns={"index": "hydro_id"})
df_final['fim_version'] = FIM_VERSION
df_final['reference_time'] = reference_time
df_final['huc8'] = huc8
df_final['branch'] = branch
df_final['forecast_stage_ft'] = round(df_final['stage_m'] * 3.28084, 2)
df_final['prc_method'] = 'HAND_Processing'

Expand Down
25 changes: 3 additions & 22 deletions Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,6 @@ variable "egis_db_name" {
type = string
}

variable "viz_redshift_host" {
description = "Hostname of the viz data warehouse redshift cluster."
type = string
}

variable "viz_redshift_db_name" {
description = "DB Name of the viz data warehouse redshift cluster."
type = string
}

variable "viz_db_user_secret_string" {
description = "The secret string of the viz_processing data base user to write/read data as."
type = string
Expand All @@ -134,11 +124,6 @@ variable "egis_db_user_secret_string" {
type = string
}

variable "viz_redshift_user_secret_string" {
description = "The secret string of the viz_processing data base user to write/read data as."
type = string
}

variable "egis_portal_password" {
description = "The password for the egis portal user to publish as."
type = string
Expand Down Expand Up @@ -412,7 +397,7 @@ resource "aws_s3_object" "python_preprocessing_zip_upload" {
#### 3GB RAM Version ####
#########################
resource "aws_lambda_function" "viz_python_preprocessing_3GB" {
function_name = "hv-vpp-${var.environment}-viz-python-preprocessing-3GB"
function_name = "hv-vpp-${var.environment}-viz-python-preprocessing"
description = "Lambda function to create max streamflow files for NWM data"
memory_size = 3072
ephemeral_storage {
Expand Down Expand Up @@ -455,7 +440,7 @@ resource "aws_lambda_function" "viz_python_preprocessing_3GB" {
]

tags = {
"Name" = "hv-vpp-${var.environment}-viz_python_preprocessing_3GB"
"Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-3GB"
}
}

Expand Down Expand Up @@ -506,7 +491,7 @@ resource "aws_lambda_function" "viz_python_preprocessing_10GB" {
]

tags = {
"Name" = "hv-vpp-${var.environment}-viz_python_preprocessing_10GB"
"Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-10GB"
}
}

Expand Down Expand Up @@ -641,10 +626,6 @@ resource "aws_lambda_function" "viz_db_postprocess_sql" {
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
REDSHIFT_DB_DATABASE = var.viz_redshift_db_name
REDSHIFT_DB_HOST = var.viz_redshift_host
REDSHIFT_DB_USERNAME = jsondecode(var.viz_redshift_user_secret_string)["username"]
REDSHIFT_DB_PASSWORD = jsondecode(var.viz_redshift_user_secret_string)["password"]
}
}
s3_bucket = aws_s3_object.db_postprocess_sql_zip_upload.bucket
Expand Down
Loading

0 comments on commit 1350ac7

Please sign in to comment.