diff --git a/bc_obps/common/management/commands/custom_migrate.py b/bc_obps/common/management/commands/custom_migrate.py index 20dfdbc9ad..8dfc5f018e 100644 --- a/bc_obps/common/management/commands/custom_migrate.py +++ b/bc_obps/common/management/commands/custom_migrate.py @@ -48,7 +48,11 @@ def handle(self, *args, **options): self.stdout.write(self.style.SUCCESS(f'Successfully migrated {app_label}.')) except Exception as e: self.stdout.write(self.style.ERROR(f'Error migrating {app_label}: {e}')) - continue + # Some apps like messages and staticfiles do not have migrations so we can ignore them + if "does not have migrations" in str(e): + continue + else: + raise e self.stdout.write(self.style.SUCCESS('Default migrations completed.')) diff --git a/bc_obps/registration/migrations/0078_make_contacts_from_industry_users.py b/bc_obps/registration/migrations/0078_make_contacts_from_industry_users.py new file mode 100644 index 0000000000..b9041c2d6d --- /dev/null +++ b/bc_obps/registration/migrations/0078_make_contacts_from_industry_users.py @@ -0,0 +1,96 @@ +# Generated by Django 5.0.11 on 2025-02-14 01:29 + +from typing import Dict +from django.db import migrations, models + +""" +One-time forward-only migration to be applied to prod data. +Purpose: create a contact record for each approved industry_user who doesn't already appear in the contact table. +""" + + +def count_stats(User, Contact, UserOperator) -> Dict[str, int]: + """Collects and returns key statistics about the data before and after migration.""" + return { + 'total_users': User.objects.count(), + 'industry_users': User.objects.filter(app_role__role_name="industry_user").count(), + 'contacts': Contact.objects.count(), + 'approved_user_operators': UserOperator.objects.filter(status="Approved").count(), + } + + +def create_contacts_from_prod_industry_users(apps, schema_editor): + """Creates Contact records for approved industry users not already in the Contact table.""" + # Import the required Django models + User = apps.get_model('registration', 'User') + Contact = apps.get_model('registration', 'Contact') + UserOperator = apps.get_model('registration', 'UserOperator') + BusinessRole = apps.get_model('registration', 'BusinessRole') + + before_stats = count_stats(User, Contact, UserOperator) + + # Fetch only approved UserOperator user IDs + approved_user_operators = UserOperator.objects.filter(status="Approved").values_list("user_id", flat=True) + + # Fetch approved industry users who are not already in Contact + approved_industry_users = User.objects.filter( + user_guid__in=approved_user_operators, app_role__role_name="industry_user" + ).exclude(email__in=Contact.objects.values_list("email", flat=True)) + + operation_representative_role = BusinessRole.objects.get(role_name="Operation Representative") + + created_count = 0 + + for user in approved_industry_users: + users_user_operator = user.user_operators.filter(status="Approved").first() + operator = users_user_operator.operator if users_user_operator else None + if operator: + _, created = Contact.objects.get_or_create( + email=user.email, + defaults={ + "first_name": user.first_name, + "last_name": user.last_name, + "phone_number": user.phone_number, + "business_role": operation_representative_role, + "address": None, + "operator": operator, + }, + ) + if created: + created_count += 1 + + print(f'\n\n\n{created_count} new contacts created') + + after_stats = count_stats(User, Contact, UserOperator) + log_migration_summary(before_stats, after_stats, Contact, User) + + +def log_migration_summary(before_stats, after_stats, Contact, User): + print("Migration Summary:") + print(f"Before Migration: {before_stats}") + print(f"After Migration: {after_stats}") + + if after_stats['contacts'] < after_stats['approved_user_operators']: + print("The number of contacts is less than the number of approved user operators.") + + internal_user_contacts = Contact.objects.filter( + email__in=User.objects.filter(app_role__role_name__startswith="cas_").values_list("email", flat=True) + ) + if internal_user_contacts.exists(): + print(f"Internal users (cas_x) found in Contacts table: {list(internal_user_contacts.values('email'))}") + + duplicate_contacts = ( + Contact.objects.values("email", "operator_id").annotate(count=models.Count("id")).filter(count__gt=1) + ) + if duplicate_contacts.exists(): + print(f"Duplicate contacts detected: {list(duplicate_contacts)}") + + +class Migration(migrations.Migration): + dependencies = [ + ('registration', '0077_historicalfacility_well_authorization_numbers_and_more'), + ] + + operations = [ + migrations.RunPython(create_contacts_from_prod_industry_users, migrations.RunPython.noop, elidable=True), + ]