From e8214d5a4e512d65172d1a75467ce520fd740b92 Mon Sep 17 00:00:00 2001 From: vshvechko Date: Wed, 12 Jun 2024 11:58:53 +0300 Subject: [PATCH 1/5] feat: patch to create flows for particular applets (M2-6968) (#1394) --- src/apps/shared/commands/patch_commands.py | 6 + .../m2_6968_create_flows_old_versions.py | 106 ++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 src/apps/shared/commands/patches/m2_6968_create_flows_old_versions.py diff --git a/src/apps/shared/commands/patch_commands.py b/src/apps/shared/commands/patch_commands.py index 4faaf84c161..26d1c5a230a 100644 --- a/src/apps/shared/commands/patch_commands.py +++ b/src/apps/shared/commands/patch_commands.py @@ -46,6 +46,12 @@ description="Set proportion.enabled=True to Maki's applets", manage_session=False, ) +PatchRegister.register( + file_path="m2_6968_create_flows_old_versions.py", + task_id="M2-6968", + description="Create flow history records for particular applets", + manage_session=False, +) PatchRegister.register( file_path="m2_6879_create_deleted_respondents.py", diff --git a/src/apps/shared/commands/patches/m2_6968_create_flows_old_versions.py b/src/apps/shared/commands/patches/m2_6968_create_flows_old_versions.py new file mode 100644 index 00000000000..dad345121d7 --- /dev/null +++ b/src/apps/shared/commands/patches/m2_6968_create_flows_old_versions.py @@ -0,0 +1,106 @@ +from rich import print +from sqlalchemy.ext.asyncio import AsyncSession + +SQL_FLOW_HISTORY_CREATE = """ + with applet_versions as ( + select + ah.id, + ah.id_version, + ah.version, + date_trunc('minute', ah.created_at) + interval '1 min' as created_at + from applet_histories ah + left join flow_histories fh on fh.applet_id = ah.id_version + where 1 = 1 + and ah.id = '{applet_id}' + and format( + '%s.%s.%s', + lpad(split_part(ah."version", '.', 1), 2, '0'), + lpad(split_part(ah."version", '.', 2), 2, '0'), + lpad(split_part(ah."version", '.', 3), 2, '0') + ) >= '{from_version_padded}' + and fh.id is null + ), + last_flow_data as ( + select f.* + from flows f + where f.applet_id = '{applet_id}' + ) + insert into flow_histories + select + av.created_at, + av.created_at as updated_at, + lf.is_deleted, + lf."name", + lf.description, + lf.is_single_report, + lf.hide_badge, + lf."order", + format('%s_%s', lf.id::text, av.version) as id_version, + av.id_version as applet_id, + lf.id, + lf.is_hidden, + lf.report_included_activity_name, + lf.report_included_item_name, + lf.extra_fields + from last_flow_data lf + cross join applet_versions av; +""" + +SQL_FLOW_ITEM_HISTORY_CREATE = """ + with applet_versions as ( + select + ah.id, + ah.id_version, + ah.version, + date_trunc('minute', ah.created_at) + interval '1 min' as created_at + from applet_histories ah + left join flow_histories fh on fh.applet_id = ah.id_version + and exists (select 1 from flow_item_histories fih where fih.activity_flow_id = fh.id_version) + where 1 = 1 + and ah.id = '{applet_id}' + and format( + '%s.%s.%s', + lpad(split_part(ah."version", '.', 1), 2, '0'), + lpad(split_part(ah."version", '.', 2), 2, '0'), + lpad(split_part(ah."version", '.', 3), 2, '0') + ) >= '{from_version_padded}' + and fh.id is null + ), + last_flow_item_data as ( + select fi.* + from flows f + join flow_items fi on fi.activity_flow_id = f.id + where f.applet_id = '{applet_id}' + ) + insert into flow_item_histories + select + av.created_at, + av.created_at as updated_at, + lfi.is_deleted, + lfi."order", + format('%s_%s', lfi.id::text, av.version) as id_version, + format('%s_%s', lfi.activity_flow_id::text, av.version) as activity_flow_id, + format('%s_%s', lfi.activity_id::text, av.version) as activity_id, + lfi.id + from last_flow_item_data lfi + cross join applet_versions av; +""" + +applet_versions = ( + ("62b21984-b90b-7f2b-a9e1-c51a00000000", "10.00.00"), + ("7bb7b30e-0d8a-4b13-bc1c-6a733ccc689a", "02.00.00"), +) + + +async def main(session: AsyncSession, *args, **kwargs): + for applet_id, version_padded in applet_versions: + sql = SQL_FLOW_HISTORY_CREATE.format(applet_id=applet_id, from_version_padded=version_padded) + print("Execute:") + print(sql) + await session.execute(sql) + print("Done") + print("Execute:") + print(sql) + sql = SQL_FLOW_ITEM_HISTORY_CREATE.format(applet_id=applet_id, from_version_padded=version_padded) + print("Done") + await session.execute(sql) From 4404bd03263d9b2ec66edd019afda602ab568e4a Mon Sep 17 00:00:00 2001 From: vshvechko Date: Thu, 13 Jun 2024 16:22:05 +0300 Subject: [PATCH 2/5] fix(patch): skip not responding arbitrary servers (M2-6879) (#1411) --- .../m2_6879_create_deleted_respondents.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py b/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py index ad6bc992c2d..7e3e29f8f93 100644 --- a/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py +++ b/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py @@ -1,6 +1,8 @@ +import asyncio import os import uuid +from rich import print from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Query @@ -130,17 +132,19 @@ async def main(session: AsyncSession, *args, **kwargs): print(f"Workspace#{i + 1} DB already processed, skip...") continue processed.add(arb_uri) - session_maker = session_manager.get_session(arb_uri) - async with session_maker() as arb_session: - try: - await find_and_create_missing_roles_arbitrary(session, arb_session, workspace.user_id) - await arb_session.commit() - print(f"Processing workspace#{i + 1} {workspace.id} " f"finished") - except Exception: - await arb_session.rollback() - print(f"[bold red]Workspace#{i + 1} {workspace.id} " f"processing error[/bold red]") - raise - + try: + session_maker = session_manager.get_session(arb_uri) + async with session_maker() as arb_session: + try: + await find_and_create_missing_roles_arbitrary(session, arb_session, workspace.user_id) + await arb_session.commit() + print(f"Processing workspace#{i + 1} {workspace.id} " f"finished") + except Exception: + await arb_session.rollback() + print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} processing error[/bold red]") + raise + except asyncio.TimeoutError: + print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} Timeout error, skipping...[/bold red]") except Exception as ex: await session.rollback() raise ex From e9751670b13c6750de2fa8ba6e57b082cf3c6cea Mon Sep 17 00:00:00 2001 From: vshvechko Date: Thu, 13 Jun 2024 16:28:50 +0300 Subject: [PATCH 3/5] fix(migration): Properly handle not responding connections (M2-7024) (#1410) --- .../database/migrations_arbitrary/env.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/infrastructure/database/migrations_arbitrary/env.py b/src/infrastructure/database/migrations_arbitrary/env.py index 85cf24804bb..d7722aee764 100644 --- a/src/infrastructure/database/migrations_arbitrary/env.py +++ b/src/infrastructure/database/migrations_arbitrary/env.py @@ -1,6 +1,7 @@ import asyncio import logging import os +import traceback import uuid from logging import getLogger from logging.config import fileConfig @@ -38,6 +39,7 @@ async def get_all_servers(connection): SELECT uw.database_uri, uw.user_id FROM users_workspaces as uw WHERE uw.database_uri is not null and uw.database_uri <> '' + ORDER BY uw.created_at """ ) rows = await connection.execute(query) @@ -84,9 +86,19 @@ async def migrate_arbitrary(): future=True, ) ) - async with connectable.connect() as connection: - await connection.run_sync(do_run_migrations, arbitrary_meta, config) - await connectable.dispose() + try: + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations, arbitrary_meta, config) + migration_log.info(f"Success: {owner_id} successfully migrated") + except asyncio.TimeoutError: + migration_log.error(f"!!! Error during migration of {owner_id}") + migration_log.error("Connection timeout") + except Exception as e: + migration_log.error(f"!!! Error during migration of {owner_id}") + migration_log.error(e) + traceback.print_exception(e) + finally: + await connectable.dispose() def run_migrations_offline() -> None: From bf63307bdb59844874479bdbaa2bb33253bc7661 Mon Sep 17 00:00:00 2001 From: vshvechko Date: Mon, 17 Jun 2024 11:14:56 +0300 Subject: [PATCH 4/5] fix(patch): skip not responding arbitrary servers for answer update patch (M2-7052) (#1414) --- .../patches/m2_4611_add_answer_subjects.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/apps/shared/commands/patches/m2_4611_add_answer_subjects.py b/src/apps/shared/commands/patches/m2_4611_add_answer_subjects.py index 34ee0796d8f..ec33cc8e685 100644 --- a/src/apps/shared/commands/patches/m2_4611_add_answer_subjects.py +++ b/src/apps/shared/commands/patches/m2_4611_add_answer_subjects.py @@ -1,3 +1,4 @@ +import asyncio import uuid from rich import print @@ -39,13 +40,16 @@ async def main( print(f"Workspace#{i + 1} DB already processed, skip...") continue processed.add(arb_uri) - session_maker = session_manager.get_session(arb_uri) - async with session_maker() as arb_session: - try: - await update_answers(arb_session) - await arb_session.commit() - print(f"Processing workspace#{i + 1} {workspace.id} " f"finished") - except Exception: - await arb_session.rollback() - print(f"[bold red]Workspace#{i + 1} {workspace.id} " f"processing error[/bold red]") - raise + try: + session_maker = session_manager.get_session(arb_uri) + async with session_maker() as arb_session: + try: + await update_answers(arb_session) + await arb_session.commit() + print(f"Processing workspace#{i + 1} {workspace.id} " f"finished") + except Exception: + await arb_session.rollback() + print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} processing error[/bold red]") + raise + except asyncio.TimeoutError: + print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} Timeout error, skipping...[/bold red]") From f6295bcbe76f0e2c9ed38d190bb15a2b990b71f7 Mon Sep 17 00:00:00 2001 From: vshvechko Date: Mon, 17 Jun 2024 13:54:28 +0300 Subject: [PATCH 5/5] fix(patch): fix duplication error in patch "M2-6879" for arbittrary DBs (M2-7055) (#1415) --- .../m2_6879_create_deleted_respondents.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py b/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py index 7e3e29f8f93..279014bd5b9 100644 --- a/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py +++ b/src/apps/shared/commands/patches/m2_6879_create_deleted_respondents.py @@ -71,10 +71,12 @@ async def get_answers_applets_respondents( async def get_missing_applet_respondent( - session: AsyncSession, owner_id: uuid.UUID, arbitrary_applet_respondents: set[tuple[uuid.UUID, uuid.UUID]] + session: AsyncSession, applet_ids: list[uuid.UUID], arbitrary_applet_respondents: set[tuple[uuid.UUID, uuid.UUID]] ) -> list[tuple[uuid.UUID, uuid.UUID]]: query: Query = select(UserAppletAccessSchema.user_id, UserAppletAccessSchema.applet_id) - query = query.where(UserAppletAccessSchema.owner_id == owner_id, UserAppletAccessSchema.role == Role.RESPONDENT) + query = query.where( + UserAppletAccessSchema.applet_id.in_(applet_ids), UserAppletAccessSchema.role == Role.RESPONDENT + ) db_result = await session.execute(query) roles_users_applets = db_result.all() return list(arbitrary_applet_respondents - set(roles_users_applets)) @@ -93,7 +95,12 @@ async def find_and_create_missing_roles_arbitrary( roles = [] for offset in range(0, count, limit): arbitrary_applet_respondents = await get_answers_applets_respondents(arbitrary_session, limit, offset) - missing_users_applets = await get_missing_applet_respondent(session, owner_id, arbitrary_applet_respondents) + + applet_ids = {x[1] for x in arbitrary_applet_respondents} + + missing_users_applets = await get_missing_applet_respondent( + session, list(applet_ids), arbitrary_applet_respondents + ) for user_id, applet_id in missing_users_applets: schema = UserAppletAccessSchema( user_id=user_id, @@ -137,10 +144,10 @@ async def main(session: AsyncSession, *args, **kwargs): async with session_maker() as arb_session: try: await find_and_create_missing_roles_arbitrary(session, arb_session, workspace.user_id) - await arb_session.commit() + await session.commit() print(f"Processing workspace#{i + 1} {workspace.id} " f"finished") except Exception: - await arb_session.rollback() + await session.rollback() print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} processing error[/bold red]") raise except asyncio.TimeoutError: