Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/script create contacts for existing users/2768 #2844

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
6 changes: 5 additions & 1 deletion bc_obps/common/management/commands/custom_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS(f'Successfully migrated {app_label}.'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'Error migrating {app_label}: {e}'))
continue
# Some apps like messages and staticfiles do not have migrations so we can ignore them
if "does not have migrations" in str(e):
continue
else:
raise e

self.stdout.write(self.style.SUCCESS('Default migrations completed.'))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Generated by Django 5.0.11 on 2025-02-14 01:29

from typing import Dict
from django.db import migrations, models

"""
One-time forward-only migration to be applied to prod data.
Purpose: create a contact record for each approved industry_user who doesn't already appear in the contact table.
"""


def count_stats(User, Contact, UserOperator) -> Dict[str, int]:
"""Collects and returns key statistics about the data before and after migration."""
return {
'total_users': User.objects.count(),
'industry_users': User.objects.filter(app_role__role_name="industry_user").count(),
'contacts': Contact.objects.count(),
'approved_user_operators': UserOperator.objects.filter(status="Approved").count(),
}


def create_contacts_from_prod_industry_users(apps, schema_editor):
"""Creates Contact records for approved industry users not already in the Contact table."""
# Import the required Django models
User = apps.get_model('registration', 'User')
Contact = apps.get_model('registration', 'Contact')
UserOperator = apps.get_model('registration', 'UserOperator')
BusinessRole = apps.get_model('registration', 'BusinessRole')

before_stats = count_stats(User, Contact, UserOperator)

# Fetch only approved UserOperator user IDs
approved_user_operators = UserOperator.objects.filter(status="Approved").values_list("user_id", flat=True)

# Fetch approved industry users who are not already in Contact
approved_industry_users = User.objects.filter(
user_guid__in=approved_user_operators, app_role__role_name="industry_user"
).exclude(email__in=Contact.objects.values_list("email", flat=True))

operation_representative_role = BusinessRole.objects.get(role_name="Operation Representative")

created_count = 0

for user in approved_industry_users:
users_user_operator = user.user_operators.filter(status="Approved").first()
operator = users_user_operator.operator if users_user_operator else None
if operator:
_, created = Contact.objects.get_or_create(
email=user.email,
defaults={
"first_name": user.first_name,
"last_name": user.last_name,
"phone_number": user.phone_number,
"business_role": operation_representative_role,
"address": None,
"operator": operator,
},
)
if created:
created_count += 1

print(f'\n\n\n{created_count} new contacts created')

after_stats = count_stats(User, Contact, UserOperator)
log_migration_summary(before_stats, after_stats, Contact, User)


def log_migration_summary(before_stats, after_stats, Contact, User):
print("Migration Summary:")
print(f"Before Migration: {before_stats}")
print(f"After Migration: {after_stats}")

if after_stats['contacts'] < after_stats['approved_user_operators']:
print("The number of contacts is less than the number of approved user operators.")

internal_user_contacts = Contact.objects.filter(
email__in=User.objects.filter(app_role__role_name__startswith="cas_").values_list("email", flat=True)
)
if internal_user_contacts.exists():
print(f"Internal users (cas_x) found in Contacts table: {list(internal_user_contacts.values('email'))}")

duplicate_contacts = (
Contact.objects.values("email", "operator_id").annotate(count=models.Count("id")).filter(count__gt=1)
)
if duplicate_contacts.exists():
print(f"Duplicate contacts detected: {list(duplicate_contacts)}")


class Migration(migrations.Migration):
dependencies = [
('registration', '0077_historicalfacility_well_authorization_numbers_and_more'),
]

operations = [
migrations.RunPython(create_contacts_from_prod_industry_users, migrations.RunPython.noop, elidable=True),
]
Loading