Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-workflow] update scaleph workflow service #700

Merged
merged 9 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.api.config;

import org.redisson.config.Config;
import org.redisson.spring.starter.RedissonAutoConfiguration;
import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Configuration;

@Configuration
@AutoConfigureBefore(RedissonAutoConfiguration.class)
public class RedissionConfig implements RedissonAutoConfigurationCustomizer {

@Override
public void customize(Config config) {

}
}
2 changes: 1 addition & 1 deletion scaleph-api/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pekko {
log-config-on-start = on
log-config-on-start = off
actor {
provider = "cluster"
default-dispatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private static List<NetworkInterface> getNetworkInterfaces() throws SocketExcept
continue;
}
networkInterfaces.add(ni);
log.info("Valid network interface: index = {}, name = {}, mac = {}",
log.debug("Valid network interface: index = {}, name = {}, mac = {}",
ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress()));
}
if (networkInterfaces.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.api.WorkerOptions;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CompletableFuture;

@Slf4j
public abstract class AbstractWorkflowTaskInstanceEventListener implements WorkflowTaskInstanceEventListener, InitializingBean, BeanFactoryAware {

private BeanFactory beanFactory;
protected RScheduledExecutorService executorService;

@Autowired
protected WorkflowTaskInstanceService workflowTaskInstanceService;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;
@Autowired
private RedissonClient redissonClient;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@Override
public void afterPropertiesSet() throws Exception {
executorService = redissonClient.getExecutorService(WorkflowTaskInstanceStateMachine.EXECUTOR);
executorService.registerWorkers(WorkerOptions.defaults().workers(20).beanFactory(beanFactory));
}

@Override
public void onEvent(WorkflowTaskInstanceEventDTO event) {
try {
handleEventAsync(event);
} catch (Throwable throwable) {
onFailure(event.getWorkflowTaskInstanceId(), throwable);
}
}

protected void onFailure(Long workflowTaskInstanceId, Throwable throwable) {
stateMachine.onFailure(workflowTaskInstanceService.get(workflowTaskInstanceId), throwable);
}

protected abstract CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Component
public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener {

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
CompletableFuture<?> future = executorService.submit(new DeployRunner(event)).toCompletableFuture();
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
onFailure(event.getWorkflowTaskInstanceId(), throwable);
} else {
stateMachine.onSuccess(workflowTaskInstanceService.get(event.getWorkflowTaskInstanceId()));
}
});
return future;
}

public static class DeployRunner implements Runnable, Serializable {

private WorkflowTaskInstanceEventDTO event;

@RInject
private String taskId;
@Autowired
private WorkflowTaskInstanceService workflowTaskInstanceService;

public DeployRunner(WorkflowTaskInstanceEventDTO event) {
this.event = event;
}

@Override
public void run() {
workflowTaskInstanceService.updateState(event.getWorkflowTaskInstanceId(), event.getState(), event.getNextState(), null);
if (RandomUtils.nextInt(0, 100) > 30) {
throw new RuntimeException("部署失败");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceEvent;
import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage;
import cn.sliew.scaleph.workflow.queue.Event;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import lombok.Getter;

import java.util.Optional;
Expand All @@ -33,25 +32,25 @@ public class WorkflowTaskInstanceEventDTO implements Event {
private final WorkflowTaskInstanceStage state;
private final WorkflowTaskInstanceStage nextState;
private final WorkflowTaskInstanceEvent event;
private final WorkflowTaskInstanceDTO workflowTaskInstanceDTO;
private final Optional<Exception> exception;
private final Long workflowTaskInstanceId;
private final Optional<Throwable> throwable;

public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId) {
this.topic = topic;
this.state = state;
this.nextState = nextState;
this.event = event;
this.workflowTaskInstanceDTO = workflowTaskInstanceDTO;
this.exception = Optional.empty();
this.workflowTaskInstanceId = workflowTaskInstanceId;
this.throwable = Optional.empty();
}

public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, WorkflowTaskInstanceDTO workflowTaskInstanceDTO, Exception exception) {
public WorkflowTaskInstanceEventDTO(String topic, WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId, Throwable throwable) {
this.topic = topic;
this.state = state;
this.nextState = nextState;
this.event = event;
this.workflowTaskInstanceDTO = workflowTaskInstanceDTO;
this.exception = Optional.ofNullable(exception);
this.workflowTaskInstanceId = workflowTaskInstanceId;
this.throwable = Optional.ofNullable(throwable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

public interface WorkflowTaskInstanceEventListener {
import cn.sliew.scaleph.workflow.queue.EventListener;

public interface WorkflowTaskInstanceEventListener extends EventListener<WorkflowTaskInstanceEventDTO> {

void run(WorkflowTaskInstanceEventDTO eventDTO);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Component
public class WorkflowTaskInstanceFailureEventListener extends AbstractWorkflowTaskInstanceEventListener {

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
return executorService.submit(new FailureRunner(event.getWorkflowTaskInstanceId(), event.getThrowable())).toCompletableFuture();
}

public static class FailureRunner implements Runnable, Serializable {

private Long workflowTaskInstanceId;
private Optional<Throwable> throwable;

@RInject
private String taskId;
@Autowired
private WorkflowTaskInstanceService workflowTaskInstanceService;

public FailureRunner(Long workflowTaskInstanceId, Optional<Throwable> throwable) {
this.workflowTaskInstanceId = workflowTaskInstanceId;
this.throwable = throwable;
}

@Override
public void run() {
workflowTaskInstanceService.updateFailure(workflowTaskInstanceId, throwable.orElse(null));
log.info("执行子任务失败啦, workflowTaskInstanceId: {}, taskId: {}", workflowTaskInstanceId, taskId, throwable);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Component
public class WorkflowTaskInstanceSuccessEventListener extends AbstractWorkflowTaskInstanceEventListener {

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
return executorService.submit(new SuccessRunner(event.getWorkflowTaskInstanceId())).toCompletableFuture();
}

public static class SuccessRunner implements Runnable, Serializable {

private Long workflowTaskInstanceId;

@RInject
private String taskId;
@Autowired
private WorkflowTaskInstanceService workflowTaskInstanceService;

public SuccessRunner(Long workflowTaskInstanceId) {
this.workflowTaskInstanceId = workflowTaskInstanceId;
}

@Override
public void run() {
workflowTaskInstanceService.updateSuccess(workflowTaskInstanceId);
log.info("执行子任务成功啦, workflowTaskInstanceId: {}, taskId: {}", workflowTaskInstanceId, taskId);
}
}

}
Loading
Loading