Skip to content

Commit

Permalink
sync all users in file (#3364)
Browse files Browse the repository at this point in the history
  • Loading branch information
chopkinsmade authored Nov 20, 2024
1 parent 23c9e4d commit 1506ff1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 268 deletions.
64 changes: 15 additions & 49 deletions dataworkspace/dataworkspace/apps/applications/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import urllib.parse
from collections import defaultdict
from typing import Dict, List
from dateutil import parser
from dateutil.tz import tzlocal

import boto3
import botocore
Expand Down Expand Up @@ -1087,6 +1085,7 @@ def create_user_from_sso(
user.profile.sso_status = sso_status
try:
user.save()
logger.info("User %s with email %s has been created", user.username, user.email)
except IntegrityError:
# A concurrent request may have overtaken this one and created a user
user = get_user_by_sso_id(sso_id)
Expand Down Expand Up @@ -1177,10 +1176,8 @@ def _do_get_staff_sso_s3_object_summaries(s3_bucket):
return sorted_files


def _process_staff_sso_file(
client, source_key, last_processed_datetime
) -> tuple[list[int], datetime.datetime]:
new_last_processed_datetime = last_processed_datetime
def _process_staff_sso_file(client, source_key) -> list[int]:

seen_user_ids = []

with smart_open(
Expand All @@ -1200,29 +1197,15 @@ def _process_staff_sso_file(
user_obj = user["object"]
user_id = user_obj.get("dit:StaffSSO:User:userId")

published = user.get("published")

published_date = parser.parse(published)

if published_date < last_processed_datetime:
# This items published date is before the last processed date, can be ignored
seen_user_ids.append(user_id)
continue

emails = user_obj.get("dit:emailAddress", [])
primary_email = user_obj.get("dit:StaffSSO:User:contactEmailAddress") or emails[0]
first_name = user_obj.get("dit:firstName")
last_name = user_obj.get("dit:lastName")
status = user_obj.get("dit:StaffSSO:User:status")

if published_date > new_last_processed_datetime:
new_last_processed_datetime = published_date

logger.info(
"sync_s3_sso_users: User id %s published date %s is after previous date %s, creating the user from sso",
"sync_s3_sso_users: Processing user id %s",
user_id,
published_date,
last_processed_datetime,
)
try:
user = create_user_from_sso(
Expand All @@ -1239,22 +1222,16 @@ def _process_staff_sso_file(

seen_user_ids.append(user_id)

return seen_user_ids, new_last_processed_datetime
return seen_user_ids


def _get_seen_ids_and_last_processed(
files, client, last_processed_datetime
) -> tuple[list[int], datetime.datetime]:
def _get_seen_ids(files, client) -> list[int]:
seen_user_ids = list[int]()
latest_processed_datetime = last_processed_datetime

for file in files:
seen_ids_in_file, new_last_processed = _process_staff_sso_file(
client, file.source_key, last_processed_datetime
)
seen_ids_in_file = _process_staff_sso_file(client, file.source_key)
seen_user_ids.extend(seen_ids_in_file)
latest_processed_datetime = new_last_processed
return list(set(seen_user_ids)), latest_processed_datetime
return list(set(seen_user_ids))


def _is_full_sync(files):
Expand All @@ -1264,26 +1241,19 @@ def _is_full_sync(files):


def _do_sync_s3_sso_users():
last_published = cache.get(
"s3_sso_sync_last_published",
datetime.datetime.fromtimestamp(0, tz=tzlocal()),
)
logger.info("sync_s3_sso_users: Starting with last published date of %s", last_published)

new_last_processed = last_published
logger.info("sync_s3_sso_users: Starting sync of users in S3 file")

s3_resource = get_s3_resource()
bucket = s3_resource.Bucket(settings.AWS_UPLOADS_BUCKET)
files = _do_get_staff_sso_s3_object_summaries(bucket)

if len(files) > 0:
seen_result = _get_seen_ids_and_last_processed(
files, s3_resource.meta.client, new_last_processed
)
new_last_processed = seen_result[1]
if len(seen_result[0]) > 0 and _is_full_sync(files):
seen_ids = _get_seen_ids(files, s3_resource.meta.client)

if len(seen_ids) > 0 and _is_full_sync(files):
unseen_user_profiles = (
Profile.objects.exclude(user__username__in=seen_result[0])
Profile.objects.exclude(user__username__in=seen_ids)
.filter(sso_status="active")
.select_related("user")
)
Expand All @@ -1294,20 +1264,16 @@ def _do_sync_s3_sso_users():

unseen_user_profiles.update(sso_status="inactive")

logger.info("sync_s3_sso_users: New last_published date for cache %s", new_last_processed)

# At the end of the loop, delete all loaded files
delete_keys = [{"Key": file.key} for file in files]
logger.info("sync_s3_sso_users: Deleting keys %s", delete_keys)
bucket.delete_objects(Delete={"Objects": delete_keys})

logger.info("sync_s3_sso_users: Finished sync of users in S3 file")

else:
logger.info("sync_s3_sso_users: No files to process")

# Always reset the cache
logger.info("sync_s3_sso_users: New last_published date for cache %s", new_last_processed)
cache.set("s3_sso_sync_last_published", new_last_processed, timeout=30 * 60) # 30 minutes


def fetch_visualisation_log_events(log_group, log_stream):
client = boto3.client("logs")
Expand Down
Loading

0 comments on commit 1506ff1

Please sign in to comment.