Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

01/16/2025 Production Deploy #1536

Merged
merged 73 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
06807cf
change celery pool support from prefork to threads
Nov 25, 2024
738b4f0
add adr
Nov 25, 2024
8c6f7ed
oops add pool=threads to manifest
Nov 25, 2024
dd83a22
merge from main
Dec 23, 2024
d72f553
merge from main
Dec 26, 2024
f8e30f8
add performance testing debug info
Jan 2, 2025
e2d64c0
add number of notifications processed
Jan 2, 2025
0a704ab
merge from main
Jan 2, 2025
cd13984
placeholder
Jan 6, 2025
628c6eb
do minimalistic fix first
Jan 6, 2025
343603c
merge from main
Jan 7, 2025
ef57a10
merge from main
Jan 7, 2025
c8a8daf
merge from main
Jan 7, 2025
2b3c9c8
Change delivery receipts tasks time to help UI lag
Jan 7, 2025
9e4e957
Merge pull request #1437 from GSA/notify-api-1432
ccostino Jan 7, 2025
c65335a
Merge pull request #1514 from GSA/notify-api-1513
terrazoon Jan 7, 2025
7fe2ec2
Merge pull request #1518 from GSA/ui-lag-improvements
ccostino Jan 7, 2025
f466cf2
Merge pull request #1511 from GSA/notify-api-1494
terrazoon Jan 7, 2025
1c67478
Update daily_check GitHub Action
ccostino Jan 7, 2025
3119882
Merge pull request #1519 from GSA/update-daily-check-action
ccostino Jan 7, 2025
0d1a989
cleanup pending notifications
Jan 8, 2025
a2fc970
cleanup pending notifications
Jan 8, 2025
da19e7c
set prefetch multiplier to 2 and increase concurrency to 15
Jan 8, 2025
4293da6
Merge pull request #1522 from GSA/notify-api-1520
terrazoon Jan 9, 2025
6aae2c7
fix db connection pool
Jan 9, 2025
2770f76
cleanup
Jan 9, 2025
7e7d432
cleanup
Jan 9, 2025
874c8ff
try batch inserts
Jan 10, 2025
bbf5bac
add lpop and rpush to notify redis
Jan 10, 2025
64a61f5
cleanup redis commands and flow
Jan 10, 2025
5f7089f
add countdown of 30 seconds for deliveries
Jan 10, 2025
1fbe427
revert behavior for emails, only sms needs optimization
Jan 10, 2025
833146e
fix tests
Jan 10, 2025
1eea4bb
fix tests
Jan 10, 2025
9685b09
fix tests
Jan 10, 2025
6bd044e
fix uuid
Jan 10, 2025
302e3ee
fix uuid
Jan 10, 2025
6f7c7d2
fix uuid
Jan 10, 2025
c6d0987
fix uuid
Jan 10, 2025
3fba382
fix uuid
Jan 10, 2025
7794eb2
fix uuid
Jan 10, 2025
2acd0a8
fix uuid
Jan 10, 2025
bf3fc43
fix uuid
Jan 10, 2025
44ce495
fix uuid
Jan 11, 2025
2847046
fix uuid
Jan 11, 2025
aeb4726
Merge pull request #1525 from GSA/notify-api-1524
terrazoon Jan 13, 2025
67d03dd
Merge pull request #1529 from GSA/connection_pool
terrazoon Jan 13, 2025
f497203
Ensure created_at stamp is correct
Jan 13, 2025
345ae88
Merge branch 'main' of https://github.com/GSA/notifications-api into …
Jan 13, 2025
a92eb91
add a test
Jan 13, 2025
238ec27
more tests
Jan 13, 2025
f9641ae
more tests
Jan 13, 2025
aaddd8c
more tests
Jan 13, 2025
40c3d4a
more tests
Jan 13, 2025
b924302
more tests
Jan 13, 2025
af158bf
more tests
Jan 13, 2025
18debf6
more tests
Jan 13, 2025
510b84b
more tests
Jan 13, 2025
521ed79
more tests
Jan 13, 2025
1ea89ab
more tests
Jan 13, 2025
f4b8c04
more tests
Jan 13, 2025
752e5ca
more tests
Jan 13, 2025
eac2178
clean up
Jan 13, 2025
4965bc2
change countdown from 30 to 60 seconds for message sends to better ma…
Jan 13, 2025
ba4301f
fix bug with created_at
Jan 13, 2025
59dfb05
code review feedback
Jan 14, 2025
0a7ccc5
Merge pull request #1533 from GSA/notify-api-1531
terrazoon Jan 15, 2025
3fd8009
Add error handling for possible string/datetime created at stamps
Jan 15, 2025
f1118d6
Remove print statement
Jan 16, 2025
ee90b24
Merge branch 'main' of https://github.com/GSA/notifications-api into …
Jan 16, 2025
7a7daf8
Remove another print statement
Jan 16, 2025
d7c97d6
Remove hilite imports
Jan 16, 2025
e9206c3
Merge pull request #1534 from GSA/invite-expiration-fix
ccostino Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/daily_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Run scan
run: bandit -r app/ -f txt -o /tmp/bandit-output.txt --confidence-level medium
- name: Upload bandit artifact
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: bandit-report
path: /tmp/bandit-output.txt
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ run-celery: ## Run celery, TODO remove purge for staging/prod
-A run_celery.notify_celery worker \
--pidfile="/tmp/celery.pid" \
--loglevel=INFO \
--concurrency=4
--pool=threads
--concurrency=10


