-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(scheduler): add support for scheduler #26
Changes from 73 commits
2a44680
720a872
7539535
de546c5
403ced6
9bf1a5a
4dcd829
c489e14
f3b06a8
eb6366d
a76184e
93231e5
a5992b8
0ff0d25
ba85058
3c7b3bb
48ebd6e
a9167e5
356f48f
267ccc5
2bfeb14
16307aa
5a57bbe
5d2aaec
a57364c
46245a0
d2ab5dc
5dd3a93
2a9f517
fa9b8ce
50219c5
53025df
64a8a37
dc5f1c4
2abf885
5b866d6
328d1cd
77d62d5
2c675f9
b285c76
c7be6a5
6f24393
0b085bc
6506325
4b997aa
71fe6b8
5f8d579
ed85e26
8290d86
bd150d5
3df74c1
caf32e5
a87d7b2
1bc932f
2c769b0
4701f79
dbf06dc
d32ac4b
9077a50
37286c7
290405b
237ca6c
60a4128
b5e987e
0a59b07
b822e9f
3844bce
124ce0e
781c9bf
6d86218
003efa6
d6e4442
bdaaa59
4d22f56
5db4cca
88e3058
130dc21
b0ce0a1
d24259c
b63484e
7e5ebca
d35de28
2e42a65
ef23c52
8c7126f
aaa6f96
be68c8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* Copyright 2023 Ant Group CO., Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. | ||
*/ | ||
package com.antgroup.openspg.common.util; | ||
|
||
import java.beans.BeanInfo; | ||
import java.beans.Introspector; | ||
import java.beans.PropertyDescriptor; | ||
import java.net.Inet4Address; | ||
import java.net.InetAddress; | ||
import java.net.NetworkInterface; | ||
import java.net.SocketException; | ||
import java.text.ParseException; | ||
import java.util.ArrayList; | ||
import java.util.Calendar; | ||
import java.util.Collections; | ||
import java.util.Date; | ||
import java.util.Enumeration; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.commons.lang3.time.DateUtils; | ||
import org.quartz.CronExpression; | ||
|
||
/** some common tools */ | ||
@Slf4j | ||
public class SchedulerUtils { | ||
|
||
public static final String IPS = String.join(",", getLocalIps()); | ||
public static final String EQ = "eq"; | ||
public static final String IN = "in"; | ||
public static final String LT = "lt"; | ||
|
||
/** merge two bean by discovering differences */ | ||
public static <M> M merge(M dest, M orig) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个方法不是SchedulerUtils?而应该放在BeanUtils呢? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 调度的本地查询merge逻辑,只有为目标值空才会覆盖,不适合放到BeanUtils |
||
if (dest == null || orig == null) { | ||
return (dest == null) ? orig : dest; | ||
} | ||
try { | ||
BeanInfo beanInfo = Introspector.getBeanInfo(dest.getClass()); | ||
for (PropertyDescriptor descriptor : beanInfo.getPropertyDescriptors()) { | ||
if (descriptor.getWriteMethod() == null) { | ||
continue; | ||
} | ||
Object originalValue = descriptor.getReadMethod().invoke(orig); | ||
if (originalValue == null) { | ||
continue; | ||
} | ||
descriptor.getWriteMethod().invoke(dest, originalValue); | ||
} | ||
} catch (Exception e) { | ||
log.error("merge bean exception", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个merge失败要抛异常的,不然异常吞了上游要得到非预期结果 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 因为入参都是同一类型M,这里反射不会抛出异常。 |
||
} | ||
return dest; | ||
} | ||
|
||
/** Limit remark. sub String To Length */ | ||
public static String setRemarkLimit(String oldRemark, StringBuffer appendRemark) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个方法命名感觉也不太对。太定制化了是不是不需要放在common/util?直接scheduler包那里使用就ok了 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 静态的工具方法统一放在SchedulerUtils.java类中,供Scheduler统一调用是合理的吧 |
||
Integer start = 0; | ||
Integer length = 100000; | ||
StringBuffer str = appendRemark.append(oldRemark); | ||
String fill = "..."; | ||
if (length >= str.length()) { | ||
return str.toString(); | ||
} | ||
return str.substring(start, length - fill.length()) + fill; | ||
} | ||
|
||
/** get CronExpression */ | ||
public static CronExpression getCronExpression(String cron) { | ||
try { | ||
return new CronExpression(cron); | ||
} catch (ParseException e) { | ||
throw new RuntimeException("Cron ParseException:" + cron, e); | ||
} | ||
} | ||
|
||
/** get Cron Execution Dates By Today */ | ||
public static List<Date> getCronExecutionDatesByToday(String cron) { | ||
CronExpression expression = getCronExpression(cron); | ||
List<Date> dates = new ArrayList<>(); | ||
Date startDate = DateUtils.truncate(new Date(), Calendar.DAY_OF_MONTH); | ||
Date endDate = DateUtils.addDays(startDate, 1); | ||
|
||
if (expression.isSatisfiedBy(startDate)) { | ||
dates.add(startDate); | ||
} | ||
Date nextDate = expression.getNextValidTimeAfter(startDate); | ||
while (nextDate != null && nextDate.before(endDate)) { | ||
dates.add(nextDate); | ||
nextDate = expression.getNextValidTimeAfter(nextDate); | ||
} | ||
|
||
return dates; | ||
} | ||
|
||
/** get Previous ValidTime */ | ||
public static Date getPreviousValidTime(String cron, Date date) { | ||
CronExpression expression = getCronExpression(cron); | ||
Date endDate = expression.getNextValidTimeAfter(expression.getNextValidTimeAfter(date)); | ||
Long time = 2 * date.getTime() - endDate.getTime(); | ||
|
||
Date nextDate = expression.getNextValidTimeAfter(new Date(time)); | ||
Date preDate = nextDate; | ||
while (nextDate != null && nextDate.before(date)) { | ||
preDate = nextDate; | ||
nextDate = expression.getNextValidTimeAfter(nextDate); | ||
} | ||
return preDate; | ||
} | ||
|
||
/** get Unique Id */ | ||
public static String getUniqueId(Long jobId, Date schedulerDate) { | ||
return jobId + DateTimeUtils.getDate2Str(DateTimeUtils.YYYY_MM_DD_HH_MM_SS2, schedulerDate); | ||
} | ||
|
||
/** content compare key */ | ||
public static boolean compare(Object content, Object key, String type) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个方法太定制化了,不像是common/util呢。key/content有可能是 Date或者String类型。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scheduler用于对象属性比较的,提供给Scheduler的本地查询用,所以统一放在SchedulerUtils中 |
||
if (key == null) { | ||
return true; | ||
} | ||
if (content == null) { | ||
return false; | ||
} | ||
switch (type) { | ||
case EQ: | ||
return content.equals(key); | ||
case IN: | ||
return ((String) content).contains((String) key); | ||
case LT: | ||
return ((Date) key).before((Date) content); | ||
default: | ||
return false; | ||
} | ||
} | ||
|
||
/** get local ips */ | ||
public static List<String> getLocalIps() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个方法NetworkAddressUtils里面已经有了 |
||
try { | ||
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); | ||
return Collections.list(networkInterfaces).stream() | ||
.flatMap(network -> Collections.list(network.getInetAddresses()).stream()) | ||
.filter(address -> address instanceof Inet4Address && !address.isLoopbackAddress()) | ||
.map(InetAddress::getHostAddress) | ||
.collect(Collectors.toList()); | ||
} catch (SocketException e) { | ||
log.error("getLocalIps failed.", e); | ||
} | ||
return new ArrayList<>(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright 2023 Ant Group CO., Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. | ||
*/ | ||
package com.antgroup.openspg.server.common.model.exception; | ||
|
||
/** Scheduler exception */ | ||
public class SchedulerException extends OpenSPGException { | ||
|
||
public SchedulerException(String message, Object... args) { | ||
super(null, true, true, message, args); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright 2023 Ant Group CO., Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. | ||
*/ | ||
package com.antgroup.openspg.server.common.model.scheduler; | ||
|
||
/** all scheduler dependent enum */ | ||
public interface SchedulerEnum { | ||
|
||
/** Instance Status enum */ | ||
enum InstanceStatus { | ||
WAITING, | ||
RUNNING, | ||
SKIP, | ||
FINISH, | ||
TERMINATE, | ||
SET_FINISH; | ||
|
||
/** status is Finished */ | ||
public static boolean isFinished(InstanceStatus status) { | ||
return InstanceStatus.FINISH.equals(status) | ||
|| InstanceStatus.TERMINATE.equals(status) | ||
|| InstanceStatus.SET_FINISH.equals(status) | ||
|| InstanceStatus.SKIP.equals(status); | ||
} | ||
} | ||
|
||
/** Life Cycle Enum */ | ||
enum LifeCycle { | ||
PERIOD, | ||
ONCE, | ||
REAL_TIME | ||
} | ||
|
||
/** Dependence Enum */ | ||
enum Dependence { | ||
DEPENDENT, | ||
INDEPENDENT | ||
} | ||
|
||
/** Status Enum */ | ||
enum Status { | ||
ENABLE, | ||
DISABLE | ||
} | ||
|
||
/** Task Status Enum */ | ||
enum TaskStatus { | ||
WAIT, | ||
RUNNING, | ||
FINISH, | ||
ERROR, | ||
SKIP, | ||
TERMINATE, | ||
SET_FINISH; | ||
|
||
/** status is Finished by TaskStatus */ | ||
public static boolean isFinished(TaskStatus status) { | ||
return TaskStatus.FINISH.equals(status) | ||
|| TaskStatus.SKIP.equals(status) | ||
|| TaskStatus.TERMINATE.equals(status) | ||
|| TaskStatus.SET_FINISH.equals(status); | ||
} | ||
|
||
/** status is Running by TaskStatus */ | ||
public static boolean isRunning(TaskStatus status) { | ||
return TaskStatus.RUNNING.equals(status) || TaskStatus.ERROR.equals(status); | ||
} | ||
} | ||
|
||
/** Translate Enum */ | ||
enum TranslateType { | ||
LOCAL_EXAMPLE("localExampleTranslate"); | ||
|
||
private String type; | ||
|
||
TranslateType(String type) { | ||
this.type = type; | ||
} | ||
|
||
public String getType() { | ||
return type; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Copyright 2023 Ant Group CO., Ltd. | ||
~ | ||
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
~ in compliance with the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software distributed under the License | ||
~ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
~ or implied. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>com.antgroup.openspg.server</groupId> | ||
<artifactId>server-parent</artifactId> | ||
<version>0.0.1-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>core-scheduler-model</artifactId> | ||
<dependencies> | ||
<dependency> | ||
<groupId>com.antgroup.openspg.server</groupId> | ||
<artifactId>common-model</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.antgroup.openspg</groupId> | ||
<artifactId>common-util</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.alibaba</groupId> | ||
<artifactId>fastjson</artifactId> | ||
</dependency> | ||
</dependencies> | ||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个类是不是直接放在core/scheduler/service就ok了?同时需要增加一些单测
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
集成测试中已经包含了该方法的单侧