Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Workflow Migration #2325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Workflow Migration changes

Revision ID: 960477800395
Revises: b338018ad0e9
Create Date: 2024-10-29 11:24:12.569841

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import text
from collections import Counter

# revision identifiers, used by Alembic.
revision = '960477800395'
down_revision = 'b338018ad0e9'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('form_process_mapper', sa.Column('is_migrated', sa.Boolean(), nullable=True, comment="Is workflow migrated", server_default='false'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will also need to set the value to TRUE for workflows (process) with only one form linked, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, it's done as part of getting by process key API. After fetching from camunda & is_migrated set to True for workflows (process) with only one form.
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Added is_migrated TRUE for workflows with only one form linked to the migration file
image

# Update process_name of format process_name(process_key) to process_name
# Ex: "Two Step Approval (two-step-approval)" to "Two Step Approval"
update_query = text("""UPDATE public.form_process_mapper
SET process_name = regexp_replace(process_name, '\\s*\\([a-zA-Z0-9_-]+\\)$', '')
WHERE process_name ~ '\\([a-zA-Z0-9_-]+\\)$';""")
# Execute the SQL statement
op.execute(update_query)

# code to set the is_migrated field to TRUE for workflow(process) with only one form.
conn = op.get_bind()
# Subquery to get the latest non-deleted row per `parent_form_id`
latest_rows_sql = """
SELECT process_key, id
FROM (
SELECT process_key,
parent_form_id,
id,
ROW_NUMBER() OVER (PARTITION BY parent_form_id ORDER BY id DESC) AS row_num
FROM public.form_process_mapper
WHERE deleted = false
) AS latest_rows
WHERE row_num = 1
"""
latest_rows = conn.execute(sa.text(latest_rows_sql)).mappings().all()
#Count occurrences of each process_key within the latest rows
process_key_counts = Counter(row["process_key"] for row in latest_rows)

#Update is_migrated for each latest row based on the process_key count
for row in latest_rows:
process_key = row["process_key"]
row_id = row["id"]

# Update is_migrated to true if process_key appears in a single parent_form_id group
if process_key_counts[process_key] == 1:
update_sql = """
UPDATE public.form_process_mapper
SET is_migrated = true
WHERE id = :row_id
"""
conn.execute(sa.text(update_sql), {"row_id": row_id})
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('form_process_mapper', 'is_migrated')
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
branch_labels = None
depends_on = None

conn = op.get_bind()
form_url_exists = conn.execute(sa.text("SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='public' AND table_name='application' AND column_name='form_url');"))
form_url_exists = form_url_exists.fetchone()[0]
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
form_url_exists = conn.execute(sa.text("SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema='public' AND table_name='application' AND column_name='form_url');"))
form_url_exists = form_url_exists.fetchone()[0]
op.add_column('application', sa.Column('submission_id', sa.String(length=100), nullable=True))
op.add_column('application', sa.Column('latest_form_id', sa.String(length=100), nullable=True))
if(form_url_exists):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Added constaints to process table.

Revision ID: e9ce28b78478
Revises: 960477800395
Create Date: 2024-11-04 12:00:58.046099

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e9ce28b78478'
down_revision = '960477800395'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_unique_constraint('uq_tenant_process_key', 'process', ['process_key', 'tenant'])
op.create_unique_constraint('uq_tenant_process_name', 'process', ['name', 'tenant'])
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('uq_tenant_process_name', 'process', type_='unique')
op.drop_constraint('uq_tenant_process_key', 'process', type_='unique')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions forms-flow-api/src/formsflow_api/constants/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class BusinessErrorCode(ErrorCodeMixin, Enum):
HTTPStatus.BAD_REQUEST,
)
FORM_VALIDATION_FAILED = "FORM_VALIDATION_FAILED.", HTTPStatus.BAD_REQUEST
INVALID_PROCESS = "Invalid process.", HTTPStatus.BAD_REQUEST
RESTRICT_FORM_DELETE = (
"Can't delete the form that has submissions associated with it.",
HTTPStatus.BAD_REQUEST,
Expand Down
55 changes: 44 additions & 11 deletions forms-flow-api/src/formsflow_api/models/form_process_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from .db import db


class FormProcessMapper(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model):
class FormProcessMapper(
AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model
): # pylint: disable=too-many-public-methods
"""This class manages form process mapper information."""

id = db.Column(db.Integer, primary_key=True)
Expand Down Expand Up @@ -52,6 +54,9 @@ class FormProcessMapper(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model)
version = db.Column(db.Integer, nullable=False, default=1)
description = db.Column(db.String, nullable=True)
prompt_new_version = db.Column(db.Boolean, nullable=True, default=False)
is_migrated = db.Column(
db.Boolean, nullable=True, default=False, comment="Is workflow migrated"
)

__table_args__ = (
UniqueConstraint("form_id", "version", "tenant", name="_form_version_uc"),
Expand All @@ -78,6 +83,7 @@ def create_from_dict(cls, mapper_info: dict) -> FormProcessMapper:
mapper.task_variable = mapper_info.get("task_variable")
mapper.version = mapper_info.get("version")
mapper.description = mapper_info.get("description")
mapper.is_migrated = mapper_info.get("is_migrated", True)
mapper.save()
return mapper
except Exception as err: # pylint: disable=broad-except
Expand Down Expand Up @@ -106,6 +112,7 @@ def update(self, mapper_info: dict):
"process_tenant",
"description",
"prompt_new_version",
"is_migrated",
],
mapper_info,
)
Expand All @@ -123,17 +130,10 @@ def mark_unpublished(self):
self.commit()

@classmethod
def find_all(cls, page_number, limit):
def find_all(cls):
"""Fetch all the form process mappers."""
if page_number == 0:
query = cls.query.order_by(FormProcessMapper.id.desc()).all()
else:
query = (
cls.query.order_by(FormProcessMapper.id.desc())
.paginate(page_number, limit, False)
.items
)
return query
query = cls.tenant_authorization(query=cls.query)
return query.all()

@classmethod
def filter_conditions(cls, **filters):
Expand Down Expand Up @@ -407,3 +407,36 @@ def get_latest_by_parent_form_id(cls, parent_form_id):
.first()
)
return query