.PHONY: dead-code
Expand Down
16 changes: 15 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy

from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
Expand Down Expand Up @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):

def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)

if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)

return (sa_url, options)


db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()
Expand Down
68 changes: 64 additions & 4 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import timedelta
import json
from datetime import datetime, timedelta

from flask import current_app
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError

from app import notify_celery, zendesk_client
from app import notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
Expand All @@ -24,6 +25,8 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
Expand All @@ -33,7 +36,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -242,6 +245,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
Expand All @@ -256,7 +261,7 @@ def process_delivery_receipts(self):

cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=10)
start_time = aware_utcnow() - timedelta(minutes=3)
end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time
Expand All @@ -278,3 +283,58 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)


@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()


@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
batch = []

# TODO We probably need some way to clear the list if
# things go haywire. A command?

# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.

count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
elif isinstance(notification_dict["created_at"], list):
notification_dict["created_at"] = notification_dict["created_at"][0]
notification = Notification(**notification_dict)
if notification is not None:
batch.append(notification)
try:
dao_batch_insert_notifications(batch)
except Exception:
current_app.logger.exception("Notification batch insert failed")
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
notification_time = n.created_at
if isinstance(notification_time, str):
notification_time = datetime.fromisoformat(n.created_at)
if notification_time < utc_now() - timedelta(seconds=50):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))
2 changes: 1 addition & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
)

current_app.logger.debug(
Expand Down
3 changes: 1 addition & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)


Expand Down
11 changes: 11 additions & 0 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,17 @@ def _update_template(id, name, template_type, content, subject):
db.session.commit()


@notify_command(name="clear-redis-list")
@click.option("-n", "--name_of_list", required=True)
def clear_redis_list(name_of_list):
my_len_before = redis_store.llen(name_of_list)
redis_store.ltrim(name_of_list, 1, 0)
my_len_after = redis_store.llen(name_of_list)
current_app.logger.info(
f"Cleared redis list {name_of_list}. Before: {my_len_before} after {my_len_after}"
)


