From 4f66bbfd35d7f304e7cf4f66efaf3fdb0aa04e87 Mon Sep 17 00:00:00 2001 From: Jonathan Lukas Date: Fri, 15 Sep 2023 10:56:48 +0200 Subject: [PATCH] Example: Async service task (#183) * Initial version * added dependabot and build job * make async duration configurable --- .github/dependabot.yml | 5 + .github/workflows/build.yaml | 12 ++ async-service-task/README.md | 41 +++++ async-service-task/pom.xml | 155 ++++++++++++++++++ .../camunda/consulting/AsyncJobWorker.java | 84 ++++++++++ .../com/camunda/consulting/AsyncService.java | 103 ++++++++++++ .../consulting/AsyncServiceProperties.java | 19 +++ .../consulting/ExampleApplication.java | 13 ++ .../camunda/consulting/ProcessController.java | 25 +++ .../src/main/resources/application.yaml | 7 + .../main/resources/async-service-task.bpmn | 47 ++++++ .../consulting/AsyncJobWorkerTest.java | 30 ++++ .../src/test/resources/application.yaml | 2 + 13 files changed, 543 insertions(+) create mode 100644 async-service-task/README.md create mode 100644 async-service-task/pom.xml create mode 100644 async-service-task/src/main/java/com/camunda/consulting/AsyncJobWorker.java create mode 100644 async-service-task/src/main/java/com/camunda/consulting/AsyncService.java create mode 100644 async-service-task/src/main/java/com/camunda/consulting/AsyncServiceProperties.java create mode 100644 async-service-task/src/main/java/com/camunda/consulting/ExampleApplication.java create mode 100644 async-service-task/src/main/java/com/camunda/consulting/ProcessController.java create mode 100644 async-service-task/src/main/resources/application.yaml create mode 100644 async-service-task/src/main/resources/async-service-task.bpmn create mode 100644 async-service-task/src/test/java/com/camunda/consulting/AsyncJobWorkerTest.java create mode 100644 async-service-task/src/test/resources/application.yaml diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 5fee9240..fccc1793 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -37,6 +37,11 @@ updates: open-pull-requests-limit: 10 - package-ecosystem: maven directory: "/timer-testing" + schedule: + interval: daily + open-pull-requests-limit: 10 +- package-ecosystem: maven + directory: "/async-service-task" schedule: interval: daily open-pull-requests-limit: 10 \ No newline at end of file diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 536f5021..69806dbf 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -99,3 +99,15 @@ jobs: - name: Build with Maven run: mvn verify -PcheckFormat -B working-directory: timer-testing + build-async-service-task: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'adopt' + - name: Build with Maven + run: mvn verify -PcheckFormat -B + working-directory: async-service-task diff --git a/async-service-task/README.md b/async-service-task/README.md new file mode 100644 index 00000000..ead41096 --- /dev/null +++ b/async-service-task/README.md @@ -0,0 +1,41 @@ +# Async Service Task Example + +## What is the use case? + +The implementation of a service task might be asynchronous. Usually, you would handle this using the Messaging Pattern (Send - Receive). + +But there might be cases when the requirement to the process is to hide this kind of implementation detail and instead represent the service call as one task. + +In this case, the implementation needs to be able to cover this kind of call by sending a request in an idempotent way and then waiting for the answer. + +## How does it work? + +This implementation will check for a transactionId, create an transactionId on service task scope if missing and uses it to send a request to a service and then check for the answer. + +Then, it checks for the answer being present. + +If not, it defers the polling by failing the job while leaving the amount of retries untouched. + +## What are the constraints of the implementation? + +The constraints of the implementation lie in the `AsyncService`. Here, the assumption is that creation of a transaction requires an ID. + +As soon as complete, the result can be fetched over and over until it is completed (which will remove the transaction from the `AsyncService`). + +## How can I try it out? + +Configure the zeebe connection in the `application.yaml`. By default, it points to a plaintext local zeebe gateway. + +Then, start the app by running + +```shell +mvn spring-boot:run +``` + +On starting up, the example process is deployed. + +After the app is up and running, a process instance can be started with a `POST` to `http://localhost:8080/start`. + +The process instance executes one service task that has an asynchronous implementation. In the logs, you should be able to see that the `AsyncService` is triggered from time to time, only creating a transaction on the first invocation. + +Then, the result is fetched. After the result has been fetched, the next poll will return it and the job is completed. diff --git a/async-service-task/pom.xml b/async-service-task/pom.xml new file mode 100644 index 00000000..31a1edc4 --- /dev/null +++ b/async-service-task/pom.xml @@ -0,0 +1,155 @@ + + + 4.0.0 + + com.camunda.consulting + async-service-task + 1.0-SNAPSHOT + + + 17 + 3.1.3 + 2.1.0 + 8.2.4 + ${java.version} + ${java.version} + UTF-8 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + io.camunda.spring + spring-boot-starter-camunda + ${spring-zeebe.version} + + + org.springframework.boot + spring-boot-starter-web + + + io.camunda.spring + spring-boot-starter-camunda-test + ${spring-zeebe.version} + test + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.39.0 + + + + + *.md + .gitignore + + + + + true + 2 + + + + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + + + + + + + autoFormat + + true + + + + + com.diffplug.spotless + spotless-maven-plugin + + + spotless-format + + apply + + process-sources + + + + + + + + + + checkFormat + + + + com.diffplug.spotless + spotless-maven-plugin + + + spotless-check + + check + + validate + + + + + + + + + \ No newline at end of file diff --git a/async-service-task/src/main/java/com/camunda/consulting/AsyncJobWorker.java b/async-service-task/src/main/java/com/camunda/consulting/AsyncJobWorker.java new file mode 100644 index 00000000..682b1bf8 --- /dev/null +++ b/async-service-task/src/main/java/com/camunda/consulting/AsyncJobWorker.java @@ -0,0 +1,84 @@ +package com.camunda.consulting; + +import com.camunda.consulting.AsyncService.TransactionResult; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.spring.client.annotation.JobWorker; +import java.time.Duration; +import java.util.Collections; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class AsyncJobWorker { + private static final String TRANSACTION_ID_VAR_NAME = "transactionId"; + + private final ZeebeClient zeebeClient; + private final AsyncService asyncService; + + @Autowired + public AsyncJobWorker(ZeebeClient zeebeClient, AsyncService asyncService) { + this.zeebeClient = zeebeClient; + this.asyncService = asyncService; + } + + @JobWorker(autoComplete = false, type = "async-job") + public void handle(ActivatedJob job) { + String transactionId = getOrCreateTransactionId(job); + asyncService.startTransaction(transactionId); + TransactionResult result = asyncService.getTransactionResult(transactionId); + if (result.complete()) { + completeJob(job, result.result()); + // this comes afterward as it could be considered optional + asyncService.completeTransaction(transactionId); + } else { + delayJob(job, Duration.ofSeconds(30)); + } + } + + /** + * Completes the job with the given result + * + * @param job the job to complete + * @param result the result to submit + */ + private void completeJob(ActivatedJob job, String result) { + zeebeClient + .newCompleteCommand(job) + .variables(Collections.singletonMap("result", result)) + .send() + .join(); + } + + /** + * Delays the job with the given backoff + * + * @param job the job to delay + * @param backoff the backoff after which the job will be available again + */ + private void delayJob(ActivatedJob job, Duration backoff) { + zeebeClient.newFailCommand(job).retries(job.getRetries()).retryBackoff(backoff).send().join(); + } + + /** + * Checks if a transaction id is already present and creates one if not + * + * @param job the job that requires the transaction id + * @return the transaction id from the job or a created one that is saved to the job now + */ + private String getOrCreateTransactionId(ActivatedJob job) { + + if (!job.getVariablesAsMap().containsKey(TRANSACTION_ID_VAR_NAME)) { + // there is no transaction id present yet, better set one (and send it to zeebe) + String transactionId = asyncService.createTransactionId(); + zeebeClient + .newSetVariablesCommand(job.getElementInstanceKey()) + .variables(Collections.singletonMap(TRANSACTION_ID_VAR_NAME, transactionId)) + .local(true) + .send() + .join(); + return transactionId; + } + return (String) job.getVariablesAsMap().get(TRANSACTION_ID_VAR_NAME); + } +} diff --git a/async-service-task/src/main/java/com/camunda/consulting/AsyncService.java b/async-service-task/src/main/java/com/camunda/consulting/AsyncService.java new file mode 100644 index 00000000..3b392729 --- /dev/null +++ b/async-service-task/src/main/java/com/camunda/consulting/AsyncService.java @@ -0,0 +1,103 @@ +package com.camunda.consulting; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +public class AsyncService { + private static final Logger LOG = LoggerFactory.getLogger(AsyncService.class); + private final Map transactions = new HashMap<>(); + private final AsyncServiceProperties properties; + + public AsyncService(AsyncServiceProperties properties) { + this.properties = properties; + } + + /** + * Creates a transactionId that is guaranteed to be available + * + * @return the transactionId + */ + public String createTransactionId() { + String transactionId = UUID.randomUUID().toString(); + while (transactions.containsKey(transactionId)) { + transactionId = UUID.randomUUID().toString(); + } + return transactionId; + } + + /** + * Starts a transaction for the given id if not already started + * + * @param transactionId the id that identifies the transaction + */ + public void startTransaction(String transactionId) { + if (transactions.containsKey(transactionId)) { + LOG.info("Transaction '{}' already started, request will be ignored", transactionId); + } else { + LOG.info("Creating transaction '{}'", transactionId); + transactions.put(transactionId, LocalDateTime.now().plus(properties.getDuration())); + } + } + + /** + * Returns the current state of a running transaction + * + * @param transactionId the id that identifies the transaction + * @return the transaction result + * @throws IllegalStateException if the transactionId is unknown + */ + public TransactionResult getTransactionResult(String transactionId) { + if (transactions.containsKey(transactionId)) { + LOG.info("Transaction '{}' found, returning result", transactionId); + LocalDateTime result = transactions.get(transactionId); + if (isComplete(result)) { + LOG.info("Transaction complete"); + return new TransactionResult(true, "A"); + } else { + LOG.info("Transaction running"); + return new TransactionResult(false, null); + } + } else { + LOG.error("Transaction '{}' not present", transactionId); + throw new IllegalStateException(String.format("Transaction '%s' not present", transactionId)); + } + } + + private boolean isComplete(LocalDateTime completionTime) { + return completionTime.isBefore(LocalDateTime.now()); + } + + /** + * Completes the transaction by releasing the given transactionId + * + * @param transactionId the ID of the transaction to be completed + * @throws IllegalStateException if the transaction to be complete has no result yet + */ + public void completeTransaction(String transactionId) { + if (transactions.containsKey(transactionId)) { + LocalDateTime result = transactions.get(transactionId); + if (isComplete(result)) { + transactions.remove(transactionId); + LOG.info("Completed transaction '{}'", transactionId); + } else { + LOG.info("Unable to complete transaction '{}'", transactionId); + throw new IllegalStateException( + String.format("Unable to complete transaction '%s'", transactionId)); + } + } + } + + /** + * A transaction result + * + * @param complete indicates whether the transaction is complete + * @param result the result. Only set if complete is true + */ + public record TransactionResult(boolean complete, String result) {} +} diff --git a/async-service-task/src/main/java/com/camunda/consulting/AsyncServiceProperties.java b/async-service-task/src/main/java/com/camunda/consulting/AsyncServiceProperties.java new file mode 100644 index 00000000..1f67338c --- /dev/null +++ b/async-service-task/src/main/java/com/camunda/consulting/AsyncServiceProperties.java @@ -0,0 +1,19 @@ +package com.camunda.consulting; + +import java.time.Duration; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@ConfigurationProperties("async-service") +@Configuration +public class AsyncServiceProperties { + private Duration duration; + + public Duration getDuration() { + return duration; + } + + public void setDuration(Duration duration) { + this.duration = duration; + } +} diff --git a/async-service-task/src/main/java/com/camunda/consulting/ExampleApplication.java b/async-service-task/src/main/java/com/camunda/consulting/ExampleApplication.java new file mode 100644 index 00000000..26ea803b --- /dev/null +++ b/async-service-task/src/main/java/com/camunda/consulting/ExampleApplication.java @@ -0,0 +1,13 @@ +package com.camunda.consulting; + +import io.camunda.zeebe.spring.client.annotation.Deployment; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@Deployment(resources = "classpath*:*.bpmn") +public class ExampleApplication { + public static void main(String[] args) { + SpringApplication.run(ExampleApplication.class, args); + } +} diff --git a/async-service-task/src/main/java/com/camunda/consulting/ProcessController.java b/async-service-task/src/main/java/com/camunda/consulting/ProcessController.java new file mode 100644 index 00000000..29edb6ae --- /dev/null +++ b/async-service-task/src/main/java/com/camunda/consulting/ProcessController.java @@ -0,0 +1,25 @@ +package com.camunda.consulting; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class ProcessController { + private final ZeebeClient zeebeClient; + + public ProcessController(ZeebeClient zeebeClient) { + this.zeebeClient = zeebeClient; + } + + @PostMapping("/start") + public ProcessInstanceEvent start() { + return zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("AsyncServiceTaskProcess") + .latestVersion() + .send() + .join(); + } +} diff --git a/async-service-task/src/main/resources/application.yaml b/async-service-task/src/main/resources/application.yaml new file mode 100644 index 00000000..be0487b8 --- /dev/null +++ b/async-service-task/src/main/resources/application.yaml @@ -0,0 +1,7 @@ +zeebe: + client: + security: + plaintext: true + +async-service: + duration: PT1M \ No newline at end of file diff --git a/async-service-task/src/main/resources/async-service-task.bpmn b/async-service-task/src/main/resources/async-service-task.bpmn new file mode 100644 index 00000000..b00bc1da --- /dev/null +++ b/async-service-task/src/main/resources/async-service-task.bpmn @@ -0,0 +1,47 @@ + + + + + Flow_0oz2221 + + + + + + + Flow_0oz2221 + Flow_0j2q5yl + + + Flow_0j2q5yl + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/async-service-task/src/test/java/com/camunda/consulting/AsyncJobWorkerTest.java b/async-service-task/src/test/java/com/camunda/consulting/AsyncJobWorkerTest.java new file mode 100644 index 00000000..62d7e9f1 --- /dev/null +++ b/async-service-task/src/test/java/com/camunda/consulting/AsyncJobWorkerTest.java @@ -0,0 +1,30 @@ +package com.camunda.consulting; + +import static io.camunda.zeebe.spring.test.ZeebeTestThreadSupport.*; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.spring.test.ZeebeSpringTest; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@ZeebeSpringTest +@SpringBootTest +public class AsyncJobWorkerTest { + + @Autowired ZeebeClient zeebeClient; + + @Test + void shouldRun() { + ProcessInstanceEvent process = + zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("AsyncServiceTaskProcess") + .latestVersion() + .send() + .join(); + waitForProcessInstanceCompleted(process, Duration.ofMinutes(2)); + } +} diff --git a/async-service-task/src/test/resources/application.yaml b/async-service-task/src/test/resources/application.yaml new file mode 100644 index 00000000..dac137c2 --- /dev/null +++ b/async-service-task/src/test/resources/application.yaml @@ -0,0 +1,2 @@ +async-service: + duration: PT5S \ No newline at end of file