From c957dd33fbe2fcf39f4de2a8fd5af353d78c47a0 Mon Sep 17 00:00:00 2001 From: auslin-aot <99173163+auslin-aot@users.noreply.github.com> Date: Mon, 4 Nov 2024 14:14:21 +0530 Subject: [PATCH 1/4] FWF-3718: [Feature] Workflow Migration --- ...960477800395_workflow_migration_changes.py | 35 +++ ..._added_submission_id_and_latest_form_id.py | 6 +- .../e9ce28b78478_added_contraints_process.py | 30 ++ .../models/form_process_mapper.py | 55 +++- .../src/formsflow_api/models/process.py | 7 +- .../src/formsflow_api/resources/process.py | 38 ++- .../src/formsflow_api/schemas/__init__.py | 1 + .../src/formsflow_api/schemas/process.py | 12 + .../services/form_process_mapper.py | 9 +- .../formsflow_api/services/import_support.py | 2 +- .../src/formsflow_api/services/process.py | 285 ++++++++++++++++-- 11 files changed, 431 insertions(+), 49 deletions(-) create mode 100644 forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py create mode 100644 forms-flow-api/migrations/versions/e9ce28b78478_added_contraints_process.py diff --git a/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py b/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py new file mode 100644 index 0000000000..45b718d63c --- /dev/null +++ b/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py @@ -0,0 +1,35 @@ +"""Workflow Migration changes + +Revision ID: 960477800395 +Revises: 8feb43e1e408 +Create Date: 2024-10-29 11:24:12.569841 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.sql import text + +# revision identifiers, used by Alembic. +revision = '960477800395' +down_revision = '8feb43e1e408' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('form_process_mapper', sa.Column('is_migrated', sa.Boolean(), nullable=True, comment="Is workflow migrated", server_default='false')) + # Update process_name of format process_name(process_key) to process_name + # Ex: "Two Step Approval (two-step-approval)" to "Two Step Approval" + update_query = text("""UPDATE public.form_process_mapper + SET process_name = regexp_replace(process_name, '\\s*\\([a-zA-Z0-9_-]+\\)$', '') + WHERE process_name ~ '\\([a-zA-Z0-9_-]+\\)$';""") + # Execute the SQL statement + op.execute(update_query) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('form_process_mapper', 'is_migrated') + # ### end Alembic commands ### diff --git a/forms-flow-api/migrations/versions/d457f753885f_added_submission_id_and_latest_form_id.py b/forms-flow-api/migrations/versions/d457f753885f_added_submission_id_and_latest_form_id.py index 5039bafdb3..d8fe1ac478 100644 --- a/forms-flow-api/migrations/versions/d457f753885f_added_submission_id_and_latest_form_id.py +++ b/forms-flow-api/migrations/versions/d457f753885f_added_submission_id_and_latest_form_id.py @@ -15,11 +15,11 @@ branch_labels = None depends_on = None -conn = op.get_bind() -form_url_exists = conn.execute(sa.text("SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='public' AND table_name='application' AND column_name='form_url');")) -form_url_exists = form_url_exists.fetchone()[0] def upgrade(): # ### commands auto generated by Alembic - please adjust! ### + conn = op.get_bind() + form_url_exists = conn.execute(sa.text("SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='public' AND table_name='application' AND column_name='form_url');")) + form_url_exists = form_url_exists.fetchone()[0] op.add_column('application', sa.Column('submission_id', sa.String(length=100), nullable=True)) op.add_column('application', sa.Column('latest_form_id', sa.String(length=100), nullable=True)) if(form_url_exists): diff --git a/forms-flow-api/migrations/versions/e9ce28b78478_added_contraints_process.py b/forms-flow-api/migrations/versions/e9ce28b78478_added_contraints_process.py new file mode 100644 index 0000000000..10cbaf229a --- /dev/null +++ b/forms-flow-api/migrations/versions/e9ce28b78478_added_contraints_process.py @@ -0,0 +1,30 @@ +"""Added constaints to process table. + +Revision ID: e9ce28b78478 +Revises: 960477800395 +Create Date: 2024-11-04 12:00:58.046099 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'e9ce28b78478' +down_revision = '960477800395' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_unique_constraint('uq_tenant_process_key', 'process', ['process_key', 'tenant']) + op.create_unique_constraint('uq_tenant_process_name', 'process', ['name', 'tenant']) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint('uq_tenant_process_name', 'process', type_='unique') + op.drop_constraint('uq_tenant_process_key', 'process', type_='unique') + # ### end Alembic commands ### diff --git a/forms-flow-api/src/formsflow_api/models/form_process_mapper.py b/forms-flow-api/src/formsflow_api/models/form_process_mapper.py index afbadf61ac..573c40a2e5 100644 --- a/forms-flow-api/src/formsflow_api/models/form_process_mapper.py +++ b/forms-flow-api/src/formsflow_api/models/form_process_mapper.py @@ -22,7 +22,9 @@ from .db import db -class FormProcessMapper(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model): +class FormProcessMapper( + AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model +): # pylint: disable=too-many-public-methods """This class manages form process mapper information.""" id = db.Column(db.Integer, primary_key=True) @@ -52,6 +54,9 @@ class FormProcessMapper(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model) version = db.Column(db.Integer, nullable=False, default=1) description = db.Column(db.String, nullable=True) prompt_new_version = db.Column(db.Boolean, nullable=True, default=False) + is_migrated = db.Column( + db.Boolean, nullable=True, default=False, comment="Is workflow migrated" + ) __table_args__ = ( UniqueConstraint("form_id", "version", "tenant", name="_form_version_uc"), @@ -78,6 +83,7 @@ def create_from_dict(cls, mapper_info: dict) -> FormProcessMapper: mapper.task_variable = mapper_info.get("task_variable") mapper.version = mapper_info.get("version") mapper.description = mapper_info.get("description") + mapper.is_migrated = mapper_info.get("is_migrated", True) mapper.save() return mapper except Exception as err: # pylint: disable=broad-except @@ -106,6 +112,7 @@ def update(self, mapper_info: dict): "process_tenant", "description", "prompt_new_version", + "is_migrated", ], mapper_info, ) @@ -123,17 +130,10 @@ def mark_unpublished(self): self.commit() @classmethod - def find_all(cls, page_number, limit): + def find_all(cls): """Fetch all the form process mappers.""" - if page_number == 0: - query = cls.query.order_by(FormProcessMapper.id.desc()).all() - else: - query = ( - cls.query.order_by(FormProcessMapper.id.desc()) - .paginate(page_number, limit, False) - .items - ) - return query + query = cls.tenant_authorization(query=cls.query) + return query.all() @classmethod def filter_conditions(cls, **filters): @@ -412,3 +412,36 @@ def get_latest_by_parent_form_id(cls, parent_form_id): .first() ) return query + + @classmethod + @user_context + def get_mappers_by_process_key(cls, process_key=None, mapper_id=None, **kwargs): + """Get all mappers matching given process key.""" + # Define the subquery with the window function to get latest mappers by process_key + user: UserContext = kwargs["user"] + tenant_key: str = user.tenant_key + subquery = ( + db.session.query( + cls.process_key, + cls.parent_form_id, + cls.id, + cls.deleted, + cls.form_id, + cls.tenant, + func.row_number() # pylint: disable=not-callable + .over(partition_by=cls.parent_form_id, order_by=cls.id.desc()) + .label("row_num"), + ).filter( + cls.process_key == process_key, + cls.deleted.is_(False), + cls.id != mapper_id, + cls.tenant == tenant_key, + ) + ).subquery("latest_mapper_rows_by_process_key") + # Only get the latest row in each parent_formid group + query = ( + db.session.query(cls) + .join(subquery, cls.id == subquery.c.id) + .filter(subquery.c.row_num == 1) + ) + return query.all() diff --git a/forms-flow-api/src/formsflow_api/models/process.py b/forms-flow-api/src/formsflow_api/models/process.py index 7de9e59b5c..7b872697cd 100644 --- a/forms-flow-api/src/formsflow_api/models/process.py +++ b/forms-flow-api/src/formsflow_api/models/process.py @@ -8,7 +8,7 @@ from flask_sqlalchemy.query import Query from formsflow_api_utils.utils import FILTER_MAPS, add_sort_filter from formsflow_api_utils.utils.user_context import UserContext, user_context -from sqlalchemy import LargeBinary, and_, desc, func, or_ +from sqlalchemy import LargeBinary, UniqueConstraint, and_, desc, func, or_ from sqlalchemy.dialects.postgresql import ENUM from .audit_mixin import AuditDateTimeMixin, AuditUserMixin @@ -53,6 +53,11 @@ class Process(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model): is_subflow = db.Column(db.Boolean, default=False) status_changed = db.Column(db.Boolean, default=False) + __table_args__ = ( + UniqueConstraint("process_key", "tenant", name="uq_tenant_process_key"), + UniqueConstraint("name", "tenant", name="uq_tenant_process_name"), + ) + @classmethod def create_from_dict(cls, process_data: dict) -> Process: """Create a new process from a dictionary.""" diff --git a/forms-flow-api/src/formsflow_api/resources/process.py b/forms-flow-api/src/formsflow_api/resources/process.py index 4a9f5cd33a..b7e6bf3de8 100644 --- a/forms-flow-api/src/formsflow_api/resources/process.py +++ b/forms-flow-api/src/formsflow_api/resources/process.py @@ -11,6 +11,7 @@ profiletime, ) +from formsflow_api.schemas import ProcessDataSchema from formsflow_api.services import ProcessService API = Namespace("Process", description="Process") @@ -201,7 +202,8 @@ def post(): response = ProcessService.create_process( process_data=process_data, process_type=process_type, is_subflow=True ) - return response, HTTPStatus.CREATED + response_data = ProcessDataSchema().dump(response) + return response_data, HTTPStatus.CREATED @cors_preflight("GET, PUT, DELETE, OPTIONS") @@ -418,5 +420,37 @@ class ProcessResourceByProcessKey(Resource): ) def get(process_key: str): """Get process data by process key.""" - response, status = ProcessService.get_process_by_key(process_key), HTTPStatus.OK + response, status = ( + ProcessService.get_process_by_key(process_key, request), + HTTPStatus.OK, + ) return response, status + + +@cors_preflight("POST,OPTIONS") +@API.route("/migrate", methods=["POST", "OPTIONS"]) +class MigrateResource(Resource): + """Resource to support migration.""" + + @staticmethod + @auth.has_one_of_roles([CREATE_DESIGNS]) + @profiletime + @API.response(200, "OK:- Successful request.") + @API.response( + 400, + "BAD_REQUEST:- Invalid request.", + ) + @API.response( + 401, + "UNAUTHORIZED:- Authorization header not provided or an invalid token passed.", + ) + @API.response( + 403, + "FORBIDDEN:- Authorization will not help.", + ) + def post(): + """Migrate by process_key.""" + return ( + ProcessService.migrate(request), + HTTPStatus.OK, + ) diff --git a/forms-flow-api/src/formsflow_api/schemas/__init__.py b/forms-flow-api/src/formsflow_api/schemas/__init__.py index f4705aa1d7..5dd41c3f2e 100644 --- a/forms-flow-api/src/formsflow_api/schemas/__init__.py +++ b/forms-flow-api/src/formsflow_api/schemas/__init__.py @@ -37,6 +37,7 @@ form_workflow_schema, ) from .process import ( + MigrateRequestSchema, ProcessDataSchema, ProcessListRequestSchema, ProcessListSchema, diff --git a/forms-flow-api/src/formsflow_api/schemas/process.py b/forms-flow-api/src/formsflow_api/schemas/process.py index 1585dd356d..563622ada6 100644 --- a/forms-flow-api/src/formsflow_api/schemas/process.py +++ b/forms-flow-api/src/formsflow_api/schemas/process.py @@ -130,3 +130,15 @@ def load(self, data, *args, **kwargs): if process_type and process_type.upper() == "LOWCODE" and process_data: data["processData"] = json.dumps(process_data) return super().load(data, *args, **kwargs) + + +class MigrateRequestSchema(Schema): + """This class manages migrate request schema.""" + + class Meta: # pylint: disable=too-few-public-methods + """Exclude unknown fields in the deserialized output.""" + + unknown = EXCLUDE + + process_key = fields.Str(data_key="processKey", required=True) + mapper_id = fields.Str(data_key="mapperId", required=True) diff --git a/forms-flow-api/src/formsflow_api/services/form_process_mapper.py b/forms-flow-api/src/formsflow_api/services/form_process_mapper.py index b805ec4876..9789046ebc 100644 --- a/forms-flow-api/src/formsflow_api/services/form_process_mapper.py +++ b/forms-flow-api/src/formsflow_api/services/form_process_mapper.py @@ -284,13 +284,6 @@ def check_tenant_authorization_by_formid(form_id: int, **kwargs) -> int: raise BusinessException(BusinessErrorCode.PERMISSION_DENIED) return - @staticmethod - def clean_form_name(name): - """Remove invalid characters from form_name before setting as process key.""" - # Remove non-letters at the start, and any invalid characters elsewhere - name = re.sub(r"^[^a-zA-Z]+|[^a-zA-Z0-9\-_]", "", name) - return name - @staticmethod def validate_process_and_update_mapper(name, mapper): """Validate process name/key exists, if exists update name & update mapper.""" @@ -363,7 +356,7 @@ def create_form(data, is_designer): process_name = response.get("name") # process key/Id doesn't support numbers & special characters at start # special characters anywhere so clean them before setting as process key - process_name = FormProcessMapperService.clean_form_name(process_name) + process_name = ProcessService.clean_form_name(process_name) mapper_data = { "formId": form_id, "formName": response.get("title"), diff --git a/forms-flow-api/src/formsflow_api/services/import_support.py b/forms-flow-api/src/formsflow_api/services/import_support.py index d0005712a7..caa3056164 100644 --- a/forms-flow-api/src/formsflow_api/services/import_support.py +++ b/forms-flow-api/src/formsflow_api/services/import_support.py @@ -310,7 +310,7 @@ def import_new_form_workflow( process_name = form_response.get("name") # process key/Id doesn't support numbers & special characters at start # special characters anywhere so clean them before setting as process key - process_name = FormProcessMapperService.clean_form_name(process_name) + process_name = ProcessService.clean_form_name(process_name) mapper_data = { "form_id": form_id, "form_name": form_response.get("title"), diff --git a/forms-flow-api/src/formsflow_api/services/process.py b/forms-flow-api/src/formsflow_api/services/process.py index 7d41042047..404414150e 100644 --- a/forms-flow-api/src/formsflow_api/services/process.py +++ b/forms-flow-api/src/formsflow_api/services/process.py @@ -1,15 +1,24 @@ """This exposes process service.""" import json +import re +from collections import Counter from flask import current_app from formsflow_api_utils.exceptions import BusinessException +from formsflow_api_utils.services.external import FormioService from formsflow_api_utils.utils.user_context import UserContext, user_context from lxml import etree from formsflow_api.constants import BusinessErrorCode, default_flow_xml_data -from formsflow_api.models import Process, ProcessStatus, ProcessType +from formsflow_api.models import ( + FormProcessMapper, + Process, + ProcessStatus, + ProcessType, +) from formsflow_api.schemas import ( + MigrateRequestSchema, ProcessDataSchema, ProcessHistorySchema, ProcessListRequestSchema, @@ -19,7 +28,7 @@ processSchema = ProcessDataSchema() -class ProcessService: # pylint: disable=too-few-public-methods +class ProcessService: # pylint: disable=too-few-public-methods,too-many-public-methods """This class manages process service.""" @classmethod @@ -29,6 +38,113 @@ def xml_parser(cls, process_data): parser = etree.XMLParser(resolve_entities=False) return etree.fromstring(process_data.encode("utf-8"), parser=parser) + @classmethod + def remove_duplicate_multitenant(cls, process_list, process_type): + """Remove duplicates on default workflows provided.""" + # Incase of multitenant env, there's possiblity of duplicate workflow with tenant & without tenant + # Exclude workflow without tenant in this scenario while migrating + default_bpm_list = [ + "Defaultflow", + "onestepapproval", + "two-step-approval", + "EmailNotification", + ] + default_dmn_list = ["email-template-example"] + default_process_list = ( + default_dmn_list + if process_type == ProcessType.DMN.value + else default_bpm_list + ) + # Count occurrences of each key in default_list in process list + key_counts = Counter( + process["key"] + for process in process_list + if process["key"] in default_process_list + ) + + # Filter process_list based on key counts and tenant condition + filtered_process_list = [ + process + for process in process_list + if process["key"] not in default_process_list + or key_counts[process["key"]] == 1 + or process["tenant"] is not None + ] + return filtered_process_list + + @classmethod + def check_duplicate_names(cls, process_list): + """Check for duplicate bpmn/dmn names before migrate.""" + # DMN/BPMN keys will be unique but names can be duplicate + # To avoid exists error before migrate to process table make it unique + current_app.logger.info("Check for duplicate bpmn/dmn names...") + name_tenant_counts = {} + name_tenant_suffix_tracker = {} + + # count occurrences and prepare suffix tracker + for item in process_list: + key = (item["name"], item["tenantId"]) + name_tenant_counts[key] = name_tenant_counts.get(key, 0) + 1 + + # create the unique names based on the counts + unique_name_process_list = [] + for item in process_list: + key = (item["name"], item["tenantId"]) + if name_tenant_counts[key] > 1: + # Increment suffix count and create new name + name_tenant_suffix_tracker[key] = ( + name_tenant_suffix_tracker.get(key, 0) + 1 + ) + new_name = f"{item['name']}_{name_tenant_suffix_tracker[key] - 1}" + else: + new_name = item["name"] + + # Append item with the unique name to the final list + unique_name_process_list.append({**item, "name": new_name}) + return unique_name_process_list + + @classmethod + @user_context + def get_subflows_dmns(cls, process_type, **kwargs): + """Fetch subflows & dmns from camunda & save to process table.""" + current_app.logger.debug(f"Fetching DMN/BPMN...{process_type}") + user: UserContext = kwargs["user"] + tenant_key = user.tenant_key + token = user.bearer_token + process_list = [] + mapper_process_keys = [] + if process_type == ProcessType.BPMN.value: + mappers = FormProcessMapper.find_all() + mapper_process_keys = [mapper.process_key for mapper in mappers] + current_app.logger.debug(f"mapper_process_keys...{mapper_process_keys}") + url_path = "&includeProcessDefinitionsWithoutTenantId=true" + process_list = BPMService.get_all_process(token, url_path) + elif process_type == ProcessType.DMN.value: + url_path = ( + "?latestVersion=true&includeDecisionDefinitionsWithoutTenantId=true" + ) + process_list = BPMService.get_decision(token, url_path) + if process_list: + if current_app.config.get("MULTI_TENANCY_ENABLED"): + process_list = cls.remove_duplicate_multitenant( + process_list, process_type + ) + process_list = cls.check_duplicate_names(process_list) + # Exclude process keys from mapper to exclude any keys present in unique_mapper_keys + filtered_processes = [ + (process["key"], process["name"]) + for process in process_list + if process["key"] not in set(mapper_process_keys) + ] + for process_key, process_name in filtered_processes: + cls.fetch_save_xml( + process_key, + tenant_key=tenant_key, + process_type=process_type, + is_subflow=True, + process_name=process_name, + ) + @classmethod def get_all_process(cls, request_args): # pylint:disable=too-many-locals """Get all process list.""" @@ -52,22 +168,31 @@ def get_all_process(cls, request_args): # pylint:disable=too-many-locals sort_order = dict_data.get("sort_order", "") sort_by = sort_by.split(",") sort_order = sort_order.split(",") - process, count = Process.find_all_process( - created_from=created_from_date, - created_to=created_to_date, - modified_from=modified_from_date, - modified_to=modified_to_date, - sort_by=sort_by, - is_subflow=True, # now only for subflow listing - sort_order=sort_order, - created_by=created_by, - id=process_id, - process_name=process_name, - process_status=status, - process_type=process_type, - page_no=page_no, - limit=limit, - ) + + def list_process(): + process, count = Process.find_all_process( + created_from=created_from_date, + created_to=created_to_date, + modified_from=modified_from_date, + modified_to=modified_to_date, + sort_by=sort_by, + is_subflow=True, # now only for subflow listing + sort_order=sort_order, + created_by=created_by, + id=process_id, + process_name=process_name, + process_status=status, + process_type=process_type, + page_no=page_no, + limit=limit, + ) + return process, count + + process, count = list_process() + # If process empty consider it as subflows not migrated, so fetch from camunda + if not process: + cls.get_subflows_dmns(process_type) + process, count = list_process() return ( ProcessDataSchema(exclude=["process_data"]).dump(process, many=True), count, @@ -107,6 +232,7 @@ def create_process( # pylint: disable=too-many-arguments, too-many-positional-a process_name=None, process_key=None, is_subflow=False, + is_migrate=False, **kwargs, ): """Save process data.""" @@ -124,6 +250,7 @@ def create_process( # pylint: disable=too-many-arguments, too-many-positional-a is_subflow=is_subflow, process_name=process_name, process_key=process_key, + is_migrate=is_migrate, ) # Check if the process already exists if it is a subflow @@ -150,9 +277,7 @@ def create_process( # pylint: disable=too-many-arguments, too-many-positional-a "parent_process_key": process_key, } process = Process.create_from_dict(process_dict) - - # Return the serialized process data - return processSchema.dump(process) + return process @staticmethod def get_process_by_type(root, process_type): @@ -190,12 +315,13 @@ def _process_data_name_and_key( # pylint: disable=too-many-arguments, too-many- is_subflow=False, process_name=None, process_key=None, + is_migrate=False, ): """Process data name key.""" # if the process is not a subflow, update the process name and ID in the XML data # if the process is a subflow, parse the XML data to extract the process name and key # if the process is of type LOWCODE, convert the process data to JSON format - if is_subflow and process_type.upper() != "LOWCODE": + if is_subflow and process_type.upper() != "LOWCODE" and not is_migrate: # Parse the XML data to extract process name and key for subflows root = cls.xml_parser(process_data) process = cls.get_process_by_type(root, process_type) @@ -216,11 +342,78 @@ def _process_data_name_and_key( # pylint: disable=too-many-arguments, too-many- ) return process_data, process_name, process_key + @staticmethod + def clean_form_name(name): + """Remove invalid characters from form_name before setting as process key.""" + # Remove non-letters at the start, and any invalid characters elsewhere + name = re.sub(r"(^[^a-zA-Z]+)|([^a-zA-Z0-9\-_])", "", name) + return name + @classmethod - def get_process_by_key(cls, process_key): + @user_context + def fetch_save_xml( # pylint: disable=too-many-arguments, too-many-positional-arguments + cls, + process_key, + tenant_key, + process_type=ProcessType.BPMN.value, + updated_process_key=None, + is_subflow=False, + process_name=None, + **kwargs, + ): + """Fetch process xml from camunda & save in process.""" + current_app.logger.debug(f"Fetching xml for process: {process_key}") + user: UserContext = kwargs["user"] + if process_type == ProcessType.DMN.value: + xml_data = BPMService.decision_definition_xml( + process_key, user.bearer_token, tenant_key + ).get("dmnXml") + else: + xml_data = BPMService.process_definition_xml( + process_key, user.bearer_token, tenant_key + ).get("bpmn20Xml") + current_app.logger.debug(f"Completed fetching xml for process: {process_key}") + # Incase of migration we need to use the filtered form name as process key + process_key = updated_process_key if updated_process_key else process_key + current_app.logger.debug(f"Create process: {process_key}") + process = cls.create_process( + process_data=xml_data, + process_type=process_type, + process_key=process_key, + process_name=process_name if process_name else process_key, + is_subflow=is_subflow, + is_migrate=True, + ) + current_app.logger.debug(f"Completed fetch &save process {process_key}") + return process + + @classmethod + def get_process_by_key(cls, process_key, request): """Get process by key.""" current_app.logger.debug(f"Get process data for process key: {process_key}") process = Process.get_latest_version_by_key(process_key) + mapper_id = request.args.get("mapperId") + # If process is not found check whether it's migrated + if not process and mapper_id: + current_app.logger.debug(f"Checking mapper_id is_migrated: {mapper_id}") + mapper = FormProcessMapper.find_form_by_id(mapper_id) + if mapper and mapper.is_migrated is False: + current_app.logger.debug(f"Mapper Id: {mapper_id} is not migrated") + process = cls.fetch_save_xml(mapper.process_key, mapper.process_tenant) + # Check for mappers with process_key exists + mappers = FormProcessMapper.get_mappers_by_process_key( + mapper.process_key, mapper_id + ) + current_app.logger.debug( + f"Found mappers: {mappers} with same process_key." + ) + # If there are no mappers with the process key update is_migrated=True + if not mappers: + current_app.logger.debug( + f"Updating mapper_id is_migrated: {mapper_id}" + ) + mapper.is_migrated = True + mapper.save() if process: process_data = processSchema.dump(process) # Determine version numbers based on the process status @@ -481,3 +674,49 @@ def get_process_by_id(cls, process_id): if process: return processSchema.dump(process) raise BusinessException(BusinessErrorCode.PROCESS_ID_NOT_FOUND) + + @classmethod + def migrate(cls, request): + """Migrate by process key.""" + current_app.logger.debug("Migrate process started..") + data = MigrateRequestSchema().load(request.get_json()) + process_key = data.get("process_key") + mapper_id = data.get("mapper_id") + mappers = FormProcessMapper.get_mappers_by_process_key(process_key, mapper_id) + current_app.logger.debug(f"Mappers found..{mappers}") + if mappers: + for mapper in mappers: + formio_service = FormioService() + form_io_token = formio_service.get_formio_access_token() + form_json = formio_service.get_form_by_id(mapper.form_id, form_io_token) + form_name = form_json.get("name") + # process key doesn't support numbers & special characters at start + # special characters anywhere so clean them before setting as process key + updated_process_key = cls.clean_form_name(form_name) + if updated_process_key: + # validate process key already exists, if exists append mapper id to process_key. + process = Process.find_process_by_name_key( + name=updated_process_key, process_key=updated_process_key + ) + if process: + updated_process_key = f"{updated_process_key}_{mapper.id}" + # This is to avoid empty process_key after clean form name + else: + updated_process_key = f"{process_key}_migrate_{mapper.id}" + cls.fetch_save_xml( + process_key, + mapper.process_tenant, + updated_process_key=updated_process_key, + ) + # Update mapper with new process key & is_migrated as True + mapper.update( + { + "is_migrated": True, + "process_key": updated_process_key, + "process_name": updated_process_key, + } + ) + # Update is_migrated to main mapper by id. + mapper = FormProcessMapper.find_form_by_id(mapper_id) + mapper.update({"is_migrated": True}) + return {} From 1ac5851aa03e7a0ff1a293f40f8615110e7c0818 Mon Sep 17 00:00:00 2001 From: auslin-aot <99173163+auslin-aot@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:18:16 +0530 Subject: [PATCH 2/4] FWF-3718: [Feature] Add testcase --- .../src/formsflow_api/constants/__init__.py | 1 + .../src/formsflow_api/services/process.py | 5 +- forms-flow-api/tests/unit/api/test_process.py | 79 +++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/forms-flow-api/src/formsflow_api/constants/__init__.py b/forms-flow-api/src/formsflow_api/constants/__init__.py index 4898356016..ecfa9addb1 100644 --- a/forms-flow-api/src/formsflow_api/constants/__init__.py +++ b/forms-flow-api/src/formsflow_api/constants/__init__.py @@ -110,6 +110,7 @@ class BusinessErrorCode(ErrorCodeMixin, Enum): HTTPStatus.BAD_REQUEST, ) FORM_VALIDATION_FAILED = "FORM_VALIDATION_FAILED.", HTTPStatus.BAD_REQUEST + INVALID_PROCESS = "Invalid process.", HTTPStatus.BAD_REQUEST def __new__(cls, message, status_code): """Constructor.""" diff --git a/forms-flow-api/src/formsflow_api/services/process.py b/forms-flow-api/src/formsflow_api/services/process.py index 404414150e..fa83c5f129 100644 --- a/forms-flow-api/src/formsflow_api/services/process.py +++ b/forms-flow-api/src/formsflow_api/services/process.py @@ -682,6 +682,10 @@ def migrate(cls, request): data = MigrateRequestSchema().load(request.get_json()) process_key = data.get("process_key") mapper_id = data.get("mapper_id") + mapper = FormProcessMapper.find_form_by_id(mapper_id) + # If the process_key in the mapper is different from the process_key in the payload + if mapper.process_key != process_key: + raise BusinessException(BusinessErrorCode.INVALID_PROCESS) mappers = FormProcessMapper.get_mappers_by_process_key(process_key, mapper_id) current_app.logger.debug(f"Mappers found..{mappers}") if mappers: @@ -717,6 +721,5 @@ def migrate(cls, request): } ) # Update is_migrated to main mapper by id. - mapper = FormProcessMapper.find_form_by_id(mapper_id) mapper.update({"is_migrated": True}) return {} diff --git a/forms-flow-api/tests/unit/api/test_process.py b/forms-flow-api/tests/unit/api/test_process.py index 5d2be754cd..43a7ccad4c 100644 --- a/forms-flow-api/tests/unit/api/test_process.py +++ b/forms-flow-api/tests/unit/api/test_process.py @@ -15,6 +15,20 @@ ) +def mapper_payload(form_id, form_name): + """Mapper payload.""" + return { + "formId": form_id, + "formName": form_name, + "processKey": "onestepapproval", + "processName": "One Step Approval", + "status": "inactive", + "formType": "form", + "parentFormId": form_id, + "is_migrated": False, + } + + def ensure_process_data_binary(process_id): """Convert process_data to binary if string.""" process = Process.query.get(process_id) @@ -568,3 +582,68 @@ def test_get_process_by_key_invalid_key(self, app, client, session, jwt): headers=headers, ) assert response.status_code == 400 + + +class MigrateProcess: + """Test suite for the migrate process.""" + + def migrate_process_success(self, app, client, session, jwt, create_mapper_custom): + """Migrate process with success.""" + rv = create_mapper_custom( + mapper_payload(form_id="1234", form_name="Sample form1") + ) + mapper_id = rv["id"] + create_mapper_custom(mapper_payload(form_id="12345", form_name="Sample form2")) + token = get_token(jwt, role=CREATE_DESIGNS, username="designer") + headers = { + "Authorization": f"Bearer {token}", + "content-type": "application/json", + } + rv = client.post( + "/process/migrate", + headers=headers, + json={"mapperId": mapper_id, "processKey": "onestepapproval"}, + ) + assert rv.status_code == 200 + + def migrate_process_unauthorized( + self, app, client, session, jwt, create_mapper_custom + ): + """Migrate process without proper authorization.""" + rv = create_mapper_custom( + mapper_payload(form_id="1234", form_name="Sample form1") + ) + mapper_id = rv["id"] + create_mapper_custom(mapper_payload(form_id="12345", form_name="Sample form2")) + rv = client.post( + "/process/migrate", + json={"mapperId": mapper_id, "processKey": "onestepapproval"}, + ) + assert rv.status_code == 401 + + def migrate_process_invalid(self, app, client, session, jwt, create_mapper_custom): + """Migrate process with invalid data.""" + rv = create_mapper_custom( + mapper_payload(form_id="1234", form_name="Sample form1") + ) + mapper_id = rv["id"] + create_mapper_custom(mapper_payload(form_id="12345", form_name="Sample form2")) + token = get_token(jwt, role=CREATE_DESIGNS, username="designer") + headers = { + "Authorization": f"Bearer {token}", + "content-type": "application/json", + } + # Test with different process_key other than mapper process_key + rv = client.post( + "/process/migrate", + headers=headers, + json={"mapperId": mapper_id, "processKey": "twostepapproval"}, + ) + assert rv.status_code == 400 + # Test with invalid mapper Id + rv = client.post( + "/process/migrate", + headers=headers, + json={"mapperId": 99, "processKey": "twostepapproval"}, + ) + assert rv.status_code == 400 From 6dd55c2dde68d85e8f37f6b848c54dc3fca81bed Mon Sep 17 00:00:00 2001 From: auslin-aot <99173163+auslin-aot@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:13:05 +0530 Subject: [PATCH 3/4] FWF-3718: [Feature] Avoid multiple fetches of XML data in migrate api. --- .../src/formsflow_api/services/process.py | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/forms-flow-api/src/formsflow_api/services/process.py b/forms-flow-api/src/formsflow_api/services/process.py index fa83c5f129..4de979dc71 100644 --- a/forms-flow-api/src/formsflow_api/services/process.py +++ b/forms-flow-api/src/formsflow_api/services/process.py @@ -191,6 +191,7 @@ def list_process(): process, count = list_process() # If process empty consider it as subflows not migrated, so fetch from camunda if not process: + current_app.logger.debug("Fetching subflows...") cls.get_subflows_dmns(process_type) process, count = list_process() return ( @@ -359,20 +360,25 @@ def fetch_save_xml( # pylint: disable=too-many-arguments, too-many-positional-a updated_process_key=None, is_subflow=False, process_name=None, + xml_data=None, **kwargs, ): """Fetch process xml from camunda & save in process.""" - current_app.logger.debug(f"Fetching xml for process: {process_key}") + current_app.logger.debug(f"Fetch & save for process: {process_key}") user: UserContext = kwargs["user"] - if process_type == ProcessType.DMN.value: - xml_data = BPMService.decision_definition_xml( - process_key, user.bearer_token, tenant_key - ).get("dmnXml") - else: - xml_data = BPMService.process_definition_xml( - process_key, user.bearer_token, tenant_key - ).get("bpmn20Xml") - current_app.logger.debug(f"Completed fetching xml for process: {process_key}") + if not xml_data: + current_app.logger.debug(f"Fetching xml for process: {process_key}") + if process_type == ProcessType.DMN.value: + xml_data = BPMService.decision_definition_xml( + process_key, user.bearer_token, tenant_key + ).get("dmnXml") + else: + xml_data = BPMService.process_definition_xml( + process_key, user.bearer_token, tenant_key + ).get("bpmn20Xml") + current_app.logger.debug( + f"Completed fetching xml for process: {process_key}" + ) # Incase of migration we need to use the filtered form name as process key process_key = updated_process_key if updated_process_key else process_key current_app.logger.debug(f"Create process: {process_key}") @@ -676,19 +682,28 @@ def get_process_by_id(cls, process_id): raise BusinessException(BusinessErrorCode.PROCESS_ID_NOT_FOUND) @classmethod - def migrate(cls, request): + @user_context + def migrate(cls, request, **kwargs): # pylint:disable=too-many-locals """Migrate by process key.""" current_app.logger.debug("Migrate process started..") data = MigrateRequestSchema().load(request.get_json()) process_key = data.get("process_key") mapper_id = data.get("mapper_id") - mapper = FormProcessMapper.find_form_by_id(mapper_id) + request_mapper = FormProcessMapper.find_form_by_id(mapper_id) # If the process_key in the mapper is different from the process_key in the payload - if mapper.process_key != process_key: + if request_mapper.process_key != process_key: raise BusinessException(BusinessErrorCode.INVALID_PROCESS) mappers = FormProcessMapper.get_mappers_by_process_key(process_key, mapper_id) current_app.logger.debug(f"Mappers found..{mappers}") if mappers: + xml_data = None + if not current_app.config.get("MULTI_TENANCY_ENABLED"): + # Incase of non multitenant env, fetch once the process xml data + user: UserContext = kwargs["user"] + current_app.logger.debug("Fetching process..") + xml_data = BPMService.process_definition_xml( + process_key, user.bearer_token, user.tenant_key + ).get("bpmn20Xml") for mapper in mappers: formio_service = FormioService() form_io_token = formio_service.get_formio_access_token() @@ -711,6 +726,7 @@ def migrate(cls, request): process_key, mapper.process_tenant, updated_process_key=updated_process_key, + xml_data=xml_data, ) # Update mapper with new process key & is_migrated as True mapper.update( @@ -721,5 +737,5 @@ def migrate(cls, request): } ) # Update is_migrated to main mapper by id. - mapper.update({"is_migrated": True}) + request_mapper.update({"is_migrated": True}) return {} From f844de56a26bd7a768cd3cf8fccbf3a708596a28 Mon Sep 17 00:00:00 2001 From: auslin-aot <99173163+auslin-aot@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:43:51 +0530 Subject: [PATCH 4/4] FWF-3718: [Feature] Added is_migrated TRUE for workflows with only one form linked to migration file --- ...960477800395_workflow_migration_changes.py | 34 +++++++++++++++++++ .../services/form_process_mapper.py | 3 ++ .../formsflow_api/services/import_support.py | 1 + .../src/formsflow_api/services/process.py | 25 ++++---------- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py b/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py index 45b718d63c..5b7b0fcac4 100644 --- a/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py +++ b/forms-flow-api/migrations/versions/960477800395_workflow_migration_changes.py @@ -8,6 +8,7 @@ from alembic import op import sqlalchemy as sa from sqlalchemy.sql import text +from collections import Counter # revision identifiers, used by Alembic. revision = '960477800395' @@ -26,6 +27,39 @@ def upgrade(): WHERE process_name ~ '\\([a-zA-Z0-9_-]+\\)$';""") # Execute the SQL statement op.execute(update_query) + + # code to set the is_migrated field to TRUE for workflow(process) with only one form. + conn = op.get_bind() + # Subquery to get the latest non-deleted row per `parent_form_id` + latest_rows_sql = """ + SELECT process_key, id + FROM ( + SELECT process_key, + parent_form_id, + id, + ROW_NUMBER() OVER (PARTITION BY parent_form_id ORDER BY id DESC) AS row_num + FROM public.form_process_mapper + WHERE deleted = false + ) AS latest_rows + WHERE row_num = 1 + """ + latest_rows = conn.execute(sa.text(latest_rows_sql)).mappings().all() + #Count occurrences of each process_key within the latest rows + process_key_counts = Counter(row["process_key"] for row in latest_rows) + + #Update is_migrated for each latest row based on the process_key count + for row in latest_rows: + process_key = row["process_key"] + row_id = row["id"] + + # Update is_migrated to true if process_key appears in a single parent_form_id group + if process_key_counts[process_key] == 1: + update_sql = """ + UPDATE public.form_process_mapper + SET is_migrated = true + WHERE id = :row_id + """ + conn.execute(sa.text(update_sql), {"row_id": row_id}) # ### end Alembic commands ### diff --git a/forms-flow-api/src/formsflow_api/services/form_process_mapper.py b/forms-flow-api/src/formsflow_api/services/form_process_mapper.py index fd1effb48b..699ca603f3 100644 --- a/forms-flow-api/src/formsflow_api/services/form_process_mapper.py +++ b/forms-flow-api/src/formsflow_api/services/form_process_mapper.py @@ -360,6 +360,7 @@ def create_form(data, is_designer): # pylint:disable=too-many-locals anonymous = False description = data.get("description", "") task_variable = [] + is_migrated = True current_app.logger.info(f"Creating new form {is_new_form}") # If creating new version for a existing form, fetch process key, name from mapper if not is_new_form: @@ -370,6 +371,7 @@ def create_form(data, is_designer): # pylint:disable=too-many-locals anonymous = mapper.is_anonymous description = mapper.description task_variable = json.loads(mapper.task_variable) + is_migrated = mapper.is_migrated else: # if new form, form name is kept as process_name & process key process_name = response.get("name") @@ -391,6 +393,7 @@ def create_form(data, is_designer): # pylint:disable=too-many-locals "status": FormProcessMapperStatus.INACTIVE.value, "anonymous": anonymous, "task_variable": task_variable, + "is_migrated": is_migrated, } mapper = FormProcessMapperService.mapper_create(mapper_data) diff --git a/forms-flow-api/src/formsflow_api/services/import_support.py b/forms-flow-api/src/formsflow_api/services/import_support.py index 6f87b3fa80..8c127ea384 100644 --- a/forms-flow-api/src/formsflow_api/services/import_support.py +++ b/forms-flow-api/src/formsflow_api/services/import_support.py @@ -417,6 +417,7 @@ def import_form( "titleChanged": title_changed, "anonymousChanged": anonymous_changed, "description": mapper.description if form_only else description, + "is_migrated": mapper.is_migrated, } FormProcessMapperService.mapper_create(mapper_data) FormProcessMapperService.mark_unpublished(mapper.id) diff --git a/forms-flow-api/src/formsflow_api/services/process.py b/forms-flow-api/src/formsflow_api/services/process.py index 4de979dc71..a36e6b921a 100644 --- a/forms-flow-api/src/formsflow_api/services/process.py +++ b/forms-flow-api/src/formsflow_api/services/process.py @@ -259,6 +259,9 @@ def create_process( # pylint: disable=too-many-arguments, too-many-positional-a if Process.find_process_by_name_key( name=process_name, process_key=process_key ): + current_app.logger.debug( + f"Process already exists..{process_name}:-{process_key}" + ) raise BusinessException(BusinessErrorCode.PROCESS_EXISTS) # Initialize version numbers for the new process @@ -399,27 +402,11 @@ def get_process_by_key(cls, process_key, request): current_app.logger.debug(f"Get process data for process key: {process_key}") process = Process.get_latest_version_by_key(process_key) mapper_id = request.args.get("mapperId") - # If process is not found check whether it's migrated + # If process is not found, fetch & save to process table. if not process and mapper_id: - current_app.logger.debug(f"Checking mapper_id is_migrated: {mapper_id}") + current_app.logger.debug("Process not found in db. Fetching & save it.") mapper = FormProcessMapper.find_form_by_id(mapper_id) - if mapper and mapper.is_migrated is False: - current_app.logger.debug(f"Mapper Id: {mapper_id} is not migrated") - process = cls.fetch_save_xml(mapper.process_key, mapper.process_tenant) - # Check for mappers with process_key exists - mappers = FormProcessMapper.get_mappers_by_process_key( - mapper.process_key, mapper_id - ) - current_app.logger.debug( - f"Found mappers: {mappers} with same process_key." - ) - # If there are no mappers with the process key update is_migrated=True - if not mappers: - current_app.logger.debug( - f"Updating mapper_id is_migrated: {mapper_id}" - ) - mapper.is_migrated = True - mapper.save() + process = cls.fetch_save_xml(mapper.process_key, mapper.process_tenant) if process: process_data = processSchema.dump(process) # Determine version numbers based on the process status