From 08d4a247b92e6a85e36228946ead85ec5d37b4a1 Mon Sep 17 00:00:00 2001 From: Shawn Crawley Date: Mon, 5 Aug 2024 14:51:40 -0600 Subject: [PATCH] Refactor to remove dependency on WRDS API --- Core/EC2/DataServices/main.tf | 25 +- .../scripts/utils/restore_db_from_s3.sh.tftpl | 1 + .../python/viz_classes.py | 13 + Core/LAMBDA/viz_functions/main.tf | 172 ++++--- .../max_flows/rfc_categorical_flows.sql | 2 +- .../max_flows/rfc_categorical_stages.sql | 2 +- .../rfc_based_5day_max_streamflow.sql | 2 +- .../viz_test_wrds_db/lambda_function.py | 86 ++++ .../viz_wrds_api_handler/lambda_function.py | 230 --------- .../StepFunctions/ensure_ec2_ready.json.tftpl | 106 +++++ Core/StepFunctions/main.tf | 86 +++- .../restore_db_from_s3_dump.tftpl | 36 ++ .../StepFunctions/sync_wrds_location_db.tftpl | 87 ++++ Core/SyncWrdsLocationDB/main.tf | 443 ------------------ .../wrds_location_api_tests.zip | Bin 1135 -> 0 bytes Core/main.tf | 22 +- 16 files changed, 512 insertions(+), 801 deletions(-) create mode 100644 Core/LAMBDA/viz_functions/viz_test_wrds_db/lambda_function.py delete mode 100644 Core/LAMBDA/viz_functions/viz_wrds_api_handler/lambda_function.py create mode 100644 Core/StepFunctions/ensure_ec2_ready.json.tftpl create mode 100644 Core/StepFunctions/restore_db_from_s3_dump.tftpl create mode 100644 Core/StepFunctions/sync_wrds_location_db.tftpl delete mode 100644 Core/SyncWrdsLocationDB/main.tf delete mode 100644 Core/SyncWrdsLocationDB/wrds_location_api_tests.zip diff --git a/Core/EC2/DataServices/main.tf b/Core/EC2/DataServices/main.tf index b3cd140b..1388d462 100644 --- a/Core/EC2/DataServices/main.tf +++ b/Core/EC2/DataServices/main.tf @@ -69,11 +69,6 @@ variable "private_route_53_zone" { # THIS TF CONFIG IS DEPENDANT ON A SSH KEY THAT CAN ACCESS THE WRDS VLAB REPOS locals { ssh_key_filename = "id_ed25519" - instance_name = "hv-vpp-${var.environment}-data-services" - instance_names = [local.instance_name, format("%s-for-tests", local.instance_name)] - logging_application_name = "data_services" - logging_application_names = [local.logging_application_name, format("test_%s", local.logging_application_name)] - location_db_names = [var.location_db_name, format("%s_ondeck", var.location_db_name)] cloudinit_config_data = { write_files = [ { @@ -99,7 +94,7 @@ locals { permissions = "0777" owner = "ec2-user:ec2-user" content = templatefile("${path.module}/templates/env/location.env.tftpl", { - db_name = "$${location_db_name}}" + db_name = var.location_db_name db_username = jsondecode(var.location_credentials_secret_string)["username"] db_password = jsondecode(var.location_credentials_secret_string)["password"] db_host = var.rds_host @@ -131,7 +126,7 @@ locals { owner = "ec2-user:ec2-user" content = templatefile("${path.module}/templates/env/forecast.env.tftpl", { db_name = var.forecast_db_name - location_db_name = "$${location_db_name}}" + location_db_name = var.location_db_name db_username = jsondecode(var.forecast_credentials_secret_string)["username"] db_password = jsondecode(var.forecast_credentials_secret_string)["password"] db_host = var.rds_host @@ -163,7 +158,6 @@ locals { # Writes the ssh key, .env files, and docker-compose.yml files to EC2 and starts the startup.sh data "cloudinit_config" "startup" { - count = 2 gzip = false base64_encode = false @@ -172,7 +166,7 @@ data "cloudinit_config" "startup" { filename = "cloud-config.yaml" content = <<-END #cloud-config - ${jsonencode(jsondecode(replace(tostring(jsonencode(local.cloudinit_config_data)), "$${location_db_name}}", tostring(local.location_db_names[count.index]))))} + ${jsonencode(local.cloudinit_config_data)} END } @@ -185,7 +179,7 @@ data "cloudinit_config" "startup" { location_api_3_0_commit = var.data_services_versions["location_api_3_0_commit"] forecast_api_2_0_commit = var.data_services_versions["forecast_api_2_0_commit"] ssh_key_filename = local.ssh_key_filename - logging_application_name = local.logging_application_names[count.index] + logging_application_name = "data_services" instance = count.index }) } @@ -193,7 +187,6 @@ data "cloudinit_config" "startup" { # EC2 Related Resources resource "aws_instance" "data_services" { - count = 2 ami = data.aws_ami.linux.id iam_instance_profile = var.ec2_instance_profile_name instance_type = "c5.xlarge" @@ -207,12 +200,12 @@ resource "aws_instance" "data_services" { } root_block_device { - encrypted = count.index == 0 - kms_key_id = count.index == 0 ? var.kms_key_arn : "" + encrypted = true + kms_key_id = var.kms_key_arn } tags = { - Name = local.instance_names[count.index] + Name = "hv-vpp-${var.environment}-data-services" OS = "Linux" } @@ -245,7 +238,3 @@ data "aws_ami" "linux" { output "dns_name" { value = aws_route53_record.hydrovis.name } - -output "dataservices-test-instance-id" { - value = aws_instance.data_services[1].id -} diff --git a/Core/EC2/RDSBastion/scripts/utils/restore_db_from_s3.sh.tftpl b/Core/EC2/RDSBastion/scripts/utils/restore_db_from_s3.sh.tftpl index ac3c0d19..00f0d71f 100644 --- a/Core/EC2/RDSBastion/scripts/utils/restore_db_from_s3.sh.tftpl +++ b/Core/EC2/RDSBastion/scripts/utils/restore_db_from_s3.sh.tftpl @@ -24,6 +24,7 @@ elif [ "$db_instance_tag" == "viz" ]; then fi aws s3 cp $s3_uri /tmp/db.sql.gz && \ +psql -c "DROP DATABASE IF EXISTS $db_name;" && \ psql -c "CREATE DATABASE $db_name;" && \ cat /tmp/db.sql.gz | gunzip | psql $db_name && \ rm /tmp/db.sql.gz diff --git a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py index 6c36f1c3..f7933d4e 100644 --- a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py +++ b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py @@ -100,6 +100,19 @@ def run_sql_file_in_db(self, sql_file): except Exception as e: raise e self.connection.close() + + ################################### + def execute_sql(self, sql): + if sql.endswith('.sql') and os.path.exists(sql): + sql = open(sql, 'r').read() + with self.connection: + try: + with self.connection.cursor() as cur: + print(f"---> Running provided SQL:\n{sql}") + cur.execute(sql) + except Exception as e: + raise e + self.connection.close() ################################### def run_sql_in_db(self, sql, return_geodataframe=False): diff --git a/Core/LAMBDA/viz_functions/main.tf b/Core/LAMBDA/viz_functions/main.tf index 2aa16463..256cf2c9 100644 --- a/Core/LAMBDA/viz_functions/main.tf +++ b/Core/LAMBDA/viz_functions/main.tf @@ -72,10 +72,6 @@ variable "db_lambda_security_groups" { type = list(any) } -variable "nat_sg_group" { - type = string -} - variable "db_lambda_subnets" { description = "Subnets to use for the db-pipeline lambdas." type = list(any) @@ -119,11 +115,26 @@ variable "viz_db_user_secret_string" { type = string } +variable "viz_db_suser_seceret_string" { + description = "The secret string of the viz_processing data base superuser to write/read data as." + type = string +} + variable "egis_db_user_secret_string" { description = "The secret string for the egis rds database." type = string } +variable "wrds_db_host" { + description = "Hostname of the viz processing RDS instance." + type = string +} + +variable "wrds_db_user_secret_string" { + description = "The secret string of the viz_processing data base user to write/read data as." + type = string +} + variable "egis_portal_password" { description = "The password for the egis portal user to publish as." type = string @@ -169,10 +180,6 @@ variable "viz_lambda_shared_funcs_layer" { type = string } -variable "dataservices_host" { - type = string -} - variable "viz_pipeline_step_function_arn" { type = string } @@ -207,85 +214,6 @@ locals { ]) } -######################################################################################################################################## -######################################################################################################################################## - -############################### -## WRDS API Handler Function ## -############################### -data "archive_file" "wrds_api_handler_zip" { - type = "zip" - - source_file = "${path.module}/viz_wrds_api_handler/lambda_function.py" - - output_path = "${path.module}/temp/viz_wrds_api_handler_${var.environment}_${var.region}.zip" -} - -resource "aws_s3_object" "wrds_api_handler_zip_upload" { - bucket = var.deployment_bucket - key = "terraform_artifacts/${path.module}/viz_wrds_api_handler.zip" - source = data.archive_file.wrds_api_handler_zip.output_path - source_hash = filemd5(data.archive_file.wrds_api_handler_zip.output_path) -} - -resource "aws_lambda_function" "viz_wrds_api_handler" { - function_name = "hv-vpp-${var.environment}-viz-wrds-api-handler" - description = "Lambda function to ping WRDS API and format outputs for processing." - memory_size = 512 - timeout = 900 - vpc_config { - security_group_ids = [var.nat_sg_group] - subnet_ids = var.db_lambda_subnets - } - environment { - variables = { - DATASERVICES_HOST = var.dataservices_host - PYTHON_PREPROCESSING_BUCKET = var.python_preprocessing_bucket - PROCESSED_OUTPUT_PREFIX = "max_stage/ahps" - INITIALIZE_PIPELINE_FUNCTION = aws_lambda_function.viz_initialize_pipeline.arn - } - } - s3_bucket = aws_s3_object.wrds_api_handler_zip_upload.bucket - s3_key = aws_s3_object.wrds_api_handler_zip_upload.key - source_code_hash = filebase64sha256(data.archive_file.wrds_api_handler_zip.output_path) - runtime = "python3.9" - handler = "lambda_function.lambda_handler" - role = var.lambda_role - layers = [ - var.arcgis_python_api_layer, - var.es_logging_layer, - var.viz_lambda_shared_funcs_layer - ] - tags = { - "Name" = "hv-vpp-${var.environment}-viz-wrds-api-handler" - } -} - -resource "aws_cloudwatch_event_target" "check_lambda_every_five_minutes" { - rule = var.five_minute_trigger.name - target_id = aws_lambda_function.viz_initialize_pipeline.function_name - arn = aws_lambda_function.viz_initialize_pipeline.arn - input = "{\"configuration\":\"rfc\"}" -} - -resource "aws_lambda_permission" "allow_cloudwatch_to_call_check_lambda" { - statement_id = "AllowExecutionFromCloudWatch" - action = "lambda:InvokeFunction" - function_name = aws_lambda_function.viz_wrds_api_handler.function_name - principal = "events.amazonaws.com" - source_arn = var.five_minute_trigger.arn -} - -resource "aws_lambda_function_event_invoke_config" "viz_wrds_api_handler" { - function_name = resource.aws_lambda_function.viz_wrds_api_handler.function_name - maximum_retry_attempts = 0 - destination_config { - on_failure { - destination = var.email_sns_topics["viz_lambda_errors"].arn - } - } -} - ################################## ## EGIS Health Checker Function ## ################################## @@ -970,6 +898,72 @@ resource "aws_lambda_function_event_invoke_config" "viz_publish_service_destinat } } + +###################### +## VIZ TEST WRDS DB ## +###################### +data "archive_file" "viz_test_wrds_db_zip" { + type = "zip" + output_path = "${path.module}/temp/test_sql_${var.environment}_${var.region}.zip" + + source { + content = file("${path.module}/viz_test_wrds_db/lambda_function.py") + filename = "lambda_function.py" + } + + dynamic "source" { + for_each = fileset("${path.module}", "**/*.sql") + content { + content = file("${path.module}/${source.key}") + filename = basename(source.key) + } + } +} + +resource "aws_s3_object" "viz_test_wrds_db_upload" { + bucket = var.deployment_bucket + key = "terraform_artifacts/${path.module}/viz_update_egis_data.zip" + source = data.archive_file.viz_test_wrds_db_zip.output_path + source_hash = filemd5(data.archive_file.viz_test_wrds_db_zip.output_path) +} + +resource "aws_lambda_function" "viz_test_wrds_db" { + function_name = "hv-vpp-${var.environment}-viz-test-wrds-db" + description = "Lambda function to test the wrds_location3_ondeck db before it is swapped for the live version" + timeout = 900 + memory_size = 5000 + vpc_config { + security_group_ids = var.db_lambda_security_groups + subnet_ids = var.db_lambda_subnets + } + environment { + variables = { + WRDS_DB_HOST = var.wrds_db_host + WRDS_DB_USERNAME = jsondecode(var.wrds_db_user_secret_string)["username"] + WRDS_DB_PASSWORD = jsondecode(var.wrds_db_user_secret_string)["password"] + VIZ_DB_DATABASE = var.viz_db_name + VIZ_DB_HOST = var.viz_db_host + VIZ_DB_USERNAME = jsondecode(var.viz_db_suser_seceret_string)["username"] + VIZ_DB_PASSWORD = jsondecode(var.viz_db_suser_seceret_string)["password"] + } + } + s3_bucket = aws_s3_object.viz_test_wrds_db_upload.bucket + s3_key = aws_s3_object.viz_test_wrds_db_upload.key + source_code_hash = filebase64sha256(data.archive_file.viz_test_wrds_db_zip.output_path) + runtime = "python3.9" + handler = "lambda_function.lambda_handler" + role = var.lambda_role + layers = [ + var.psycopg2_sqlalchemy_layer, + var.viz_lambda_shared_funcs_layer + ] + tags = { + "Name" = "hv-vpp-${var.environment}-viz-test-wrds-db" + } +} + + + ######################### ## Image Based Lambdas ## ######################### @@ -1037,8 +1031,8 @@ output "publish_service" { value = aws_lambda_function.viz_publish_service } -output "wrds_api_handler" { - value = aws_lambda_function.viz_wrds_api_handler +output "test_wrds_db" { + value = aws_lambda_function.viz_test_wrds_db } output "egis_health_checker" { diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_flows.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_flows.sql index a46bb92a..f0756be2 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_flows.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_flows.sql @@ -3,7 +3,7 @@ DROP TABLE IF EXISTS cache.rfc_categorical_flows; -- Create temporary routelink tables (dropped at end) SELECT * INTO ingest.nwm_routelink -FROM external.nwm_routelink; +FROM external.nwm_routelink_3_0_conus; SELECT main.nwm_feature_id, diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_stages.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_stages.sql index 79a75874..353b2064 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_stages.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/max_flows/rfc_categorical_stages.sql @@ -4,7 +4,7 @@ DROP TABLE IF EXISTS cache.rfc_categorical_stages; -- calculation SELECT * INTO ingest.routelink -FROM external.nwm_routelink; +FROM external.nwm_routelink_3_0_conus; SELECT main.nwm_feature_id, diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/replace_route/rfc_based_5day_max_streamflow.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/replace_route/rfc_based_5day_max_streamflow.sql index 90fb8641..e5b98cc9 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/replace_route/rfc_based_5day_max_streamflow.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/replace_route/rfc_based_5day_max_streamflow.sql @@ -21,7 +21,7 @@ max_flows_station_xwalk AS ( ELSE FALSE END AS is_waterbody FROM cache.max_flows_rnr mf - LEFT JOIN external.nwm_routelink rl + LEFT JOIN external.nwm_routelink_3_0_conus rl ON rl.nwm_feature_id = mf.feature_id LEFT JOIN rnr.nwm_crosswalk xwalk ON xwalk.nwm_feature_id = mf.feature_id diff --git a/Core/LAMBDA/viz_functions/viz_test_wrds_db/lambda_function.py b/Core/LAMBDA/viz_functions/viz_test_wrds_db/lambda_function.py new file mode 100644 index 00000000..23be49e5 --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_test_wrds_db/lambda_function.py @@ -0,0 +1,86 @@ +from pathlib import Path +import re +import os + +from viz_classes import database + + +THIS_DIR = Path(__file__).parent +FILES_DIR = THIS_DIR / 'files' +IGNORE_FILES = ['dba_stuff.sql'] +IGNORE_TABLES = ['building_footprints_fema'] + +def lambda_handler(event, context): + db = database(db_type='viz') + connection = db.get_db_connection() + + wrds_db_host = os.getenv('WRDS_DB_HOST') + wrds_db_username = os.getenv('WRDS_DB_USERNAME') + wrds_db_password = os.getenv('WRDS_DB_PASSWORD') + + # SETUP FOREIGN SCHEMA POINTING TO wrds_location3_ondeck DB + sql = f''' + DROP SERVER IF EXISTS test_wrds_location CASCADE; + DROP SCHEMA IF EXISTS automated_test CASCADE; + DROP SCHEMA IF EXISTS test_external CASCADE; + CREATE SCHEMA test_external; + CREATE SCHEMA automated_test; + CREATE SERVER test_wrds_location FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '{wrds_db_host}', dbname 'wrds_location3_ondeck', port '5432'); + CREATE USER MAPPING FOR {wrds_db_username} SERVER test_wrds_location OPTIONS (user '{wrds_db_username}', password '{wrds_db_password}'); + IMPORT FOREIGN SCHEMA public FROM SERVER test_wrds_location INTO test_external; + ALTER SERVER test_wrds_location OPTIONS (fetch_size '150000'); + ''' + + _execute_sql(connection, sql) + + for fname in FILES_DIR.iterdir(): + if fname.name in IGNORE_FILES: continue + with open(fname, 'r', encoding="utf8") as f: + sql = f.read() + if 'external.' not in sql: continue + + external_table_matches = set(re.findall('external\.([A-Za-z0-9_-]+)', sql, flags=re.IGNORECASE)) + if len(external_table_matches) == 1 and list(external_table_matches)[0] in IGNORE_TABLES: + print(f"Skipping {fname.name}...") + continue + + print(f"Rewriting {fname.name} for test environment...") + for table in external_table_matches: + if table not in IGNORE_TABLES: + sql = re.sub(f'external.{table}', f'test_external.{table}', sql, flags=re.IGNORECASE) + + into_matches = re.findall('INTO ([A-Za-z0-9_-]+)\.([A-Za-z0-9_-]+)', sql, flags=re.IGNORECASE) + for into_match in into_matches: + table = '.'.join(into_match) + reference_matches = re.findall(f'(FROM|JOIN) {table}', sql, flags=re.IGNORECASE) + into_replace = '' + if reference_matches: + # This means an is created in the script and then subsequently used (i.e. an intermediate table) + # Thus, this line shouldn't be replaced, but the table written to should be changed to the + # automated_test schema and the places where it is used (i.e. FROM ) should be updated + # to point to this automated_test schema as well + into_replace = f'INTO automated_test.{into_match[0]}_{into_match[1]}' + sql = re.sub(f'(FROM|JOIN) {table}\\b', f'\g<1> automated_test.{into_match[0]}_{into_match[1]}', sql, flags=re.IGNORECASE) + sql = re.sub(f'INTO {table}\\b', into_replace, sql, flags=re.IGNORECASE) + sql = re.sub(f'DROP TABLE IF EXISTS {table}\\b;?', '', sql, flags=re.IGNORECASE) + + print(f"Executing {fname.name} in test environment...") + _execute_sql(connection, sql) + + sql = f''' + DROP SERVER IF EXISTS test_wrds_location CASCADE; + DROP SCHEMA IF EXISTS automated_test CASCADE; + DROP SCHEMA IF EXISTS test_external CASCADE; + ''' + _execute_sql(connection, sql) + + connection.close() + +def _execute_sql(connection, sql): + with connection: + with connection.cursor() as cur: + try: + cur.execute(sql) + except Exception as e: + print(sql) + raise e diff --git a/Core/LAMBDA/viz_functions/viz_wrds_api_handler/lambda_function.py b/Core/LAMBDA/viz_functions/viz_wrds_api_handler/lambda_function.py deleted file mode 100644 index 69a9cf20..00000000 --- a/Core/LAMBDA/viz_functions/viz_wrds_api_handler/lambda_function.py +++ /dev/null @@ -1,230 +0,0 @@ -import requests -import os -import pandas as pd -from datetime import datetime, timedelta -import boto3 -import botocore -import json - -from viz_lambda_shared_funcs import check_s3_file_existence - -from es_logging import get_elasticsearch_logger -es_logger = get_elasticsearch_logger() - -PYTHON_PREPROCESSING_BUCKET = os.environ['PYTHON_PREPROCESSING_BUCKET'] -PROCESSED_OUTPUT_PREFIX = os.environ['PROCESSED_OUTPUT_PREFIX'] -WRDS_HOST = os.environ["DATASERVICES_HOST"] -CACHE_DAYS = os.environ.get('CACHE_DAYS') if os.environ.get('CACHE_DAYS') else 30 -INITIALIZE_PIPELINE_FUNCTION = os.environ['INITIALIZE_PIPELINE_FUNCTION'] - -s3 = boto3.client('s3') - - -def lambda_handler(event, context): - """ - The lambda handler is the function that is kicked off with the lambda. This function will take a forecast file, - extract features with streamflows above 1.5 year threshold, and then kick off lambdas for each HUC with valid - data. - - Args: - event(event object): An event is a JSON-formatted document that contains data for a Lambda function to - process - context(object): Provides methods and properties that provide information about the invocation, function, - and runtime environment - """ - reference_date = datetime.strptime(event['time'], "%Y-%m-%dT%H:%M:%SZ") - reference_time = reference_date.strftime("%Y-%m-%d %H:%M:%S") - es_logger.info(f"Retrieving AHPS forecast data for reference time {event['time']}") - - df = get_recent_rfc_forecasts() - - missing_locations = df[df['latitude'].isna()].index.tolist() - if missing_locations: - df_locations = get_location_metadata(missing_locations) - df.update(df_locations) - - df_meta = df.drop(columns=['members']) - meta_key = f"{PROCESSED_OUTPUT_PREFIX}/{reference_date.strftime('%Y%m%d')}/{reference_date.strftime('%H')}_{reference_date.strftime('%M')}_ahps_metadata.csv" - upload_df_to_s3(df_meta, PYTHON_PREPROCESSING_BUCKET, meta_key) - - df_forecasts = extract_and_flatten_rfc_forecasts(df[['members']]) - forecast_key = f"{PROCESSED_OUTPUT_PREFIX}/{reference_date.strftime('%Y%m%d')}/{reference_date.strftime('%H')}_{reference_date.strftime('%M')}_ahps_forecasts.csv" - upload_df_to_s3(df_forecasts, PYTHON_PREPROCESSING_BUCKET, forecast_key) - - es_logger.info(f"Triggering DB ingest for ahps forecast and metadata for {reference_time}") - trigger_db_ingest("ahps", reference_time, PYTHON_PREPROCESSING_BUCKET, forecast_key) - - cleanup_cache(reference_date) - -def get_recent_rfc_forecasts(hour_range=48): - ahps_call = f"http://{WRDS_HOST}/api/rfc_forecast/v2.0/forecast/stage/nws_lid/all/?returnNewestForecast=true&excludePast=true&minForecastStatus=action" # noqa - print(f"Fetching AHPS forecast data from {ahps_call}") - res = requests.get(ahps_call) - - print("Parsing forecast data") - res = res.json() - - df = pd.DataFrame(res['forecasts']) - df['issuedTime'] = pd.to_datetime(df['issuedTime'], format='%Y-%m-%dT%H:%M:%SZ') - - date_range = datetime.utcnow() - timedelta(hours=48) - df = df[pd.to_datetime(df['issuedTime']) > date_range] - - df = pd.concat([df.drop(['location'], axis=1), df['location'].apply(pd.Series)], axis=1) - df = pd.concat([df.drop(['names'], axis=1), df['names'].apply(pd.Series)], axis=1) - df = pd.concat([df.drop(['nws_coordinates'], axis=1), df['nws_coordinates'].apply(pd.Series)], axis=1) - df = df.drop(['units'], axis=1) - df = pd.concat([df.drop(['thresholds'], axis=1), df['thresholds'].apply(pd.Series)], axis=1) - - rename_columns = { - "action": "action_threshold", - "minor": "minor_threshold", - "moderate": "moderate_threshold", - "major": "major_threshold", - "record": "record_threshold", - "stage": "unit", - "nwsLid": "nws_lid", - "usgsSiteCode": "usgs_sitecode", - "nwsName": "nws_name", - "usgsName": "usgs_name", - "nwm_feature_id": "feature_id" - } - df = df.rename(columns=rename_columns) - - columns_to_keep = [ - "producer", "issuer", "issuedTime", "generationTime", "members", "nws_lid", - "usgs_sitecode", "feature_id", "nws_name", "usgs_name", "latitude", - "longitude", "units", "action_threshold", "minor_threshold", "moderate_threshold", - "major_threshold", "record_threshold" - ] - - drop_columns = [column for column in df.columns if column not in columns_to_keep] - df = df.drop(columns=drop_columns) - - df.loc[df['feature_id'].isnull(), 'feature_id'] = -9999 - df['feature_id'] = df['feature_id'].astype(int) - - df = df.set_index('nws_lid') - - df = df[[ - 'producer', 'issuer', 'issuedTime', 'generationTime', 'members', - 'usgs_sitecode', 'feature_id', 'nws_name', 'usgs_name', 'latitude', - 'longitude', 'action_threshold', 'minor_threshold', - 'moderate_threshold', 'major_threshold', 'record_threshold', 'units' - ]] - - return df - -def get_location_metadata(nws_lid_list): - nws_lid_list = ",".join(nws_lid_list) - - location_url = f"http://{WRDS_HOST}/api/location/v3.0/metadata/nws_lid/{nws_lid_list}" - location_res = requests.get(location_url, verify=False) - location_res = location_res.json() - - df_locations = pd.DataFrame(location_res['locations']) - df_locations = pd.concat([df_locations.drop(['identifiers'], axis=1), df_locations['identifiers'].apply(pd.Series)], axis=1) - df_locations = pd.concat([df_locations.drop(['nws_data'], axis=1), df_locations['nws_data'].apply(pd.Series)], axis=1) - - drop_columns = [ - 'usgs_data', 'nwm_feature_data', 'env_can_gage_data', 'nws_preferred', 'usgs_preferred', 'goes_id', 'env_can_gage_id', - 'geo_rfc', 'map_link', 'horizontal_datum_name', 'county', 'county_code', 'huc', 'hsa', - 'zero_datum', 'vertical_datum_name', 'rfc_forecast_point', 'rfc_defined_fcst_point', 'riverpoint' - ] - rename_columns = { - "usgs_site_code": "usgs_sitecode", - "name": "usgs_name", - "nwm_feature_id": "feature_id", - "wfo": "issuer", - "rfc": "producer", - "issuedTime": "issued_time", - "generationTime": "generation_time" - } - df_locations = df_locations.drop(columns=drop_columns) - df_locations = df_locations.rename(columns=rename_columns) - - df_locations = df_locations.set_index("nws_lid") - - return df_locations - -def extract_and_flatten_rfc_forecasts(df_forecasts): - df_forecasts = pd.concat([df_forecasts.drop(['members'], axis=1), df_forecasts['members'].apply(pd.Series)], axis=1) - df_forecasts = pd.concat([df_forecasts.drop([0], axis=1), df_forecasts[0].apply(pd.Series)], axis=1) - df_forecasts = pd.concat([df_forecasts.drop(['dataPointsList'], axis=1), df_forecasts['dataPointsList'].apply(pd.Series)], axis=1) # noqa - - forecasts = df_forecasts[0].apply(pd.Series).reset_index().melt(id_vars='nws_lid') - forecasts = forecasts.dropna()[['nws_lid', 'value']].set_index('nws_lid') - - df_forecasts = pd.merge(df_forecasts, forecasts, left_index=True, right_index=True) - - df_forecasts = df_forecasts.drop(columns=[0, 'identifier', 'forecast_status']) - - df_forecasts = pd.concat([df_forecasts.drop(['value'], axis=1), df_forecasts['value'].apply(pd.Series)], axis=1) - df_forecasts = df_forecasts.rename(columns={"value": "stage"}) - - return df_forecasts - -def upload_df_to_s3(df, bucket, key): - print("Saving dataframe to csv output") - tmp_csv = f"/tmp/{os.path.basename(key)}" - df.to_csv(tmp_csv) - - es_logger.info(f"Uploading csv to {key}") - s3.upload_file( - tmp_csv, bucket, key, - ExtraArgs={'ServerSideEncryption': 'aws:kms'} - ) - os.remove(tmp_csv) - -def cleanup_cache(reference_date, buffer_days=.25): - """ - Cleans up old max flows files that are beyond the cache - - Args: - configuration(str): The configuration for the forecast being processed - date(str): The date for the forecast being processed - short_hand_config(str): The short-hand NWM configuration for the forecast being processed - hour(str): The hour for the forecast being processed - buffer_days(int): The number of days of max flows files to keep - """ - print(f"Cleaning up files older than {CACHE_DAYS} days") - s3_resource = boto3.resource('s3') - - # Determine the date threshold for keeping max flows files - buffer_increments = int(buffer_days*24*4) - cache_date = reference_date - timedelta(days=int(CACHE_DAYS)) - - # Loop through the buffer hours to try to delete old files, starting from the cache_date - for fifteen_min_increments in range(1, buffer_increments+1): - buffer_date = cache_date - timedelta(minutes=15*fifteen_min_increments) - - metadata_csv_key = f"{PROCESSED_OUTPUT_PREFIX}/{buffer_date.strftime('%Y%m%d')}/{buffer_date.strftime('%H')}_{buffer_date.strftime('%M')}_ahps_metadata.csv" # noqa - forecast_csv_key = f"{PROCESSED_OUTPUT_PREFIX}/{buffer_date.strftime('%Y%m%d')}/{buffer_date.strftime('%H')}_{buffer_date.strftime('%M')}_ahps_forecasts.csv" # noqa - - if check_s3_file_existence(PYTHON_PREPROCESSING_BUCKET, metadata_csv_key): - s3_resource.Object(PYTHON_PREPROCESSING_BUCKET, metadata_csv_key).delete() - print(f"Deleted file {metadata_csv_key} from {PYTHON_PREPROCESSING_BUCKET}") - - if check_s3_file_existence(PYTHON_PREPROCESSING_BUCKET, forecast_csv_key): - s3_resource.Object(PYTHON_PREPROCESSING_BUCKET, forecast_csv_key).delete() - print(f"Deleted file {forecast_csv_key} from {PYTHON_PREPROCESSING_BUCKET}") - - -def trigger_db_ingest(configuration, reference_time, bucket, s3_file_path): - """ - Triggers the db_ingest lambda function to ingest a specific file into the vizprocessing db. - - Args: - configuration(str): The configuration for the forecast being processed - reference_time (datetime): The reference time of the originating forecast - bucket (str): The s3 bucket containing the file. - s3_file_path (str): The s3 path to ingest into the db. - """ - boto_config = botocore.client.Config(max_pool_connections=1, connect_timeout=60, read_timeout=600) - lambda_client = boto3.client('lambda', config=boto_config) - - payload = {"data_key": s3_file_path, "data_bucket": bucket, "invocation_type": "event"} - print(payload) - lambda_client.invoke(FunctionName=INITIALIZE_PIPELINE_FUNCTION, - InvocationType='Event', - Payload=json.dumps(payload)) diff --git a/Core/StepFunctions/ensure_ec2_ready.json.tftpl b/Core/StepFunctions/ensure_ec2_ready.json.tftpl new file mode 100644 index 00000000..5ff0eec2 --- /dev/null +++ b/Core/StepFunctions/ensure_ec2_ready.json.tftpl @@ -0,0 +1,106 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Which arg provided?", + "States": { + "Which arg provided?": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.InstanceId", + "IsPresent": true, + "Next": "Get Machine Status" + }, + { + "Variable": "$.InstanceName", + "IsPresent": true, + "Next": "Get InstanceId" + } + ], + "Default": "Fail" + }, + "Get InstanceId": { + "Type": "Task", + "Parameters": { + "Filters": [ + { + "Name": "tag:Name", + "Values.$": "States.Array(States.Format($.InstanceName))" + } + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ec2:describeInstances", + "Next": "Get Machine Status", + "ResultSelector": { + "InstanceId.$": "$.Reservations[0].Instances[0].InstanceId" + } + }, + "Get Machine Status": { + "Type": "Task", + "Parameters": { + "InstanceIds.$": "States.Array(States.Format($.InstanceId))" + }, + "Resource": "arn:aws:states:::aws-sdk:ec2:describeInstanceStatus", + "Next": "Machine Running?", + "ResultPath": "$.result" + }, + "Machine Running?": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.result.InstanceStatuses[0]", + "IsPresent": true, + "Next": "Machine Accessible?" + } + ], + "Default": "StartInstances" + }, + "Machine Accessible?": { + "Type": "Choice", + "Choices": [ + { + "And": [ + { + "Variable": "$.result.InstanceStatuses[0].InstanceStatus.Status", + "StringEquals": "ok" + }, + { + "Variable": "$.result.InstanceStatuses[0].SystemStatus.Status", + "StringEquals": "ok" + } + ], + "Next": "DescribeInstances" + } + ], + "Default": "Wait" + }, + "DescribeInstances": { + "Type": "Task", + "Next": "Success", + "Parameters": { + "InstanceIds.$": "States.Array(States.Format($.InstanceId))" + }, + "Resource": "arn:aws:states:::aws-sdk:ec2:describeInstances", + "OutputPath": "$.Reservations[0].Instances[0]" + }, + "StartInstances": { + "Type": "Task", + "Next": "Wait", + "Parameters": { + "InstanceIds.$": "States.Array(States.Format($.InstanceId))" + }, + "Resource": "arn:aws:states:::aws-sdk:ec2:startInstances", + "ResultPath": null + }, + "Success": { + "Type": "Succeed" + }, + "Wait": { + "Type": "Wait", + "Seconds": 60, + "Next": "Get Machine Status" + }, + "Fail": { + "Type": "Fail" + } + } +} \ No newline at end of file diff --git a/Core/StepFunctions/main.tf b/Core/StepFunctions/main.tf index 7f108bb5..23a04b8f 100644 --- a/Core/StepFunctions/main.tf +++ b/Core/StepFunctions/main.tf @@ -89,6 +89,15 @@ variable "viz_processing_pipeline_log_group" { type = string } +variable "rds_bastion_id" { + description = "ID of the RDS Bastion EC2 machine that the DB deploys will be executed from." + type = string +} + +variable "test_wrds_db_lambda_arn" { + type = string +} + ######################################### ## Replace Route Step Function ## ######################################### @@ -216,13 +225,88 @@ resource "aws_cloudwatch_event_rule" "viz_pipeline_step_function_failure" { EOF } -resource "aws_cloudwatch_event_target" "step_function_failure_sns" { +resource "aws_cloudwatch_event_target" "viz_pipeline_step_function_failure_sns" { rule = aws_cloudwatch_event_rule.viz_pipeline_step_function_failure.name target_id = "SendToSNS" arn = var.email_sns_topics["viz_lambda_errors"].arn input_path = "$.detail.name" } +#################################################### +## Ensure EC2 Ready For Use Step Function ## +#################################################### + +resource "aws_sfn_state_machine" "ensure_ec2_ready_for_use_step_function" { + name = "hv-vpp-${var.environment}-ensure-ec2-ready-for-use" + role_arn = var.viz_lambda_role + + definition = templatefile("${path.module}/ensure_ec2_ready.json.tftpl", {}) +} + +################################################### +## Restore DB From S3 Dump Step Function ## +################################################### + +resource "aws_sfn_state_machine" "restore_db_from_s3_dump_step_function" { + name = "hv-vpp-${var.environment}-restore-db-from-s3" + role_arn = var.viz_lambda_role + + definition = templatefile("${path.module}/restore_db_from_s3_dump.json.tftpl", { + ensure_ec2_ready_step_function_arn = aws_sfn_state_machine.ensure_ec2_ready_for_use_step_function.arn + rds_bastion_id = var.rds_bastion_id + region = var.region + }) +} + +################################################# +## Sync WRDS Location DB Step Function ## +################################################# + +resource "aws_sfn_state_machine" "sync_wrds_location_db_step_function" { + name = "hv-vpp-${var.environment}-sync-wrds-location-db" + role_arn = var.viz_lambda_role + + definition = templatefile("${path.module}/sync_wrds_location_db.json.tftpl", { + restore_db_from_s3_dump_step_function_arn = aws_sfn_state_machine.restore_db_from_s3_dump_step_function.arn + test_wrds_db_lambda_arn = var.test_wrds_db_lambda_arn + rds_bastion_id = var.rds_bastion_id + region = var.region + }) +} + +####### Step Function Failure / Time Out SNS ####### +resource "aws_cloudwatch_event_rule" "sync_wrds_location_db_step_function_failure" { + name = "hv-vpp-${var.environment}-sync-wrds-location-db-step-function-failure" + description = "Alert when the sync wrds location db step function times out or fails." + + event_pattern = <{3b62|%+@A&7Dcd{r-rET)nC^;U9`X3GdRA8(bk2YEriKdd{pZ!CYDZv1r zjYppz-rue6A3l@2$9uB+@aN;FyVZx}HkoABmN$#V%gf(?{qco-eE70@eO=H}EON!D z6-q9)zs{G70xad!(uLd^k_*0=xLG-DlAIRLoRSPZ=b)#s1!=Dnrlf^`?d)beO7I6K zB%&mga}CCz-#2VMdDLP{E!>xP^^?g{%{R`8Gi-blt&~OElWUo4D09kSI`Nh=xjyne zs~yz8osWBbQW0`KYiV7OO6 zRt4$6Iml`A@67Y)fxjoqZcrPg8;>rYeLhUjdA?4*-Tu~WnfnnIH2I$*B|kT=eYv_~ zHP+@$uD-XPg_EI=tveGv&63;Oe>@3xQN`zJcW==^mMVf?mJxJ#8b!BusS ze5dE{HL2~}K&Vqakb%7kvAs@mc;XXfGGOF4iUVM`y*q)mBmcdr-`vEB=%WD@gK)oz zxmd@L)lmx(q2xKP9%;Y}MJ6xp=U2`}W!__!Nk$D{uMP)h*<6aW+e000O8ka&(&b-f^W=>q@&ToV8Q5&!@I00000 z0RSKX00000003-ZZDM3$UuJb~V{~b6ZZ2?nP)h{{000000RRC2KmY&$83X_T003@p B5W@ff diff --git a/Core/main.tf b/Core/main.tf index ed42f21c..4610fbab 100644 --- a/Core/main.tf +++ b/Core/main.tf @@ -608,16 +608,17 @@ module "viz-lambda-functions" { dask_layer = module.lambda-layers.dask.arn viz_lambda_shared_funcs_layer = module.lambda-layers.viz_lambda_shared_funcs.arn db_lambda_security_groups = [module.security-groups.rds.id, module.security-groups.egis_overlord.id] - nat_sg_group = module.security-groups.vpc_access.id db_lambda_subnets = [module.vpc.subnet_private_a.id, module.vpc.subnet_private_b.id] viz_db_host = module.rds-viz.dns_name viz_db_name = local.env.viz_db_name viz_db_user_secret_string = module.secrets-manager.secret_strings["viz-proc-admin-rw-user"] + viz_db_suser_seceret_string = module.secrets-manager.secret_strings["viz-processing-pg-rdssecret"] + wrds_db_host = module.rds-ingest.dns_name + wrds_db_user_secret_string = module.secrets-manager.secret_strings["ingest-pg-rdssecret"] egis_db_host = module.rds-egis.dns_name egis_db_name = local.env.egis_db_name egis_db_user_secret_string = module.secrets-manager.secret_strings["egis-pg-rds-secret"] egis_portal_password = local.env.viz_ec2_hydrovis_egis_pass - dataservices_host = module.data-services.dns_name viz_pipeline_step_function_arn = module.step-functions.viz_pipeline_step_function.arn default_tags = local.env.tags nwm_dataflow_version = local.env.nwm_dataflow_version @@ -649,6 +650,8 @@ module "step-functions" { aws_instances_to_reboot = [module.rnr.ec2.id] fifteen_minute_trigger = module.eventbridge.fifteen_minute_eventbridge viz_processing_pipeline_log_group = module.cloudwatch.viz_processing_pipeline_log_group.name + rds_bastion_id = module.rds-bastion.instance-id + test_wrds_db_lambda_arn = module.viz-lambda-functions.test_wrds_db.arn } # Event Bridge @@ -692,18 +695,3 @@ module "viz-ec2" { private_route_53_zone = module.private-route53.zone nwm_dataflow_version = local.env.nwm_dataflow_version } - -module "sync-wrds-location-db" { - source = "./SyncWrdsLocationDB" - - environment = local.env.environment - region = local.env.region - iam_role_arn = module.iam-roles.role_sync_wrds_location_db.arn - email_sns_topics = module.sns.email_sns_topics - requests_lambda_layer = module.lambda-layers.requests.arn - rds_bastion_id = module.rds-bastion.instance-id - test_data_services_id = module.data-services.dataservices-test-instance-id - lambda_security_groups = [module.security-groups.rds.id] - lambda_subnets = [module.vpc.subnet_private_a.id, module.vpc.subnet_private_b.id] - db_dumps_bucket = module.s3.buckets["deployment"].bucket -}