Skip to content

Commit

Permalink
fix: workflow and task event fire and listen
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Mar 29, 2024
1 parent 329a6bf commit 4d4aa58
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event)
CompletableFuture<?> future = executorService.submit(new DeployRunner(event)).toCompletableFuture();
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
log.error("deploy workflow task instance error", throwable);
onFailure(event.getWorkflowTaskInstanceId(), throwable);
} else {
stateMachine.onSuccess(workflowTaskInstanceService.get(event.getWorkflowTaskInstanceId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceEvent;
import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState;
import cn.sliew.scaleph.workflow.queue.Event;
import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO;
import lombok.Getter;

import java.util.Optional;
Expand All @@ -34,24 +33,24 @@ public class WorkflowInstanceEventDTO implements Event {
private final WorkflowInstanceState nextState;
private final WorkflowInstanceEvent event;
private final Long workflowInstanceId;
private final Optional<Exception> exception;
private final Optional<Throwable> throwable;

public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId) {
this.topic = topic;
this.state = state;
this.nextState = nextState;
this.event = event;
this.workflowInstanceId = workflowInstanceId;
this.exception = Optional.empty();
this.throwable = Optional.empty();
}

public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Exception exception) {
public WorkflowInstanceEventDTO(String topic, WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId, Throwable throwable) {
this.topic = topic;
this.state = state;
this.nextState = nextState;
this.event = event;
this.workflowInstanceId = workflowInstanceId;
this.exception = Optional.ofNullable(exception);
this.throwable = Optional.ofNullable(throwable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.cola.statemachine.builder.StateMachineBuilder;
import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -58,12 +59,12 @@ public class WorkflowInstanceStateMachine implements InitializingBean {
@Autowired
private WorkflowInstanceFailureEventListener workflowInstanceFailureEventListener;

private StateMachine<WorkflowInstanceState, WorkflowInstanceEvent, Long> stateMachine;
private StateMachine<WorkflowInstanceState, WorkflowInstanceEvent, Pair<Long, Throwable>> stateMachine;
private Map<WorkflowInstanceEvent, Queue<WorkflowInstanceEventDTO>> queueMap;

@Override
public void afterPropertiesSet() throws Exception {
StateMachineBuilder<WorkflowInstanceState, WorkflowInstanceEvent, Long> builder = StateMachineBuilderFactory.create();
StateMachineBuilder<WorkflowInstanceState, WorkflowInstanceEvent, Pair<Long, Throwable>> builder = StateMachineBuilderFactory.create();

builder.externalTransition()
.from(WorkflowInstanceState.PENDING)
Expand Down Expand Up @@ -131,38 +132,38 @@ public void afterPropertiesSet() throws Exception {
queueMap.put(WorkflowInstanceEvent.PROCESS_FAILURE, failureQueue);
}

private Action<WorkflowInstanceState, WorkflowInstanceEvent, Long> doPerform() {
return (fromState, toState, eventEnum, workflowInstanceId) -> {
private Action<WorkflowInstanceState, WorkflowInstanceEvent, Pair<Long, Throwable>> doPerform() {
return (fromState, toState, eventEnum, pair) -> {
Queue<WorkflowInstanceEventDTO> queue = queueMap.get(eventEnum);
if (queue != null) {
queue.push(new WorkflowInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, workflowInstanceId));
queue.push(new WorkflowInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight()));
} else {
log.error("queue not found, event:{}", eventEnum.getValue());
}
};
}

public void deploy(WorkflowInstanceDTO workflowInstanceDTO) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_DEPLOY, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_DEPLOY, Pair.of(workflowInstanceDTO.getId(), null));
}

public void shutdown(WorkflowInstanceDTO workflowInstanceDTO) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SHUTDOWN, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SHUTDOWN, Pair.of(workflowInstanceDTO.getId(), null));
}

public void suspend(WorkflowInstanceDTO workflowInstanceDTO) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SUSPEND, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_SUSPEND, Pair.of(workflowInstanceDTO.getId(), null));
}

public void resume(WorkflowInstanceDTO workflowInstanceDTO) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_RESUME, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.COMMAND_RESUME, Pair.of(workflowInstanceDTO.getId(), null));
}

