Skip to content

Commit

Permalink
Merge pull request #18 from bpm-crafters/feature/separate_correlation…
Browse files Browse the repository at this point in the history
…_api

Separate SignalAPI from CorrelationAPI
  • Loading branch information
zambrovski authored Mar 5, 2024
2 parents c57839e + 07e0c9d commit 5138fcc
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import dev.bpmcrafters.processengineapi.RestrictionAware
import java.util.concurrent.Future

/**
* API to correlate messages or signals with running process instances.
* API to correlate messages with running process instances.
* @since 0.0.1
*/
interface CorrelationApi : MetaInfoAware, RestrictionAware {
Expand All @@ -16,11 +16,4 @@ interface CorrelationApi : MetaInfoAware, RestrictionAware {
* @return future to indicate completion.
*/
fun correlateMessage(cmd: CorrelateMessageCmd): Future<Empty>

/**
* Delivers a signal event to process engine.
* @param cmd command to deliver.
* @return future to indicate completion.
*/
fun sendSignal(cmd: SendSignalCmd): Future<Empty>
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import dev.bpmcrafters.processengineapi.PayloadSupplier
data class SendSignalCmd(
val signalName: String,
val payloadSupplier: PayloadSupplier,
val correlation: () -> Correlation
val correlation: CorrelationSupplier
) : PayloadSupplier by payloadSupplier
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dev.bpmcrafters.processengineapi.correlation

import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.RestrictionAware
import java.util.concurrent.Future

/**
* API to send signals to running process instances.
* @since 0.0.1
*/
interface SignalApi : MetaInfoAware, RestrictionAware {

/**
* Delivers a signal event to process engine.
* @param cmd command to deliver.
* @return future to indicate completion.
*/
fun sendSignal(cmd: SendSignalCmd): Future<Empty>
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,13 @@ class CorrelationApiImpl(
}
}

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
runtimeService
.createSignalEvent(cmd.signalName)
.buildCorrelation(cmd.correlation)
.setVariables(cmd.payloadSupplier.get())
.send()
Empty
}
}

override fun getSupportedRestrictions(): Set<String> = setOf(
CommonRestrictions.PROCESS_INSTANCE_ID,
CommonRestrictions.TENANT_ID,
CommonRestrictions.BUSINESS_KEY,
"processDefinitionId"
)

private fun SignalEventReceivedBuilder.buildCorrelation(correlation: () -> Correlation) = this.apply {
val restrictions = correlation.invoke().restrictions
ensureSupported(restrictions)
restrictions
.forEach { (key, value) ->
when (key) {
CommonRestrictions.TENANT_ID -> this.tenantId(value)
CommonRestrictions.EXECUTION_ID -> this.executionId(value)
}
}
}

private fun MessageCorrelationBuilder.buildCorrelation(correlation: CorrelationSupplier): MessageCorrelationBuilder = this.apply {
val restrictions = correlation.get().restrictions
ensureSupported(restrictions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dev.bpmcrafters.processengineapi.adapter.c7.correlation

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.correlation.*
import org.camunda.bpm.engine.RuntimeService
import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder
import org.camunda.bpm.engine.runtime.SignalEventReceivedBuilder
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class SignalApiImpl(
private val runtimeService: RuntimeService
) : SignalApi {

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
val correlation = cmd.correlation
runtimeService
.createSignalEvent(cmd.signalName)
.buildCorrelation(correlation)
.setVariables(cmd.payloadSupplier.get())
.send()
Empty
}
}

override fun getSupportedRestrictions(): Set<String> = setOf(
CommonRestrictions.PROCESS_INSTANCE_ID,
CommonRestrictions.TENANT_ID,
)

private fun SignalEventReceivedBuilder.buildCorrelation(correlation: CorrelationSupplier) = this.apply {
val restrictions = correlation.get().restrictions
ensureSupported(restrictions)
restrictions
.forEach { (key, value) ->
when (key) {
CommonRestrictions.TENANT_ID -> this.tenantId(value)
CommonRestrictions.EXECUTION_ID -> this.executionId(value)
}
}
}

override fun meta(instance: MetaInfoAware): MetaInfo {
TODO("Not yet implemented")
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.bpmcrafters.processengineapi.adapter.c7.springboot

import dev.bpmcrafters.processengineapi.adapter.c7.correlation.CorrelationApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.correlation.SignalApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.process.StartProcessApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.task.C7TaskApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.task.completion.ExternalTaskCompletionStrategy
Expand All @@ -9,6 +10,7 @@ import dev.bpmcrafters.processengineapi.adapter.commons.task.CompletionStrategy
import dev.bpmcrafters.processengineapi.adapter.commons.task.InMemSubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.correlation.CorrelationApi
import dev.bpmcrafters.processengineapi.correlation.SignalApi
import dev.bpmcrafters.processengineapi.process.StartProcessApi
import dev.bpmcrafters.processengineapi.task.TaskApi
import org.camunda.bpm.engine.ExternalTaskService
Expand Down Expand Up @@ -40,6 +42,11 @@ class AdapterAutoConfiguration {
runtimeService = runtimeService
)

@Bean
fun signalApi(runtimeService: RuntimeService): SignalApi = SignalApiImpl(
runtimeService = runtimeService
)

@Bean
fun subscriptionRepository(): SubscriptionRepository = InMemSubscriptionRepository()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ class CorrelationApiImpl(
}
}

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
zeebeClient
.newBroadcastSignalCommand()
.signalName(cmd.signalName)
.variables(cmd.payloadSupplier.get())
.send()
.get() // FIXME Chain
Empty
}
}

