diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java index 25eefae43af..0a9c385b331 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java @@ -33,10 +33,14 @@ import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser; import io.serverlessworkflow.api.Workflow; +import io.serverlessworkflow.api.actions.Action; import io.serverlessworkflow.api.events.OnEvents; import io.serverlessworkflow.api.states.EventState; +import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.eventBasedSplitNode; +import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.joinExclusiveNode; import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.startMessageNode; +import static org.kie.kogito.serverless.workflow.utils.TimeoutsConfigResolver.resolveEventTimeout; public class EventHandler extends CompositeContextNodeHandler { @@ -51,18 +55,45 @@ public void handleStart() { @Override public MakeNodeResult makeNode(RuleFlowNodeContainerFactory factory) { - MakeNodeResult currentBranch = joinNodes(factory, state.getOnEvents(), this::processOnEvent); - // ignore timeout for start states - return isStartState ? currentBranch : makeTimeoutNode(factory, currentBranch); + return joinNodes(factory, state.getOnEvents(), this::processOnEvent); } private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory factory, OnEvents onEvent) { - MakeNodeResult result = joinNodes(factory, - onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), isStartState ? ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR : getVarName(), - (f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar))); - CompositeContextNodeFactory embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions()); - connect(result.getOutgoingNode(), embeddedSubProcess); - return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess); + if (isStartState) { + MakeNodeResult result = joinNodes(factory, + onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR, + (f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar))); + CompositeContextNodeFactory embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions()); + connect(result.getOutgoingNode(), embeddedSubProcess); + return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess); + } else { + String varName = getVarName(); + CompositeContextNodeFactory embeddedSubProcess = makeCompositeNode(factory); + NodeFactory startNode = embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart"); + JoinFactory joinNode = null; + String eventTimeout = resolveEventTimeout(state, workflow); + if (eventTimeout != null) { + // creating a split-join branch for the timer + SplitFactory splitNode = eventBasedSplitNode(embeddedSubProcess.splitNode(parserContext.newId()), Split.TYPE_XAND); + joinNode = joinExclusiveNode(embeddedSubProcess.joinNode(parserContext.newId())); + startNode = connect(startNode, splitNode); + createTimerNode(embeddedSubProcess, splitNode, joinNode, eventTimeout); + } + MakeNodeResult result = joinNodes(embeddedSubProcess, + onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), varName, + (f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar))); + connect(startNode, result.getIncomingNode()); + NodeFactory currentNode = result.getOutgoingNode(); + for (Action action : onEvent.getActions()) { + currentNode = connect(currentNode, getActionNode(embeddedSubProcess, action, varName, true)); + } + if (joinNode != null) { + currentNode = connect(currentNode, joinNode); + } + connect(currentNode, embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done(); + handleErrors(parserContext.factory(), embeddedSubProcess); + return new MakeNodeResult(embeddedSubProcess); + } } private MakeNodeResult joinNodes(RuleFlowNodeContainerFactory factory, List events, BiFunction, T, MakeNodeResult> function) { diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java index 98eeda115a8..6ef53ad49c3 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java @@ -33,7 +33,6 @@ import org.jbpm.ruleflow.core.Metadata; import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory; import org.jbpm.ruleflow.core.RuleFlowProcessFactory; -import org.jbpm.ruleflow.core.factory.AbstractCompositeNodeFactory; import org.jbpm.ruleflow.core.factory.ActionNodeFactory; import org.jbpm.ruleflow.core.factory.BoundaryEventNodeFactory; import org.jbpm.ruleflow.core.factory.CompositeContextNodeFactory; @@ -477,25 +476,19 @@ protected final EventDefinition eventDefinition(String eventName) { protected final MakeNodeResult makeTimeoutNode(RuleFlowNodeContainerFactory factory, MakeNodeResult notTimerBranch) { String eventTimeout = resolveEventTimeout(state, workflow); if (eventTimeout != null) { - if (notTimerBranch.getIncomingNode() == notTimerBranch.getOutgoingNode() && notTimerBranch.getIncomingNode() instanceof AbstractCompositeNodeFactory) { - // reusing composite - ((AbstractCompositeNodeFactory) notTimerBranch.getIncomingNode()).timeout(eventTimeout); - return notTimerBranch; - } else { - // creating a split-join branch for the timer - SplitFactory splitNode = eventBasedSplitNode(factory.splitNode(parserContext.newId()), Split.TYPE_XAND); - JoinFactory joinNode = joinExclusiveNode(factory.joinNode(parserContext.newId())); - connect(connect(splitNode, notTimerBranch), joinNode); - createTimerNode(factory, splitNode, joinNode, eventTimeout); - return new MakeNodeResult(splitNode, joinNode); - } + // creating a split-join branch for the timer + SplitFactory splitNode = eventBasedSplitNode(factory.splitNode(parserContext.newId()), Split.TYPE_XAND); + JoinFactory joinNode = joinExclusiveNode(factory.joinNode(parserContext.newId())); + connect(connect(splitNode, notTimerBranch), joinNode); + createTimerNode(factory, splitNode, joinNode, eventTimeout); + return new MakeNodeResult(splitNode, joinNode); } else { // No timeouts, returning the existing branch. return notTimerBranch; } } - private void createTimerNode(RuleFlowNodeContainerFactory factory, SplitFactory splitNode, JoinFactory joinNode, String eventTimeout) { + protected final void createTimerNode(RuleFlowNodeContainerFactory factory, SplitFactory splitNode, JoinFactory joinNode, String eventTimeout) { TimerNodeFactory eventTimeoutTimerNode = timerNode(factory.timerNode(parserContext.newId()), eventTimeout); connect(splitNode, eventTimeoutTimerNode); connect(eventTimeoutTimerNode, joinNode); diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventWithError.sw.json b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventWithError.sw.json new file mode 100644 index 00000000000..b9e84b6d972 --- /dev/null +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventWithError.sw.json @@ -0,0 +1,95 @@ +{ + "id": "eventTimedout", + "version": "1.0", + "expressionLang": "jsonpath", + "name": "Workflow event test", + "description": "An test of a non starting event with timeout error", + "start": "printWaitMessage", + "events": [ + { + "name": "moveEvent", + "source": "", + "type": "move" + } + ], + "errors": [ + { + "name": "timeoutError", + "code": "TimedOut" + } + ], + "functions": [ + { + "name": "printMessage", + "type": "custom", + "operation": "sysout" + }, + { + "name": "publishTimeoutExpired", + "type": "asyncapi", + "operation": "specs/callbackResults.yaml#sendTimeoutExpired" + } + ] + , + "states": [ + { + "name": "printWaitMessage", + "type": "operation", + "actions": [ + { + "name": "printBeforeEvent", + "functionRef": { + "refName": "printMessage", + "arguments": { + "message": "$[*]" + } + } + } + ], + "transition": "waitForEvent" + }, + { + "name": "waitForEvent", + "type": "event", + "onEvents": [ + { + "eventRefs": [ + "moveEvent" + ], + "actions": [ + { + "name": "printAfterEvent", + "functionRef": { + "refName": "printMessage", + "arguments": { + "message": "$[*]" + } + } + } + ] + } + ], + "onErrors": [ + { + "errorRef": "timeoutError", + "transition": "PublishTimeout" + } + ], + "timeouts": { + "eventTimeout": "PT5S" + }, + "end":true + }, + { + "name": "PublishTimeout", + "type": "operation", + "actions": [ + { + "name": "publishTimeoutExpired", + "functionRef": "publishTimeoutExpired" + } + ], + "end": "true" + } + ] +} \ No newline at end of file diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java new file mode 100644 index 00000000000..292fbcab110 --- /dev/null +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.quarkus.workflows; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.kie.kogito.event.Converter; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; +import org.kie.kogito.event.impl.ByteArrayCloudEventUnmarshallerFactory; +import org.kie.kogito.test.quarkus.QuarkusTestProperty; +import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient; +import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import io.cloudevents.CloudEvent; +import io.cloudevents.jackson.JsonFormat; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.kie.kogito.quarkus.workflows.AssuredTestUtils.startProcess; + +@QuarkusIntegrationTest +@QuarkusTestResource(KafkaQuarkusTestResource.class) +public class EventTimedoutIT { + + private final static Logger logger = LoggerFactory.getLogger(EventTimedoutIT.class); + + @QuarkusTestProperty(name = KafkaQuarkusTestResource.KOGITO_KAFKA_PROPERTY) + String kafkaBootstrapServers; + private ObjectMapper objectMapper; + private KafkaTypedTestClient kafkaClient; + + @BeforeEach + void setup() { + kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class); + objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(JsonFormat.getCloudEventJacksonModule()) + .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @AfterEach + void cleanUp() { + if (kafkaClient != null) { + kafkaClient.shutdown(); + } + } + + @Test + void testTimedout() throws InterruptedException { + String id = startProcess("eventTimedout"); + Converter converter = new ByteArrayCloudEventUnmarshallerFactory(objectMapper).unmarshaller(Map.class).cloudEvent(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + kafkaClient.consume("timeout", v -> { + try { + CloudEvent event = converter.convert(v); + if (id.equals(event.getExtension(CloudEventExtensionConstants.PROCESS_INSTANCE_ID))) { + countDownLatch.countDown(); + } + } catch (IOException e) { + logger.info("Unmarshall exception", e); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(countDownLatch.getCount()).isZero(); + } +}