@classmethod
@user_context
def get_mappers_by_process_key(cls, process_key=None, mapper_id=None, **kwargs):
"""Get all mappers matching given process key."""
# Define the subquery with the window function to get latest mappers by process_key
user: UserContext = kwargs["user"]
tenant_key: str = user.tenant_key
subquery = (
db.session.query(
cls.process_key,
cls.parent_form_id,
cls.id,
cls.deleted,
cls.form_id,
cls.tenant,
func.row_number() # pylint: disable=not-callable
.over(partition_by=cls.parent_form_id, order_by=cls.id.desc())
.label("row_num"),
).filter(
cls.process_key == process_key,
cls.deleted.is_(False),
cls.id != mapper_id,
cls.tenant == tenant_key,
)
).subquery("latest_mapper_rows_by_process_key")
# Only get the latest row in each parent_formid group
query = (
db.session.query(cls)
.join(subquery, cls.id == subquery.c.id)
.filter(subquery.c.row_num == 1)
)
return query.all()
7 changes: 6 additions & 1 deletion forms-flow-api/src/formsflow_api/models/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flask_sqlalchemy.query import Query
from formsflow_api_utils.utils import FILTER_MAPS, add_sort_filter
from formsflow_api_utils.utils.user_context import UserContext, user_context
from sqlalchemy import LargeBinary, and_, desc, func, or_
from sqlalchemy import LargeBinary, UniqueConstraint, and_, desc, func, or_
from sqlalchemy.dialects.postgresql import ENUM

from .audit_mixin import AuditDateTimeMixin, AuditUserMixin
Expand Down Expand Up @@ -53,6 +53,11 @@ class Process(AuditDateTimeMixin, AuditUserMixin, BaseModel, db.Model):
is_subflow = db.Column(db.Boolean, default=False)
status_changed = db.Column(db.Boolean, default=False)

__table_args__ = (
UniqueConstraint("process_key", "tenant", name="uq_tenant_process_key"),
UniqueConstraint("name", "tenant", name="uq_tenant_process_name"),
)

@classmethod
def create_from_dict(cls, process_data: dict) -> Process:
"""Create a new process from a dictionary."""
Expand Down
38 changes: 36 additions & 2 deletions forms-flow-api/src/formsflow_api/resources/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
profiletime,
)

from formsflow_api.schemas import ProcessDataSchema
from formsflow_api.services import ProcessService

API = Namespace("Process", description="Process")
Expand Down Expand Up @@ -204,7 +205,8 @@ def post():
response = ProcessService.create_process(
process_data=process_data, process_type=process_type, is_subflow=True
)
return response, HTTPStatus.CREATED
response_data = ProcessDataSchema().dump(response)
return response_data, HTTPStatus.CREATED