override fun meta(instance: MetaInfoAware): MetaInfo {
TODO("Not yet implemented")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dev.bpmcrafters.processengineapi.adapter.c8.correlation

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.correlation.CorrelateMessageCmd
import dev.bpmcrafters.processengineapi.correlation.CorrelationApi
import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
import dev.bpmcrafters.processengineapi.correlation.SignalApi
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.command.BroadcastSignalCommandStep1
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep2
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep3
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class SignalApiImpl(
private val zeebeClient: ZeebeClient
) : SignalApi {

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
val restrictions = cmd.correlation.get().restrictions
zeebeClient
.newBroadcastSignalCommand()
.signalName(cmd.signalName)
.buildCorrelation(restrictions)
.variables(cmd.payloadSupplier.get())
.send()
.get() // FIXME Chain
Empty
}
}

override fun getSupportedRestrictions(): Set<String> = setOf(
CommonRestrictions.TENANT_ID,
)

fun BroadcastSignalCommandStep1.BroadcastSignalCommandStep2.buildCorrelation(restrictions: Map<String, String>) = this.apply {
if (restrictions.containsKey(CommonRestrictions.TENANT_ID)) {
this.tenantId(restrictions[CommonRestrictions.TENANT_ID])
}
}

override fun meta(instance: MetaInfoAware): MetaInfo {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.bpmcrafters.processengineapi.adapter.c8.springboot

import dev.bpmcrafters.processengineapi.adapter.c8.correlation.CorrelationApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.correlation.SignalApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.process.StartProcessApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.task.C8TaskApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.ServiceTaskCompletionStrategy
Expand All @@ -9,6 +10,7 @@ import dev.bpmcrafters.processengineapi.adapter.commons.task.CompletionStrategy
import dev.bpmcrafters.processengineapi.adapter.commons.task.InMemSubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.correlation.CorrelationApi
import dev.bpmcrafters.processengineapi.correlation.SignalApi
import dev.bpmcrafters.processengineapi.process.StartProcessApi
import dev.bpmcrafters.processengineapi.task.TaskApi
import io.camunda.tasklist.CamundaTaskListClient
Expand All @@ -31,8 +33,9 @@ class AdapterAutoConfiguration {


@Bean
fun camundaTaskListClientSaaS(zeebeClientCloudConfigurationProperties: ZeebeClientConfigurationProperties,
c8AdapterProperties: C8AdapterProperties
fun camundaTaskListClientSaaS(
zeebeClientCloudConfigurationProperties: ZeebeClientConfigurationProperties,
c8AdapterProperties: C8AdapterProperties
): CamundaTaskListClient {
/*
val jwtConfig = JwtConfig()
Expand Down Expand Up @@ -74,6 +77,11 @@ class AdapterAutoConfiguration {
zeebeClient = zeebeClient
)

@Bean
fun signalApi(zeebeClient: ZeebeClient): SignalApi = SignalApiImpl(
zeebeClient = zeebeClient
)

@Bean
fun subscriptionRepository(): SubscriptionRepository = InMemSubscriptionRepository()

Expand Down
52 changes: 52 additions & 0 deletions examples/java-c7/simple-process-demo-failed-user.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
### Start process
< {%
client.global.clearAll()
%}
POST http://localhost:8080/simple-service-tasks/start-process?value=string&intValue=1

> {%
client.test("Request executed successfully", function () {
client.assert(response.status === 201, "Response status is not 201");
});

client.global.set("instanceId", response.headers.valueOf("Location"));
%}

### Get user tasks

GET http://localhost:8080/simple-service-tasks/tasks
Accept: application/json

> {%
client.test("Request executed successfully", function () {
client.assert(response.status === 200, "Response status is not 201");
});
client.test("Content-Type is application/json", () => {
const contentType = response.headers.valueOf("content-type");
client.assert(contentType == "application/json",
`Expected Content-Type is application/json, but actual is ${contentType}`)
})

const tasks = response.body;
const taskId = jsonPath(tasks, "$[0].taskId");
console.log("Created user task: ", taskId);
client.global.set("taskId", taskId);
%}

### Complete user task
POST http://localhost:8080/simple-service-tasks/tasks/{{ taskId }}/error?value=value-of-user-task-error

> {%
client.test("Request executed successfully", function () {
client.assert(response.status === 204, "Response status is not 204");
});
%}

### Correlate signal
POST http://localhost:8080/simple-service-tasks/signal?value=value-delivered-by-signal

> {%
client.test("Request executed successfully", function () {
client.assert(response.status === 204, "Response status is not 204");
});
%}
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,4 @@ public Future<Void> correlateMessage(String correlationValue, String variableVal
});
return completableFuture;
}

@Override
public Future<Void> deliverSignal(String variableValue) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
try {
correlationApi.sendSignal(
new SendSignalCmd(
"signal1",
() -> Map.of(
"signal-delivered-value", variableValue
),
Correlation.Companion::getEMPTY
)
).get();
completableFuture.complete(null); // FIXME -> chain instead of sync get
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
});
return completableFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,4 @@ public Future<Void> correlateMessage(String correlationValue, String variableVal
});
return completableFuture;
}

@Override
public Future<Void> deliverSignal(String variableValue) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
try {
correlationApi.sendSignal(
new SendSignalCmd(
"signal1",
() -> Map.of(
"signal-delivered-value", variableValue
),
Correlation.Companion::getEMPTY
)
).get();
completableFuture.complete(null); // FIXME -> chain instead of sync get
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
});
return completableFuture;
}
}
Loading

0 comments on commit 5138fcc

Please sign in to comment.