@notify_command(name="update-templates")
def update_templates():
with open(current_app.config["CONFIG_FILES"] + "/templates.json") as f:
Expand Down
14 changes: 12 additions & 2 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
Expand Down Expand Up @@ -200,7 +200,17 @@ class Config(object):
},
"process-delivery-receipts": {
"task": "process-delivery-receipts",
"schedule": timedelta(minutes=8),
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"batch-insert-notifications": {
"task": "batch-insert-notifications",
"schedule": 10.0,
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
Expand Down
2 changes: 1 addition & 1 deletion app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):

def dao_get_unfinished_jobs():
stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()
return db.session.execute(stmt).scalars().all()


def dao_get_jobs_by_service_id(
Expand Down
63 changes: 62 additions & 1 deletion app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from datetime import timedelta
import os
from datetime import datetime, timedelta
from time import time

from flask import current_app
from sqlalchemy import (
Expand Down Expand Up @@ -94,6 +96,32 @@ def dao_create_notification(notification):
# notify-api-1454 insert only if it doesn't exist
if not dao_notification_exists(notification.id):
db.session.add(notification)
# There have been issues with invites expiring.
# Ensure the created at value is set and debug.
if notification.notification_type == "email":
orig_time = notification.created_at
now_time = utc_now()
try:
diff_time = now_time - orig_time
except TypeError:
try:
orig_time = datetime.strptime(orig_time, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
orig_time = datetime.strptime(orig_time, "%Y-%m-%d")
diff_time = now_time - orig_time
current_app.logger.error(
f"dao_create_notification orig created at: {orig_time} and now created at: {now_time}"
)
if diff_time.total_seconds() > 300:
current_app.logger.error(
"Something is wrong with notification.created_at in email!"
)
if os.getenv("NOTIFY_ENVIRONMENT") not in ["test"]:
notification.created_at = now_time
dao_update_notification(notification)
current_app.logger.error(
f"Email notification created_at reset to {notification.created_at}"
)


def country_records_delivery(phone_prefix):
Expand Down Expand Up @@ -727,6 +755,7 @@ def get_service_ids_with_notifications_on_date(notification_type, date):


def dao_update_delivery_receipts(receipts, delivered):
start_time_millis = time() * 1000
new_receipts = []
for r in receipts:
if isinstance(r, str):
Expand Down Expand Up @@ -773,3 +802,35 @@ def dao_update_delivery_receipts(receipts, delivered):
)
db.session.execute(stmt)
db.session.commit()
elapsed_time = (time() * 1000) - start_time_millis
current_app.logger.info(
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)


def dao_close_out_delivery_receipts():
THREE_DAYS_AGO = utc_now() - timedelta(minutes=3)
stmt = (
update(Notification)
.where(
Notification.status == NotificationStatus.PENDING,
Notification.sent_at < THREE_DAYS_AGO,
)
.values(status=NotificationStatus.FAILED, provider_response="Technical Failure")
)
result = db.session.execute(stmt)

db.session.commit()
if result:
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)


def dao_batch_insert_notifications(batch):

db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"Batch inserted notifications: {len(batch)}")
return len(batch)
40 changes: 38 additions & 2 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import CheckConstraint, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection

Expand Down Expand Up @@ -577,7 +577,16 @@ def get_inbound_number(self):
return self.inbound_number.number

def get_default_sms_sender(self):
default_sms_sender = [x for x in self.service_sms_senders if x.is_default]
# notify-api-1513 let's try a minimalistic fix
# to see if we can get the right numbers back
default_sms_sender = [
x
for x in self.service_sms_senders
if x.is_default and x.service_id == self.id
]
current_app.logger.info(
f"#notify-api-1513 senders for service {self.name} are {self.service_sms_senders}"
)
return default_sms_sender[0].sms_sender

def get_default_reply_to_email_address(self):
Expand Down Expand Up @@ -1685,6 +1694,33 @@ def get_created_by_email_address(self):
else:
return None

def serialize_for_redis(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for column in obj.__table__.columns:
if column.name == "notification_status":
new_name = "status"
value = getattr(obj, new_name)
elif column.name == "created_at":
if isinstance(obj.created_at, str):
value = obj.created_at
else:
value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),)
elif column.name in ["sent_at", "completed_at"]:
value = None
elif column.name.endswith("_id"):
value = getattr(obj, column.name)
value = str(value)
else:
value = getattr(obj, column.name)
if column.name in ["message_id", "api_key_id"]:
pass # do nothing because we don't have the message id yet
else:
fields[column.name] = value

return fields
raise ValueError("Provided object is not a SQLAlchemy instance")

def serialize_for_csv(self):
serialized = {
"row_number": (
Expand Down
Loading
Loading