Skip to content

Commit

Permalink
add new implementation, fix #9
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Feb 27, 2024
1 parent d664018 commit 880fa4f
Show file tree
Hide file tree
Showing 38 changed files with 946 additions and 140 deletions.
5 changes: 0 additions & 5 deletions engine-adapter/camunda-platform-7-core/README.md

This file was deleted.

4 changes: 4 additions & 0 deletions engine-adapter/camunda-platform-7-embedded-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Camunda Platform 7 Test Adapter

Current purpose of this adapter is to create a simple (not production-ready) implementation of the process engine API.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>process-engine-api-adapter-camunda-platform-c7-core</artifactId>
<artifactId>process-engine-api-adapter-camunda-platform-c7-embedded-core</artifactId>

<dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ class InMemSubscriptionRepository : SubscriptionRepository {
subscriptions.remove(subscription)
}

override fun getActiveSubscriptionForTask(taskId: String): TaskSubscriptionHandle {
return activeSubscribedHandler[taskId] ?: throw IllegalArgumentException("No active subscription for for task $taskId found.")
override fun getActiveSubscriptionForTask(taskId: String): TaskSubscriptionHandle? {
return activeSubscribedHandler[taskId]
}

override fun activateSubscriptionForTask(taskId: String, subscription: TaskSubscriptionHandle) {
this.activeSubscribedHandler[taskId] = subscription
}

override fun removeSubscriptionForTask(taskId: String): TaskSubscriptionHandle {
return this.activeSubscribedHandler.remove(taskId) ?: throw IllegalArgumentException("No active subscription for for task $taskId found.")
override fun removeSubscriptionForTask(taskId: String): TaskSubscriptionHandle? {
return this.activeSubscribedHandler.remove(taskId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ interface SubscriptionRepository {
fun deleteTaskSubscription(subscription: TaskSubscriptionHandle)

fun activateSubscriptionForTask(taskId: String, subscription: TaskSubscriptionHandle)
fun getActiveSubscriptionForTask(taskId: String): TaskSubscriptionHandle
fun removeSubscriptionForTask(taskId: String): TaskSubscriptionHandle
fun getActiveSubscriptionForTask(taskId: String): TaskSubscriptionHandle?
fun removeSubscriptionForTask(taskId: String): TaskSubscriptionHandle?
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class TaskApiImpl(
// get active subscription
val activeSubscription = subscriptionRepository.getActiveSubscriptionForTask(cmd.taskId)
// find the correct strategy
requireNotNull(activeSubscription) { "Could not complete task ${cmd.taskId}, task not found." }
val strategy = completionStrategies.find { it.supports(restrictions = activeSubscription.restrictions, taskDescriptionKey = activeSubscription.taskDescriptionKey) }
return strategy?.completeTask(cmd) ?: throw IllegalArgumentException("No completion strategy found for task ${cmd.taskId}")
}

override fun completeTaskByError(cmd: CompleteTaskByErrorCmd): Future<Empty> {
// get active subscription
val activeSubscription = subscriptionRepository.getActiveSubscriptionForTask(cmd.taskId)
requireNotNull(activeSubscription) { "Could not complete task ${cmd.taskId} with error, task not found." }
// find the correct strategy
val strategy = completionStrategies.find { it.supports(restrictions = activeSubscription.restrictions, taskDescriptionKey = activeSubscription.taskDescriptionKey) }
return strategy?.completeTaskByError(cmd) ?: throw IllegalArgumentException("No completion strategy found for task ${cmd.taskId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import dev.bpmcrafters.processengineapi.task.CompleteTaskByErrorCmd
import dev.bpmcrafters.processengineapi.task.CompleteTaskCmd
import mu.KLogging
import org.camunda.bpm.engine.ExternalTaskService
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

/**
* Strategy for completing external tasks using Camunda externalTaskService Java API.
*/
class ExternalTaskCompletionStrategy(
private val workerId: String = UUID.randomUUID().toString(),
private val workerId: String,
private val externalTaskService: ExternalTaskService,
private val subscriptionRepository: SubscriptionRepository
) : CompletionStrategy {
Expand All @@ -43,8 +42,10 @@ class ExternalTaskCompletionStrategy(
workerId,
cmd.get()
)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId).modification.terminated(cmd.taskId)
logger.info { "Successfully completed external task ${cmd.taskId}." }
subscriptionRepository.removeSubscriptionForTask(cmd.taskId)?.apply {
modification.terminated(cmd.taskId)
logger.info { "Successfully completed external task ${cmd.taskId}." }
}
return CompletableFuture.completedFuture(Empty)
}

Expand All @@ -54,8 +55,10 @@ class ExternalTaskCompletionStrategy(
workerId,
cmd.error
)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId).modification.terminated(cmd.taskId)
logger.info { "Completed external task ${cmd.taskId} with error." }
subscriptionRepository.removeSubscriptionForTask(cmd.taskId)?.apply {
modification.terminated(cmd.taskId)
logger.info { "Completed external task ${cmd.taskId} with error." }
}
return CompletableFuture.completedFuture(Empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class UserTaskCompletionStrategy(
cmd.taskId,
cmd.get()
)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId).modification.terminated(cmd.taskId)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId)?.apply {
modification.terminated(cmd.taskId)
}
return CompletableFuture.completedFuture(Empty)
}

Expand All @@ -48,7 +50,9 @@ class UserTaskCompletionStrategy(
cmd.taskId,
cmd.error
)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId).modification.terminated(cmd.taskId)
subscriptionRepository.removeSubscriptionForTask(cmd.taskId)?.apply {
modification.terminated(cmd.taskId)
}
return CompletableFuture.completedFuture(Empty)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package dev.bpmcrafters.processengineapi.adapter.c7.task.delivery

import dev.bpmcrafters.processengineapi.adapter.c7.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.c7.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.adapter.c7.task.completion.UserTaskCompletionStrategy
import org.camunda.bpm.engine.delegate.DelegateTask

class EmbeddedEventBasedUserTaskDelivery(
private val subscriptionRepository: SubscriptionRepository
) : UserTaskDelivery {

fun userTaskCreated(delegateTask: DelegateTask) {
val subscriptions = subscriptionRepository.getTaskSubscriptions()

subscriptions
.firstOrNull { subscription -> subscription.matches(delegateTask) }
?.let { activeSubscription ->

subscriptionRepository.activateSubscriptionForTask(delegateTask.id, activeSubscription)

val variables = if (activeSubscription.payloadDescription.isEmpty()) {
delegateTask.variables
} else {
delegateTask.variables.filterKeys { key -> activeSubscription.payloadDescription.contains(key) }
}

activeSubscription.action.accept(delegateTask.toTaskInformation(), variables)
}
}

fun userTaskModified(delegateTask: DelegateTask) {
subscriptionRepository.getActiveSubscriptionForTask(delegateTask.id)?.apply {

val variables = if (this.payloadDescription.isEmpty()) {
delegateTask.variables
} else {
delegateTask.variables.filterKeys { key -> this.payloadDescription.contains(key) }
}

modification.modified(delegateTask.toTaskInformation(), variables)
}
}

fun userTaskDeleted(delegateTask: DelegateTask) {
subscriptionRepository.getActiveSubscriptionForTask(delegateTask.id)?.modification?.terminated(delegateTask.id)
}


private fun TaskSubscriptionHandle.matches(task: DelegateTask): Boolean =
UserTaskCompletionStrategy.supports(this.restrictions)
&& (this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionKey || this.taskDescriptionKey == task.id)

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import dev.bpmcrafters.processengineapi.adapter.c7.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.adapter.c7.task.completion.ExternalTaskCompletionStrategy
import dev.bpmcrafters.processengineapi.task.TaskInformation
import org.camunda.bpm.engine.ExternalTaskService
import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder
import org.camunda.bpm.engine.externaltask.LockedExternalTask

/**
* Delivers external tasks to subscriptions.
* This implementation uses internal Java API and pulls tasks for delivery.
*/
class ExternalTaskDelivery(
class EmbeddedPullExternalTaskDelivery(
private val externalTaskService: ExternalTaskService,
private val workerId: String,
private val subscriptionRepository: SubscriptionRepository
) {
private val subscriptionRepository: SubscriptionRepository,
private val maxTasks: Int,
private val lockDuration: Long
) : ExternalServiceTaskDelivery {

/**
* Delivers all tasks found in the external service to corresponding subscriptions.
Expand All @@ -24,15 +28,8 @@ class ExternalTaskDelivery(

val subscriptions = subscriptionRepository.getTaskSubscriptions()
externalTaskService
.fetchAndLock(10, workerId)
.apply {
subscriptions
.mapNotNull { it.taskDescriptionKey }
.distinct()
.forEach { topic ->
this.topic(topic, 10) // FIXME -> magic number
}
}
.fetchAndLock(maxTasks, workerId)
.forSubscriptions(subscriptions)
.execute()
.forEach { lockedTask ->
subscriptions
Expand All @@ -52,25 +49,18 @@ class ExternalTaskDelivery(
}
}

fun deliverOne() {
// TODO: @jangalinski could you provide your implementation on delivery of a single task, please?
private fun ExternalTaskQueryBuilder.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): ExternalTaskQueryBuilder {
subscriptions
.mapNotNull { it.taskDescriptionKey }
.distinct()
.forEach { topic ->
this.topic(topic, lockDuration)
}
return this
}

private fun TaskSubscriptionHandle.matches(task: LockedExternalTask): Boolean {
return ExternalTaskCompletionStrategy.supports(this.restrictions) &&
(this.taskDescriptionKey == null || this.taskDescriptionKey == task.topicName)
}

private fun LockedExternalTask.toTaskInformation(): TaskInformation {
return TaskInformation(
this.id,
mapOf(
CommonRestrictions.PROCESS_DEFINITION_KEY to this.processDefinitionKey,
CommonRestrictions.PROCESS_INSTANCE_ID to this.processInstanceId,
CommonRestrictions.TENANT_ID to this.tenantId,
CommonRestrictions.TASK_TYPE to "service",
CommonRestrictions.ACTIVITY_ID to this.activityId
)
)
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package dev.bpmcrafters.processengineapi.adapter.c7.task.delivery

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.c7.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.adapter.c7.task.completion.UserTaskCompletionStrategy
import dev.bpmcrafters.processengineapi.task.TaskInformation
import org.camunda.bpm.engine.TaskService
import org.camunda.bpm.engine.task.Task

import org.camunda.bpm.engine.task.TaskQuery
/**
* Delivers user tasks to subscriptions.
* Uses internal Java API for pulling tasks.
*/
class UserTaskDelivery(
class EmbeddedPullUserTaskDelivery(
private val taskService: TaskService,
private val subscriptionRepository: SubscriptionRepository
) {
) : UserTaskDelivery {

/**
* Delivers all tasks found in user task service to corresponding subscriptions.
*/
fun deliverAll() {
val subscriptions = subscriptionRepository.getTaskSubscriptions()
taskService.createTaskQuery().active().list() // FIXME: narrow down, for the moment take all tasks
taskService
.createTaskQuery()
.forSubscriptions(subscriptions)
.list()
.forEach { task ->
subscriptions
.firstOrNull { subscription -> subscription.matches(task) }
Expand All @@ -35,27 +37,22 @@ class UserTaskDelivery(
taskService.getVariables(task.id, activeSubscription.payloadDescription)
}

activeSubscription.action.accept(task.toInformation(), variables)
activeSubscription.action.accept(task.toTaskInformation(), variables)
}
}
}

private fun TaskQuery.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): TaskQuery {
// FIXME: narrow down, for the moment take all tasks
return this
.active()
}


private fun TaskSubscriptionHandle.matches(task: Task): Boolean =
UserTaskCompletionStrategy.supports(this.restrictions) && (
this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionKey || this.taskDescriptionKey == task.id
)

private fun Task.toInformation() =
TaskInformation(
taskId = this.id,
meta = mapOf(
CommonRestrictions.TASK_TYPE to "user",
CommonRestrictions.TASK_DEFINITION_KEY to this.taskDefinitionKey,
CommonRestrictions.TENANT_ID to this.tenantId,
CommonRestrictions.PROCESS_INSTANCE_ID to this.processInstanceId,
"taskName" to this.name,
"taskDescription" to this.description,
"assignee" to this.assignee
)
)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.task.delivery

/**
* Common interface for external service task delivery.
*/
interface ExternalServiceTaskDelivery
Loading

0 comments on commit 880fa4f

Please sign in to comment.