diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java new file mode 100644 index 000000000..b30db0cbe --- /dev/null +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.api.config; + +import org.redisson.config.Config; +import org.redisson.spring.starter.RedissonAutoConfiguration; +import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.context.annotation.Configuration; + +@Configuration +@AutoConfigureBefore(RedissonAutoConfiguration.class) +public class RedissionConfig implements RedissonAutoConfigurationCustomizer { + + @Override + public void customize(Config config) { + + } +} diff --git a/scaleph-api/src/main/resources/application.conf b/scaleph-api/src/main/resources/application.conf index c8d839f55..356042d10 100644 --- a/scaleph-api/src/main/resources/application.conf +++ b/scaleph-api/src/main/resources/application.conf @@ -1,5 +1,5 @@ pekko { - log-config-on-start = on + log-config-on-start = off actor { provider = "cluster" default-dispatcher { diff --git a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java b/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java index 0d751eddd..613e0b307 100644 --- a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java +++ b/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java @@ -159,7 +159,7 @@ private static List getNetworkInterfaces() throws SocketExcept continue; } networkInterfaces.add(ni); - log.info("Valid network interface: index = {}, name = {}, mac = {}", + log.debug("Valid network interface: index = {}, name = {}, mac = {}", ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); } if (networkInterfaces.isEmpty()) { diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java new file mode 100644 index 000000000..69e2e0d6d --- /dev/null +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/AbstractWorkflowTaskInstanceEventListener.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.workflow.listener.taskinstance; + +import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; +import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RScheduledExecutorService; +import org.redisson.api.RedissonClient; +import org.redisson.api.WorkerOptions; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +public abstract class AbstractWorkflowTaskInstanceEventListener implements WorkflowTaskInstanceEventListener, InitializingBean, BeanFactoryAware { + + private BeanFactory beanFactory; + protected RScheduledExecutorService executorService; + + @Autowired + protected WorkflowTaskInstanceService workflowTaskInstanceService; + @Autowired + protected WorkflowTaskInstanceStateMachine stateMachine; + @Autowired + private RedissonClient redissonClient; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterPropertiesSet() throws Exception { + executorService = redissonClient.getExecutorService(WorkflowTaskInstanceStateMachine.EXECUTOR); + executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory)); + } + + @Override + public void onEvent(WorkflowTaskInstanceEventDTO event) { + try { + handleEventAsync(event); + } catch (Throwable throwable) { + onFailure(event.getWorkflowTaskInstanceId(), throwable); + } + } + + protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) { + stateMachine.onFailure(workflowTaskInstanceService.get(workflowTaskInstanceId), throwable); + } + + protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event); +} diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java new file mode 100644 index 000000000..e7a6032f6 --- /dev/null +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.workflow.listener.taskinstance; + +import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.redisson.api.annotation.RInject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener { + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + CompletableFuture future = executorService.submit(new DeployRunner(event)).toCompletableFuture(); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowTaskInstanceId(), throwable); + } else { + stateMachine.onSuccess(workflowTaskInstanceService.get(event.getWorkflowTaskInstanceId())); + } + }); + return future; + } + + public static class DeployRunner implements Runnable, Serializable { + + private WorkflowTaskInstanceEventDTO event; + + @RInject + private String taskId; + @Autowired + private WorkflowTaskInstanceService workflowTaskInstanceService; + + public DeployRunner(WorkflowTaskInstanceEventDTO event) { + this.event = event; + } + + @Override + public void run() { + workflowTaskInstanceService.updateState(event.getWorkflowTaskInstanceId(), event.getState(), event.getNextState(), null); + if (RandomUtils.nextInt(0, 100) > 30) { + throw new RuntimeException("部署失败"); + } + } + } +} diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java index ade4350c5..ad189418f 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java @@ -21,7 +21,6 @@ import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceEvent; import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; import cn.sliew.scaleph.workflow.queue.Event; -import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO; import lombok.Getter; import java.util.Optional; @@ -33,25 +32,25 @@ public class WorkflowTaskInstanceEventDTO implements Event { private final WorkflowTaskInstanceStage state; private final WorkflowTaskInstanceStage nextState; private final WorkflowTaskInstanceEvent event; - private final WorkflowTaskInstanceDTO workflowTaskInstanceDTO; - private final Optional exception; + private final Long workflowTaskInstanceId; + private final Optional throwable; - public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, WorkflowTaskInstanceDTO workflowTaskInstanceDTO) { + public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId) { this.topic = topic; this.state = state; this.nextState = nextState; this.event = event; - this.workflowTaskInstanceDTO = workflowTaskInstanceDTO; - this.exception = Optional.empty(); + this.workflowTaskInstanceId = workflowTaskInstanceId; + this.throwable = Optional.empty(); } - public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, WorkflowTaskInstanceDTO workflowTaskInstanceDTO, Exception exception) { + public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId, Throwable throwable) { this.topic = topic; this.state = state; this.nextState = nextState; this.event = event; - this.workflowTaskInstanceDTO = workflowTaskInstanceDTO; - this.exception = Optional.ofNullable(exception); + this.workflowTaskInstanceId = workflowTaskInstanceId; + this.throwable = Optional.ofNullable(throwable); } @Override diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java index bc0c171a6..fdbeabf2c 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java @@ -18,7 +18,8 @@ package cn.sliew.scaleph.workflow.listener.taskinstance; -public interface WorkflowTaskInstanceEventListener { +import cn.sliew.scaleph.workflow.queue.EventListener; + +public interface WorkflowTaskInstanceEventListener extends EventListener { - void run(WorkflowTaskInstanceEventDTO eventDTO); } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java new file mode 100644 index 000000000..155a13f13 --- /dev/null +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.workflow.listener.taskinstance; + +import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.annotation.RInject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowTaskInstanceFailureEventListener extends AbstractWorkflowTaskInstanceEventListener { + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + return executorService.submit(new FailureRunner(event.getWorkflowTaskInstanceId(), event.getThrowable())).toCompletableFuture(); + } + + public static class FailureRunner implements Runnable, Serializable { + + private Long workflowTaskInstanceId; + private Optional throwable; + + @RInject + private String taskId; + @Autowired + private WorkflowTaskInstanceService workflowTaskInstanceService; + + public FailureRunner(Long workflowTaskInstanceId, Optional throwable) { + this.workflowTaskInstanceId = workflowTaskInstanceId; + this.throwable = throwable; + } + + @Override + public void run() { + workflowTaskInstanceService.updateFailure(workflowTaskInstanceId, throwable.orElse(null)); + log.info("执行子任务失败啦, workflowTaskInstanceId: {}, taskId: {}", workflowTaskInstanceId, taskId, throwable); + } + } + +} diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java new file mode 100644 index 000000000..ba6b80065 --- /dev/null +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceSuccessEventListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.workflow.listener.taskinstance; + +import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.annotation.RInject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Component +public class WorkflowTaskInstanceSuccessEventListener extends AbstractWorkflowTaskInstanceEventListener { + + @Override + protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) { + return executorService.submit(new SuccessRunner(event.getWorkflowTaskInstanceId())).toCompletableFuture(); + } + + public static class SuccessRunner implements Runnable, Serializable { + + private Long workflowTaskInstanceId; + + @RInject + private String taskId; + @Autowired + private WorkflowTaskInstanceService workflowTaskInstanceService; + + public SuccessRunner(Long workflowTaskInstanceId) { + this.workflowTaskInstanceId = workflowTaskInstanceId; + } + + @Override + public void run() { + workflowTaskInstanceService.updateSuccess(workflowTaskInstanceId); + log.info("执行子任务成功啦, workflowTaskInstanceId: {}, taskId: {}", workflowTaskInstanceId, taskId); + } + } + +} diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java new file mode 100644 index 000000000..8b95598fb --- /dev/null +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/AbstractWorkflowInstanceEventListener.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.workflow.listener.workflowinstance; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.workflow.service.WorkflowInstanceService; +import cn.sliew.scaleph.workflow.statemachine.WorkflowInstanceStateMachine; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RScheduledExecutorService; +import org.redisson.api.RedissonClient; +import org.redisson.api.WorkerOptions; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +public abstract class AbstractWorkflowInstanceEventListener implements WorkflowInstanceEventListener, InitializingBean, BeanFactoryAware { + + private BeanFactory beanFactory; + protected RScheduledExecutorService executorService; + + @Autowired + protected WorkflowInstanceService workflowInstanceService; + @Autowired + private WorkflowInstanceStateMachine stateMachine; + @Autowired + private RedissonClient redissonClient; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterPropertiesSet() throws Exception { + executorService = redissonClient.getExecutorService(WorkflowInstanceStateMachine.EXECUTOR); + executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory)); + } + + @Override + public void onEvent(WorkflowInstanceEventDTO event) { + try { + CompletableFuture future = handleEventAsync(event.getWorkflowInstanceId()); + future.whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + onFailure(event.getWorkflowInstanceId(), throwable); + } else { + workflowInstanceService.updateState(event.getWorkflowInstanceId(), event.getState(), event.getNextState(), null); + } + }); + } catch (Throwable throwable) { + onFailure(event.getWorkflowInstanceId(), throwable); + } + } + + protected void onFailure(Long workflowInstanceId, Throwable throwable) { + stateMachine.onFailure(workflowInstanceService.get(workflowInstanceId), throwable); + } + + protected abstract CompletableFuture handleEventAsync(Long workflowInstanceId); +} diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceDeployEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceDeployEventListener.java index 6efc00bef..72bed240d 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceDeployEventListener.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceDeployEventListener.java @@ -18,68 +18,67 @@ package cn.sliew.scaleph.workflow.listener.workflowinstance; -import cn.sliew.scaleph.workflow.service.WorkflowInstanceService; import cn.sliew.scaleph.workflow.service.WorkflowTaskDefinitionService; import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO; import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO; -import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO; -import cn.sliew.scaleph.workflow.statemachine.WorkflowInstanceStateMachine; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RExecutorFuture; -import org.redisson.api.RScheduledExecutorService; -import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; @Slf4j @Component -public class WorkflowInstanceDeployEventListener implements WorkflowInstanceEventListener { +public class WorkflowInstanceDeployEventListener extends AbstractWorkflowInstanceEventListener { - @Autowired - private WorkflowInstanceService workflowInstanceService; @Autowired private WorkflowTaskDefinitionService workflowTaskDefinitionService; - @Autowired - private WorkflowTaskInstanceService workflowTaskInstanceService; - @Autowired - private WorkflowInstanceStateMachine stateMachine; - @Autowired - private RedissonClient redissonClient; @Override - public void onEvent(WorkflowInstanceEventDTO event) { - WorkflowInstanceDTO workflowInstanceDTO = event.getWorkflowInstanceDTO(); + protected CompletableFuture handleEventAsync(Long workflowInstanceId) { + WorkflowInstanceDTO workflowInstanceDTO = workflowInstanceService.get(workflowInstanceId); WorkflowDefinitionDTO workflowDefinitionDTO = workflowInstanceDTO.getWorkflowDefinition(); - try { - // fixme 获取所有 task 的执行结果,执行成功,则发送执行成功事件,否则发送执行失败事件 - doDeploy(workflowDefinitionDTO); - onSuccess(workflowInstanceDTO.getId()); - } catch (Exception e) { - onFailure(workflowInstanceDTO.getId(), e); - } + return doDeploy(workflowDefinitionDTO); } - private void doDeploy(WorkflowDefinitionDTO workflowDefinitionDTO) { - RScheduledExecutorService executorService = redissonClient.getExecutorService("WorkflowTaskInstanceDeploy"); + private CompletableFuture doDeploy(WorkflowDefinitionDTO workflowDefinitionDTO) { List workflowTaskDefinitionDTOS = workflowTaskDefinitionService.list(workflowDefinitionDTO.getId()); - List> futures = new ArrayList<>(workflowTaskDefinitionDTOS.size()); + // todo 应该是找到 root 节点,批量启动 root 节点 + List futures = new ArrayList<>(workflowTaskDefinitionDTOS.size()); for (WorkflowTaskDefinitionDTO workflowTaskDefinitionDTO : workflowTaskDefinitionDTOS) { - RExecutorFuture future = executorService.submit(() -> workflowTaskInstanceService.deploy(workflowTaskDefinitionDTO.getId())); - futures.add(future); - // todo 全部执行完毕,在发送下一步信息 + RExecutorFuture future = executorService.submit(new DeployRunner(workflowTaskDefinitionDTO.getId())); + // RExecutorFuture -> CompletableFuture 类型强转无法监听异步任务执行结果。需转成 CompletableFuture + futures.add(future.toCompletableFuture()); } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); } - private void onFailure(Long workflowInstanceId, Exception e) { - stateMachine.onFailure(workflowInstanceService.get(workflowInstanceId), e); - } + /** + * 必须实现 Serializable 接口,无法使用 lambda + */ + public static class DeployRunner implements Runnable, Serializable { - private void onSuccess(Long workflowInstanceId) { - stateMachine.onSuccess(workflowInstanceService.get(workflowInstanceId)); + private Long workflowTaskDefinitionId; + + @RInject + private String taskId; + @Autowired + private WorkflowTaskInstanceService workflowTaskInstanceService; + + public DeployRunner(Long workflowTaskDefinitionId) { + this.workflowTaskDefinitionId = workflowTaskDefinitionId; + } + + @Override + public void run() { + workflowTaskInstanceService.deploy(workflowTaskDefinitionId); + } } } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java index f9b35ff1f..0cc80725a 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java @@ -21,7 +21,6 @@ import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceEvent; import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; import cn.sliew.scaleph.workflow.queue.Event; -import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; import lombok.Getter; import java.util.Optional; @@ -33,25 +32,25 @@ public class WorkflowInstanceEventDTO implements Event { private final WorkflowInstanceState state; private final WorkflowInstanceState nextState; private final WorkflowInstanceEvent event; - private final WorkflowInstanceDTO workflowInstanceDTO; - private final Optional exception; + private final Long workflowInstanceId; + private final Optional throwable; - public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, WorkflowInstanceDTO workflowInstanceDTO) { + public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId) { this.topic = topic; this.state = state; this.nextState = nextState; this.event = event; - this.workflowInstanceDTO = workflowInstanceDTO; - this.exception = Optional.empty(); + this.workflowInstanceId = workflowInstanceId; + this.throwable = Optional.empty(); } - public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, WorkflowInstanceDTO workflowInstanceDTO, Exception exception) { + public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Throwable throwable) { this.topic = topic; this.state = state; this.nextState = nextState; this.event = event; - this.workflowInstanceDTO = workflowInstanceDTO; - this.exception = Optional.ofNullable(exception); + this.workflowInstanceId = workflowInstanceId; + this.throwable = Optional.ofNullable(throwable); } @Override diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowInstanceService.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowInstanceService.java index dc18239a4..7a9d4d4da 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowInstanceService.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowInstanceService.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.workflow.service; +import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState; import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; import cn.sliew.scaleph.workflow.service.param.WorkflowInstanceListParam; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -28,6 +29,8 @@ public interface WorkflowInstanceService { WorkflowInstanceDTO get(Long id); + void updateState(Long id, WorkflowInstanceState state, WorkflowInstanceState nextState, String message); + WorkflowInstanceDTO deploy(Long workflowDefinitionId); void shutdown(Long id); diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowTaskInstanceService.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowTaskInstanceService.java index 6f8813c97..1ebd1f790 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowTaskInstanceService.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowTaskInstanceService.java @@ -18,7 +18,7 @@ package cn.sliew.scaleph.workflow.service; -import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; +import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage; import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO; import cn.sliew.scaleph.workflow.service.param.WorkflowTaskInstanceListParam; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -29,6 +29,12 @@ public interface WorkflowTaskInstanceService { WorkflowTaskInstanceDTO get(Long id); + void updateState(Long id, WorkflowTaskInstanceStage stage, WorkflowTaskInstanceStage nextStage, String message); + + void updateSuccess(Long id); + + void updateFailure(Long id, Throwable throwable); + WorkflowTaskInstanceDTO deploy(Long workflowTaskDefinitionId); void shutdown(Long id); diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowInstanceServiceImpl.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowInstanceServiceImpl.java index 8f066a689..0636336c5 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowInstanceServiceImpl.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowInstanceServiceImpl.java @@ -27,6 +27,8 @@ import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO; import cn.sliew.scaleph.workflow.service.param.WorkflowInstanceListParam; import cn.sliew.scaleph.workflow.statemachine.WorkflowInstanceStateMachine; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -60,6 +62,17 @@ public WorkflowInstanceDTO get(Long id) { return WorkflowInstanceVOConvert.INSTANCE.toDto(vo); } + @Override + public void updateState(Long id, WorkflowInstanceState state, WorkflowInstanceState nextState, String message) { + LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(WorkflowInstance.class) + .eq(WorkflowInstance::getId, id) + .eq(WorkflowInstance::getState, state); + WorkflowInstance record = new WorkflowInstance(); + record.setState(nextState); + record.setMessage(message); + workflowInstanceMapper.update(record, updateWrapper); + } + @Override public WorkflowInstanceDTO deploy(Long workflowDefinitionId) { WorkflowInstance record = new WorkflowInstance(); @@ -72,19 +85,16 @@ public WorkflowInstanceDTO deploy(Long workflowDefinitionId) { @Override public void shutdown(Long id) { - WorkflowInstanceDTO workflowInstanceDTO = get(id); - stateMachine.shutdown(workflowInstanceDTO); + stateMachine.shutdown(get(id)); } @Override public void suspend(Long id) { - WorkflowInstanceDTO workflowInstanceDTO = get(id); - stateMachine.suspend(workflowInstanceDTO); + stateMachine.suspend(get(id)); } @Override public void resume(Long id) { - WorkflowInstanceDTO workflowInstanceDTO = get(id); - stateMachine.resume(workflowInstanceDTO); + stateMachine.resume(get(id)); } } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowTaskInstanceServiceImpl.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowTaskInstanceServiceImpl.java index 49edf1300..4bda9ba56 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowTaskInstanceServiceImpl.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowTaskInstanceServiceImpl.java @@ -22,16 +22,18 @@ import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowTaskInstance; import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowTaskInstanceVO; import cn.sliew.scaleph.dao.mapper.master.workflow.WorkflowTaskInstanceMapper; -import cn.sliew.scaleph.workflow.service.WorkflowInstanceService; import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService; import cn.sliew.scaleph.workflow.service.convert.WorkflowTaskInstanceVOConvert; import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO; import cn.sliew.scaleph.workflow.service.param.WorkflowTaskInstanceListParam; import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Date; import java.util.List; import static cn.sliew.milky.common.check.Ensures.checkState; @@ -39,8 +41,6 @@ @Service public class WorkflowTaskInstanceServiceImpl implements WorkflowTaskInstanceService { - @Autowired - private WorkflowInstanceService workflowInstanceService; @Autowired private WorkflowTaskInstanceMapper workflowTaskInstanceMapper; @Autowired @@ -63,6 +63,38 @@ public WorkflowTaskInstanceDTO get(Long id) { return WorkflowTaskInstanceVOConvert.INSTANCE.toDto(vo); } + @Override + public void updateState(Long id, WorkflowTaskInstanceStage stage, WorkflowTaskInstanceStage nextStage, String message) { + LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(WorkflowTaskInstance.class) + .eq(WorkflowTaskInstance::getId, id) + .eq(WorkflowTaskInstance::getStage, stage); + WorkflowTaskInstance record = new WorkflowTaskInstance(); + record.setStage(nextStage); + record.setMessage(message); + workflowTaskInstanceMapper.update(record, updateWrapper); + } + + @Override + public void updateSuccess(Long id) { + WorkflowTaskInstance record = new WorkflowTaskInstance(); + record.setId(id); + record.setStage(WorkflowTaskInstanceStage.SUCCESS); + record.setEndTime(new Date()); + workflowTaskInstanceMapper.updateById(record); + } + + @Override + public void updateFailure(Long id, Throwable throwable) { + WorkflowTaskInstance record = new WorkflowTaskInstance(); + record.setId(id); + record.setStage(WorkflowTaskInstanceStage.FAILURE); + record.setEndTime(new Date()); + if (throwable != null) { + record.setMessage(throwable.getMessage()); + } + workflowTaskInstanceMapper.updateById(record); + } + @Override public WorkflowTaskInstanceDTO deploy(Long workflowTaskDefinitionId) { WorkflowTaskInstance record = new WorkflowTaskInstance(); @@ -75,19 +107,16 @@ public WorkflowTaskInstanceDTO deploy(Long workflowTaskDefinitionId) { @Override public void shutdown(Long id) { - WorkflowTaskInstanceDTO workflowTaskInstanceDTO = get(id); - stateMachine.shutdown(workflowTaskInstanceDTO); + stateMachine.shutdown(get(id)); } @Override public void suspend(Long id) { - WorkflowTaskInstanceDTO workflowTaskInstanceDTO = get(id); - stateMachine.suspend(workflowTaskInstanceDTO); + stateMachine.suspend(get(id)); } @Override public void resume(Long id) { - WorkflowTaskInstanceDTO workflowTaskInstanceDTO = get(id); - stateMachine.resume(workflowTaskInstanceDTO); + stateMachine.resume(get(id)); } } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java index a2e329c86..72f791178 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java @@ -29,6 +29,7 @@ import com.alibaba.cola.statemachine.builder.StateMachineBuilder; import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -40,6 +41,9 @@ @Component public class WorkflowInstanceStateMachine implements InitializingBean { + public static final String CONSUMER_GROUP = "WorkflowInstanceStateMachine"; + public static final String EXECUTOR = "WorkflowInstanceExecute"; + @Autowired private QueueFactory queueFactory; @Autowired @@ -55,12 +59,12 @@ public class WorkflowInstanceStateMachine implements InitializingBean { @Autowired private WorkflowInstanceFailureEventListener workflowInstanceFailureEventListener; - private StateMachine stateMachine; + private StateMachine> stateMachine; private Map> queueMap; @Override public void afterPropertiesSet() throws Exception { - StateMachineBuilder builder = StateMachineBuilderFactory.create(); + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); builder.externalTransition() .from(WorkflowInstanceState.PENDING) @@ -100,39 +104,39 @@ public void afterPropertiesSet() throws Exception { .on(WorkflowInstanceEvent.COMMAND_SHUTDOWN) .perform(doPerform()); - this.stateMachine = builder.build("WorkflowInstanceStateMachine"); + this.stateMachine = builder.build(CONSUMER_GROUP); this.queueMap = new HashMap<>(); - Queue deployQueue = queueFactory.newInstance(WorkflowInstanceEvent.COMMAND_DEPLOY.getValue()); - deployQueue.register("WorkflowInstanceStateMachine", workflowInstanceDeployEventListener); + Queue deployQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.COMMAND_DEPLOY.getValue()); + deployQueue.register(CONSUMER_GROUP, workflowInstanceDeployEventListener); queueMap.put(WorkflowInstanceEvent.COMMAND_DEPLOY, deployQueue); - Queue shutDownQueue = queueFactory.newInstance(WorkflowInstanceEvent.COMMAND_SHUTDOWN.getValue()); - shutDownQueue.register("WorkflowInstanceStateMachine", workflowInstanceShutdownEventListener); + Queue shutDownQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.COMMAND_SHUTDOWN.getValue()); + shutDownQueue.register(CONSUMER_GROUP, workflowInstanceShutdownEventListener); queueMap.put(WorkflowInstanceEvent.COMMAND_SHUTDOWN, shutDownQueue); - Queue suspendQueue = queueFactory.newInstance(WorkflowInstanceEvent.COMMAND_SUSPEND.getValue()); - suspendQueue.register("WorkflowInstanceStateMachine", workflowInstanceSuspendEventListener); + Queue suspendQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.COMMAND_SUSPEND.getValue()); + suspendQueue.register(CONSUMER_GROUP, workflowInstanceSuspendEventListener); queueMap.put(WorkflowInstanceEvent.COMMAND_SUSPEND, suspendQueue); - Queue resumeQueue = queueFactory.newInstance(WorkflowInstanceEvent.COMMAND_RESUME.getValue()); - resumeQueue.register("WorkflowInstanceStateMachine", workflowInstanceResumeEventListener); + Queue resumeQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.COMMAND_RESUME.getValue()); + resumeQueue.register(CONSUMER_GROUP, workflowInstanceResumeEventListener); queueMap.put(WorkflowInstanceEvent.COMMAND_RESUME, resumeQueue); - Queue successQueue = queueFactory.newInstance(WorkflowInstanceEvent.PROCESS_SUCCESS.getValue()); - successQueue.register("WorkflowInstanceStateMachine", workflowInstanceSuccessEventListener); + Queue successQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.PROCESS_SUCCESS.getValue()); + successQueue.register(CONSUMER_GROUP, workflowInstanceSuccessEventListener); queueMap.put(WorkflowInstanceEvent.PROCESS_SUCCESS, successQueue); - Queue failureQueue = queueFactory.newInstance(WorkflowInstanceEvent.PROCESS_FAILURE.getValue()); - failureQueue.register("WorkflowInstanceStateMachine", workflowInstanceFailureEventListener); + Queue failureQueue = queueFactory.newInstance("WorkflowInstanceEvent#" + WorkflowInstanceEvent.PROCESS_FAILURE.getValue()); + failureQueue.register(CONSUMER_GROUP, workflowInstanceFailureEventListener); queueMap.put(WorkflowInstanceEvent.PROCESS_FAILURE, failureQueue); } - private Action doPerform() { - return (fromState, toState, eventEnum, workflowInstanceDTO) -> { + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { Queue queue = queueMap.get(eventEnum); if (queue != null) { - queue.push(new WorkflowInstanceEventDTO(eventEnum.getValue(), fromState, toState, eventEnum, workflowInstanceDTO)); + queue.push(new WorkflowInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight())); } else { log.error("queue not found, event:{}", eventEnum.getValue()); } @@ -140,26 +144,26 @@ private Action stateMachine; + private StateMachine> stateMachine; private Map> queueMap; @Override public void afterPropertiesSet() throws Exception { - StateMachineBuilder builder = StateMachineBuilderFactory.create(); + StateMachineBuilder> builder = StateMachineBuilderFactory.create(); builder.externalTransition() .from(WorkflowTaskInstanceStage.PENDING) @@ -88,15 +102,27 @@ public void afterPropertiesSet() throws Exception { .on(WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN) .perform(doPerform()); - this.stateMachine = builder.build("WorkflowTaskInstanceStateMachine"); + this.stateMachine = builder.build(CONSUMER_GROUP); this.queueMap = new HashMap<>(); + + Queue deployQueue = queueFactory.newInstance("WorkflowTaskInstanceEvent#" + WorkflowTaskInstanceEvent.COMMAND_DEPLOY.getValue()); + deployQueue.register(CONSUMER_GROUP, workflowTaskInstanceDeployEventListener); + queueMap.put(WorkflowTaskInstanceEvent.COMMAND_DEPLOY, deployQueue); + + Queue successQueue = queueFactory.newInstance("WorkflowTaskInstanceEvent#" + WorkflowTaskInstanceEvent.PROCESS_SUCCESS.getValue()); + successQueue.register(CONSUMER_GROUP, workflowTaskInstanceSuccessEventListener); + queueMap.put(WorkflowTaskInstanceEvent.PROCESS_SUCCESS, successQueue); + + Queue failureQueue = queueFactory.newInstance("WorkflowTaskInstanceEvent#" + WorkflowTaskInstanceEvent.PROCESS_FAILURE.getValue()); + failureQueue.register(CONSUMER_GROUP, workflowTaskInstanceFailureEventListener); + queueMap.put(WorkflowTaskInstanceEvent.PROCESS_FAILURE, failureQueue); } - private Action doPerform() { - return (fromState, toState, eventEnum, workflowTaskInstanceDTO) -> { + private Action> doPerform() { + return (fromState, toState, eventEnum, pair) -> { Queue queue = queueMap.get(eventEnum); if (queue != null) { - queue.push(new WorkflowTaskInstanceEventDTO(eventEnum.getValue(), fromState, toState, eventEnum, workflowTaskInstanceDTO)); + queue.push(new WorkflowTaskInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight())); } else { log.error("queue not found, event: {}", eventEnum.getValue()); } @@ -104,18 +130,26 @@ private Action