From 4d4aa58c9e6c4b6526b7ef4676038579ac1cca7d Mon Sep 17 00:00:00 2001 From: wangqi Date: Fri, 29 Mar 2024 18:51:24 +0800 Subject: [PATCH] fix: workflow and task event fire and listen --- ...rkflowTaskInstanceDeployEventListener.java | 1 - .../WorkflowInstanceEventDTO.java | 9 +++---- .../WorkflowInstanceStateMachine.java | 23 +++++++++-------- .../WorkflowTaskInstanceStateMachine.java | 25 ++++++++++--------- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java index acc527c0f..e7a6032f6 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java @@ -37,7 +37,6 @@ protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); future.whenCompleteAsync((unused, throwable) -> { if (throwable != null) { - log.error("deploy workflow task instance error", throwable); onFailure(event.getWorkflowTaskInstanceId(), throwable); } else { stateMachine.onSuccess(workflowTaskInstanceService.get(event.getWorkflowTaskInstanceId())); diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java index 89981d377..0cc80725a 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java @@ -21,7 +21,6 @@ import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; import cn.sliew.scaleph.workflow.queue.Event; -import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; import lombok.Getter; import java.util.Optional; @@ -34,7 +33,7 @@ public class WorkflowInstanceEventDTO implements Event { private final WorkflowInstanceState nextState; private final WorkflowInstanceEvent event; private final Long workflowInstanceId; - private final Optional exception; + private final Optional throwable; public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId) { this.topic = topic; @@ -42,16 +41,16 @@ public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, Workf this.nextState = nextState; this.event = event; this.workflowInstanceId = workflowInstanceId; - this.exception = Optional.empty(); + this.throwable = Optional.empty(); } - public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Exception exception) { + public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Throwable throwable) { this.topic = topic; this.state = state; this.nextState = nextState; this.event = event; this.workflowInstanceId = workflowInstanceId; - this.exception = Optional.ofNullable(exception); + this.throwable = Optional.ofNullable(throwable); } @Override diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java index 36fbdef1f..72f791178 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java @@ -29,6 +29,7 @@ import com.alibaba.cola.statemachine.builder.StateMachineBuilder; import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -58,12 +59,12 @@ public class WorkflowInstanceStateMachine implements InitializingBean { @Autowired private WorkflowInstanceFailureEventListener workflowInstanceFailureEventListener; - private StateMachine stateMachine; + private StateMachine> stateMachine; private Map> queueMap; @Override public void afterPropertiesSet() throws Exception { - StateMachineBuilder builder = StateMachineBuilderFactory.create(); + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); builder.externalTransition() .from(WorkflowInstanceState.PENDING) @@ -131,11 +132,11 @@ public void afterPropertiesSet() throws Exception { queueMap.put(WorkflowInstanceEvent.PROCESS_FAILURE, failureQueue); } - private Action doPerform() { - return (fromState, toState, eventEnum, workflowInstanceId) -> { + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { Queue queue = queueMap.get(eventEnum); if (queue != null) { - queue.push(new WorkflowInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, workflowInstanceId)); + queue.push(new WorkflowInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight())); } else { log.error("queue not found, event:{}", eventEnum.getValue()); } @@ -143,26 +144,26 @@ private Action doPerform() { } public void deploy(WorkflowInstanceDTO workflowInstanceDTO) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_DEPLOY, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_DEPLOY, Pair.of(workflowInstanceDTO.getId(), null)); } public void shutdown(WorkflowInstanceDTO workflowInstanceDTO) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SHUTDOWN, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SHUTDOWN, Pair.of(workflowInstanceDTO.getId(), null)); } public void suspend(WorkflowInstanceDTO workflowInstanceDTO) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SUSPEND, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SUSPEND, Pair.of(workflowInstanceDTO.getId(), null)); } public void resume(WorkflowInstanceDTO workflowInstanceDTO) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_RESUME, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_RESUME, Pair.of(workflowInstanceDTO.getId(), null)); } public void onSuccess(WorkflowInstanceDTO workflowInstanceDTO) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_SUCCESS, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_SUCCESS, Pair.of(workflowInstanceDTO.getId(), null)); } public void onFailure(WorkflowInstanceDTO workflowInstanceDTO, Throwable throwable) { - stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_FAILURE, workflowInstanceDTO.getId()); + stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_FAILURE, Pair.of(workflowInstanceDTO.getId(), throwable)); } } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowTaskInstanceStateMachine.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowTaskInstanceStateMachine.java index 18b81a32e..0bdec75f4 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowTaskInstanceStateMachine.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowTaskInstanceStateMachine.java @@ -32,12 +32,14 @@ import com.alibaba.cola.statemachine.builder.StateMachineBuilder; import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @Slf4j @Component @@ -55,13 +57,12 @@ public class WorkflowTaskInstanceStateMachine implements InitializingBean { @Autowired private WorkflowTaskInstanceFailureEventListener workflowTaskInstanceFailureEventListener; - - private StateMachine stateMachine; + private StateMachine> stateMachine; private Map> queueMap; @Override public void afterPropertiesSet() throws Exception { - StateMachineBuilder builder = StateMachineBuilderFactory.create(); + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); builder.externalTransition() .from(WorkflowTaskInstanceStage.PENDING) @@ -117,11 +118,11 @@ public void afterPropertiesSet() throws Exception { queueMap.put(WorkflowTaskInstanceEvent.PROCESS_FAILURE, failureQueue); } - private Action doPerform() { - return (fromState, toState, eventEnum, workflowTaskInstanceId) -> { + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { Queue queue = queueMap.get(eventEnum); if (queue != null) { - queue.push(new WorkflowTaskInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, workflowTaskInstanceId)); + queue.push(new WorkflowTaskInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight())); } else { log.error("queue not found, event: {}", eventEnum.getValue()); } @@ -129,26 +130,26 @@ private Action doPer } public void deploy(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_DEPLOY, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_DEPLOY, Pair.of(workflowTaskInstanceDTO.getId(), null)); } public void shutdown(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, Pair.of(workflowTaskInstanceDTO.getId(), null)); } public void suspend(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SUSPEND, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SUSPEND, Pair.of(workflowTaskInstanceDTO.getId(), null)); } public void resume(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_RESUME, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_RESUME, Pair.of(workflowTaskInstanceDTO.getId(), null)); } public void onSuccess(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_SUCCESS, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_SUCCESS, Pair.of(workflowTaskInstanceDTO.getId(), null)); } public void onFailure(WorkflowTaskInstanceDTO workflowTaskInstanceDTO, Throwable throwable) { - stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_FAILURE, workflowTaskInstanceDTO.getId()); + stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_FAILURE, Pair.of(workflowTaskInstanceDTO.getId(), throwable)); } }