-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Initial version * added dependabot and build job * make async duration configurable
- Loading branch information
1 parent
3e7124d
commit 4f66bbf
Showing
13 changed files
with
543 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# Async Service Task Example | ||
|
||
## What is the use case? | ||
|
||
The implementation of a service task might be asynchronous. Usually, you would handle this using the Messaging Pattern (Send - Receive). | ||
|
||
But there might be cases when the requirement to the process is to hide this kind of implementation detail and instead represent the service call as one task. | ||
|
||
In this case, the implementation needs to be able to cover this kind of call by sending a request in an idempotent way and then waiting for the answer. | ||
|
||
## How does it work? | ||
|
||
This implementation will check for a transactionId, create an transactionId on service task scope if missing and uses it to send a request to a service and then check for the answer. | ||
|
||
Then, it checks for the answer being present. | ||
|
||
If not, it defers the polling by failing the job while leaving the amount of retries untouched. | ||
|
||
## What are the constraints of the implementation? | ||
|
||
The constraints of the implementation lie in the `AsyncService`. Here, the assumption is that creation of a transaction requires an ID. | ||
|
||
As soon as complete, the result can be fetched over and over until it is completed (which will remove the transaction from the `AsyncService`). | ||
|
||
## How can I try it out? | ||
|
||
Configure the zeebe connection in the `application.yaml`. By default, it points to a plaintext local zeebe gateway. | ||
|
||
Then, start the app by running | ||
|
||
```shell | ||
mvn spring-boot:run | ||
``` | ||
|
||
On starting up, the example process is deployed. | ||
|
||
After the app is up and running, a process instance can be started with a `POST` to `http://localhost:8080/start`. | ||
|
||
The process instance executes one service task that has an asynchronous implementation. In the logs, you should be able to see that the `AsyncService` is triggered from time to time, only creating a transaction on the first invocation. | ||
|
||
Then, the result is fetched. After the result has been fetched, the next poll will return it and the job is completed. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.camunda.consulting</groupId> | ||
<artifactId>async-service-task</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
|
||
<properties> | ||
<java.version>17</java.version> | ||
<spring-boot.version>3.1.3</spring-boot.version> | ||
<camunda-process-test-coverage.version>2.1.0</camunda-process-test-coverage.version> | ||
<spring-zeebe.version>8.2.4</spring-zeebe.version> | ||
<maven.compiler.source>${java.version}</maven.compiler.source> | ||
<maven.compiler.target>${java.version}</maven.compiler.target> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
</properties> | ||
|
||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-dependencies</artifactId> | ||
<version>${spring-boot.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>io.camunda.spring</groupId> | ||
<artifactId>spring-boot-starter-camunda</artifactId> | ||
<version>${spring-zeebe.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-web</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.camunda.spring</groupId> | ||
<artifactId>spring-boot-starter-camunda-test</artifactId> | ||
<version>${spring-zeebe.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-configuration-processor</artifactId> | ||
<optional>true</optional> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<pluginManagement> | ||
<plugins> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
<version>2.39.0</version> | ||
<configuration> | ||
<formats> | ||
<format> | ||
<includes> | ||
<include>*.md</include> | ||
<include>.gitignore</include> | ||
</includes> | ||
<trimTrailingWhitespace/> | ||
<endWithNewline/> | ||
<indent> | ||
<spaces>true</spaces> | ||
<spacesPerTab>2</spacesPerTab> | ||
</indent> | ||
</format> | ||
</formats> | ||
<java> | ||
<googleJavaFormat/> | ||
</java> | ||
<pom/> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</pluginManagement> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-maven-plugin</artifactId> | ||
<version>${spring-boot.version}</version> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>repackage</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-surefire-plugin</artifactId> | ||
<version>3.1.2</version> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
<profiles> | ||
<!-- profile to auto format --> | ||
<profile> | ||
<id>autoFormat</id> | ||
<activation> | ||
<activeByDefault>true</activeByDefault> | ||
</activation> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<id>spotless-format</id> | ||
<goals> | ||
<goal>apply</goal> | ||
</goals> | ||
<phase>process-sources</phase> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</profile> | ||
|
||
<!-- profile to perform strict validation checks --> | ||
<profile> | ||
<id>checkFormat</id> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<id>spotless-check</id> | ||
<goals> | ||
<goal>check</goal> | ||
</goals> | ||
<phase>validate</phase> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</profile> | ||
</profiles> | ||
|
||
</project> |
84 changes: 84 additions & 0 deletions
84
async-service-task/src/main/java/com/camunda/consulting/AsyncJobWorker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package com.camunda.consulting; | ||
|
||
import com.camunda.consulting.AsyncService.TransactionResult; | ||
import io.camunda.zeebe.client.ZeebeClient; | ||
import io.camunda.zeebe.client.api.response.ActivatedJob; | ||
import io.camunda.zeebe.spring.client.annotation.JobWorker; | ||
import java.time.Duration; | ||
import java.util.Collections; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
|
||
@Component | ||
public class AsyncJobWorker { | ||
private static final String TRANSACTION_ID_VAR_NAME = "transactionId"; | ||
|
||
private final ZeebeClient zeebeClient; | ||
private final AsyncService asyncService; | ||
|
||
@Autowired | ||
public AsyncJobWorker(ZeebeClient zeebeClient, AsyncService asyncService) { | ||
this.zeebeClient = zeebeClient; | ||
this.asyncService = asyncService; | ||
} | ||
|
||
@JobWorker(autoComplete = false, type = "async-job") | ||
public void handle(ActivatedJob job) { | ||
String transactionId = getOrCreateTransactionId(job); | ||
asyncService.startTransaction(transactionId); | ||
TransactionResult result = asyncService.getTransactionResult(transactionId); | ||
if (result.complete()) { | ||
completeJob(job, result.result()); | ||
// this comes afterward as it could be considered optional | ||
asyncService.completeTransaction(transactionId); | ||
} else { | ||
delayJob(job, Duration.ofSeconds(30)); | ||
} | ||
} | ||
|
||
/** | ||
* Completes the job with the given result | ||
* | ||
* @param job the job to complete | ||
* @param result the result to submit | ||
*/ | ||
private void completeJob(ActivatedJob job, String result) { | ||
zeebeClient | ||
.newCompleteCommand(job) | ||
.variables(Collections.singletonMap("result", result)) | ||
.send() | ||
.join(); | ||
} | ||
|
||
/** | ||
* Delays the job with the given backoff | ||
* | ||
* @param job the job to delay | ||
* @param backoff the backoff after which the job will be available again | ||
*/ | ||
private void delayJob(ActivatedJob job, Duration backoff) { | ||
zeebeClient.newFailCommand(job).retries(job.getRetries()).retryBackoff(backoff).send().join(); | ||
} | ||
|
||
/** | ||
* Checks if a transaction id is already present and creates one if not | ||
* | ||
* @param job the job that requires the transaction id | ||
* @return the transaction id from the job or a created one that is saved to the job now | ||
*/ | ||
private String getOrCreateTransactionId(ActivatedJob job) { | ||
|
||
if (!job.getVariablesAsMap().containsKey(TRANSACTION_ID_VAR_NAME)) { | ||
// there is no transaction id present yet, better set one (and send it to zeebe) | ||
String transactionId = asyncService.createTransactionId(); | ||
zeebeClient | ||
.newSetVariablesCommand(job.getElementInstanceKey()) | ||
.variables(Collections.singletonMap(TRANSACTION_ID_VAR_NAME, transactionId)) | ||
.local(true) | ||
.send() | ||
.join(); | ||
return transactionId; | ||
} | ||
return (String) job.getVariablesAsMap().get(TRANSACTION_ID_VAR_NAME); | ||
} | ||
} |
Oops, something went wrong.