diff --git a/common/util/pom.xml b/common/util/pom.xml index 52dc02a1f..1e0e20d17 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -39,6 +39,10 @@ org.apache.commons commons-collections4 + + org.quartz-scheduler + quartz + org.apache.commons commons-lang3 diff --git a/common/util/src/main/java/com/antgroup/openspg/common/util/DateTimeUtils.java b/common/util/src/main/java/com/antgroup/openspg/common/util/DateTimeUtils.java index 0dcfd0919..0b30e842e 100644 --- a/common/util/src/main/java/com/antgroup/openspg/common/util/DateTimeUtils.java +++ b/common/util/src/main/java/com/antgroup/openspg/common/util/DateTimeUtils.java @@ -31,4 +31,17 @@ public static String bizDateByNow() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYY_MM_DD1); return simpleDateFormat.format(new Date()); } + + /** Date to String by yyyy-MM-dd HH:mm:ss */ + public static String getDate2LongStr(Date date) { + return getDate2Str(YYYY_MM_DD_HH_MM_SS1, date); + } + + /** Date to String by format */ + public static String getDate2Str(String format, Date date) { + if (date == null) { + return ""; + } + return new SimpleDateFormat(format).format(date); + } } diff --git a/pom.xml b/pom.xml index 362ffda3f..31cbafc4a 100644 --- a/pom.xml +++ b/pom.xml @@ -337,6 +337,11 @@ QLExpress 3.3.2 + + org.quartz-scheduler + quartz + 2.3.2 + commons-cli commons-cli diff --git a/server/arks/sofaboot/src/main/resources/config/application-default.properties b/server/arks/sofaboot/src/main/resources/config/application-default.properties index 734b7ad51..29db8c455 100644 --- a/server/arks/sofaboot/src/main/resources/config/application-default.properties +++ b/server/arks/sofaboot/src/main/resources/config/application-default.properties @@ -49,3 +49,11 @@ python.exec=/openspg_venv/bin/python3 python.paths=/openspg_venv/lib/python3.8/site-packages dialog.upload.dir=/upload +# Scheduler +scheduler.handler.type=local +scheduler.metadata.store.type=local +scheduler.execute.instances.period=5 +scheduler.execute.instances.unit=MINUTES +scheduler.generate.instances.period=1 +scheduler.generate.instances.unit=HOURS +scheduler.execute.max.day=10 diff --git a/server/common/model/pom.xml b/server/common/model/pom.xml index ff80ac3c4..392b29640 100644 --- a/server/common/model/pom.xml +++ b/server/common/model/pom.xml @@ -35,5 +35,9 @@ com.google.guava guava + + org.projectlombok + lombok + diff --git a/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/exception/SchedulerException.java b/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/exception/SchedulerException.java new file mode 100644 index 000000000..ba0f27273 --- /dev/null +++ b/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/exception/SchedulerException.java @@ -0,0 +1,21 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.common.model.exception; + +/** Scheduler exception, Custom exception message */ +public class SchedulerException extends OpenSPGException { + + public SchedulerException(String message, Object... args) { + super(null, true, true, message, args); + } +} diff --git a/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/scheduler/SchedulerEnum.java b/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/scheduler/SchedulerEnum.java new file mode 100644 index 000000000..4ff5e9fd8 --- /dev/null +++ b/server/common/model/src/main/java/com/antgroup/openspg/server/common/model/scheduler/SchedulerEnum.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.common.model.scheduler; + +/** all scheduler dependent enum */ +public interface SchedulerEnum { + + /** Instance Status enum */ + enum InstanceStatus { + WAITING, + RUNNING, + SKIP, + FINISH, + TERMINATE, + SET_FINISH; + + /** status is Finished */ + public static boolean isFinished(InstanceStatus status) { + return InstanceStatus.FINISH.equals(status) + || InstanceStatus.TERMINATE.equals(status) + || InstanceStatus.SET_FINISH.equals(status) + || InstanceStatus.SKIP.equals(status); + } + } + + /** Life Cycle Enum */ + enum LifeCycle { + PERIOD, + ONCE, + REAL_TIME + } + + /** Dependence Enum */ + enum Dependence { + DEPENDENT, + INDEPENDENT + } + + /** Status Enum */ + enum Status { + ENABLE, + DISABLE + } + + /** Task Status Enum */ + enum TaskStatus { + WAIT, + RUNNING, + FINISH, + ERROR, + SKIP, + TERMINATE, + SET_FINISH; + + /** status is Finished by TaskStatus */ + public static boolean isFinished(TaskStatus status) { + return TaskStatus.FINISH.equals(status) + || TaskStatus.SKIP.equals(status) + || TaskStatus.TERMINATE.equals(status) + || TaskStatus.SET_FINISH.equals(status); + } + + /** status is Running by TaskStatus */ + public static boolean isRunning(TaskStatus status) { + return TaskStatus.RUNNING.equals(status) || TaskStatus.ERROR.equals(status); + } + } + + /** Translate Enum */ + enum TranslateType { + LOCAL_EXAMPLE("localExampleTranslate"); + + private String type; + + TranslateType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + } +} diff --git a/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/spring/SpringContextHolder.java b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/spring/SpringContextHolder.java index 854192f63..0cdf365b6 100644 --- a/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/spring/SpringContextHolder.java +++ b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/spring/SpringContextHolder.java @@ -41,6 +41,14 @@ public static T getBean(Class clazz) { return null; } + /** get spring bean by name */ + public static T getBean(String name, Class clazz) { + if (applicationContext != null) { + return applicationContext.getBean(name, clazz); + } + return null; + } + public static List getBeans(Class clazz) { if (applicationContext != null) { return new ArrayList<>(applicationContext.getBeansOfType(clazz).values()); diff --git a/server/core/scheduler/model/pom.xml b/server/core/scheduler/model/pom.xml new file mode 100644 index 000000000..54944dc86 --- /dev/null +++ b/server/core/scheduler/model/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + com.antgroup.openspg.server + server-parent + 0.0.1-SNAPSHOT + ../../../pom.xml + + + core-scheduler-model + + + com.antgroup.openspg.server + common-model + + + com.antgroup.openspg + common-util + + + com.alibaba + fastjson + + + diff --git a/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerInstance.java b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerInstance.java new file mode 100644 index 000000000..f4204a187 --- /dev/null +++ b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerInstance.java @@ -0,0 +1,88 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.model.service; + +import com.alibaba.fastjson.JSONObject; +import com.antgroup.openspg.server.common.model.base.BaseModel; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Dependence; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import java.util.Date; +import lombok.Getter; +import lombok.Setter; + +/** Scheduler Instance Model */ +@Getter +@Setter +public class SchedulerInstance extends BaseModel { + + private static final long serialVersionUID = -2574666198428196663L; + + /** primary key */ + private Long id; + + /** unique id = jobId+yyyyMMddHHmmss */ + private String uniqueId; + + /** project id */ + private Long projectId; + + /** SchedulerJob Id */ + private Long jobId; + + /** instance type */ + private String type; + + /** status */ + private InstanceStatus status; + + /** progress [0-100] */ + private Long progress; + + /** create User */ + private String createUser; + + /** create time */ + private Date gmtCreate; + + /** modify time */ + private Date gmtModified; + + /** instance begin Running Time */ + private Date beginRunningTime; + + /** instance finish Time */ + private Date finishTime; + + /** job Life Cycle:PERIOD,ONCE,REAL_TIME Enum:LifeCycle */ + private LifeCycle lifeCycle; + + /** Dependent pre task completion */ + private Dependence dependence; + + /** scheduler Date */ + private Date schedulerDate; + + /** version */ + private String version; + + /** extension */ + private JSONObject extension; + + /** task dag Config */ + private TaskExecuteDag taskDag; + + /** start CreateTime Date For Query */ + private transient Date startCreateTime; +} diff --git a/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerJob.java b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerJob.java new file mode 100644 index 000000000..4b235fdb7 --- /dev/null +++ b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerJob.java @@ -0,0 +1,79 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.model.service; + +import com.alibaba.fastjson.JSONObject; +import com.antgroup.openspg.server.common.model.base.BaseModel; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Dependence; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Status; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TranslateType; +import java.util.Date; +import lombok.Getter; +import lombok.Setter; + +/** Scheduler Job Model */ +@Getter +@Setter +public class SchedulerJob extends BaseModel { + + private static final long serialVersionUID = 3050626766276089001L; + + /** primary key */ + private Long id; + + /** createUser */ + private String createUser; + + /** modifyUser */ + private String modifyUser; + + /** Create time */ + private Date gmtCreate; + + /** Modified time */ + private Date gmtModified; + + /** project id */ + private Long projectId; + + /** job name */ + private String name; + + /** job Life Cycle:PERIOD,ONCE,REAL_TIME */ + private LifeCycle lifeCycle; + + /** translate type */ + private TranslateType translateType; + + /** job Status:ENABLE,DISABLE */ + private Status status; + + /** Dependent pre task completion */ + private Dependence dependence; + + /** Scheduler Cron expression default:0 0 0 * * ? */ + private String schedulerCron; + + /** last Execute Time */ + private Date lastExecuteTime; + + /** invoker id, Primary key of the service table that triggers scheduler */ + private String invokerId; + + /** extension */ + private JSONObject extension; + + /** version */ + private String version; +} diff --git a/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerTask.java b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerTask.java new file mode 100644 index 000000000..317ed3b3e --- /dev/null +++ b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerTask.java @@ -0,0 +1,112 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.model.service; + +import com.alibaba.fastjson.JSONObject; +import com.antgroup.openspg.common.util.DateTimeUtils; +import com.antgroup.openspg.server.common.model.base.BaseModel; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import java.util.Date; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; + +/** Scheduler Task Model */ +@Getter +@Setter +public class SchedulerTask extends BaseModel { + + private static final long serialVersionUID = -5515352651327338741L; + + /** primary key */ + private Long id; + + /** Create time */ + private Date gmtCreate; + + /** Modified time */ + private Date gmtModified; + + /** type */ + private String type; + + /** title */ + private String title; + + /** status */ + private TaskStatus status; + + /** SchedulerJob Id */ + private Long jobId; + + /** instance id */ + private Long instanceId; + + /** execute Num */ + private Integer executeNum; + + /** execute begin Time */ + private Date beginTime; + + /** execute finish Time */ + private Date finishTime; + + /** estimate Finish Time */ + private Date estimateFinishTime; + + /** traceLog */ + private String traceLog; + + /** lock Time */ + private Date lockTime; + + /** resource */ + private String resource; + + /** input */ + private String input; + + /** output */ + private String output; + + /** node id */ + private String nodeId; + + /** extension,JSON */ + private JSONObject extension; + + public SchedulerTask() {} + + /** constructor by instance and dag node */ + public SchedulerTask(SchedulerInstance instance, TaskStatus status, TaskExecuteDag.Node node) { + this.executeNum = 0; + this.beginTime = new Date(); + this.status = status; + this.jobId = instance.getJobId(); + this.instanceId = instance.getId(); + this.nodeId = node.getId(); + this.type = node.getTaskComponent(); + this.title = StringUtils.isNotBlank(node.getName()) ? node.getName() : node.getTaskComponent(); + + if (node.getProperties() != null) { + this.extension = node.getProperties(); + } + + StringBuffer log = new StringBuffer(DateTimeUtils.getDate2LongStr(new Date())); + log.append("Create new Task, Waiting preceding node to complete....."); + log.append(System.getProperty("line.separator")); + + this.traceLog = log.toString(); + } +} diff --git a/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteContext.java b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteContext.java new file mode 100644 index 000000000..3fe3d0dd2 --- /dev/null +++ b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteContext.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.server.core.scheduler.model.task; + +import com.antgroup.openspg.common.util.DateTimeUtils; +import com.antgroup.openspg.common.util.NetworkAddressUtils; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import java.util.Date; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** Scheduler Task Context */ +@Getter +@Setter +@ToString +public class TaskExecuteContext { + + /** Scheduler Job */ + private SchedulerJob job; + + /** Scheduler Instance */ + private SchedulerInstance instance; + + /** Scheduler task */ + private SchedulerTask task; + + /** trace Log */ + private StringBuffer traceLog; + + /** start Time */ + private long startTime; + + /** task is Finish */ + private boolean taskFinish; + + /** constructor by job,instance and task */ + public TaskExecuteContext(SchedulerJob job, SchedulerInstance instance, SchedulerTask task) { + task.setTraceLog(null); + this.job = job; + this.instance = instance; + this.task = task; + this.traceLog = new StringBuffer(); + this.startTime = System.currentTimeMillis(); + this.taskFinish = false; + } + + /** insert TraceLog, Fixed adding Date and IP prefix */ + public void addTraceLog(String message, Object... args) { + int dstOffset = 0; + StringBuffer log = new StringBuffer(DateTimeUtils.getDate2LongStr(new Date())); + log.append("(") + .append(NetworkAddressUtils.LOCAL_IP) + .append("): ") + .append(String.format(message, args)) + .append(System.getProperty("line.separator")); + + traceLog.insert(dstOffset, log); + } +} diff --git a/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteDag.java b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteDag.java new file mode 100644 index 000000000..a704f5b31 --- /dev/null +++ b/server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteDag.java @@ -0,0 +1,79 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.model.task; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** Task Dag Model ,Contains nodes and edges,scheduler is executed step by step according to DAG */ +@Getter +@Setter +@ToString +public class TaskExecuteDag { + + /** dag nodes List */ + private List nodes = Collections.emptyList(); + + /** dag edges List */ + private List edges = Collections.emptyList(); + + /** dag extend */ + private String extend; + + /** get Next/Pre Nodes */ + public List getRelatedNodes(String id, boolean next) { + List idList = Lists.newArrayList(); + for (Edge edge : edges) { + if ((next && edge.getFrom().equals(id)) || (!next && edge.getTo().equals(id))) { + idList.add(next ? edge.getTo() : edge.getFrom()); + } + } + return this.nodes.stream() + .filter(node -> idList.contains(node.getId())) + .collect(Collectors.toList()); + } + + @Getter + @Setter + @ToString + public static class Node { + /** id */ + private String id; + + /** name */ + private String name; + + /** JobTask Component name */ + private String taskComponent; + + /** properties */ + private JSONObject properties; + } + + @Getter + @Setter + @ToString + public static class Edge { + /** from id */ + private String from; + + /** to id */ + private String to; + } +} diff --git a/server/core/scheduler/service/pom.xml b/server/core/scheduler/service/pom.xml new file mode 100644 index 000000000..ab110bb5c --- /dev/null +++ b/server/core/scheduler/service/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + com.antgroup.openspg.server + server-parent + 0.0.1-SNAPSHOT + ../../../pom.xml + + + core-scheduler-service + + + com.antgroup.openspg.server + core-scheduler-model + + + com.antgroup.openspg.server + common-service + + + org.springframework + spring-tx + + + org.springframework.boot + spring-boot-autoconfigure + + + diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/SchedulerService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/SchedulerService.java new file mode 100644 index 000000000..5c0b5915b --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/SchedulerService.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.api; + +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import java.util.List; + +/** Scheduler Service:submit,execute,delete and other scheduler interfaces */ +public interface SchedulerService { + /** to save job. Execute once after saving successfully */ + SchedulerJob submitJob(SchedulerJob job); + + /** execute once Job, generate instance */ + Boolean executeJob(Long jobId); + + /** enable Job, set job status to enable */ + Boolean enableJob(Long jobId); + + /** disable Job, set job status to disable */ + Boolean disableJob(Long jobId); + + /** delete Job, delete associated instances and tasks together */ + Boolean deleteJob(Long jobId); + + /** update Job fields */ + boolean updateJob(SchedulerJob job); + + /** get Job details By id */ + SchedulerJob getJobById(Long jobId); + + /** search Jobs by fields */ + List searchJobs(SchedulerJob query); + + /** get Instance details By id */ + SchedulerInstance getInstanceById(Long instanceId); + + /** stop Instance, set instance status to terminate */ + Boolean stopInstance(Long instanceId); + + /** set Instance To Finish, set instance status to finish */ + Boolean setFinishInstance(Long instanceId); + + /** restart Instance, generate a new instance and execute */ + Boolean restartInstance(Long instanceId); + + /** trigger instance to execute once */ + Boolean triggerInstance(Long instanceId); + + /** search Instances by fields */ + List searchInstances(SchedulerInstance query); + + /** search Tasks by fields */ + List searchTasks(SchedulerTask query); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/impl/SchedulerServiceImpl.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/impl/SchedulerServiceImpl.java new file mode 100644 index 000000000..684331958 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/impl/SchedulerServiceImpl.java @@ -0,0 +1,245 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.api.impl; + +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Status; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.service.api.SchedulerService; +import com.antgroup.openspg.server.core.scheduler.service.common.SchedulerCommonService; +import com.antgroup.openspg.server.core.scheduler.service.engine.SchedulerExecuteService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerJobService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +/** Scheduler Service, Scheduler public sdk, include submit, delete, update execute, etc. */ +@Service +public class SchedulerServiceImpl implements SchedulerService { + + public static final String DEFAULT_VERSION = "V3"; + + private ThreadPoolExecutor instanceExecutor = + new ThreadPoolExecutor(1, 20, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100)); + + @Autowired SchedulerJobService schedulerJobService; + @Autowired SchedulerInstanceService schedulerInstanceService; + @Autowired SchedulerTaskService schedulerTaskService; + @Autowired SchedulerCommonService schedulerCommonService; + @Autowired SchedulerExecuteService schedulerExecuteService; + + @Override + public SchedulerJob submitJob(SchedulerJob job) { + setJobPropertyDefaultValue(job); + checkJobPropertyValidity(job); + Long id = job.getId(); + id = (id == null ? schedulerJobService.insert(job) : schedulerJobService.update(job)); + this.executeJob(id); + return job; + } + + /** set Job Property Default Value */ + private void setJobPropertyDefaultValue(SchedulerJob job) { + job.setGmtModified(new Date()); + if (job.getGmtCreate() == null) { + job.setGmtCreate(new Date()); + } + + job.setStatus(Status.ENABLE); + job.setVersion(DEFAULT_VERSION); + } + + /** check Job Property is validity */ + private void checkJobPropertyValidity(SchedulerJob job) { + // check not Null + Assert.notNull(job, "job not null"); + Assert.notNull(job.getProjectId(), "ProjectId not null"); + Assert.hasText(job.getName(), "Name not null"); + Assert.hasText(job.getCreateUser(), "CreateUser not null"); + Assert.notNull(job.getLifeCycle(), "LifeCycle not null"); + Assert.notNull(job.getTranslateType(), "TranslateType not null"); + Assert.notNull(job.getDependence(), "MergeMode not null"); + + // check scheduler cron + if (LifeCycle.PERIOD.equals(job.getLifeCycle())) { + Assert.hasText(job.getSchedulerCron(), "SchedulerCron not null"); + SchedulerUtils.getCronExpression(job.getSchedulerCron()); + } + } + + @Override + public Boolean executeJob(Long jobId) { + List instances = Lists.newArrayList(); + SchedulerJob job = schedulerJobService.getById(jobId); + + // generate instance by LifeCycle + if (LifeCycle.REAL_TIME.equals(job.getLifeCycle())) { + stopJobAllInstance(jobId); + instances.add(schedulerCommonService.generateInstance(job)); + } else if (LifeCycle.PERIOD.equals(job.getLifeCycle())) { + instances.addAll(schedulerCommonService.generatePeriodInstance(job)); + } else if (LifeCycle.ONCE.equals(job.getLifeCycle())) { + instances.add(schedulerCommonService.generateInstance(job)); + } + + if (CollectionUtils.isEmpty(instances)) { + return false; + } + + // execute the generated instance + for (SchedulerInstance ins : instances) { + Long instanceId = ins.getId(); + Runnable instanceRunnable = () -> schedulerExecuteService.executeInstance(instanceId); + instanceExecutor.execute(instanceRunnable); + } + return true; + } + + /** stop all not finish instance by job id */ + private void stopJobAllInstance(Long jobId) { + SchedulerInstance query = new SchedulerInstance(); + query.setJobId(jobId); + List instances = schedulerInstanceService.getNotFinishInstance(query); + if (CollectionUtils.isEmpty(instances)) { + return; + } + + for (SchedulerInstance instance : instances) { + stopInstance(instance.getId()); + } + } + + @Override + public Boolean enableJob(Long jobId) { + SchedulerJob job = schedulerJobService.getById(jobId); + SchedulerJob updateJob = new SchedulerJob(); + updateJob.setId(jobId); + updateJob.setStatus(Status.ENABLE); + Long flag = schedulerJobService.update(updateJob); + if (flag <= 0) { + return false; + } + + // execute the job after it is enabled + if (LifeCycle.REAL_TIME.equals(job.getLifeCycle())) { + this.executeJob(jobId); + } + return true; + } + + @Override + public Boolean disableJob(Long jobId) { + SchedulerJob updateJob = new SchedulerJob(); + updateJob.setId(jobId); + updateJob.setStatus(Status.DISABLE); + Long flag = schedulerJobService.update(updateJob); + if (flag <= 0) { + return false; + } + + stopJobAllInstance(jobId); + return true; + } + + @Override + public Boolean deleteJob(Long jobId) { + stopJobAllInstance(jobId); + schedulerJobService.deleteById(jobId); + schedulerInstanceService.deleteByJobId(jobId); + schedulerTaskService.deleteByJobId(jobId); + return true; + } + + @Override + public boolean updateJob(SchedulerJob job) { + Long id = schedulerJobService.update(job); + return id > 0; + } + + @Override + public SchedulerJob getJobById(Long jobId) { + return schedulerJobService.getById(jobId); + } + + @Override + public List searchJobs(SchedulerJob query) { + return schedulerJobService.query(query); + } + + @Override + public SchedulerInstance getInstanceById(Long instanceId) { + return schedulerInstanceService.getById(instanceId); + } + + @Override + public Boolean stopInstance(Long instanceId) { + SchedulerInstance instance = schedulerInstanceService.getById(instanceId); + schedulerCommonService.setInstanceFinish( + instance, InstanceStatus.TERMINATE, TaskStatus.TERMINATE); + return true; + } + + @Override + public Boolean setFinishInstance(Long instanceId) { + SchedulerInstance instance = schedulerInstanceService.getById(instanceId); + schedulerCommonService.setInstanceFinish( + instance, InstanceStatus.SET_FINISH, TaskStatus.SET_FINISH); + return true; + } + + @Override + public Boolean restartInstance(Long instanceId) { + SchedulerInstance instance = schedulerInstanceService.getById(instanceId); + SchedulerJob job = schedulerJobService.getById(instance.getJobId()); + SchedulerInstance reRunInstance = schedulerCommonService.generateInstance(job); + Long id = reRunInstance.getId(); + Runnable instanceRunnable = () -> schedulerExecuteService.executeInstance(id); + instanceExecutor.execute(instanceRunnable); + return true; + } + + @Override + public Boolean triggerInstance(Long instanceId) { + SchedulerInstance instance = schedulerInstanceService.getById(instanceId); + if (InstanceStatus.isFinished(instance.getStatus())) { + throw new SchedulerException("The instance has been finished"); + } + schedulerExecuteService.executeInstance(instanceId); + return true; + } + + @Override + public List searchInstances(SchedulerInstance query) { + return schedulerInstanceService.query(query); + } + + @Override + public List searchTasks(SchedulerTask query) { + return schedulerTaskService.query(query); + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/SchedulerCommonService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/SchedulerCommonService.java new file mode 100644 index 000000000..51f7f5db8 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/SchedulerCommonService.java @@ -0,0 +1,182 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.common; + +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.common.service.spring.SpringContextHolder; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import com.antgroup.openspg.server.core.scheduler.service.config.SchedulerConfig; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerJobService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.task.TaskExecute; +import com.antgroup.openspg.server.core.scheduler.service.task.async.AsyncTaskExecute; +import com.antgroup.openspg.server.core.scheduler.service.translate.TranslatorFactory; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +/** Scheduler internal common service, include generate Instance and Modify Instance status etc. */ +@Service +@Slf4j +public class SchedulerCommonService { + + public static final String UNDERLINE_SEPARATOR = "_"; + public static final Long FINISH = 100L; + + @Autowired SchedulerJobService schedulerJobService; + @Autowired SchedulerInstanceService schedulerInstanceService; + @Autowired SchedulerTaskService schedulerTaskService; + @Autowired SchedulerConfig schedulerConfig; + + /** set Instance To Finish, set Status,Progress field to Finish and stop all running tasks */ + public void setInstanceFinish( + SchedulerInstance instance, InstanceStatus instanceStatus, TaskStatus taskStatus) { + Date finishTime = (instance.getFinishTime() == null ? new Date() : instance.getFinishTime()); + SchedulerInstance updateInstance = new SchedulerInstance(); + updateInstance.setId(instance.getId()); + updateInstance.setStatus(instanceStatus); + updateInstance.setProgress(FINISH); + updateInstance.setFinishTime(finishTime); + Long updateNum = schedulerInstanceService.update(updateInstance); + Assert.isTrue(updateNum > 0, "update instance failed " + updateInstance); + + stopRunningTasks(instance); + schedulerTaskService.setStatusByInstanceId(instance.getId(), taskStatus); + } + + /** stop all running tasks by instance */ + private void stopRunningTasks(SchedulerInstance instance) { + List taskList = schedulerTaskService.queryByInstanceId(instance.getId()); + + SchedulerJob job = schedulerJobService.getById(instance.getJobId()); + + for (SchedulerTask task : taskList) { + // Filter non-running tasks + if (!TaskStatus.isRunning(task.getStatus()) || StringUtils.isBlank(task.getType())) { + continue; + } + + // get AsyncTaskExecute by type + String type = task.getType().split(UNDERLINE_SEPARATOR)[0]; + TaskExecute jobTask = SpringContextHolder.getBean(type, TaskExecute.class); + boolean isAsyncTask = (jobTask != null && jobTask instanceof AsyncTaskExecute); + if (!isAsyncTask) { + log.warn("get bean is null or not instance of JobAsyncTask id: {}", task.getId()); + continue; + } + + // transform to jobAsyncTask trigger stop + AsyncTaskExecute jobAsyncTask = (AsyncTaskExecute) jobTask; + TaskExecuteContext context = new TaskExecuteContext(job, instance, task); + jobAsyncTask.stop(context, task.getResource()); + } + } + + /** check Instance is Running within 24H */ + private void checkInstanceRunning(SchedulerJob job) { + SchedulerInstance query = new SchedulerInstance(); + query.setJobId(job.getId()); + query.setStartCreateTime(DateUtils.addDays(new Date(), -1)); + + List instances = schedulerInstanceService.query(query); + for (SchedulerInstance instance : instances) { + if (!InstanceStatus.isFinished(instance.getStatus())) { + throw new SchedulerException("Running {} exist within 24H", instance.getUniqueId()); + } + } + } + + /** generate Period Instance by Cron */ + public List generatePeriodInstance(SchedulerJob job) { + List instances = Lists.newArrayList(); + + // get period instance all execution Dates + List executionDates = SchedulerUtils.getCronExecutionDatesByToday(job.getSchedulerCron()); + for (Date schedulerDate : executionDates) { + String uniqueId = SchedulerUtils.getUniqueId(job.getId(), schedulerDate); + SchedulerInstance instance = generateInstance(job, uniqueId, schedulerDate); + if (instance != null) { + instances.add(instance); + } + } + return instances; + } + + /** generate Once/RealTime Instance */ + public SchedulerInstance generateInstance(SchedulerJob job) { + checkInstanceRunning(job); + String uniqueId = job.getId().toString() + System.currentTimeMillis(); + return generateInstance(job, uniqueId, new Date()); + } + + /** generate Instance by schedulerDate */ + public SchedulerInstance generateInstance(SchedulerJob job, String uniqueId, Date schedulerDate) { + SchedulerInstance existInstance = schedulerInstanceService.getByUniqueId(uniqueId); + if (existInstance != null) { + log.error("generateInstance uniqueId exist jobId:{} uniqueId:{}", job.getId(), uniqueId); + return null; + } + + log.info("generateInstance start jobId:{} uniqueId:{}", job.getId(), uniqueId); + Long progress = 0L; + SchedulerInstance instance = new SchedulerInstance(); + instance.setUniqueId(uniqueId); + instance.setProjectId(job.getProjectId()); + instance.setJobId(job.getId()); + instance.setType(job.getTranslateType().getType()); + instance.setStatus(InstanceStatus.WAITING); + instance.setProgress(progress); + instance.setCreateUser(job.getCreateUser()); + instance.setGmtCreate(new Date()); + instance.setGmtModified(new Date()); + instance.setLifeCycle(job.getLifeCycle()); + instance.setSchedulerDate(schedulerDate); + instance.setDependence(job.getDependence()); + instance.setVersion(job.getVersion()); + TaskExecuteDag taskDag = TranslatorFactory.getTranslator(job.getTranslateType()).translate(job); + instance.setTaskDag(taskDag); + + schedulerInstanceService.insert(instance); + log.info("generateInstance successful jobId:{} uniqueId:{}", job.getId(), uniqueId); + + // Create tasks based on the DAG generated by the translation + for (TaskExecuteDag.Node node : taskDag.getNodes()) { + List pres = taskDag.getRelatedNodes(node.getId(), false); + TaskStatus status = CollectionUtils.isEmpty(pres) ? TaskStatus.RUNNING : TaskStatus.WAIT; + schedulerTaskService.insert(new SchedulerTask(instance, status, node)); + } + + // set job last execute time + SchedulerJob updateJob = new SchedulerJob(); + updateJob.setId(job.getId()); + updateJob.setLastExecuteTime(schedulerDate); + schedulerJobService.update(updateJob); + + return instance; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/config/SchedulerConfig.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/config/SchedulerConfig.java new file mode 100644 index 000000000..123eeb491 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/config/SchedulerConfig.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +/* + * Ant Group + * Copyright (c) 2004-2022 All Rights Reserved. + */ +package com.antgroup.openspg.server.core.scheduler.service.config; + +import com.antgroup.openspg.common.util.StringUtils; +import java.util.concurrent.TimeUnit; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** Scheduler Common Value */ +@Component +public class SchedulerConfig { + + @Value("${scheduler.execute.instances.period:}") + private String executeInstancesPeriod; + + @Value("${scheduler.execute.instances.unit:}") + private String executeInstancesUnit; + + @Value("${scheduler.generate.instances.period:}") + private String generateInstancesPeriod; + + @Value("${scheduler.generate.instances.unit:}") + private String generateInstancesUnit; + + @Value("${scheduler.execute.max.day:}") + private String executeMaxDay; + + public Long getExecuteInstancesPeriod() { + if (StringUtils.isBlank(executeInstancesPeriod)) { + return null; + } + return Long.valueOf(executeInstancesPeriod); + } + + public TimeUnit getExecuteInstancesUnit() { + if (StringUtils.isBlank(executeInstancesPeriod)) { + return null; + } + return TimeUnit.valueOf(executeInstancesUnit); + } + + public Long getGenerateInstancesPeriod() { + if (StringUtils.isBlank(generateInstancesPeriod)) { + return null; + } + return Long.valueOf(generateInstancesPeriod); + } + + public TimeUnit getGenerateInstancesUnit() { + if (StringUtils.isBlank(generateInstancesUnit)) { + return null; + } + return TimeUnit.valueOf(generateInstancesUnit); + } + + public Integer getExecuteMaxDay() { + if (StringUtils.isBlank(executeMaxDay)) { + return 10; + } + return Integer.valueOf(executeMaxDay); + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/SchedulerExecuteService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/SchedulerExecuteService.java new file mode 100644 index 000000000..53903a1de --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/SchedulerExecuteService.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.engine; + +/** Scheduler Execute Service. execute all instances */ +public interface SchedulerExecuteService { + /** generate instances by period job */ + void generateInstances(); + + /** execute all not finish instances */ + void executeInstances(); + + /** execute instance by id */ + void executeInstance(Long id); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/impl/SchedulerExecuteServiceImpl.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/impl/SchedulerExecuteServiceImpl.java new file mode 100644 index 000000000..81d4f7b58 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/impl/SchedulerExecuteServiceImpl.java @@ -0,0 +1,250 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.engine.impl; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.common.service.spring.SpringContextHolder; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import com.antgroup.openspg.server.core.scheduler.service.common.SchedulerCommonService; +import com.antgroup.openspg.server.core.scheduler.service.config.SchedulerConfig; +import com.antgroup.openspg.server.core.scheduler.service.engine.SchedulerExecuteService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerJobService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.task.TaskExecute; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Scheduler Execute Service implementation class. generate period instance and execute instances + */ +@Service +@Slf4j +public class SchedulerExecuteServiceImpl implements SchedulerExecuteService { + + public static final String UNDERLINE_SEPARATOR = "_"; + public static final long DELAY = 10; + + private ConcurrentHashMap instances = new ConcurrentHashMap<>(); + private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); + + @Autowired SchedulerConfig schedulerConfig; + @Autowired SchedulerJobService schedulerJobService; + @Autowired SchedulerInstanceService schedulerInstanceService; + @Autowired SchedulerTaskService schedulerTaskService; + @Autowired SchedulerCommonService schedulerCommonService; + + /** generate instances by period job */ + @Override + public void generateInstances() { + SchedulerJob record = new SchedulerJob(); + record.setLifeCycle(SchedulerEnum.LifeCycle.PERIOD); + record.setStatus(SchedulerEnum.Status.ENABLE); + List allJob = schedulerJobService.query(record); + log.info("getAllPeriodJob successful size:{}", allJob.size()); + + if (CollectionUtils.isEmpty(allJob)) { + return; + } + + // generate instance for period job + for (SchedulerJob job : allJob) { + try { + schedulerCommonService.generatePeriodInstance(job); + } catch (Exception e) { + log.error("generate error jobId:{}", job.getId(), e); + } + } + } + + /** execute all not finish instances */ + @Override + public void executeInstances() { + List allInstance = getAllNotFinishInstances(); + log.info("getAllNotFinishInstance successful size:{}", allInstance.size()); + if (CollectionUtils.isEmpty(allInstance)) { + return; + } + + for (SchedulerInstance instance : allInstance) { + Runnable instanceRunnable = () -> executeInstance(instance.getId()); + getInstanceExecutor(instance.getType()).execute(instanceRunnable); + log.info("add instanceExecutor successful {}", instance.getUniqueId()); + } + } + + /** execute instance by id */ + @Override + public void executeInstance(Long id) { + try { + SchedulerInstance instance = schedulerInstanceService.getById(id); + List tasks = schedulerTaskService.queryByInstanceId(id); + + // Filter non-running tasks + List runningTasks = + tasks.stream() + .filter(s -> TaskStatus.isRunning(s.getStatus())) + .collect(Collectors.toList()); + + // If no tasks are running, Change next task Status from WAIT to RUNNING + if (CollectionUtils.isEmpty(runningTasks)) { + runningTasks = checkAndUpdateWaitStatus(instance, tasks); + } + if (CollectionUtils.isEmpty(runningTasks)) { + schedulerCommonService.setInstanceFinish( + instance, InstanceStatus.FINISH, TaskStatus.FINISH); + return; + } + executeInstance(instance, runningTasks); + } catch (Exception e) { + log.error("execute instance error id:{}", id, e); + } + } + + /** execute instance by all tasks */ + private void executeInstance(SchedulerInstance instance, List tasks) { + if (InstanceStatus.isFinished(instance.getStatus())) { + log.info("instance:{} status is {} ignore execute", instance.getId(), instance.getStatus()); + return; + } + tasks.forEach(task -> executeTask(instance, task)); + } + + /** execute instance by task */ + private void executeTask(SchedulerInstance instance, SchedulerTask task) { + try { + SchedulerJob job = schedulerJobService.getById(instance.getJobId()); + TaskExecuteContext context = new TaskExecuteContext(job, instance, task); + if (StringUtils.isBlank(task.getType())) { + log.error("task type is null uniqueId:{} taskId:{}", instance.getUniqueId(), task.getId()); + return; + } + + // get TaskExecute by type + String type = task.getType().split(UNDERLINE_SEPARATOR)[0]; + TaskExecute jobTask = SpringContextHolder.getBean(type, TaskExecute.class); + if (jobTask != null) { + jobTask.executeEntry(context); + executeNextTask(context); + } else { + log.error("get bean is null uniqueId:{} type:{}", instance.getUniqueId(), type); + } + } catch (Exception e) { + log.error("process task error task:{}", task.getId(), e); + } + } + + /** execute next task */ + private void executeNextTask(TaskExecuteContext context) { + SchedulerInstance instance = context.getInstance(); + SchedulerTask task = context.getTask(); + // get all next task + List nextNodes = + instance.getTaskDag().getRelatedNodes(task.getNodeId(), true); + if (!context.isTaskFinish() || CollectionUtils.isEmpty(nextNodes)) { + return; + } + List taskList = Lists.newArrayList(); + + // execute all next task + for (TaskExecuteDag.Node nextNode : nextNodes) { + taskList.add( + schedulerTaskService.queryByInstanceIdAndType( + instance.getId(), nextNode.getTaskComponent())); + } + SchedulerInstance ins = schedulerInstanceService.getById(instance.getId()); + Runnable instanceRunnable = () -> executeInstance(ins, taskList); + + executorService.schedule(instanceRunnable, DELAY, TimeUnit.SECONDS); + log.info("executeNextTask successful {}", instance.getUniqueId()); + } + + /** check next task Status is WAIT to RUNNING */ + private List checkAndUpdateWaitStatus( + SchedulerInstance instance, List tasks) { + + List result = Lists.newArrayList(); + for (SchedulerTask task : tasks) { + if (!TaskStatus.WAIT.equals(task.getStatus())) { + continue; + } + List preNodes = + instance.getTaskDag().getRelatedNodes(task.getNodeId(), false); + // If all pre-nodes have completed, set the current task status to Running + if (checkAllNodesFinished(instance.getId(), preNodes)) { + SchedulerTask updateTask = new SchedulerTask(); + updateTask.setId(task.getId()); + updateTask.setStatus(TaskStatus.RUNNING); + schedulerTaskService.update(updateTask); + task.setStatus(TaskStatus.RUNNING); + result.add(task); + } + } + + return result; + } + + /** check all nodes is finished */ + private boolean checkAllNodesFinished(Long instanceId, List nodes) { + for (TaskExecuteDag.Node node : nodes) { + SchedulerTask t = + schedulerTaskService.queryByInstanceIdAndType(instanceId, node.getTaskComponent()); + if (!TaskStatus.isFinished(t.getStatus())) { + return false; + } + } + return true; + } + + /** get all not finish instances */ + private List getAllNotFinishInstances() { + SchedulerInstance record = new SchedulerInstance(); + Integer maxDays = schedulerConfig.getExecuteMaxDay() + 1; + Date startDate = DateUtils.addDays(new Date(), -maxDays); + record.setStartCreateTime(startDate); + List allInstance = schedulerInstanceService.getNotFinishInstance(record); + return allInstance; + } + + /** get instance ThreadPoolExecutor by type */ + private ThreadPoolExecutor getInstanceExecutor(String type) { + if (instances.containsKey(type)) { + return instances.get(type); + } + ThreadPoolExecutor instanceExecutor = + new ThreadPoolExecutor(20, 100, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100000)); + + instances.put(type, instanceExecutor); + return instanceExecutor; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/SchedulerHandler.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/SchedulerHandler.java new file mode 100644 index 000000000..6e2156236 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/SchedulerHandler.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.handler; + +/** Scheduler Handler. To generate and execute Instances */ +public interface SchedulerHandler { + + /** scheduler timer entrance. execute Instances */ + void executeInstances(); + + /** scheduler generate Instances timer */ + void generateInstances(); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/impl/local/LocalSchedulerHandler.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/impl/local/LocalSchedulerHandler.java new file mode 100644 index 000000000..317657b04 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/impl/local/LocalSchedulerHandler.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.handler.impl.local; + +import com.antgroup.openspg.server.core.scheduler.service.config.SchedulerConfig; +import com.antgroup.openspg.server.core.scheduler.service.engine.SchedulerExecuteService; +import com.antgroup.openspg.server.core.scheduler.service.handler.SchedulerHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import javax.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** Scheduler Handler Local implementation class. To generate and execute Instances */ +@Service +@Slf4j +@ConditionalOnProperty(name = "scheduler.handler.type", havingValue = "local") +public class LocalSchedulerHandler implements SchedulerHandler { + + private static final int corePoolSize = 1; + private static final long initialDelay = 0; + + private static ScheduledExecutorService EXECUTE = new ScheduledThreadPoolExecutor(corePoolSize); + private static ScheduledExecutorService GENERATE = new ScheduledThreadPoolExecutor(corePoolSize); + + @Autowired SchedulerConfig schedulerConfig; + @Autowired SchedulerExecuteService schedulerExecuteService; + + @Override + @PostConstruct + public void executeInstances() { + log.info("start executeInstances"); + EXECUTE.scheduleAtFixedRate( + new ExecuteRunnable(), + initialDelay, + schedulerConfig.getExecuteInstancesPeriod(), + schedulerConfig.getExecuteInstancesUnit()); + } + + @Override + @PostConstruct + public void generateInstances() { + log.info("start generateInstances"); + GENERATE.scheduleAtFixedRate( + new GenerateRunnable(), + initialDelay, + schedulerConfig.getGenerateInstancesPeriod(), + schedulerConfig.getGenerateInstancesUnit()); + } + + /** Execute Instances Runnable */ + class ExecuteRunnable implements Runnable { + @Override + public void run() { + try { + Long startTime = System.currentTimeMillis(); + schedulerExecuteService.executeInstances(); + Long time = System.currentTimeMillis() - startTime; + log.info("run ExecuteInstances end time:{}", time); + } catch (Exception e) { + log.error("run ExecuteInstances Exception", e); + } + } + } + + /** Generate Instances Runnable */ + class GenerateRunnable implements Runnable { + @Override + public void run() { + try { + Long startTime = System.currentTimeMillis(); + schedulerExecuteService.generateInstances(); + Long time = System.currentTimeMillis() - startTime; + log.info("run GenerateInstances end time:{}", time); + } catch (Exception e) { + log.error("run GenerateInstances Exception", e); + } + } + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerInstanceService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerInstanceService.java new file mode 100644 index 000000000..9ccfe2f9d --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerInstanceService.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata; + +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import java.util.List; + +/** Scheduler Instance Service: Add, delete, update, and query instances */ +public interface SchedulerInstanceService { + + /** insert Instance */ + Long insert(SchedulerInstance record); + + /** delete By JobId */ + int deleteByJobId(Long jobId); + + /** update */ + Long update(SchedulerInstance record); + + /** get By id */ + SchedulerInstance getById(Long id); + + /** get By instanceId */ + SchedulerInstance getByUniqueId(String instanceId); + + /** query By Condition */ + List query(SchedulerInstance record); + + /** get Not Finish Instance */ + List getNotFinishInstance(SchedulerInstance record); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerJobService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerJobService.java new file mode 100644 index 000000000..ea512362e --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerJobService.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata; + +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import java.util.List; + +/** Scheduler Job Service: Add, delete, update, and query Jobs */ +public interface SchedulerJobService { + + /** insert Job */ + Long insert(SchedulerJob record); + + /** delete By Id */ + int deleteById(Long id); + + /** update Job */ + Long update(SchedulerJob record); + + /** get By id */ + SchedulerJob getById(Long id); + + /** query By Condition */ + List query(SchedulerJob record); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerTaskService.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerTaskService.java new file mode 100644 index 000000000..1c4b4e252 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerTaskService.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import java.util.List; + +/** Scheduler Task Service: Add, delete, update, and query tasks */ +public interface SchedulerTaskService { + + /** insert Task */ + Long insert(SchedulerTask record); + + /** delete By jobId */ + int deleteByJobId(Long jobId); + + /** update By Id */ + Long update(SchedulerTask record); + + /** insert Or Update,id is null to Update */ + Long replace(SchedulerTask record); + + /** get By id */ + SchedulerTask getById(Long id); + + /** query By Condition */ + List query(SchedulerTask record); + + /** query By InstanceId And Type */ + SchedulerTask queryByInstanceIdAndType(Long instanceId, String type); + + /** query By InstanceId */ + List queryByInstanceId(Long instanceId); + + /** set Status By InstanceId */ + int setStatusByInstanceId(Long instanceId, TaskStatus status); + + /** update Lock */ + int updateLock(Long id); + + /** update Unlock */ + int updateUnlock(Long id); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerInstanceServiceImpl.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerInstanceServiceImpl.java new file mode 100644 index 000000000..5216afaae --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerInstanceServiceImpl.java @@ -0,0 +1,155 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata.impl.local; + +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** Scheduler Instance Service implementation class: Add, delete, update, and query instances */ +@Service +@ConditionalOnProperty(name = "scheduler.metadata.store.type", havingValue = "local") +public class LocalSchedulerInstanceServiceImpl implements SchedulerInstanceService { + + private static ConcurrentHashMap instances = new ConcurrentHashMap<>(); + private static AtomicLong maxId = new AtomicLong(0L); + + @Autowired SchedulerTaskService schedulerTaskService; + + @Override + public synchronized Long insert(SchedulerInstance record) { + String uniqueId = record.getUniqueId(); + for (Long id : instances.keySet()) { + SchedulerInstance instance = instances.get(id); + if (uniqueId.equals(instance.getUniqueId())) { + throw new SchedulerException("uniqueId {} already existed", uniqueId); + } + } + Long id = maxId.incrementAndGet(); + record.setId(id); + record.setGmtModified(new Date()); + instances.put(id, record); + return id; + } + + @Override + public synchronized int deleteByJobId(Long jobId) { + List instanceList = Lists.newArrayList(); + for (Long key : instances.keySet()) { + SchedulerInstance instance = instances.get(key); + if (jobId.equals(instance.getJobId())) { + instanceList.add(instance.getId()); + } + } + + for (Long id : instanceList) { + instances.remove(id); + } + return instanceList.size(); + } + + @Override + public synchronized Long update(SchedulerInstance record) { + Long id = record.getId(); + SchedulerInstance old = getById(id); + if (record.getGmtModified() != null && !old.getGmtModified().equals(record.getGmtModified())) { + return 0L; + } + record = SchedulerUtils.merge(old, record); + record.setGmtModified(new Date()); + instances.put(id, record); + return id; + } + + @Override + public SchedulerInstance getById(Long id) { + SchedulerInstance oldInstance = instances.get(id); + if (oldInstance == null) { + throw new SchedulerException("not find id {}", id); + } + SchedulerInstance instance = new SchedulerInstance(); + BeanUtils.copyProperties(oldInstance, instance); + return instance; + } + + @Override + public SchedulerInstance getByUniqueId(String instanceId) { + for (Long key : instances.keySet()) { + SchedulerInstance instance = instances.get(key); + if (instanceId.equals(instance.getUniqueId())) { + SchedulerInstance target = new SchedulerInstance(); + BeanUtils.copyProperties(instance, target); + return target; + } + } + return null; + } + + @Override + public List query(SchedulerInstance record) { + List instanceList = Lists.newArrayList(); + for (Long key : instances.keySet()) { + SchedulerInstance instance = instances.get(key); + // Filter instance by fields + if (!SchedulerUtils.compare(instance.getId(), record.getId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + instance.getProjectId(), record.getProjectId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(instance.getJobId(), record.getJobId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + instance.getUniqueId(), record.getUniqueId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(instance.getType(), record.getType(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(instance.getStatus(), record.getStatus(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + instance.getLifeCycle(), record.getLifeCycle(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + instance.getDependence(), record.getDependence(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + instance.getVersion(), record.getVersion(), SchedulerUtils.EQ)) { + continue; + } + + Date create = instance.getGmtCreate(); + if (!SchedulerUtils.compare(create, record.getStartCreateTime(), SchedulerUtils.LT)) { + continue; + } + + SchedulerInstance target = new SchedulerInstance(); + BeanUtils.copyProperties(instance, target); + instanceList.add(target); + } + return instanceList; + } + + @Override + public List getNotFinishInstance(SchedulerInstance record) { + List instanceList = query(record); + instanceList = + instanceList.stream() + .filter(s -> !InstanceStatus.isFinished(s.getStatus())) + .collect(Collectors.toList()); + return instanceList; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerJobServiceImpl.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerJobServiceImpl.java new file mode 100644 index 000000000..5db872ab1 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerJobServiceImpl.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata.impl.local; + +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerJobService; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.springframework.beans.BeanUtils; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** Scheduler Job Service implementation class: Add, delete, update, and query Jobs */ +@Service +@ConditionalOnProperty(name = "scheduler.metadata.store.type", havingValue = "local") +public class LocalSchedulerJobServiceImpl implements SchedulerJobService { + + private static ConcurrentHashMap jobs = new ConcurrentHashMap<>(); + private static AtomicLong maxId = new AtomicLong(0L); + + @Override + public synchronized Long insert(SchedulerJob record) { + Long id = maxId.incrementAndGet(); + record.setId(id); + record.setGmtModified(new Date()); + jobs.put(id, record); + return id; + } + + @Override + public synchronized int deleteById(Long id) { + SchedulerJob record = jobs.remove(id); + return record == null ? 0 : 1; + } + + @Override + public synchronized Long update(SchedulerJob record) { + Long id = record.getId(); + SchedulerJob old = getById(id); + if (record.getGmtModified() != null && !old.getGmtModified().equals(record.getGmtModified())) { + return 0L; + } + record = SchedulerUtils.merge(old, record); + record.setGmtModified(new Date()); + jobs.put(id, record); + return id; + } + + @Override + public SchedulerJob getById(Long id) { + SchedulerJob oldJob = jobs.get(id); + if (oldJob == null) { + throw new SchedulerException("not find id {}", id); + } + SchedulerJob job = new SchedulerJob(); + BeanUtils.copyProperties(oldJob, job); + return job; + } + + @Override + public List query(SchedulerJob record) { + List jobList = Lists.newArrayList(); + for (Long key : jobs.keySet()) { + SchedulerJob job = jobs.get(key); + + // Filter job by fields + if (!SchedulerUtils.compare(job.getId(), record.getId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(job.getCreateUser(), record.getCreateUser(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + job.getTranslateType(), record.getTranslateType(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(job.getLifeCycle(), record.getLifeCycle(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(job.getStatus(), record.getStatus(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(job.getDependence(), record.getDependence(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(job.getName(), record.getName(), SchedulerUtils.IN)) { + continue; + } + + SchedulerJob target = new SchedulerJob(); + BeanUtils.copyProperties(job, target); + jobList.add(target); + } + return jobList; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerTaskServiceImpl.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerTaskServiceImpl.java new file mode 100644 index 000000000..cf85f6f43 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerTaskServiceImpl.java @@ -0,0 +1,178 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.metadata.impl.local; + +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.springframework.beans.BeanUtils; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** Scheduler Task Service implementation class: Add, delete, update, and query tasks */ +@Service +@ConditionalOnProperty(name = "scheduler.metadata.store.type", havingValue = "local") +public class LocalSchedulerTaskServiceImpl implements SchedulerTaskService { + + private static ConcurrentHashMap tasks = new ConcurrentHashMap<>(); + private static AtomicLong maxId = new AtomicLong(0L); + + @Override + public synchronized Long insert(SchedulerTask record) { + Long id = maxId.incrementAndGet(); + record.setId(id); + record.setGmtModified(new Date()); + tasks.put(id, record); + return id; + } + + @Override + public synchronized int deleteByJobId(Long jobId) { + List ids = Lists.newArrayList(); + for (Long key : tasks.keySet()) { + SchedulerTask task = tasks.get(key); + if (jobId.equals(task.getJobId())) { + ids.add(task.getId()); + } + } + for (Long id : ids) { + tasks.remove(id); + } + return ids.size(); + } + + @Override + public synchronized Long update(SchedulerTask record) { + Long id = record.getId(); + SchedulerTask old = getById(id); + if (record.getGmtModified() != null && !old.getGmtModified().equals(record.getGmtModified())) { + return 0L; + } + record = SchedulerUtils.merge(old, record); + record.setGmtModified(new Date()); + tasks.put(id, record); + return id; + } + + @Override + public synchronized Long replace(SchedulerTask record) { + if (record.getId() == null) { + return insert(record); + } else { + return update(record); + } + } + + @Override + public SchedulerTask getById(Long id) { + SchedulerTask oldTask = tasks.get(id); + if (oldTask == null) { + throw new SchedulerException("not find id {}", id); + } + SchedulerTask task = new SchedulerTask(); + BeanUtils.copyProperties(oldTask, task); + return task; + } + + @Override + public List query(SchedulerTask record) { + List taskList = Lists.newArrayList(); + for (Long key : tasks.keySet()) { + SchedulerTask task = tasks.get(key); + + // Filter task by fields + if (!SchedulerUtils.compare(task.getId(), record.getId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(task.getType(), record.getType(), SchedulerUtils.EQ) + || !SchedulerUtils.compare(task.getTitle(), record.getTitle(), SchedulerUtils.IN) + || !SchedulerUtils.compare(task.getJobId(), record.getJobId(), SchedulerUtils.EQ) + || !SchedulerUtils.compare( + task.getInstanceId(), record.getInstanceId(), SchedulerUtils.EQ)) { + continue; + } + + SchedulerTask target = new SchedulerTask(); + BeanUtils.copyProperties(task, target); + taskList.add(target); + } + return taskList; + } + + @Override + public SchedulerTask queryByInstanceIdAndType(Long instanceId, String type) { + for (Long key : tasks.keySet()) { + SchedulerTask task = tasks.get(key); + if (instanceId.equals(task.getInstanceId()) && type.equalsIgnoreCase(task.getType())) { + SchedulerTask target = new SchedulerTask(); + BeanUtils.copyProperties(task, target); + return target; + } + } + return null; + } + + @Override + public List queryByInstanceId(Long instanceId) { + List taskList = Lists.newArrayList(); + for (Long key : tasks.keySet()) { + SchedulerTask task = tasks.get(key); + if (instanceId.equals(task.getInstanceId())) { + SchedulerTask target = new SchedulerTask(); + BeanUtils.copyProperties(task, target); + taskList.add(target); + } + } + return taskList; + } + + @Override + public int setStatusByInstanceId(Long instanceId, TaskStatus status) { + int flag = 0; + for (Long key : tasks.keySet()) { + SchedulerTask task = tasks.get(key); + if (instanceId.equals(task.getInstanceId())) { + task.setGmtModified(new Date()); + task.setStatus(status); + flag++; + } + } + return flag; + } + + @Override + public int updateLock(Long id) { + SchedulerTask oldRecord = getById(id); + if (oldRecord.getLockTime() != null) { + return 0; + } + oldRecord.setGmtModified(new Date()); + oldRecord.setLockTime(new Date()); + tasks.put(id, oldRecord); + return 1; + } + + @Override + public int updateUnlock(Long id) { + SchedulerTask oldRecord = getById(id); + oldRecord.setGmtModified(new Date()); + oldRecord.setLockTime(null); + tasks.put(id, oldRecord); + return 1; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecute.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecute.java new file mode 100644 index 000000000..ab1b9a493 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecute.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; + +/** Job Task .execute instance Entry and process task */ +public interface TaskExecute { + /** execute Entry */ + void executeEntry(TaskExecuteContext context); + + /** execute task */ + TaskStatus execute(TaskExecuteContext context); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java new file mode 100644 index 000000000..56842510e --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java @@ -0,0 +1,229 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task; + +import com.antgroup.openspg.common.util.DateTimeUtils; +import com.antgroup.openspg.server.common.model.exception.SchedulerException; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import com.antgroup.openspg.server.core.scheduler.service.common.SchedulerCommonService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import java.util.Date; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.beans.factory.annotation.Autowired; + +/** JobTask Template class. execute before,process,finally and other functions */ +@Slf4j +public abstract class TaskExecuteTemplate implements TaskExecute { + + /** lock max time */ + public static final Integer LOCK_TIME_MINUTES = 15; + + @Autowired SchedulerTaskService schedulerTaskService; + @Autowired SchedulerCommonService schedulerCommonService; + + @Override + public final void executeEntry(TaskExecuteContext context) { + TaskStatus status = null; + boolean lock = true; + try { + lock = lockTask(context); + if (lock) { + before(context); + status = execute(context); + context.getTask().setStatus(status); + } + } catch (Throwable e) { + context.getTask().setStatus(TaskStatus.ERROR); + context.addTraceLog("Scheduling execute exception:%s", ExceptionUtils.getStackTrace(e)); + log.error("JobTask process error uniqueId:{}", context.getInstance().getUniqueId(), e); + } + + processStatus(context, status, lock); + } + + public void processStatus(TaskExecuteContext context, TaskStatus status, boolean lock) { + try { + if (TaskStatus.isFinished(status)) { + setTaskFinish(context); + } + } catch (Throwable e) { + context.addTraceLog("Scheduling save status error:%s", ExceptionUtils.getStackTrace(e)); + log.error("process status error uniqueId:{}", context.getInstance().getUniqueId(), e); + } finally { + unlockTask(context, lock); + finallyFunc(context); + } + } + + /** lock task before scheduling */ + private boolean lockTask(TaskExecuteContext context) { + SchedulerTask task = context.getTask(); + if (task.getLockTime() == null) { + if (schedulerTaskService.updateLock(task.getId()) < 1) { + context.addTraceLog("Failed to preempt lock, the lock is already occupied!"); + return false; + } + context.addTraceLog("Lock preempt successful!"); + return true; + } + + Date now = new Date(); + Date unLockTime = DateUtils.addMinutes(task.getLockTime(), LOCK_TIME_MINUTES); + if (now.before(unLockTime)) { + context.addTraceLog( + "Last lock preempt time:%s,The threshold was not exceeded. Wait for the execution to complete", + DateTimeUtils.getDate2LongStr(task.getLockTime())); + return false; + } + + // Timeout release lock + context.addTraceLog( + "Last lock preempt time:%s, The threshold was exceeded. The current process is executed directly", + DateTimeUtils.getDate2LongStr(task.getLockTime())); + unlockTask(context, true); + if (schedulerTaskService.updateLock(task.getId()) < 1) { + context.addTraceLog("Failed to re-preempt lock!"); + return false; + } + context.addTraceLog("Re-preempt lock successfully!"); + return true; + } + + /** Release lock after scheduling is completed */ + private void unlockTask(TaskExecuteContext context, boolean lock) { + if (!lock) { + return; + } + schedulerTaskService.updateUnlock(context.getTask().getId()); + context.addTraceLog("Lock released successfully!"); + } + + public void before(TaskExecuteContext context) { + context.addTraceLog("Start process task!"); + } + + /** the finally Func */ + public void finallyFunc(TaskExecuteContext context) { + long cost = System.currentTimeMillis() - context.getStartTime(); + context.addTraceLog("Task scheduling completed. cost:%s ms !", cost); + + SchedulerTask task = context.getTask(); + SchedulerTask old = schedulerTaskService.getById(task.getId()); + if (TaskStatus.isFinished(old.getStatus())) { + context.addTraceLog("Task has been completed by other threads,status:%s!", old.getStatus()); + task = old; + } + + // update task execute num and trace log + task.setGmtModified(old.getGmtModified()); + task.setExecuteNum(old.getExecuteNum() + 1); + context.getTraceLog().insert(0, System.getProperty("line.separator")); + task.setTraceLog(SchedulerUtils.setRemarkLimit(old.getTraceLog(), context.getTraceLog())); + task.setLockTime(null); + + if (schedulerTaskService.replace(task) <= 0) { + throw new SchedulerException("finally replace task error task {}", task); + } + } + + /** set task to finished */ + public void setTaskFinish(TaskExecuteContext context) { + SchedulerInstance instance = context.getInstance(); + SchedulerTask task = context.getTask(); + task.setFinishTime(new Date()); + + List nextNodes = + instance.getTaskDag().getRelatedNodes(task.getNodeId(), true); + + // Set the instance to complete if all tasks are completed + if (CollectionUtils.isEmpty(nextNodes)) { + List tasks = schedulerTaskService.queryByInstanceId(instance.getId()); + if (checkAllTasksFinished(task, tasks)) { + setInstanceFinished(context, TaskStatus.FINISH, InstanceStatus.FINISH); + } + return; + } + nextNodes.forEach(node -> startNextNode(context, instance.getTaskDag(), node)); + } + + /** start next wait node */ + private void startNextNode( + TaskExecuteContext context, TaskExecuteDag taskDag, TaskExecuteDag.Node nextNode) { + SchedulerTask task = context.getTask(); + + if (!checkAllNodesFinished(task, taskDag.getRelatedNodes(nextNode.getId(), false))) { + return; + } + SchedulerTask nextTask = + schedulerTaskService.queryByInstanceIdAndType( + task.getInstanceId(), nextNode.getTaskComponent()); + SchedulerTask updateTask = new SchedulerTask(); + updateTask.setId(nextTask.getId()); + String name = nextNode.getName(); + context.addTraceLog("current node is completed to trigger next node:%s", name); + if (!TaskStatus.WAIT.equals(nextTask.getStatus())) { + context.addTraceLog("%s status:%s,Only WAIT can be modified", name, nextTask.getStatus()); + return; + } + updateTask.setStatus(TaskStatus.RUNNING); + updateTask.setBeginTime(new Date()); + if (schedulerTaskService.replace(updateTask) <= 0) { + task.setStatus(TaskStatus.ERROR); + throw new SchedulerException("replace task error task {}", updateTask); + } + context.setTaskFinish(true); + } + + /** check all tasks is finished */ + private boolean checkAllTasksFinished(SchedulerTask task, List taskList) { + for (SchedulerTask t : taskList) { + if (!t.getId().equals(task.getId()) && !TaskStatus.isFinished(t.getStatus())) { + return false; + } + } + return true; + } + + /** Check that all nodes are complete */ + private boolean checkAllNodesFinished(SchedulerTask task, List nodes) { + for (TaskExecuteDag.Node node : nodes) { + SchedulerTask t = + schedulerTaskService.queryByInstanceIdAndType( + task.getInstanceId(), node.getTaskComponent()); + if (!node.getId().equals(task.getNodeId()) && !TaskStatus.isFinished(t.getStatus())) { + return false; + } + } + return true; + } + + /** set instance status to finished */ + public void setInstanceFinished( + TaskExecuteContext context, TaskStatus taskStatus, InstanceStatus instanceStatus) { + SchedulerInstance instance = context.getInstance(); + context.addTraceLog( + "Complete instance,Subsequent task status will all be changed to:%s. instance status set to:%s", + taskStatus.name(), instanceStatus.name()); + schedulerCommonService.setInstanceFinish(instance, instanceStatus, taskStatus); + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecute.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecute.java new file mode 100644 index 000000000..17d4f4a63 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecute.java @@ -0,0 +1,28 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task.async; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; + +/** Async scheduler Task, submit/stop task */ +public interface AsyncTaskExecute { + /** Async submit task, return null and retry */ + String submit(TaskExecuteContext context); + + /** get task Status by resource */ + TaskStatus getStatus(TaskExecuteContext context, String resource); + + /** stop Task */ + Boolean stop(TaskExecuteContext context, String resource); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecuteTemplate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecuteTemplate.java new file mode 100644 index 000000000..face824c3 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecuteTemplate.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task.async; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTaskService; +import com.antgroup.openspg.server.core.scheduler.service.task.TaskExecuteTemplate; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; + +/** Job Async Task Template class. execute process functions */ +public abstract class AsyncTaskExecuteTemplate extends TaskExecuteTemplate + implements AsyncTaskExecute { + + @Autowired SchedulerTaskService schedulerTaskService; + + @Override + public final TaskStatus execute(TaskExecuteContext context) { + SchedulerTask task = context.getTask(); + String resource = task.getResource(); + // if resource not blank trigger getStatus + if (StringUtils.isNotBlank(resource)) { + context.addTraceLog("Async task submitted! Get task status. resource:%s", resource); + return getStatus(context, resource); + } + + context.addTraceLog("The Async task has not been submit! Go to submit"); + // if resource is blank trigger submit + resource = submit(context); + if (StringUtils.isBlank(resource)) { + return TaskStatus.RUNNING; + } + + context.addTraceLog("Async task submit successful! resource:%s", resource); + SchedulerTask updateTask = new SchedulerTask(); + updateTask.setId(task.getId()); + updateTask.setResource(resource); + schedulerTaskService.update(updateTask); + return TaskStatus.RUNNING; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecute.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecute.java new file mode 100644 index 000000000..41e9d0931 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecute.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task.sync; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; + +/** Job Sync task, submit task */ +public interface SyncTaskExecute { + + /** Sync submit task */ + TaskStatus submit(TaskExecuteContext context); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecuteTemplate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecuteTemplate.java new file mode 100644 index 000000000..fce2832d8 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecuteTemplate.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.task.sync; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.service.task.TaskExecuteTemplate; + +/** Job Sync task Template class. execute process functions */ +public abstract class SyncTaskExecuteTemplate extends TaskExecuteTemplate + implements SyncTaskExecute { + + @Override + public final TaskStatus execute(TaskExecuteContext context) { + return submit(context); + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/Translate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/Translate.java new file mode 100644 index 000000000..a43ad78be --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/Translate.java @@ -0,0 +1,28 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.translate; + +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; + +/** scheduler Translate. SchedulerJob to TaskDag */ +public interface Translate { + + /** + * translate to Task Dag + * + * @param job + * @return + */ + TaskExecuteDag translate(SchedulerJob job); +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/TranslatorFactory.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/TranslatorFactory.java new file mode 100644 index 000000000..0e6a50a42 --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/TranslatorFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.translate; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TranslateType; +import com.antgroup.openspg.server.common.service.spring.SpringContextHolder; +import lombok.extern.slf4j.Slf4j; + +/** Translator Factory. get Translate Bean by type */ +@Slf4j +public class TranslatorFactory { + + /** get Translate by type */ + public static Translate getTranslator(TranslateType type) { + Translate dagTranslate = SpringContextHolder.getBean(type.getType(), Translate.class); + if (dagTranslate == null) { + log.error("getTranslator bean error type:{}", type); + throw new RuntimeException("not find bean type:" + type); + } + return dagTranslate; + } +} diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/utils/SchedulerUtils.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/utils/SchedulerUtils.java new file mode 100644 index 000000000..b39df458f --- /dev/null +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/utils/SchedulerUtils.java @@ -0,0 +1,138 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.server.core.scheduler.service.utils; + +import com.antgroup.openspg.common.util.DateTimeUtils; +import java.beans.BeanInfo; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateUtils; +import org.quartz.CronExpression; + +/** some scheduler common tools */ +@Slf4j +public class SchedulerUtils { + + public static final String EQ = "eq"; + public static final String IN = "in"; + public static final String LT = "lt"; + + /** merge two bean by discovering differences */ + public static M merge(M dest, M orig) { + if (dest == null || orig == null) { + return (dest == null) ? orig : dest; + } + try { + BeanInfo beanInfo = Introspector.getBeanInfo(dest.getClass()); + for (PropertyDescriptor descriptor : beanInfo.getPropertyDescriptors()) { + if (descriptor.getWriteMethod() == null) { + continue; + } + Object originalValue = descriptor.getReadMethod().invoke(orig); + if (originalValue == null) { + continue; + } + descriptor.getWriteMethod().invoke(dest, originalValue); + } + } catch (Exception e) { + log.error("merge bean exception", e); + } + return dest; + } + + /** Limit remark. sub String To Length */ + public static String setRemarkLimit(String oldRemark, StringBuffer appendRemark) { + Integer start = 0; + Integer length = 100000; + StringBuffer str = appendRemark.append(oldRemark); + String fill = "..."; + if (length >= str.length()) { + return str.toString(); + } + return str.substring(start, length - fill.length()) + fill; + } + + /** get CronExpression */ + public static CronExpression getCronExpression(String cron) { + try { + return new CronExpression(cron); + } catch (ParseException e) { + throw new RuntimeException("Cron ParseException:" + cron, e); + } + } + + /** get Cron Execution Dates By Today */ + public static List getCronExecutionDatesByToday(String cron) { + CronExpression expression = getCronExpression(cron); + List dates = new ArrayList<>(); + Date startDate = DateUtils.truncate(new Date(), Calendar.DAY_OF_MONTH); + Date endDate = DateUtils.addDays(startDate, 1); + + if (expression.isSatisfiedBy(startDate)) { + dates.add(startDate); + } + Date nextDate = expression.getNextValidTimeAfter(startDate); + while (nextDate != null && nextDate.before(endDate)) { + dates.add(nextDate); + nextDate = expression.getNextValidTimeAfter(nextDate); + } + + return dates; + } + + /** get Previous ValidTime */ + public static Date getPreviousValidTime(String cron, Date date) { + CronExpression expression = getCronExpression(cron); + Date endDate = expression.getNextValidTimeAfter(expression.getNextValidTimeAfter(date)); + Long time = 2 * date.getTime() - endDate.getTime(); + + Date nextDate = expression.getNextValidTimeAfter(new Date(time)); + Date preDate = nextDate; + while (nextDate != null && nextDate.before(date)) { + preDate = nextDate; + nextDate = expression.getNextValidTimeAfter(nextDate); + } + return preDate; + } + + /** get Unique Id */ + public static String getUniqueId(Long jobId, Date schedulerDate) { + return jobId + DateTimeUtils.getDate2Str(DateTimeUtils.YYYY_MM_DD_HH_MM_SS2, schedulerDate); + } + + /** content compare key */ + public static boolean compare(Object content, Object key, String type) { + if (key == null) { + return true; + } + if (content == null) { + return false; + } + switch (type) { + case EQ: + return content.equals(key); + case IN: + return ((String) content).contains((String) key); + case LT: + return ((Date) key).before((Date) content); + default: + return false; + } + } +} diff --git a/server/pom.xml b/server/pom.xml index f2ac51222..08185293d 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -46,6 +46,8 @@ common/service core/schema/model core/schema/service + core/scheduler/model + core/scheduler/service core/reasoner/model core/reasoner/service infra/dao @@ -113,6 +115,31 @@ core-schema-service ${project.version} + + com.antgroup.openspg.server + core-builder-model + ${project.version} + + + com.antgroup.openspg.server + core-builder-service + ${project.version} + + + com.antgroup.openspg.server + core-scheduler-model + ${project.version} + + + com.antgroup.openspg.server + core-scheduler-service + ${project.version} + + + com.antgroup.openspg.server + core-reasoner-model + ${project.version} + com.antgroup.openspg.server core-reasoner-service @@ -160,6 +187,16 @@ springdoc-openapi-ui ${springdoc.version} + + org.springframework.boot + spring-boot-starter-test + 2.7.8 + + + org.springframework + spring-web + 5.3.21 + diff --git a/server/test/pom.xml b/server/test/pom.xml index 1f1ab7043..30eedba41 100644 --- a/server/test/pom.xml +++ b/server/test/pom.xml @@ -34,6 +34,10 @@ org.springframework.boot spring-boot-autoconfigure + + org.springframework.boot + spring-boot-starter-test + org.slf4j slf4j-api @@ -48,5 +52,21 @@ cloudext-impl-graph-store-tugraph test + + com.antgroup.openspg.server + core-scheduler-service + + + com.antgroup.openspg.server + infra-dao + + + org.springframework.boot + spring-boot-starter-test + + + org.springframework + spring-web + diff --git a/server/test/src/test/java/com/antgroup/openspg/test/kgschema/SPGSchemaFacadeTest.groovy b/server/test/src/test/java/com/antgroup/openspg/test/kgschema/SPGSchemaFacadeTest.groovy new file mode 100644 index 000000000..51f3ccb99 --- /dev/null +++ b/server/test/src/test/java/com/antgroup/openspg/test/kgschema/SPGSchemaFacadeTest.groovy @@ -0,0 +1,202 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + + +package com.antgroup.openspg.test.kgschema + +import com.antgroup.openspg.server.api.facade.ApiResponse +import com.antgroup.openspg.server.api.facade.dto.schema.request.BuiltInPropertyRequest +import com.antgroup.openspg.server.api.facade.dto.schema.request.ConceptRequest +import com.antgroup.openspg.server.api.facade.dto.schema.request.ProjectSchemaRequest +import com.antgroup.openspg.server.api.facade.dto.schema.request.RelationRequest +import com.antgroup.openspg.server.api.facade.dto.schema.request.SPGTypeRequest +import com.antgroup.openspg.server.api.facade.dto.schema.request.SchemaAlterRequest +import com.antgroup.openspg.server.api.http.client.HttpConceptFacade +import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade +import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo +import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap +import com.antgroup.openspg.cloudext.impl.graphstore.tugraph.TuGraphStoreClient +import com.antgroup.openspg.server.common.service.datasource.DataSourceService +import com.antgroup.openspg.core.schema.model.alter.SchemaDraft +import com.antgroup.openspg.core.schema.model.predicate.Property +import com.antgroup.openspg.core.schema.model.predicate.Relation +import com.antgroup.openspg.core.schema.model.semantic.request.DefineDynamicTaxonomyRequest +import com.antgroup.openspg.core.schema.model.semantic.request.DefineLogicalCausationRequest +import com.antgroup.openspg.core.schema.model.semantic.request.RemoveDynamicTaxonomyRequest +import com.antgroup.openspg.core.schema.model.semantic.request.RemoveLogicalCausationRequest +import com.antgroup.openspg.core.schema.model.type.BaseSPGType +import com.antgroup.openspg.core.schema.model.type.ConceptList +import com.antgroup.openspg.core.schema.model.type.ProjectSchema +import com.antgroup.openspg.core.schema.model.type.SPGTypeEnum +import com.antgroup.openspg.test.sofaboot.SofaBootTestApplication +import org.mockito.Mockito +import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean +import spock.lang.Shared +import spock.lang.Specification + +import static org.junit.jupiter.api.Assertions.assertEquals +import static org.junit.jupiter.api.Assertions.assertNotNull + +@SpringBootTest(classes = SofaBootTestApplication, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +@AutoConfigureTestDatabase +class SPGSchemaFacadeTest extends Specification { + @Shared + Long projectId = 1L + @Shared + spgSchemaFacade = new HttpSchemaFacade() + @Shared + conceptFacade = new HttpConceptFacade() + + @MockBean + private DataSourceService dataSourceService + + def setupSpec() { + HttpClientBootstrap.init(new ConnectionInfo("http://127.0.0.1:8887") + .setConnectTimeout(60000).setReadTimeout(60000) + ) + } + + /** + * step 1: query project schema, check system built-in BasicType and Standard is inited; + * step 2: create new StandardType、EntityType、ConceptType、EventType; + * step 3: define taxonomy semantic and logic causation semantic on concept; + * step 4: update or delete some StandardType、EntityType、ConceptType、EventType; + * step 5: query a single spg type, query a single relation; + * step 6: delete all customized StandardType、EntityType、ConceptType、EventType; + */ + def "test"() { + given: + Mockito.doReturn(Mock(TuGraphStoreClient.class)) + .when(dataSourceService) + .buildSharedKgStoreClient() + Mockito.doReturn(Mock(ElasticSearchEngineClient.class)) + .when(dataSourceService) + .buildSharedSearchEngineClient() + + when: + // step 1 + ProjectSchema projectSchema = this.getProjectSchema() + MockSchemaResultValidator.checkInitResult(projectSchema.getSpgTypes()) + + BuiltInPropertyRequest builtInPropertyRequest = new BuiltInPropertyRequest( + spgTypeEnum: SPGTypeEnum.CONCEPT_TYPE.name() + ) + ApiResponse> apiResponse = spgSchemaFacade.queryBuiltInProperty(builtInPropertyRequest) + assertEquals(2, apiResponse.getData().size()) + + // step 2 + SchemaDraft createDraft = MockSchemaDraftFactory.buildCreateDraft() + SchemaAlterRequest schemaAlterRequest = new SchemaAlterRequest( + projectId: projectId, schemaDraft: createDraft) + spgSchemaFacade.alterSchema(schemaAlterRequest) + + projectSchema = this.getProjectSchema() + MockSchemaResultValidator.checkCreateResult(projectSchema.getSpgTypes()) + + //step 3 + DefineDynamicTaxonomyRequest defineDynamicTaxonomyRequest1 = new DefineDynamicTaxonomyRequest( + conceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + conceptName: "中产阶级", + dsl: "Define (s:DEFAULT.Person)-[p:belongTo]->(o:`DEFAULT.TaxonomyOfPerson`/`中产阶级`) " + + "{GraphStructure{} Rule{ R1: s.age >= 40 and s.age < 50}}") + conceptFacade.defineDynamicTaxonomy(defineDynamicTaxonomyRequest1) + + DefineDynamicTaxonomyRequest defineDynamicTaxonomyRequest2 = new DefineDynamicTaxonomyRequest( + conceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + conceptName: "资产阶级", + dsl: "Define (s:DEFAULT.Person)-[p:belongTo]->" + + "(o:`DEFAULT.TaxonomyOfPerson`/`资产阶级`) " + + "{GraphStructure{} Rule{ R1: s.age >= 50}}") + conceptFacade.defineDynamicTaxonomy(defineDynamicTaxonomyRequest2) + + DefineLogicalCausationRequest defineLogicalCausationRequest = new DefineLogicalCausationRequest( + subjectConceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + subjectConceptName: "中产阶级", + objectConceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + objectConceptName: "资产阶级", + predicateName: "leadTo", + dsl: "Define (s:`DEFAULT.TaxonomyOfPerson`/`中产阶级`)-[p:leadTo]->" + + "(o:`DEFAULT.TaxonomyOfPerson`/`资产阶级`) " + + "{GraphStructure{} Rule{ R1: s.age=50} \n Action {}}") + conceptFacade.defineLogicalCausation(defineLogicalCausationRequest) + + ConceptRequest conceptRequest = new ConceptRequest( + conceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + conceptName: "中产阶级" + ) + ApiResponse conceptResponse = conceptFacade.queryConcept(conceptRequest) + assertEquals(1, conceptResponse.getData().getConcepts().size()) + assertEquals(2, conceptResponse.getData().getConcepts().get(0).getSemantics().size()) + + RemoveDynamicTaxonomyRequest removeDynamicTaxonomyRequest = new RemoveDynamicTaxonomyRequest( + objectConceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName()) + conceptFacade.removeDynamicTaxonomy(removeDynamicTaxonomyRequest) + + RemoveLogicalCausationRequest removeLogicalCausationRequest = new RemoveLogicalCausationRequest( + subjectConceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + subjectConceptName: "中产阶级", + objectConceptTypeName: MockSpgTypeNameEnum.DEFAULT_TAXOMOMY_OF_PERSON.getName(), + objectConceptName: "资产阶级", + predicateName: "leadTo") + conceptFacade.removeLogicalCausation(removeLogicalCausationRequest) + + conceptResponse = conceptFacade.queryConcept(conceptRequest) + assertEquals(0, conceptResponse.getData().getConcepts().size()) + + // step 4 + SchemaDraft updateDraft = MockSchemaDraftFactory.buildUpdateDraft(projectSchema) + schemaAlterRequest = new SchemaAlterRequest( + projectId: projectId, schemaDraft: updateDraft) + spgSchemaFacade.alterSchema(schemaAlterRequest) + + projectSchema = this.getProjectSchema() + MockSchemaResultValidator.checkUpdateResult(projectSchema.getSpgTypes()) + + // step 5 + SPGTypeRequest request = new SPGTypeRequest(name: MockSpgTypeNameEnum.DEFAULT_ALIPAY_USER.getName()) + ApiResponse response = spgSchemaFacade.querySPGType(request) + assertNotNull(response.getData()) + assertEquals(MockSpgTypeNameEnum.DEFAULT_ALIPAY_USER.getName(), response.getData().getName()) + + RelationRequest relationRequest = new RelationRequest( + sName: MockSpgTypeNameEnum.DEFAULT_ALIPAY_USER.getName(), + relation: "regAddress", + oName: MockSpgTypeNameEnum.DEFAULT_ADMINISTRATION.getName()) + ApiResponse relationResponse = spgSchemaFacade.queryRelation(relationRequest) + assertNotNull(relationResponse.getData()) + + // step 6 + projectSchema = this.getProjectSchema() + SchemaDraft deleteDraft = MockSchemaDraftFactory.buildDeleteDraft(projectSchema) + schemaAlterRequest = new SchemaAlterRequest( + projectId: projectId, schemaDraft: deleteDraft) + spgSchemaFacade.alterSchema(schemaAlterRequest) + + projectSchema = this.getProjectSchema() + MockSchemaResultValidator.checkInitResult(projectSchema.getSpgTypes()) + + then: + assertNotNull(this.getProjectSchema()) + } + + ProjectSchema getProjectSchema() { + ProjectSchemaRequest projectSchemaRequest = new ProjectSchemaRequest(projectId: projectId) + ApiResponse projectSchemaResponse = + spgSchemaFacade.queryProjectSchema(projectSchemaRequest) + + assertNotNull(projectSchemaResponse) + return projectSchemaResponse.getData() + } +} diff --git a/server/test/src/test/java/com/antgroup/openspg/test/scheduler/SchedulerServiceImplTest.java b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/SchedulerServiceImplTest.java new file mode 100644 index 000000000..eddb1693a --- /dev/null +++ b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/SchedulerServiceImplTest.java @@ -0,0 +1,352 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.test.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.antgroup.openspg.common.util.thread.ThreadUtils; +import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo; +import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Dependence; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Status; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TranslateType; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.service.api.SchedulerService; +import com.antgroup.openspg.server.core.scheduler.service.engine.SchedulerExecuteService; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.test.sofaboot.SofaBootTestApplication; +import java.util.Comparator; +import java.util.List; +import org.apache.commons.collections4.CollectionUtils; +import org.junit.Before; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +/** Scheduler Service Test */ +@SpringBootTest( + classes = SofaBootTestApplication.class, + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +class SchedulerServiceImplTest { + + @Autowired SchedulerService schedulerService; + @Autowired SchedulerInstanceService schedulerInstanceService; + @Autowired SchedulerExecuteService schedulerExecuteService; + + @Before + public void setUp() { + HttpClientBootstrap.init( + new ConnectionInfo("http://127.0.0.1:8887").setConnectTimeout(60000).setReadTimeout(60000)); + } + + /** + * step 1: create Once Job to submit step 2: query all Jobs step 3: offline Job step 4: online Job + * step 5: update Job step 6: execute Job step 7: get Instance to set Finish step 8: reRun + * Instance and to stop step 9: reRun Instance and to trigger step 10: trigger Instance until it + * ends step 11: get tasks step 12: delete Job; + */ + @Test + void submitOnceJob() { + // step 1: create Job to submit + SchedulerJob job = new SchedulerJob(); + job.setProjectId(0L); + job.setName("Once Job"); + job.setCreateUser("andy"); + job.setLifeCycle(LifeCycle.ONCE); + job.setTranslateType(TranslateType.LOCAL_EXAMPLE); + job.setDependence(Dependence.DEPENDENT); + job = schedulerService.submitJob(job); + Long jobId = job.getId(); + assertTrue(jobId > 0); + + SchedulerJob jobQuery = new SchedulerJob(); + jobQuery.setId(jobId); + SchedulerInstance instanceQuery = new SchedulerInstance(); + instanceQuery.setJobId(jobId); + SchedulerTask taskQuery = new SchedulerTask(); + taskQuery.setJobId(jobId); + + try { + // step 2: query Jobs + List jobs = schedulerService.searchJobs(jobQuery); + assertEquals(1, jobs.size()); + + // step 3: offline job + assertTrue(schedulerService.disableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.DISABLE, job.getStatus()); + List notFinishInstances = + schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(CollectionUtils.isEmpty(notFinishInstances)); + ThreadUtils.sleep(100); + + // step 4: online Job + assertTrue(schedulerService.enableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.ENABLE, job.getStatus()); + ThreadUtils.sleep(100); + + // step 5: update Job + String updateName = "Update Test Once Job"; + job.setName(updateName); + assertTrue(schedulerService.updateJob(job)); + job = schedulerService.getJobById(jobId); + assertEquals(updateName, job.getName()); + ThreadUtils.sleep(100); + + // step 6: execute Job + assertTrue(schedulerService.executeJob(jobId)); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(1, notFinishInstances.size()); + ThreadUtils.sleep(100); + + // step 7: get Instance to set Finish + List instances = schedulerService.searchInstances(instanceQuery); + assertTrue(instances.size() > 0); + SchedulerInstance instance = notFinishInstances.get(0); + SchedulerInstance ins = schedulerService.getInstanceById(instance.getId()); + assertEquals(ins.getId(), instance.getId()); + assertTrue(schedulerService.setFinishInstance(instance.getId())); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(CollectionUtils.isEmpty(notFinishInstances)); + ThreadUtils.sleep(100); + + // step 8: reRun Instance and to stop + assertTrue(schedulerService.restartInstance(instance.getId())); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(1, notFinishInstances.size()); + instance = notFinishInstances.get(0); + assertTrue(schedulerService.stopInstance(instance.getId())); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(CollectionUtils.isEmpty(notFinishInstances)); + ThreadUtils.sleep(100); + + // step 9: reRun Instance + assertTrue(schedulerService.restartInstance(instance.getId())); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(1, notFinishInstances.size()); + instance = notFinishInstances.get(0); + ThreadUtils.sleep(2000); + + // step 10: trigger Instance until it ends + while (!InstanceStatus.isFinished(getInstance(instance.getId()))) { + assertTrue(schedulerService.triggerInstance(instance.getId())); + ThreadUtils.sleep(2000); + } + instance = schedulerService.getInstanceById(instance.getId()); + assertEquals(InstanceStatus.FINISH, instance.getStatus()); + ThreadUtils.sleep(100); + + // step 11: get tasks + List tasks = schedulerService.searchTasks(taskQuery); + assertTrue(tasks.size() > 0); + } finally { + // step 12: delete Job + assertTrue(schedulerService.deleteJob(jobId)); + assertEquals(0, schedulerService.searchJobs(jobQuery).size()); + assertEquals(0, schedulerService.searchInstances(instanceQuery).size()); + assertEquals(0, schedulerService.searchTasks(taskQuery).size()); + } + } + + /** + * step 1: create Period Job to submit step 2: query Jobs and Instances step 3: offline Job step + * 4: online Job step 5: execute Job step 6: trigger first Instance until it ends step 7: trigger + * second Instance until it ends step 8: get tasks step 9: delete Job + */ + @Test + void submitPeriodJob() { + // step 1: create Period Job to submit + SchedulerJob job = new SchedulerJob(); + job.setProjectId(0L); + job.setName("Period Job"); + job.setCreateUser("andy"); + job.setLifeCycle(LifeCycle.PERIOD); + job.setSchedulerCron("0 0 * * * ?"); + job.setTranslateType(TranslateType.LOCAL_EXAMPLE); + job.setDependence(Dependence.DEPENDENT); + job = schedulerService.submitJob(job); + Long jobId = job.getId(); + assertTrue(jobId > 0); + + ThreadUtils.sleep(100); + SchedulerJob jobQuery = new SchedulerJob(); + jobQuery.setId(jobId); + SchedulerInstance instanceQuery = new SchedulerInstance(); + instanceQuery.setJobId(jobId); + SchedulerTask taskQuery = new SchedulerTask(); + taskQuery.setJobId(jobId); + + try { + // step 2: query Jobs and Instances + List jobs = schedulerService.searchJobs(jobQuery); + assertEquals(1, jobs.size()); + List instances = schedulerService.searchInstances(instanceQuery); + assertEquals(24, instances.size()); + ThreadUtils.sleep(100); + + // step 3: offline Period job + assertTrue(schedulerService.disableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.DISABLE, job.getStatus()); + List notFinishInstances = + schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(CollectionUtils.isEmpty(notFinishInstances)); + ThreadUtils.sleep(100); + + // step 4: online Period Job + assertTrue(schedulerService.enableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.ENABLE, job.getStatus()); + ThreadUtils.sleep(100); + + // step 5: execute Job + assertFalse(schedulerService.executeJob(jobId)); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(notFinishInstances.size() < 24); + schedulerInstanceService.deleteByJobId(jobId); + assertTrue(schedulerService.executeJob(jobId)); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(24, notFinishInstances.size()); + ThreadUtils.sleep(100); + + SchedulerInstance instance = + notFinishInstances.stream() + .min(Comparator.comparing(x -> x.getSchedulerDate())) + .orElse(null); + ThreadUtils.sleep(2000); + + // step 6: trigger first Instance until it ends + while (schedulerInstanceService.getNotFinishInstance(instanceQuery).size() == 24) { + schedulerExecuteService.executeInstances(); + ThreadUtils.sleep(2000); + } + instance = schedulerService.getInstanceById(instance.getId()); + assertEquals(InstanceStatus.FINISH, instance.getStatus()); + + // step 7: trigger second Instance until it ends + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(23, notFinishInstances.size()); + instance = + notFinishInstances.stream() + .min(Comparator.comparing(x -> x.getSchedulerDate())) + .orElse(null); + while (schedulerInstanceService.getNotFinishInstance(instanceQuery).size() == 23) { + schedulerExecuteService.executeInstances(); + ThreadUtils.sleep(2000); + } + instance = schedulerService.getInstanceById(instance.getId()); + assertEquals(InstanceStatus.FINISH, instance.getStatus()); + + // step 8: get tasks + List tasks = schedulerService.searchTasks(taskQuery); + assertTrue(tasks.size() > 0); + + } finally { + // step 9: delete Job + assertTrue(schedulerService.deleteJob(jobId)); + assertEquals(0, schedulerService.searchJobs(jobQuery).size()); + assertEquals(0, schedulerService.searchInstances(instanceQuery).size()); + assertEquals(0, schedulerService.searchTasks(taskQuery).size()); + } + } + + /** + * step 1: create RealTime Job to submit step 2: query Jobs and Instances step 3: offline Job step + * 4: online Job step 5: trigger Instance step 6: get tasks step 7: delete Job; + */ + @Test + void submitRealTimeJob() { + // step 1: create RealTime Job to submit + SchedulerJob job = new SchedulerJob(); + job.setProjectId(0L); + job.setName("RealTime Job"); + job.setCreateUser("andy"); + job.setLifeCycle(LifeCycle.REAL_TIME); + job.setTranslateType(TranslateType.LOCAL_EXAMPLE); + job.setDependence(Dependence.DEPENDENT); + job = schedulerService.submitJob(job); + Long jobId = job.getId(); + assertTrue(jobId > 0); + + ThreadUtils.sleep(100); + SchedulerJob jobQuery = new SchedulerJob(); + jobQuery.setId(jobId); + SchedulerInstance instanceQuery = new SchedulerInstance(); + instanceQuery.setJobId(jobId); + SchedulerTask taskQuery = new SchedulerTask(); + taskQuery.setJobId(jobId); + + try { + // step 2: query Jobs and Instances + List jobs = schedulerService.searchJobs(jobQuery); + assertEquals(1, jobs.size()); + List instances = schedulerService.searchInstances(instanceQuery); + assertEquals(1, instances.size()); + ThreadUtils.sleep(100); + + // step 3: offline RealTime job + assertTrue(schedulerService.disableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.DISABLE, job.getStatus()); + List notFinishInstances = + schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertTrue(CollectionUtils.isEmpty(notFinishInstances)); + ThreadUtils.sleep(100); + + // step 4: online RealTime Job + assertTrue(schedulerService.enableJob(jobId)); + job = schedulerService.getJobById(jobId); + assertEquals(Status.ENABLE, job.getStatus()); + notFinishInstances = schedulerInstanceService.getNotFinishInstance(instanceQuery); + assertEquals(1, notFinishInstances.size()); + ThreadUtils.sleep(100); + + SchedulerInstance instance = + notFinishInstances.stream() + .min(Comparator.comparing(x -> x.getSchedulerDate())) + .orElse(null); + ThreadUtils.sleep(2000); + + // step 5: trigger Instance + for (int i = 0; i < 10; i++) { + assertTrue(schedulerService.triggerInstance(instance.getId())); + ThreadUtils.sleep(2000); + } + instance = schedulerService.getInstanceById(instance.getId()); + assertEquals(InstanceStatus.RUNNING, instance.getStatus()); + + // step 6: get tasks + List tasks = schedulerService.searchTasks(taskQuery); + assertTrue(tasks.size() > 0); + + } finally { + // step 7: delete Job + assertTrue(schedulerService.deleteJob(jobId)); + assertEquals(0, schedulerService.searchJobs(jobQuery).size()); + assertEquals(0, schedulerService.searchInstances(instanceQuery).size()); + assertEquals(0, schedulerService.searchTasks(taskQuery).size()); + } + } + + private InstanceStatus getInstance(Long id) { + SchedulerInstance ins = schedulerService.getInstanceById(id); + return ins.getStatus(); + } +} diff --git a/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleAsyncTaskMock.java b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleAsyncTaskMock.java new file mode 100644 index 000000000..b0e6c0351 --- /dev/null +++ b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleAsyncTaskMock.java @@ -0,0 +1,52 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.test.scheduler.task; + +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.service.task.async.AsyncTaskExecuteTemplate; +import java.util.UUID; +import org.springframework.stereotype.Component; + +/** Local Async Task Example */ +@Component("localExampleAsyncTask") +public class LocalExampleAsyncTaskMock extends AsyncTaskExecuteTemplate { + + @Override + public String submit(TaskExecuteContext context) { + String resource = UUID.randomUUID().toString(); + context.addTraceLog("submit a example Task, resource:%s", resource); + return resource; + } + + @Override + public TaskStatus getStatus(TaskExecuteContext context, String resource) { + context.addTraceLog("check example task status, resource:%s", resource); + SchedulerInstance instance = context.getInstance(); + SchedulerTask task = context.getTask(); + if (LifeCycle.REAL_TIME.equals(instance.getLifeCycle())) { + context.addTraceLog("LifeCycle is REAL_TIME, The instance is running continuously..."); + return TaskStatus.RUNNING; + } + return task.getExecuteNum() > 2 ? TaskStatus.FINISH : TaskStatus.RUNNING; + } + + @Override + public Boolean stop(TaskExecuteContext context, String resource) { + context.addTraceLog("stop example Task, resource:%s", resource); + return true; + } +} diff --git a/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleSyncTaskMock.java b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleSyncTaskMock.java new file mode 100644 index 000000000..46f51cafd --- /dev/null +++ b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleSyncTaskMock.java @@ -0,0 +1,113 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.test.scheduler.task; + +import com.antgroup.openspg.common.util.DateTimeUtils; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.Dependence; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.InstanceStatus; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.LifeCycle; +import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum.TaskStatus; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerInstance; +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteContext; +import com.antgroup.openspg.server.core.scheduler.service.config.SchedulerConfig; +import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerInstanceService; +import com.antgroup.openspg.server.core.scheduler.service.task.sync.SyncTaskExecuteTemplate; +import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** Local Sync Task Example: Pre Check Task */ +@Component("localExampleSyncTask") +public class LocalExampleSyncTaskMock extends SyncTaskExecuteTemplate { + + /** scheduler max days */ + private static final long SCHEDULER_MAX_DAYS = 5; + + @Autowired SchedulerConfig schedulerConfig; + @Autowired SchedulerInstanceService schedulerInstanceService; + + @Override + public TaskStatus submit(TaskExecuteContext context) { + TaskStatus status = getTaskStatus(context); + if (TaskStatus.isFinished(status)) { + SchedulerInstance instance = context.getInstance(); + SchedulerInstance updateInstance = new SchedulerInstance(); + updateInstance.setId(instance.getId()); + updateInstance.setStatus(InstanceStatus.RUNNING); + updateInstance.setGmtModified(instance.getGmtModified()); + schedulerInstanceService.update(updateInstance); + } + return status; + } + + /** period instance pre-check, dependent pre task completion */ + private TaskStatus getTaskStatus(TaskExecuteContext context) { + SchedulerInstance instance = context.getInstance(); + + long days = + TimeUnit.MILLISECONDS.toDays( + System.currentTimeMillis() - instance.getGmtCreate().getTime()); + Integer lastDays = schedulerConfig.getExecuteMaxDay(); + if (days > SCHEDULER_MAX_DAYS) { + context.addTraceLog( + "The pre-check has not passed for more than %s days. It will not be scheduled after more than %s days", + days, lastDays); + } + Date schedulerDate = instance.getSchedulerDate(); + Date now = new Date(); + if (now.before(schedulerDate)) { + context.addTraceLog( + "Execution time not reached! Start scheduling date:%s", + DateTimeUtils.getDate2LongStr(schedulerDate)); + return TaskStatus.RUNNING; + } + LifeCycle lifeCycle = instance.getLifeCycle(); + if (!LifeCycle.PERIOD.equals(lifeCycle)) { + context.addTraceLog("No pre-check required"); + return TaskStatus.FINISH; + } + + if (Dependence.INDEPENDENT.name().equals(instance.getDependence())) { + return processByIndependent(context); + } else { + return processByDependent(context); + } + } + + /** not dependent pre task completion */ + private TaskStatus processByIndependent(TaskExecuteContext context) { + context.addTraceLog("The current task does not depend on the completion of the last instance"); + return TaskStatus.FINISH; + } + + /** dependent pre task completion */ + public TaskStatus processByDependent(TaskExecuteContext context) { + context.addTraceLog("The current task depends on the completion of the last instance"); + SchedulerInstance instance = context.getInstance(); + SchedulerJob job = context.getJob(); + Date preSchedulerDate = + SchedulerUtils.getPreviousValidTime(job.getSchedulerCron(), instance.getSchedulerDate()); + String preUniqueId = SchedulerUtils.getUniqueId(job.getId(), preSchedulerDate); + SchedulerInstance pre = schedulerInstanceService.getByUniqueId(preUniqueId); + + if (null == pre || InstanceStatus.isFinished(pre.getStatus())) { + return TaskStatus.FINISH; + } + + context.addTraceLog("Last instance(%s) has not executed, please wait", pre.getUniqueId()); + return TaskStatus.RUNNING; + } +} diff --git a/server/test/src/test/java/com/antgroup/openspg/test/scheduler/translate/LocalExampleTranslateMock.java b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/translate/LocalExampleTranslateMock.java new file mode 100644 index 000000000..ead50c99f --- /dev/null +++ b/server/test/src/test/java/com/antgroup/openspg/test/scheduler/translate/LocalExampleTranslateMock.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ +package com.antgroup.openspg.test.scheduler.translate; + +import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerJob; +import com.antgroup.openspg.server.core.scheduler.model.task.TaskExecuteDag; +import com.antgroup.openspg.server.core.scheduler.service.translate.Translate; +import com.google.common.collect.Lists; +import java.util.List; +import org.springframework.stereotype.Component; + +/** scheduler Translate Local implementation class. SchedulerJob to TaskDag */ +@Component("localExampleTranslate") +public class LocalExampleTranslateMock implements Translate { + + @Override + public TaskExecuteDag translate(SchedulerJob schedulerJob) { + return getTaskDag(); + } + + /** get Local Example TaskDag */ + public TaskExecuteDag getTaskDag() { + + List nodes = Lists.newArrayList(); + List edges = Lists.newArrayList(); + + TaskExecuteDag taskDag = new TaskExecuteDag(); + TaskExecuteDag.Node sync = new TaskExecuteDag.Node(); + String prdId = "1000001"; + sync.setId(prdId); + sync.setName("Local Sync Task Example"); + sync.setTaskComponent("localExampleSyncTask"); + nodes.add(sync); + + TaskExecuteDag.Node async = new TaskExecuteDag.Node(); + String dryRunId = "2000001"; + async.setId(dryRunId); + async.setName("Local Async Task Example"); + async.setTaskComponent("localExampleAsyncTask"); + nodes.add(async); + + TaskExecuteDag.Edge edge = new TaskExecuteDag.Edge(); + edge.setFrom(prdId); + edge.setTo(dryRunId); + edges.add(edge); + + taskDag.setNodes(nodes); + taskDag.setEdges(edges); + + return taskDag; + } +} diff --git a/server/test/src/test/resources/config/application.properties b/server/test/src/test/resources/config/application.properties new file mode 100644 index 000000000..16c7be426 --- /dev/null +++ b/server/test/src/test/resources/config/application.properties @@ -0,0 +1,56 @@ +# Copyright 2023 OpenSPG Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. + +# /* ----------------------- * +# | spring | +# * ----------------------- */ +spring.application.name=openspg +spring.servlet.multipart.max-file-size=100GB +spring.servlet.multipart.max-request-size=100GB + +# /* ----------------------- * +# | system | +# * ----------------------- */ +server.port=8887 +logging.level.com.alipay.sofa=info +logging.path=./logs +management.endpoint.components.enable=false +management.endpoint.beans.enable=false + +# /* ----------------------- * +# | core | +# * ----------------------- */ +schema.uri=http://127.0.0.1 +builder.search-engine.enable=true +builder.operator.python.exec=/usr/local/bin/python3.9 +builder.operator.python.paths=/usr/local/lib/python3.9/site-packages;./python; + +# /* ----------------------- * +# | cloudext | +# * ----------------------- */ + +# repository +cloudext.repository.driver=com.antgroup.openspg.server.infra.dao.JdbcRepositoryClientDriver +cloudext.repository.impl.jdbc.url=jdbc:mysql://${cloudext.repository.impl.jdbc.host}:${cloudext.repository.impl.jdbc.port}/openspg?useUnicode=true&characterEncoding=utf8 +cloudext.repository.impl.jdbc.host=127.0.0.1 +cloudext.repository.impl.jdbc.port=3306 +cloudext.repository.impl.jdbc.username=root +cloudext.repository.impl.jdbc.password=openspg +cloudext.repository.impl.jdbc.driver=com.mysql.jdbc.Driver + +# Scheduler +scheduler.handler.type=db +scheduler.metadata.store.type=local +scheduler.execute.instances.period=5 +scheduler.execute.instances.unit=MINUTES +scheduler.generate.instances.period=1 +scheduler.generate.instances.unit=HOURS +scheduler.execute.max.day=10 diff --git a/server/test/src/test/resources/spring/spring-common.xml b/server/test/src/test/resources/spring/spring-common.xml new file mode 100644 index 000000000..1c3b754e1 --- /dev/null +++ b/server/test/src/test/resources/spring/spring-common.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + +