Skip to content

Commit

Permalink
improve tenant support, fix #156
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Jan 9, 2025
1 parent 385931c commit c079821
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ package dev.bpmcrafters.processengineapi
object CommonRestrictions {

/**
* FIXME: consider to remove restriction names to adapters.
* Definition attribute of a BPMN activity from XML holding the id of the element.
*/

const val ACTIVITY_ID = "activityId"
const val BUSINESS_KEY = "businessKey"
const val CORRELATION_KEY = "correlationKey"

/**
* Definition attribute of a BPMN process from XML holding the id of the element.
*/
const val PROCESS_DEFINITION_KEY = "processDefinitionKey"

/**
* Id provided by the runtime identifying the process instance.
*/
const val PROCESS_INSTANCE_ID = "processInstanceId"
const val PROCESS_DEFINITION_ID = "processDefinitionId"
const val PROCESS_DEFINITION_VERSION_TAG = "processDefinitionVersionTag"
Expand All @@ -23,7 +30,7 @@ object CommonRestrictions {
const val MESSAGE_ID = "messageId"
const val MESSAGE_TTL = "messageTTL"
const val EXECUTION_ID = "executionId"

/**
* Creates a helper restrictions builder.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.event

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.UserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullUserTaskDelivery.Companion.logger
Expand Down Expand Up @@ -48,5 +49,13 @@ class EmbeddedEventBasedUserTaskDelivery(
this.taskDescriptionKey == null
|| this.taskDescriptionKey == task.taskDefinitionKey
|| this.taskDescriptionKey == task.id
)
) && this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == task.executionId
CommonRestrictions.TENANT_ID -> it.value == task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionId
else -> false
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.OPERATION_CREATE
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.OPERATION_DELETE
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.job.EmbeddedTaskDeliveryJobHandler.EmbeddedTaskDeliveryJobHandlerConfiguration.Companion.TYPE_SERVICE
Expand Down Expand Up @@ -90,9 +91,27 @@ class EmbeddedTaskDeliveryJobHandler(

private fun TaskSubscriptionHandle.matches(taskEntity: TaskEntity): Boolean =
this.taskType == TaskType.USER && (this.taskDescriptionKey == null || this.taskDescriptionKey == taskEntity.taskDefinitionKey || this.taskDescriptionKey == taskEntity.id)
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == taskEntity.executionId
CommonRestrictions.TENANT_ID -> it.value == taskEntity.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == taskEntity.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == taskEntity.processDefinitionId
else -> false
}
}

private fun TaskSubscriptionHandle.matches(taskEntity: LockedExternalTask): Boolean =
this.taskType == TaskType.EXTERNAL && (this.taskDescriptionKey == null || this.taskDescriptionKey == taskEntity.topicName)
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == taskEntity.executionId
CommonRestrictions.TENANT_ID -> it.value == taskEntity.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == taskEntity.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == taskEntity.processDefinitionId
else -> false
}
}

/**
* Job configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ class EmbeddedPullServiceTaskDelivery(

private fun ExternalTaskQueryBuilder.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): ExternalTaskQueryBuilder {
subscriptions
.mapNotNull { it.taskDescriptionKey }
.distinct()
.forEach { topic ->
this.topic(topic, lockDuration)
.filter { it.taskDescriptionKey != null }
.distinctBy { it.taskDescriptionKey }
.forEach { subscription ->
this
.topic(subscription.taskDescriptionKey, lockDuration)
.enableCustomObjectDeserialization()
// FIXME -> consider complex tent filtering
}
return this
}
Expand All @@ -88,7 +90,7 @@ class EmbeddedPullServiceTaskDelivery(
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == task.executionId
CommonRestrictions.ACTIVITY_ID -> it.value == task.activityId
CommonRestrictions.ACTIVITY_ID -> it.value == task.activityInstanceId // FIXME task.activityId?
CommonRestrictions.BUSINESS_KEY -> it.value == task.businessKey
CommonRestrictions.TENANT_ID -> it.value == task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.UserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.toTaskInformation
import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription
import dev.bpmcrafters.processengineapi.task.TaskType
import mu.KLogging
import org.camunda.bpm.engine.RepositoryService
Expand Down Expand Up @@ -65,11 +66,11 @@ class EmbeddedPullUserTaskDelivery(
}
}

@Suppress("UNUSED_PARAMETER")
private fun TaskQuery.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): TaskQuery {
// TODO: narrow down, for the moment take all tasks
private fun TaskQuery.forSubscriptions(@Suppress("UNUSED_PARAMETER") subscriptions: List<TaskSubscriptionHandle>): TaskQuery {
// TODO: narrow down, for the moment take all tasks matching tenants
return this
.active()
// FIXME -> consider complex tent filtering
}


Expand All @@ -78,6 +79,14 @@ class EmbeddedPullUserTaskDelivery(
this.taskDescriptionKey == null
|| this.taskDescriptionKey == task.taskDefinitionKey
|| this.taskDescriptionKey == task.id
)
) && this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == task.executionId
CommonRestrictions.TENANT_ID -> it.value == task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionId
else -> false
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RemotePullServiceTaskDelivery(
.forEach { topic ->
this.topic(topic, lockDuration)
.enableCustomObjectDeserialization()
// FIXME -> consider complex tent filtering
}
return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class RemotePullUserTaskDelivery(
// FIXME: narrow down, for the moment take all tasks
return this
.active()
// FIXME -> consider complex tent filtering
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.remote.task.delivery.subscribe

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.remote.task.delivery.toTaskInformation
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle
Expand Down Expand Up @@ -84,7 +85,20 @@ class SubscribingClientServiceTaskDelivery(
private fun TaskSubscriptionHandle.matches(externalTask: ExternalTask): Boolean {
return this.taskType == TaskType.EXTERNAL && (
this.taskDescriptionKey == null || this.taskDescriptionKey == externalTask.topicName
)
) && this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == externalTask.executionId
CommonRestrictions.ACTIVITY_ID -> it.value == externalTask.activityId
CommonRestrictions.BUSINESS_KEY -> it.value == externalTask.businessKey
CommonRestrictions.TENANT_ID -> it.value == externalTask.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == externalTask.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_KEY -> it.value == externalTask.processDefinitionKey
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == externalTask.processDefinitionId
CommonRestrictions.PROCESS_DEFINITION_VERSION_TAG -> it.value == externalTask.processDefinitionVersionTag
else -> false
}
}

// FIXME: analyze this! check restrictions, etc..
}

Expand All @@ -97,5 +111,6 @@ class SubscribingClientServiceTaskDelivery(
} else {
this
}
// FIXME -> consider complex tent filtering
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle
Expand Down Expand Up @@ -58,14 +59,23 @@ class PullUserTaskDelivery(
}

private fun TaskSearch.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): TaskSearch {
// FIXME -> support tenant on subscription
subscriptions
.filter { it.taskType == TaskType.USER } // only user task subscriptions
.map { it.taskDescriptionKey to it.restrictions }

// FIXME -> consider complex tent filtering
return this
}

private fun TaskSubscriptionHandle.matches(task: Task): Boolean =
this.taskType == TaskType.USER
&& (this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionId)
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.TENANT_ID -> it.value == task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceKey
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionKey
else -> false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,32 +153,41 @@ class SubscribingRefreshingUserTaskDelivery(
}

private fun ActivateJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): ActivateJobsCommandStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
return this.apply {
val payloadDescription = subscription.payloadDescription
if (!payloadDescription.isNullOrEmpty()) {
this.fetchVariables(payloadDescription.toList())
}
if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) {
this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID])
}
}
}

private fun JobWorkerBuilderStep3.forSubscription(subscription: TaskSubscriptionHandle): JobWorkerBuilderStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
return this.apply {
val payloadDescription = subscription.payloadDescription
if (!payloadDescription.isNullOrEmpty()) {
this.fetchVariables(payloadDescription.toList())
}
if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) {
this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID])
}
}
}

private fun StreamJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): StreamJobsCommandStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
return this.apply {
val payloadDescription = subscription.payloadDescription
if (!payloadDescription.isNullOrEmpty()) {
this.fetchVariables(payloadDescription.toList())
}
if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) {
this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID])
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ class SubscribingServiceTaskDelivery(
}

private fun JobWorkerBuilderStep3.forSubscription(subscription: TaskSubscriptionHandle): JobWorkerBuilderStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this
.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
return this.apply {
val payloadDescription = subscription.payloadDescription
if (!payloadDescription.isNullOrEmpty()) {
this.fetchVariables(payloadDescription.toList())
}
if (subscription.restrictions.containsKey(CommonRestrictions.TENANT_ID)) {
this.tenantId(subscription.restrictions[CommonRestrictions.TENANT_ID])
}
}
}
}

0 comments on commit c079821

Please sign in to comment.