@cors_preflight("GET, PUT, DELETE, OPTIONS")
Expand Down Expand Up @@ -421,5 +423,37 @@ class ProcessResourceByProcessKey(Resource):
)
def get(process_key: str):
"""Get process data by process key."""
response, status = ProcessService.get_process_by_key(process_key), HTTPStatus.OK
response, status = (
ProcessService.get_process_by_key(process_key, request),
HTTPStatus.OK,
)
return response, status


@cors_preflight("POST,OPTIONS")
@API.route("/migrate", methods=["POST", "OPTIONS"])
class MigrateResource(Resource):
"""Resource to support migration."""

@staticmethod
@auth.has_one_of_roles([CREATE_DESIGNS])
@profiletime
@API.response(200, "OK:- Successful request.")
@API.response(
400,
"BAD_REQUEST:- Invalid request.",
)
@API.response(
401,
"UNAUTHORIZED:- Authorization header not provided or an invalid token passed.",
)
@API.response(
403,
"FORBIDDEN:- Authorization will not help.",
)
def post():
"""Migrate by process_key."""
return (
ProcessService.migrate(request),
HTTPStatus.OK,
)
1 change: 1 addition & 0 deletions forms-flow-api/src/formsflow_api/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
form_workflow_schema,
)
from .process import (
MigrateRequestSchema,
ProcessDataSchema,
ProcessListRequestSchema,
ProcessListSchema,
Expand Down
12 changes: 12 additions & 0 deletions forms-flow-api/src/formsflow_api/schemas/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,15 @@ def load(self, data, *args, **kwargs):
if process_type and process_type.upper() == "LOWCODE" and process_data:
data["processData"] = json.dumps(process_data)
return super().load(data, *args, **kwargs)


class MigrateRequestSchema(Schema):
"""This class manages migrate request schema."""

class Meta: # pylint: disable=too-few-public-methods
"""Exclude unknown fields in the deserialized output."""

unknown = EXCLUDE

process_key = fields.Str(data_key="processKey", required=True)
mapper_id = fields.Str(data_key="mapperId", required=True)
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,6 @@ def check_tenant_authorization_by_formid(
raise BusinessException(BusinessErrorCode.PERMISSION_DENIED)
return

@staticmethod
def clean_form_name(name):
"""Remove invalid characters from form_name before setting as process key."""
# Remove non-letters at the start, and any invalid characters elsewhere
name = re.sub(r"^[^a-zA-Z]+|[^a-zA-Z0-9\-_]", "", name)
return name

@staticmethod
def validate_process_and_update_mapper(name, mapper):
"""Validate process name/key exists, if exists update name & update mapper."""
Expand Down Expand Up @@ -388,6 +381,7 @@ def create_form(data, is_designer, **kwargs): # pylint:disable=too-many-locals
anonymous = False
description = data.get("description", "")
task_variable = []
is_migrated = True
current_app.logger.info(f"Creating new form {is_new_form}")
# If creating new version for a existing form, fetch process key, name from mapper
if not is_new_form:
Expand All @@ -398,12 +392,13 @@ def create_form(data, is_designer, **kwargs): # pylint:disable=too-many-locals
anonymous = mapper.is_anonymous
description = mapper.description
task_variable = json.loads(mapper.task_variable)
is_migrated = mapper.is_migrated
else:
# if new form, form name is kept as process_name & process key
process_name = response.get("name")
# process key/Id doesn't support numbers & special characters at start
# special characters anywhere so clean them before setting as process key
process_name = FormProcessMapperService.clean_form_name(process_name)
process_name = ProcessService.clean_form_name(process_name)

mapper_data = {
"formId": form_id,
Expand All @@ -419,6 +414,7 @@ def create_form(data, is_designer, **kwargs): # pylint:disable=too-many-locals
"status": FormProcessMapperStatus.INACTIVE.value,
"anonymous": anonymous,
"task_variable": task_variable,
"is_migrated": is_migrated,
}

mapper = FormProcessMapperService.mapper_create(mapper_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def import_new_form_workflow(
process_name = form_response.get("name")
# process key/Id doesn't support numbers & special characters at start
# special characters anywhere so clean them before setting as process key
process_name = FormProcessMapperService.clean_form_name(process_name)
process_name = ProcessService.clean_form_name(process_name)
mapper_data = {
"form_id": form_id,
"form_name": form_response.get("title"),
Expand Down Expand Up @@ -417,6 +417,7 @@ def import_form(
"titleChanged": title_changed,
"anonymousChanged": anonymous_changed,
"description": mapper.description if form_only else description,
"is_migrated": mapper.is_migrated,
}
FormProcessMapperService.mapper_create(mapper_data)
FormProcessMapperService.mark_unpublished(mapper.id)
Expand Down
Loading
Loading