public void onSuccess(WorkflowInstanceDTO workflowInstanceDTO) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_SUCCESS, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_SUCCESS, Pair.of(workflowInstanceDTO.getId(), null));
}

public void onFailure(WorkflowInstanceDTO workflowInstanceDTO, Throwable throwable) {
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_FAILURE, workflowInstanceDTO.getId());
stateMachine.fireEvent(workflowInstanceDTO.getState(), WorkflowInstanceEvent.PROCESS_FAILURE, Pair.of(workflowInstanceDTO.getId(), throwable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import com.alibaba.cola.statemachine.builder.StateMachineBuilder;
import com.alibaba.cola.statemachine.builder.StateMachineBuilderFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Slf4j
@Component
Expand All @@ -55,13 +57,12 @@ public class WorkflowTaskInstanceStateMachine implements InitializingBean {
@Autowired
private WorkflowTaskInstanceFailureEventListener workflowTaskInstanceFailureEventListener;


private StateMachine<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Long> stateMachine;
private StateMachine<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Pair<Long, Throwable>> stateMachine;
private Map<WorkflowTaskInstanceEvent, Queue<WorkflowTaskInstanceEventDTO>> queueMap;

@Override
public void afterPropertiesSet() throws Exception {
StateMachineBuilder<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Long> builder = StateMachineBuilderFactory.create();
StateMachineBuilder<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Pair<Long, Throwable>> builder = StateMachineBuilderFactory.create();

builder.externalTransition()
.from(WorkflowTaskInstanceStage.PENDING)
Expand Down Expand Up @@ -117,38 +118,38 @@ public void afterPropertiesSet() throws Exception {
queueMap.put(WorkflowTaskInstanceEvent.PROCESS_FAILURE, failureQueue);
}

private Action<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Long> doPerform() {
return (fromState, toState, eventEnum, workflowTaskInstanceId) -> {
private Action<WorkflowTaskInstanceStage, WorkflowTaskInstanceEvent, Pair<Long, Throwable>> doPerform() {
return (fromState, toState, eventEnum, pair) -> {
Queue<WorkflowTaskInstanceEventDTO> queue = queueMap.get(eventEnum);
if (queue != null) {
queue.push(new WorkflowTaskInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, workflowTaskInstanceId));
queue.push(new WorkflowTaskInstanceEventDTO(queue.getName(), fromState, toState, eventEnum, pair.getLeft(), pair.getRight()));
} else {
log.error("queue not found, event: {}", eventEnum.getValue());
}
};
}

public void deploy(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_DEPLOY, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_DEPLOY, Pair.of(workflowTaskInstanceDTO.getId(), null));
}

public void shutdown(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SHUTDOWN, Pair.of(workflowTaskInstanceDTO.getId(), null));
}

public void suspend(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SUSPEND, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_SUSPEND, Pair.of(workflowTaskInstanceDTO.getId(), null));
}

public void resume(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_RESUME, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.COMMAND_RESUME, Pair.of(workflowTaskInstanceDTO.getId(), null));
}

public void onSuccess(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_SUCCESS, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_SUCCESS, Pair.of(workflowTaskInstanceDTO.getId(), null));
}

public void onFailure(WorkflowTaskInstanceDTO workflowTaskInstanceDTO, Throwable throwable) {
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_FAILURE, workflowTaskInstanceDTO.getId());
stateMachine.fireEvent(workflowTaskInstanceDTO.getStage(), WorkflowTaskInstanceEvent.PROCESS_FAILURE, Pair.of(workflowTaskInstanceDTO.getId(), throwable));
}
}

0 comments on commit 4d4aa58

Please sign in to comment.