Skip to content

Commit

Permalink
[Feature][scaleph-workspace-seatunnel] upgrade flink、flink kubernetes…
Browse files Browse the repository at this point in the history
… operator and seatunnel (#703)

* feature: add workflow task change event listener

* feature: add workflow task change event listener

* fix: seatunnel connectors plugins lib directory error

* feature: upgrade seatunnel and flink versions

* feature: upgrade seatunnel and flink versions

* feature: upgrade seatunnel and flink versions

* feature: upgrade flink kubernetes operator spec

* fix: mkdir directory failed

* fix: seatunnel plugin_config error

---------

Co-authored-by: wangqi <[email protected]>
  • Loading branch information
kalencaya and wangqi authored Apr 1, 2024
1 parent c0e97aa commit 8fc72a7
Show file tree
Hide file tree
Showing 27 changed files with 277 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/release-manual-docker-seatunnel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ on:
flinkVersion:
description: 'flink version'
required: true
default: '1.15'
default: '1.16'
type: choice
options:
- 1.15
- 1.16
env:
HUB: ghcr.io/flowerfine/scaleph-seatunnel
SEATUNNEL_VERSION: ${{ inputs.seatunnelVersion }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public enum FlinkVersion implements DictInstance {

V_1_18_0("1.18.0", "1.18.0"),
V_1_18_1("1.18.1", "1.18.1"),

V_1_19_0("1.19.0", "1.19.0"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
public enum OperatorFlinkVersion implements DictInstance {

v1_15("v1_15", "v1_15"),

v1_16("v1_16", "v1_16"),

v1_17("v1_17", "v1_17"),

v1_18("v1_18", "v1_18"),
v1_19("v1_19", "v1_19"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public enum WorkflowInstanceEvent implements DictInstance {
COMMAND_SHUTDOWN("1", "COMMAND_SHUTDOWN"),
COMMAND_SUSPEND("2", "COMMAND_SUSPEND"),
COMMAND_RESUME("3", "COMMAND_RESUME"),
PROCESS_SUCCESS("4", "PROCESS_SUCCESS"),
PROCESS_FAILURE("5", "PROCESS_FAILURE"),
PROCESS_TASK_CHANGE("4", "PROCESS_TASK_CHANGE"),
PROCESS_SUCCESS("5", "PROCESS_SUCCESS"),
PROCESS_FAILURE("6", "PROCESS_FAILURE"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
/**
* Base pod template for job and task manager pods. Can be overridden by the jobManager and
* taskManager pod templates.
* fixme change to PodTemplateSpec
*/
private Pod podTemplate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package cn.sliew.scaleph.engine.flink.kubernetes.operator.spec;

import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,4 +49,14 @@ public class IngressSpec {
* Ingress annotations.
*/
private Map<String, String> annotations;

/**
* Ingress labels.
*/
private Map<String, String> labels;

/**
* Ingress tls.
*/
private List<IngressTLS> tls;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public class JobSpec implements Diffable<JobSpec> {
*/
private Boolean allowNonRestoredState;

/**
* Nonce used to trigger a full redeployment of the job from the savepoint path specified in
* initialSavepointPath. In order to trigger redeployment, change the number to a different
* non-null value. Rollback is not possible after redeployment.
*/
private Long savepointRedeployNonce;

@Override
public DiffResult<JobSpec> diff(JobSpec right) {
ReflectionDiffBuilder builder = new ReflectionDiffBuilder(this, right, ToStringStyle.DEFAULT_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
*/
private String error;

/** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
private Long observedGeneration;

/**
* Lifecycle state of the Flink resource (including being rolled back, failed etc.).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ public class JobStatus {
/**
* Information about pending and last checkpoint for the job.
*/
private Object checkpointInfo = new Object();
private CheckpointInfo checkpointInfo = new CheckpointInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public enum FlinkImageMapping {
JAR_1_16(FlinkJobType.JAR, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "flink:1.16"),
JAR_1_17(FlinkJobType.JAR, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "flink:1.17"),
JAR_1_18(FlinkJobType.JAR, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "flink:1.18"),
JAR_1_19(FlinkJobType.JAR, OperatorFlinkVersion.v1_19, FlinkVersionMapping.V_1_19, "flink:1.19"),

SQL_1_17(FlinkJobType.SQL, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "ghcr.io/flowerfine/scaleph-sql-template:1.17"),
SQL_1_18(FlinkJobType.SQL, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-sql-template:1.18"),

SEATUNNEL_1_15(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_15, FlinkVersionMapping.V_1_15, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.15"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.16"),
FLINK_CDC_1_18(FlinkJobType.FLINK_CDC, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-flink-cdc:3.0.0-flink-1.18"),
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
@Getter
public enum FlinkVersionMapping {

V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_0, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_19(OperatorFlinkVersion.v1_19, FlinkVersion.V_1_19_0, FlinkVersion.V_1_19_0),
V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_17(OperatorFlinkVersion.v1_17, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_1, FlinkVersion.V_1_17_0),
V_1_16(OperatorFlinkVersion.v1_16, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_2, FlinkVersion.V_1_16_1, FlinkVersion.V_1_16_0),
V_1_15(OperatorFlinkVersion.v1_15, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_3, FlinkVersion.V_1_15_2, FlinkVersion.V_1_15_1, FlinkVersion.V_1_15_0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.Date;
Expand All @@ -28,10 +29,13 @@
* @author gleiyu
*/
@Data
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class BaseDTO implements Serializable {

private static final long serialVersionUID = -3170630380110141492L;

// 仅使用 id 作为 equals 和 hashcode 字段
@EqualsAndHashCode.Include
@Schema(description = "ID")
private Long id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.statemachine.WorkflowInstanceStateMachine;
import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RScheduledExecutorService;
Expand All @@ -41,6 +42,8 @@ public abstract class AbstractWorkflowTaskInstanceEventListener implements Workf
@Autowired
protected WorkflowTaskInstanceService workflowTaskInstanceService;
@Autowired
protected WorkflowInstanceStateMachine workflowInstanceStateMachine;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;
@Autowired
private RedissonClient redissonClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import org.springframework.stereotype.Component;

import java.io.Serializable;
Expand Down Expand Up @@ -45,6 +46,8 @@ public FailureRunner(Long workflowTaskInstanceId, Optional<Throwable> throwable)
@Override
public void run() {
workflowTaskInstanceService.updateFailure(workflowTaskInstanceId, throwable.orElse(null));
WorkflowTaskInstanceDTO workflowTaskInstanceDTO = workflowTaskInstanceService.get(workflowTaskInstanceId);
workflowInstanceStateMachine.onTaskChange(workflowTaskInstanceDTO.getWorkflowInstanceDTO());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import org.springframework.stereotype.Component;

import java.io.Serializable;
Expand All @@ -42,6 +43,8 @@ public SuccessRunner(Long workflowTaskInstanceId) {
@Override
public void run() {
workflowTaskInstanceService.updateSuccess(workflowTaskInstanceId);
WorkflowTaskInstanceDTO workflowTaskInstanceDTO = workflowTaskInstanceService.get(workflowTaskInstanceId);
workflowInstanceStateMachine.onTaskChange(workflowTaskInstanceDTO.getWorkflowInstanceDTO());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@

package cn.sliew.scaleph.workflow.listener.workflowinstance;

import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.service.WorkflowInstanceService;
import cn.sliew.scaleph.workflow.service.WorkflowTaskDefinitionService;
import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import com.google.common.graph.Graph;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

@Slf4j
Expand All @@ -45,8 +45,6 @@ protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) {
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
onFailure(event.getWorkflowInstanceId(), throwable);
} else {
stateMachine.onSuccess(workflowInstanceService.get(event.getWorkflowInstanceId()));
}
});
return future;
Expand Down Expand Up @@ -79,11 +77,16 @@ public void run() {
WorkflowInstanceDTO workflowInstanceDTO = workflowInstanceService.get(event.getWorkflowInstanceId());
WorkflowDefinitionDTO workflowDefinitionDTO = workflowInstanceDTO.getWorkflowDefinition();

List<WorkflowTaskDefinitionDTO> workflowTaskDefinitionDTOS = workflowTaskDefinitionService.list(workflowDefinitionDTO.getId());
List<WorkflowTaskInstanceDTO> workflowTaskInstanceDTOS = new ArrayList<>(workflowTaskDefinitionDTOS.size());
// todo 应该是找到 root 节点,批量启动 root 节点
for (WorkflowTaskDefinitionDTO workflowTaskDefinitionDTO : workflowTaskDefinitionDTOS) {
workflowTaskInstanceDTOS.add(workflowTaskInstanceService.deploy(workflowTaskDefinitionDTO.getId(), event.getWorkflowInstanceId()));
// 找到 root 节点,批量启动 root 节点
Graph<WorkflowTaskDefinitionDTO> dag = workflowTaskDefinitionService.getDag(workflowDefinitionDTO.getId());
Set<WorkflowTaskDefinitionDTO> nodes = dag.nodes();
for (WorkflowTaskDefinitionDTO workflowTaskDefinitionDTO : nodes) {
System.out.println("验证是否需要启动 root 节点: " + JacksonUtil.toJsonString(workflowTaskDefinitionDTO));
// root 节点
if (dag.inDegree(workflowTaskDefinitionDTO) == 0) {
workflowTaskInstanceService.deploy(workflowTaskDefinitionDTO.getId(), event.getWorkflowInstanceId());
System.out.println("真正启动 root 节点: " + JacksonUtil.toJsonString(workflowTaskDefinitionDTO));
}
}
// todo 循环检测 workflowTaskInstanceDTOS 状态或接收 workflowTaskInstance 事件,判断是否成功或失败
}
Expand Down
Loading

0 comments on commit 8fc72a7

Please sign in to comment.