Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main to develop #1413

Merged
merged 8 commits into from
Jun 17, 2024
6 changes: 6 additions & 0 deletions src/apps/shared/commands/patch_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
description="Set proportion.enabled=True to Maki's applets",
manage_session=False,
)
PatchRegister.register(
file_path="m2_6968_create_flows_old_versions.py",
task_id="M2-6968",
description="Create flow history records for particular applets",
manage_session=False,
)

PatchRegister.register(
file_path="m2_6879_create_deleted_respondents.py",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import os
import uuid

from rich import print
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Query
Expand Down Expand Up @@ -130,17 +132,19 @@ async def main(session: AsyncSession, *args, **kwargs):
print(f"Workspace#{i + 1} DB already processed, skip...")
continue
processed.add(arb_uri)
session_maker = session_manager.get_session(arb_uri)
async with session_maker() as arb_session:
try:
await find_and_create_missing_roles_arbitrary(session, arb_session, workspace.user_id)
await arb_session.commit()
print(f"Processing workspace#{i + 1} {workspace.id} " f"finished")
except Exception:
await arb_session.rollback()
print(f"[bold red]Workspace#{i + 1} {workspace.id} " f"processing error[/bold red]")
raise

try:
session_maker = session_manager.get_session(arb_uri)
async with session_maker() as arb_session:
try:
await find_and_create_missing_roles_arbitrary(session, arb_session, workspace.user_id)
await arb_session.commit()
print(f"Processing workspace#{i + 1} {workspace.id} " f"finished")
except Exception:
await arb_session.rollback()
print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} processing error[/bold red]")
raise
except asyncio.TimeoutError:
print(f"[bold red]Error: Workspace#{i + 1} {workspace.id} Timeout error, skipping...[/bold red]")
except Exception as ex:
await session.rollback()
raise ex
106 changes: 106 additions & 0 deletions src/apps/shared/commands/patches/m2_6968_create_flows_old_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from rich import print
from sqlalchemy.ext.asyncio import AsyncSession

SQL_FLOW_HISTORY_CREATE = """
with applet_versions as (
select
ah.id,
ah.id_version,
ah.version,
date_trunc('minute', ah.created_at) + interval '1 min' as created_at
from applet_histories ah
left join flow_histories fh on fh.applet_id = ah.id_version
where 1 = 1
and ah.id = '{applet_id}'
and format(
'%s.%s.%s',
lpad(split_part(ah."version", '.', 1), 2, '0'),
lpad(split_part(ah."version", '.', 2), 2, '0'),
lpad(split_part(ah."version", '.', 3), 2, '0')
) >= '{from_version_padded}'
and fh.id is null
),
last_flow_data as (
select f.*
from flows f
where f.applet_id = '{applet_id}'
)
insert into flow_histories
select
av.created_at,
av.created_at as updated_at,
lf.is_deleted,
lf."name",
lf.description,
lf.is_single_report,
lf.hide_badge,
lf."order",
format('%s_%s', lf.id::text, av.version) as id_version,
av.id_version as applet_id,
lf.id,
lf.is_hidden,
lf.report_included_activity_name,
lf.report_included_item_name,
lf.extra_fields
from last_flow_data lf
cross join applet_versions av;
"""

SQL_FLOW_ITEM_HISTORY_CREATE = """
with applet_versions as (
select
ah.id,
ah.id_version,
ah.version,
date_trunc('minute', ah.created_at) + interval '1 min' as created_at
from applet_histories ah
left join flow_histories fh on fh.applet_id = ah.id_version
and exists (select 1 from flow_item_histories fih where fih.activity_flow_id = fh.id_version)
where 1 = 1
and ah.id = '{applet_id}'
and format(
'%s.%s.%s',
lpad(split_part(ah."version", '.', 1), 2, '0'),
lpad(split_part(ah."version", '.', 2), 2, '0'),
lpad(split_part(ah."version", '.', 3), 2, '0')
) >= '{from_version_padded}'
and fh.id is null
),
last_flow_item_data as (
select fi.*
from flows f
join flow_items fi on fi.activity_flow_id = f.id
where f.applet_id = '{applet_id}'
)
insert into flow_item_histories
select
av.created_at,
av.created_at as updated_at,
lfi.is_deleted,
lfi."order",
format('%s_%s', lfi.id::text, av.version) as id_version,
format('%s_%s', lfi.activity_flow_id::text, av.version) as activity_flow_id,
format('%s_%s', lfi.activity_id::text, av.version) as activity_id,
lfi.id
from last_flow_item_data lfi
cross join applet_versions av;
"""

applet_versions = (
("62b21984-b90b-7f2b-a9e1-c51a00000000", "10.00.00"),
("7bb7b30e-0d8a-4b13-bc1c-6a733ccc689a", "02.00.00"),
)


async def main(session: AsyncSession, *args, **kwargs):
for applet_id, version_padded in applet_versions:
sql = SQL_FLOW_HISTORY_CREATE.format(applet_id=applet_id, from_version_padded=version_padded)
print("Execute:")
print(sql)
await session.execute(sql)
print("Done")
print("Execute:")
print(sql)
sql = SQL_FLOW_ITEM_HISTORY_CREATE.format(applet_id=applet_id, from_version_padded=version_padded)
print("Done")
await session.execute(sql)
18 changes: 15 additions & 3 deletions src/infrastructure/database/migrations_arbitrary/env.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import os
import traceback
import uuid
from logging import getLogger
from logging.config import fileConfig
Expand Down Expand Up @@ -38,6 +39,7 @@ async def get_all_servers(connection):
SELECT uw.database_uri, uw.user_id
FROM users_workspaces as uw
WHERE uw.database_uri is not null and uw.database_uri <> ''
ORDER BY uw.created_at
"""
)
rows = await connection.execute(query)
Expand Down Expand Up @@ -84,9 +86,19 @@ async def migrate_arbitrary():
future=True,
)
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations, arbitrary_meta, config)
await connectable.dispose()
try:
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations, arbitrary_meta, config)
migration_log.info(f"Success: {owner_id} successfully migrated")
except asyncio.TimeoutError:
migration_log.error(f"!!! Error during migration of {owner_id}")
migration_log.error("Connection timeout")
except Exception as e:
migration_log.error(f"!!! Error during migration of {owner_id}")
migration_log.error(e)
traceback.print_exception(e)
finally:
await connectable.dispose()


def run_migrations_offline() -> None:
Expand Down
Loading