diff --git a/api/src/main/kotlin/dev/bpmcrafters/processengineapi/CommonRestrictions.kt b/api/src/main/kotlin/dev/bpmcrafters/processengineapi/CommonRestrictions.kt index 3ea7210f..482d87db 100644 --- a/api/src/main/kotlin/dev/bpmcrafters/processengineapi/CommonRestrictions.kt +++ b/api/src/main/kotlin/dev/bpmcrafters/processengineapi/CommonRestrictions.kt @@ -7,13 +7,20 @@ package dev.bpmcrafters.processengineapi object CommonRestrictions { /** - * FIXME: consider to remove restriction names to adapters. + * Definition attribute of a BPMN activity from XML holding the id of the element. */ - const val ACTIVITY_ID = "activityId" const val BUSINESS_KEY = "businessKey" const val CORRELATION_KEY = "correlationKey" + + /** + * Definition attribute of a BPMN process from XML holding the id of the element. + */ const val PROCESS_DEFINITION_KEY = "processDefinitionKey" + + /** + * Id provided by the runtime identifying the process instance. + */ const val PROCESS_INSTANCE_ID = "processInstanceId" const val PROCESS_DEFINITION_ID = "processDefinitionId" const val PROCESS_DEFINITION_VERSION_TAG = "processDefinitionVersionTag" @@ -23,7 +30,7 @@ object CommonRestrictions { const val MESSAGE_ID = "messageId" const val MESSAGE_TTL = "messageTTL" const val EXECUTION_ID = "executionId" - + /** * Creates a helper restrictions builder. */ diff --git a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/event/EmbeddedEventBasedUserTaskDelivery.kt b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/event/EmbeddedEventBasedUserTaskDelivery.kt index 760c3f3a..383f75dc 100644 --- a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/event/EmbeddedEventBasedUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/event/EmbeddedEventBasedUserTaskDelivery.kt @@ -1,5 +1,6 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.event +import dev.bpmcrafters.processengineapi.CommonRestrictions import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.UserTaskDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullUserTaskDelivery.Companion.logger @@ -48,5 +49,13 @@ class EmbeddedEventBasedUserTaskDelivery( this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionKey || this.taskDescriptionKey == task.id - ) + ) && this.restrictions.all { + when (it.key) { + CommonRestrictions.EXECUTION_ID -> it.value == task.executionId + CommonRestrictions.TENANT_ID -> it.value == task.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionId + else -> false + } + } } diff --git a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/job/EmbeddedTaskDeliveryJobHandler.kt b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/job/EmbeddedTaskDeliveryJobHandler.kt index c1dfbd4d..031bebeb 100644 --- a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/job/EmbeddedTaskDeliveryJobHandler.kt +++ b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/job/EmbeddedTaskDeliveryJobHandler.kt @@ -1,5 +1,6 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job +import dev.bpmcrafters.processengineapi.CommonRestrictions import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.OPERATION_CREATE import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.OPERATION_DELETE import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.TYPE_SERVICE @@ -90,9 +91,27 @@ class EmbeddedTaskDeliveryJobHandler( private fun TaskSubscriptionHandle.matches(taskEntity: TaskEntity): Boolean = this.taskType == TaskType.USER && (this.taskDescriptionKey == null || this.taskDescriptionKey == taskEntity.taskDefinitionKey || this.taskDescriptionKey == taskEntity.id) + && this.restrictions.all { + when (it.key) { + CommonRestrictions.EXECUTION_ID -> it.value == taskEntity.executionId + CommonRestrictions.TENANT_ID -> it.value == taskEntity.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == taskEntity.processInstanceId + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == taskEntity.processDefinitionId + else -> false + } + } private fun TaskSubscriptionHandle.matches(taskEntity: LockedExternalTask): Boolean = this.taskType == TaskType.EXTERNAL && (this.taskDescriptionKey == null || this.taskDescriptionKey == taskEntity.topicName) + && this.restrictions.all { + when (it.key) { + CommonRestrictions.EXECUTION_ID -> it.value == taskEntity.executionId + CommonRestrictions.TENANT_ID -> it.value == taskEntity.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == taskEntity.processInstanceId + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == taskEntity.processDefinitionId + else -> false + } + } /** * Job configuration. diff --git a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullServiceTaskDelivery.kt b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullServiceTaskDelivery.kt index da6fb750..0a137466 100644 --- a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullServiceTaskDelivery.kt @@ -73,11 +73,13 @@ class EmbeddedPullServiceTaskDelivery( private fun ExternalTaskQueryBuilder.forSubscriptions(subscriptions: List): ExternalTaskQueryBuilder { subscriptions - .mapNotNull { it.taskDescriptionKey } - .distinct() - .forEach { topic -> - this.topic(topic, lockDuration) + .filter { it.taskDescriptionKey != null } + .distinctBy { it.taskDescriptionKey } + .forEach { subscription -> + this + .topic(subscription.taskDescriptionKey, lockDuration) .enableCustomObjectDeserialization() + // FIXME -> consider complex tent filtering } return this } @@ -88,7 +90,7 @@ class EmbeddedPullServiceTaskDelivery( && this.restrictions.all { when (it.key) { CommonRestrictions.EXECUTION_ID -> it.value == task.executionId - CommonRestrictions.ACTIVITY_ID -> it.value == task.activityId + CommonRestrictions.ACTIVITY_ID -> it.value == task.activityInstanceId // FIXME task.activityId? CommonRestrictions.BUSINESS_KEY -> it.value == task.businessKey CommonRestrictions.TENANT_ID -> it.value == task.tenantId CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId diff --git a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullUserTaskDelivery.kt b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullUserTaskDelivery.kt index b6a233b9..4f11793d 100644 --- a/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/task/delivery/pull/EmbeddedPullUserTaskDelivery.kt @@ -1,11 +1,12 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull +import dev.bpmcrafters.processengineapi.CommonRestrictions import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.UserTaskDelivery -import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.toTaskInformation import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle +import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.task.TaskType import mu.KLogging import org.camunda.bpm.engine.RepositoryService @@ -65,11 +66,11 @@ class EmbeddedPullUserTaskDelivery( } } - @Suppress("UNUSED_PARAMETER") - private fun TaskQuery.forSubscriptions(subscriptions: List): TaskQuery { - // TODO: narrow down, for the moment take all tasks + private fun TaskQuery.forSubscriptions(@Suppress("UNUSED_PARAMETER") subscriptions: List): TaskQuery { + // TODO: narrow down, for the moment take all tasks matching tenants return this .active() + // FIXME -> consider complex tent filtering } @@ -78,6 +79,14 @@ class EmbeddedPullUserTaskDelivery( this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionKey || this.taskDescriptionKey == task.id - ) + ) && this.restrictions.all { + when (it.key) { + CommonRestrictions.EXECUTION_ID -> it.value == task.executionId + CommonRestrictions.TENANT_ID -> it.value == task.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionId + else -> false + } + } } diff --git a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullServiceTaskDelivery.kt b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullServiceTaskDelivery.kt index f961b1d7..edc461a2 100644 --- a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullServiceTaskDelivery.kt @@ -77,6 +77,7 @@ class RemotePullServiceTaskDelivery( .forEach { topic -> this.topic(topic, lockDuration) .enableCustomObjectDeserialization() + // FIXME -> consider complex tent filtering } return this } diff --git a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullUserTaskDelivery.kt b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullUserTaskDelivery.kt index 11027417..8cc86d3f 100644 --- a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/pull/RemotePullUserTaskDelivery.kt @@ -67,6 +67,7 @@ class RemotePullUserTaskDelivery( // FIXME: narrow down, for the moment take all tasks return this .active() + // FIXME -> consider complex tent filtering } diff --git a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/subscribe/SubscribingClientServiceTaskDelivery.kt b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/subscribe/SubscribingClientServiceTaskDelivery.kt index 4b2d39e4..bfe8f01a 100644 --- a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/subscribe/SubscribingClientServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/task/delivery/subscribe/SubscribingClientServiceTaskDelivery.kt @@ -1,5 +1,6 @@ package dev.bpmcrafters.processengineapi.adapter.c7.remote.task.delivery.subscribe +import dev.bpmcrafters.processengineapi.CommonRestrictions import dev.bpmcrafters.processengineapi.adapter.c7.remote.task.delivery.toTaskInformation import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle @@ -84,7 +85,20 @@ class SubscribingClientServiceTaskDelivery( private fun TaskSubscriptionHandle.matches(externalTask: ExternalTask): Boolean { return this.taskType == TaskType.EXTERNAL && ( this.taskDescriptionKey == null || this.taskDescriptionKey == externalTask.topicName - ) + ) && this.restrictions.all { + when (it.key) { + CommonRestrictions.EXECUTION_ID -> it.value == externalTask.executionId + CommonRestrictions.ACTIVITY_ID -> it.value == externalTask.activityId + CommonRestrictions.BUSINESS_KEY -> it.value == externalTask.businessKey + CommonRestrictions.TENANT_ID -> it.value == externalTask.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == externalTask.processInstanceId + CommonRestrictions.PROCESS_DEFINITION_KEY -> it.value == externalTask.processDefinitionKey + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == externalTask.processDefinitionId + CommonRestrictions.PROCESS_DEFINITION_VERSION_TAG -> it.value == externalTask.processDefinitionVersionTag + else -> false + } + } + // FIXME: analyze this! check restrictions, etc.. } @@ -97,5 +111,6 @@ class SubscribingClientServiceTaskDelivery( } else { this } + // FIXME -> consider complex tent filtering } } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt index 8d5cfa90..45092dfa 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt @@ -1,5 +1,6 @@ package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery +import dev.bpmcrafters.processengineapi.CommonRestrictions import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle @@ -58,14 +59,23 @@ class PullUserTaskDelivery( } private fun TaskSearch.forSubscriptions(subscriptions: List): TaskSearch { + // FIXME -> support tenant on subscription subscriptions .filter { it.taskType == TaskType.USER } // only user task subscriptions .map { it.taskDescriptionKey to it.restrictions } - + // FIXME -> consider complex tent filtering return this } private fun TaskSubscriptionHandle.matches(task: Task): Boolean = this.taskType == TaskType.USER && (this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionId) + && this.restrictions.all { + when (it.key) { + CommonRestrictions.TENANT_ID -> it.value == task.tenantId + CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceKey + CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionKey + else -> false + } + } } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt index df733644..2ef84744 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt @@ -153,32 +153,41 @@ class SubscribingRefreshingUserTaskDelivery( } private fun ActivateJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): ActivateJobsCommandStep3 { - // FIXME -> tenantId // FIXME -> more to setup from props - return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) { - this.fetchVariables(subscription.payloadDescription!!.toList()) - } else { - this + return this.apply { + val payloadDescription = subscription.payloadDescription + if (!payloadDescription.isNullOrEmpty()) { + this.fetchVariables(payloadDescription.toList()) + } + if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) { + this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID]) + } } } private fun JobWorkerBuilderStep3.forSubscription(subscription: TaskSubscriptionHandle): JobWorkerBuilderStep3 { - // FIXME -> tenantId // FIXME -> more to setup from props - return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) { - this.fetchVariables(subscription.payloadDescription!!.toList()) - } else { - this + return this.apply { + val payloadDescription = subscription.payloadDescription + if (!payloadDescription.isNullOrEmpty()) { + this.fetchVariables(payloadDescription.toList()) + } + if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) { + this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID]) + } } } private fun StreamJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): StreamJobsCommandStep3 { - // FIXME -> tenantId // FIXME -> more to setup from props - return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) { - this.fetchVariables(subscription.payloadDescription!!.toList()) - } else { - this + return this.apply { + val payloadDescription = subscription.payloadDescription + if (!payloadDescription.isNullOrEmpty()) { + this.fetchVariables(payloadDescription.toList()) + } + if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) { + this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID]) + } } } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt index 18f73ce6..48eb0f5b 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt @@ -89,13 +89,15 @@ class SubscribingServiceTaskDelivery( } private fun JobWorkerBuilderStep3.forSubscription(subscription: TaskSubscriptionHandle): JobWorkerBuilderStep3 { - // FIXME -> tenantId // FIXME -> more to setup from props - return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) { - this - .fetchVariables(subscription.payloadDescription!!.toList()) - } else { - this + return this.apply { + val payloadDescription = subscription.payloadDescription + if (!payloadDescription.isNullOrEmpty()) { + this.fetchVariables(payloadDescription.toList()) + } + if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) { + this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID]) + } } } }