Commit f5158cec by 宋珺琪

集成powerjob、入库编码和增删改查

parent 6f7d522a
......@@ -19,9 +19,9 @@ public class CodeYymc implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.INPUT)
/**
* 编码
*/
@ApiModelProperty("主键")
private String id;
@ApiModelProperty("编码")
private String code;
......
......@@ -50,6 +50,21 @@ public class KeyUtilNaotu {
return asd;
}
/**
*
* @Description: 生成唯一的主键 格式:
*/
public static synchronized String getJzKey1(String str) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
Date date = new Date();
String dateNowStr = sdf.format(date);
int num = UUID.randomUUID().toString().replaceAll("-","").hashCode();
num = num < 0 ? -num:num;
String substring = String.valueOf(num).substring(0, 6);
String asd = str+substring;
return asd;
}
public static void main(String[] args) {
String str ="JZ";
......
......@@ -26,7 +26,7 @@ public class TokenConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
String[] excludePatterns = new String[]{"/swagger-resources/**", "/webjars/**", "/v2/**", "/swagger-ui.html/**",
"/api", "/api-docs", "/api-docs/**", "/doc.html/**", "/koTime/**"};
"/api", "/api-docs", "/api-docs/**", "/doc.html/**", "/koTime/**","/code/codeAdd"};
registry.addInterceptor(tokenInterceptor)
.addPathPatterns("/**")//指定该类拦截的url
.excludePathPatterns(skInterceptorController.findAllUrl())
......
......@@ -4,8 +4,14 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.founder.commonutils.model.newPublicEntity.CodeYymc.CodeYymc;
import com.founder.commonutils.model.newPublicEntity.CodeYymc.SysYhczrz;
import com.founder.commonutils.model.newPublicEntity.MapRestResult;
import com.founder.commonutils.model.newPublicEntity.SysUser;
import com.founder.commonutils.util.HttpUtil;
import com.founder.commonutils.util.KeyUtilNaotu;
import com.founder.servicebase.logs.OperLog;
import com.founder.servicebase.logs.OperationType;
import com.founder.servicebase.mapper.mysqlMapper.CodeYymcMapper;
......@@ -13,11 +19,13 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.stream.Collectors;
......@@ -28,6 +36,12 @@ public class CodeYymcController {
@Autowired
CodeYymcMapper codeYymcMapper;
@Value("${logUrl}")
private String URL ;
@Value("${logtoken}")
private String token ;
@Value("${xzxtlog}")
private String xzxtlog ;
@PostMapping("yymcAdd")
@ApiOperation(value = "添加名称编码")
......@@ -46,6 +60,7 @@ public class CodeYymcController {
} if (codeYymc.getCzlxdm().equals("06")){
codeYymc.setCzlx("导出");
}
codeYymc.setId(KeyUtilNaotu.getJzKey1("cd"));
int insert = codeYymcMapper.insert(codeYymc);
if (insert > 0){
return new MapRestResult(200,"新增成功",codeYymc);
......@@ -59,8 +74,11 @@ public class CodeYymcController {
@ApiOperation(value = "删除名称编码")
@OperLog(message = "删除名称编码", operation = OperationType.DELETE)
public MapRestResult yymcDelete(@RequestBody CodeYymc codeYymc){
codeYymc.setIsDeleted("1");
int i = codeYymcMapper.updateById(codeYymc);
UpdateWrapper<CodeYymc> wrapper = new UpdateWrapper<>();
wrapper.eq("code",codeYymc.getCode());
wrapper.set("is_deleted","1");
int i = codeYymcMapper.update(codeYymc,wrapper);
if (i > 0){
return new MapRestResult(200,"删除成功",codeYymc);
} else {
......@@ -88,8 +106,12 @@ public class CodeYymcController {
@OperLog(message = "查询code编码", operation = OperationType.QUERY)
public MapRestResult yymcSelect(@RequestBody CodeYymc codeYymc){
QueryWrapper<CodeYymc> wrapper = new QueryWrapper<>();
QueryWrapper<CodeYymc> wrapper = new QueryWrapper<>();
// 根据id查询
wrapper.eq(StringUtils.isNotEmpty(codeYymc.getId()),"id",codeYymc.getId());
//根据code模糊
wrapper.like(StringUtils.isNotEmpty(codeYymc.getCode()),"code",codeYymc.getCode());
//根据name模糊
wrapper.like(StringUtils.isNotEmpty(codeYymc.getName()),"name",codeYymc.getName());
//一级名称
......@@ -121,7 +143,9 @@ public class CodeYymcController {
@OperLog(message = "详情", operation = OperationType.QUERY)
public MapRestResult yymcDetail(@RequestBody CodeYymc codeYymc){
CodeYymc codeYymc1 = codeYymcMapper.selectById(codeYymc.getCode());
UpdateWrapper<CodeYymc> wrapper = new UpdateWrapper<>();
wrapper.eq("code",codeYymc.getCode());
CodeYymc codeYymc1 = codeYymcMapper.selectOne(wrapper);
return new MapRestResult(200,"ok",null,codeYymc1);
}
......@@ -142,5 +166,53 @@ public class CodeYymcController {
}
@PostMapping("codeAdd")
@ApiOperation(value = "入200库")
@OperLog(message = "入200库", operation = OperationType.ADD)
public MapRestResult codeAdd( HttpServletRequest request) {
String logcode = request.getHeader("logcode");
// 根据code去查询对应名称;
QueryWrapper wrapper = new QueryWrapper<>();
wrapper.eq("code", logcode);
CodeYymc codeYymc = codeYymcMapper.selectOne(wrapper);
SysUser user = (SysUser) request.getAttribute("user");
SysYhczrz sysYhczrz = new SysYhczrz();
sysYhczrz.setYhGmsfhm(user.getIdentitycard());
sysYhczrz.setYhdwGajgjgdm(user.getUnitcode());
sysYhczrz.setYhdwGajgmc(user.getUnitname());
sysYhczrz.setYhXm(user.getTrueName());
sysYhczrz.setYhIp(user.getIp());
sysYhczrz.setYymcJyqk(codeYymc.getName());
sysYhczrz.setYymcdm(logcode);
sysYhczrz.setCzlxdm(codeYymc.getCzlxdm());
if (codeYymc.getCzlxdm().equals("05")) {
sysYhczrz.setCzxxParam(null);
}
sysYhczrz.setCzlx(codeYymc.getCzlx());
sysYhczrz.setXxdjryXm(user.getLrr());
sysYhczrz.setXxczryGmsfhm(user.getIdentitycard());
sysYhczrz.setXxdjryGmsfhm(user.getIdentitycard());
sysYhczrz.setXxczryXm(user.getTrueName());
sysYhczrz.setXxczdwGajgjgdm(user.getUnitcode());
sysYhczrz.setXxczdwGajgmc(user.getUnitname());
sysYhczrz.setXxdjryLxdh(user.getTelephone());
sysYhczrz.setXxdjdwGajgmc(user.getUnitname());
sysYhczrz.setXxdjdwGajgjgdm(user.getUnitcode());
ObjectMapper mapper = new ObjectMapper();
try {
String json = mapper.writeValueAsString(sysYhczrz);
String s = HttpUtil.doPostJson(URL, json, token);
System.out.println(s);
return new MapRestResult(s);
} catch (JsonProcessingException e) {
e.printStackTrace();
return new MapRestResult("转换异常");
}
}
}
......@@ -12,6 +12,7 @@
<modules>
<module>publicapi</module>
<module>monitoring</module>
<module>powerjob</module>
</modules>
<artifactId>service</artifactId>
<dependencies>
......
# 基础镜像(支持 amd64 & arm64),based on Ubuntu 18.04.4 LTS
FROM adoptopenjdk:8-jdk-hotspot
# 维护者
MAINTAINER tengjiqi@gmail.com
# 下载并安装 maven
RUN curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
RUN tar -zxvf apache-maven-3.6.3-bin.tar.gz && mv apache-maven-3.6.3 /opt/powerjob-maven && rm -rf apache-maven-3.6.3-bin.tar.gz
# 替换 maven 配置文件
RUN rm -rf /opt/powerjob-maven/conf/settings.xml
COPY settings.xml /opt/powerjob-maven/conf/settings.xml
# 设置 maven 环境变量(maven invoker 读取该变量调用 maven)
ENV M2_HOME=/opt/powerjob-maven
# 设置时区
ENV TZ=Asia/Shanghai
# 设置其他环境变量
ENV APP_NAME=powerjob-server
# 传递 SpringBoot 启动参数 和 JVM参数
ENV PARAMS=""
ENV JVMOPTIONS=""
# 将应用 jar 包拷入 docker
COPY powerjob-server.jar /powerjob-server.jar
# 暴露端口(HTTP + AKKA + VertX)
EXPOSE 7700 10086 10010
# 创建 docker 文件目录(盲猜这是用户目录)
RUN mkdir -p /root/powerjob-server
# 挂载数据卷,将文件直接输出到宿主机(注意,此处挂载的是匿名卷,即在宿主机位置随机)
# VOLUME /root/powerjob
# 启动应用
ENTRYPOINT ["sh","-c","java $JVMOPTIONS -jar /powerjob-server.jar $PARAMS"]
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server-common</artifactId>
<version>${project.parent.version}</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
\ No newline at end of file
package tech.powerjob.server.common;
/**
* @author Echo009
* @since 2022/10/2
*/
public class Holder<T> {
private T value;
public Holder(T value) {
this.value = value;
}
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}
package tech.powerjob.server.common;
/**
* 配置文件 key
*
* @author tjq
* @since 2020/8/2
*/
public class PowerJobServerConfigKey {
/**
* akka 协议端口号
*/
public static final String AKKA_PORT = "oms.akka.port";
/**
* http 协议端口号
*/
public static final String HTTP_PORT = "oms.http.port";
/**
* 自定义数据库表前缀
*/
public static final String TABLE_PREFIX = "oms.table-prefix";
/**
* 是否使用 mongoDB
*/
public static final String MONGODB_ENABLE = "oms.mongodb.enable";
/**
* 是否启用 Swagger-UI,默认关闭
*/
public static final String SWAGGER_UI_ENABLE = "oms.swagger.enable";
/**
* 钉钉报警相关
*/
public static final String DING_APP_KEY = "oms.alarm.ding.app-key";
public static final String DING_APP_SECRET = "oms.alarm.ding.app-secret";
public static final String DING_AGENT_ID = "oms.alarm.ding.agent-id";
}
package tech.powerjob.server.common;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.AtomicLong;
/**
* 拒绝策略
*
* @author tjq
* @since 2020/11/28
*/
@Slf4j
public class RejectedExecutionHandlerFactory {
private static final AtomicLong COUNTER = new AtomicLong();
/**
* 拒绝执行,抛出 RejectedExecutionException
* @param source name for log
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newAbort(String source) {
return (r, e) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " + source);
};
}
/**
* 直接丢弃该任务
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newDiscard(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);
};
}
/**
* 调用线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newCallerRun(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
r.run();
}
};
}
/**
* 新线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newThreadRun(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
String threadName = source + "-T-" + COUNTER.getAndIncrement();
log.info("[{}] create new thread[{}] to run job", source, threadName);
new Thread(r, threadName).start();
}
};
}
}
package tech.powerjob.server.common;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
/**
* Splitter & Joiner
*
* @author tjq
* @since 2020/5/27
*/
public class SJ {
public static final Splitter COMMA_SPLITTER = Splitter.on(",");
public static final Joiner COMMA_JOINER = Joiner.on(",");
public static final Joiner MONITOR_JOINER = Joiner.on("|").useForNull("-");
}
package tech.powerjob.server.common.aware;
/**
* PowerJobAware
*
* @author tjq
* @since 2022/9/12
*/
public interface PowerJobAware {
}
package tech.powerjob.server.common.aware;
import tech.powerjob.server.common.module.ServerInfo;
/**
* notify server info
*
* @author tjq
* @since 2022/9/12
*/
public interface ServerInfoAware extends PowerJobAware {
void setServerInfo(ServerInfo serverInfo);
}
package tech.powerjob.server.common.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 容器类型
*
* @author tjq
* @since 2020/5/15
*/
@Getter
@AllArgsConstructor
public enum ContainerSourceType {
FatJar(1, "Jar文件"),
Git(2, "Git代码库");
private final int v;
private final String des;
public static ContainerSourceType of(int v) {
for (ContainerSourceType type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown ContainerSourceType of " + v);
}
}
package tech.powerjob.server.common.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 任务实例类型
*
* @author tjq
* @since 2020/5/29
*/
@Getter
@AllArgsConstructor
public enum InstanceType {
NORMAL(1),
WORKFLOW(2);
private final int v;
}
package tech.powerjob.server.common.constants;
/**
* 线程池
*
* @author tjq
* @since 2022/9/12
*/
public class PJThreadPool {
/**
* 定时调度用线程池
*/
public static final String TIMING_POOL = "PowerJobTimingPool";
/**
* 后台任务异步线程池
*/
public static final String BACKGROUND_POOL = "PowerJobBackgroundPool";
/**
* 本地数据库专用线程池
*/
public static final String LOCAL_DB_POOL = "PowerJobLocalDbPool";
}
package tech.powerjob.server.common.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 支持开/关的状态,如 任务状态(JobStatus)和工作流状态(WorkflowStatus)
*
* @author tjq
* @since 2020/4/6
*/
@Getter
@AllArgsConstructor
public enum SwitchableStatus {
/**
*
*/
ENABLE(1),
DISABLE(2),
DELETED(99);
private final int v;
public static SwitchableStatus of(int v) {
for (SwitchableStatus type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown SwitchableStatus of " + v);
}
}
package tech.powerjob.server.common.module;
import lombok.Data;
/**
* current server info
*
* @author tjq
* @since 2022/9/12
*/
@Data
public class ServerInfo {
private Long id;
private String ip;
private long bornTime;
private String version = "UNKNOWN";
}
package tech.powerjob.server.common.module;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.common.request.WorkerHeartbeat;
import java.util.List;
/**
* worker info
*
* @author tjq
* @since 2021/2/7
*/
@Data
@Slf4j
public class WorkerInfo {
private String address;
private long lastActiveTime;
private String protocol;
private String client;
private String tag;
private int lightTaskTrackerNum;
private int heavyTaskTrackerNum;
private long lastOverloadTime;
private boolean overloading;
private SystemMetrics systemMetrics;
private List<DeployedContainerInfo> containerInfos;
private static final long WORKER_TIMEOUT_MS = 60000;
public void refresh(WorkerHeartbeat workerHeartbeat) {
address = workerHeartbeat.getWorkerAddress();
lastActiveTime = workerHeartbeat.getHeartbeatTime();
protocol = workerHeartbeat.getProtocol();
client = workerHeartbeat.getClient();
tag = workerHeartbeat.getTag();
systemMetrics = workerHeartbeat.getSystemMetrics();
containerInfos = workerHeartbeat.getContainerInfos();
lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();
if (workerHeartbeat.isOverload()) {
overloading = true;
lastOverloadTime = workerHeartbeat.getHeartbeatTime();
log.warn("[WorkerInfo] worker {} is overload!", getAddress());
} else {
overloading = false;
}
}
public boolean timeout() {
long timeout = System.currentTimeMillis() - lastActiveTime;
return timeout > WORKER_TIMEOUT_MS;
}
public boolean overload() {
return overloading;
}
}
package tech.powerjob.server.common.thread;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Echo009
* @since 2022/10/12
*/
@Slf4j
public class NewThreadRunRejectedExecutionHandler implements RejectedExecutionHandler {
private static final AtomicLong COUNTER = new AtomicLong();
private final String source;
public NewThreadRunRejectedExecutionHandler(String source) {
this.source = source;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
String threadName = source + "-T-" + COUNTER.getAndIncrement();
log.info("[{}] create new thread[{}] to run job", source, threadName);
new Thread(r, threadName).start();
}
}
}
package tech.powerjob.server.common.timewheel;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 定时器
*
* @author tjq
* @since 2020/4/2
*/
public interface Timer {
/**
* 调度定时任务
*/
TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
/**
* 停止所有调度任务
*/
Set<TimerTask> stop();
}
package tech.powerjob.server.common.timewheel;
/**
* TimerFuture
*
* @author tjq
* @since 2020/4/3
*/
public interface TimerFuture {
TimerTask getTask();
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
*/
boolean cancel();
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
}
package tech.powerjob.server.common.timewheel;
/**
* 时间任务接口
*
* @author tjq
* @since 2020/4/2
*/
@FunctionalInterface
public interface TimerTask extends Runnable {
}
package tech.powerjob.server.common.timewheel.holder;
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
import tech.powerjob.server.common.timewheel.Timer;
/**
* 时间轮单例
*
* @author tjq
* @since 2020/4/5
*/
public class HashedWheelTimerHolder {
/**
* 非精确时间轮,每 5S 走一格
*/
public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0);
private HashedWheelTimerHolder() {
}
}
package tech.powerjob.server.common.timewheel.holder;
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
import tech.powerjob.server.common.timewheel.Timer;
import tech.powerjob.server.common.timewheel.TimerFuture;
import tech.powerjob.server.common.timewheel.TimerTask;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 定时调度任务实例
*
* @author tjq
* @since 2020/7/25
*/
public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
/**
* 精确调度时间轮,每 1MS 走一格
*/
private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
/**
* 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
*/
private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
/**
* 支持取消的时间间隔,低于该阈值则不会放进 CARGO
*/
private static final long MIN_INTERVAL_MS = 1000;
/**
* 长延迟阈值
*/
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
/**
* 定时调度
* @param uniqueId 唯一 ID,必须是 snowflake 算法生成的 ID
* @param delayMS 延迟毫秒数
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
if (delayMS <= LONG_DELAY_THRESHOLD_MS) {
realSchedule(uniqueId, delayMS, timerTask);
return;
}
long expectTriggerTime = System.currentTimeMillis() + delayMS;
TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> {
CARGO.remove(uniqueId);
realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask);
}, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS);
CARGO.put(uniqueId, longDelayTask);
}
/**
* 获取 TimerFuture
* @param uniqueId 唯一 ID
* @return TimerFuture
*/
public static TimerFuture fetchTimerFuture(Long uniqueId) {
return CARGO.get(uniqueId);
}
private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
}
}
}
package tech.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.lang.reflect.Method;
/**
* AOP Utils
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
public class AOPUtils {
private static final ExpressionParser PARSER = new SpelExpressionParser();
private static final ParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer();
public static String parseRealClassName(JoinPoint joinPoint) {
return joinPoint.getSignature().getDeclaringType().getSimpleName();
}
public static Method parseMethod(ProceedingJoinPoint joinPoint) {
Signature pointSignature = joinPoint.getSignature();
if (!(pointSignature instanceof MethodSignature)) {
throw new IllegalArgumentException("this annotation should be used on a method!");
}
MethodSignature signature = (MethodSignature) pointSignature;
Method method = signature.getMethod();
if (method.getDeclaringClass().isInterface()) {
try {
method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());
} catch (SecurityException | NoSuchMethodException e) {
ExceptionUtils.rethrow(e);
}
}
return method;
}
public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
String[] params = DISCOVERER.getParameterNames(method);
assert params != null;
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], arguments[len]);
}
try {
Expression expression = PARSER.parseExpression(spEl);
return expression.getValue(context, clazz);
} catch (Exception e) {
log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);
return defaultResult;
}
}
}
package tech.powerjob.server.common.utils;
import tech.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.DigestUtils;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.URLEncoder;
/**
* 文件工具类,统一文件存放地址
*
* @author tjq
* @since 2020/5/15
*/
public class OmsFileUtils {
private static final String USER_HOME = System.getProperty("user.home", "oms");
private static final String COMMON_PATH = USER_HOME + "/powerjob/server/";
/**
* 获取在线日志的存放路径
* @return 在线日志的存放路径
*/
public static String genLogDirPath() {
return COMMON_PATH + "online_log/";
}
/**
* 获取用于构建容器的 jar 文件存放路径
* @return 路径
*/
public static String genContainerJarPath() {
return COMMON_PATH + "container/";
}
/**
* 获取临时目录(固定目录)
* @return 目录
*/
public static String genTemporaryPath() {
return COMMON_PATH + "temporary/";
}
/**
* 获取临时目录(随机目录,不会重复),用完记得删除
* @return 临时目录
*/
public static String genTemporaryWorkPath() {
return genTemporaryPath() + CommonUtils.genUUID() + "/";
}
/**
* 获取 H2 数据库工作目录
* @return H2 工作目录
*/
public static String genH2BasePath() {
return COMMON_PATH + "h2/";
}
public static String genH2WorkPath() {
return genH2BasePath() + CommonUtils.genUUID() + "/";
}
/**
* 将文本写入文件
* @param content 文本内容
* @param file 文件
*/
public static void string2File(String content, File file) {
try(FileWriter fw = new FileWriter(file)) {
fw.write(content);
}catch (IOException ie) {
ExceptionUtils.rethrow(ie);
}
}
/**
* 输出文件(对外下载功能)
* @param file 文件
* @param response HTTP响应
* @throws IOException 异常
*/
public static void file2HttpResponse(File file, HttpServletResponse response) throws IOException {
response.setContentType("application/octet-stream");
response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(file.getName(), "UTF-8"));
byte[] buffer = new byte[4096];
try (BufferedOutputStream bos = new BufferedOutputStream(response.getOutputStream());
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file))) {
while (bis.read(buffer) != -1) {
bos.write(buffer);
}
}
}
/**
* 计算文件的 MD5
* @param f 文件
* @return md5
* @throws IOException 异常
*/
public static String md5(File f) throws IOException {
String md5;
try(FileInputStream fis = new FileInputStream(f)) {
md5 = DigestUtils.md5DigestAsHex(fis);
}
return md5;
}
}
package tech.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.InputStream;
import java.net.URL;
import java.util.Objects;
import java.util.Properties;
/**
* 加载配置文件
*
* @author tjq
* @since 2020/5/18
*/
@Slf4j
public class PropertyUtils {
private static final Properties PROPERTIES = new Properties();
public static Properties getProperties() {
return PROPERTIES;
}
public static void init() {
URL propertiesURL =PropertyUtils.class.getClassLoader().getResource("application.properties");
Objects.requireNonNull(propertiesURL);
try (InputStream is = propertiesURL.openStream()) {
PROPERTIES.load(is);
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
}
package tech.powerjob.server.common.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring ApplicationContext 工具类
*
* @author tjq
* @since 2020/4/7
*/
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext context;
public static <T> T getBean(Class<T> clz) {
return context.getBean(clz);
}
public static Object getBean(String beanName) {
return context.getBean(beanName);
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
context = ctx;
}
}
package tech.powerjob.server.common.utils;
import tech.powerjob.common.RemoteConstant;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ntp.NTPUDPClient;
import org.apache.commons.net.ntp.NtpV3Packet;
import org.apache.commons.net.ntp.TimeInfo;
import java.net.InetAddress;
import java.util.List;
/**
* 时间工具
*
* @author tjq
* @since 2020/5/19
*/
@Slf4j
public class TimeUtils {
/**
* NTP 授时服务器(阿里云 -> 交大 -> 水果)
*/
private static final List<String> NTP_SERVER_LIST = Lists.newArrayList("ntp.aliyun.com", "ntp.sjtu.edu.cn", "time1.apple.com");
/**
* 最大误差 5S
*/
private static final long MAX_OFFSET = 5000;
public static void check() throws TimeCheckException {
NTPUDPClient timeClient = new NTPUDPClient();
try {
timeClient.setDefaultTimeout((int) RemoteConstant.DEFAULT_TIMEOUT_MS);
for (String address : NTP_SERVER_LIST) {
try {
TimeInfo t = timeClient.getTime(InetAddress.getByName(address));
NtpV3Packet ntpV3Packet = t.getMessage();
log.info("[TimeUtils] use ntp server: {}, request result: {}", address, ntpV3Packet);
// RFC-1305标准:https://tools.ietf.org/html/rfc1305
// 忽略传输误差吧...也就几十毫秒的事(阿里云给力啊!)
long local = System.currentTimeMillis();
long ntp = ntpV3Packet.getTransmitTimeStamp().getTime();
long offset = local - ntp;
if (Math.abs(offset) > MAX_OFFSET) {
String msg = String.format("inaccurate server time(local:%d, ntp:%d), please use ntp update to calibration time", local, ntp);
throw new TimeCheckException(msg);
}
return;
}catch (Exception ignore) {
log.warn("[TimeUtils] ntp server: {} may down!", address);
}
}
throw new TimeCheckException("no available ntp server, maybe alibaba, sjtu and apple are both collapse");
}finally {
timeClient.close();
}
}
public static final class TimeCheckException extends RuntimeException {
public TimeCheckException(String message) {
super(message);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server-core</artifactId>
<version>${project.parent.version}</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-extension</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-remote</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-persistence</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package tech.powerjob.server.core.container;
import tech.powerjob.common.ContainerConstant;
import net.lingala.zip4j.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.common.utils.OmsFileUtils;
import java.io.*;
import java.util.Objects;
/**
* oms-worker container 生成器
*
* @author tjq
* @since 2020/5/15
*/
public class ContainerTemplateGenerator {
private static final String ORIGIN_FILE_NAME = "oms-template-origin";
/**
* 生成 container 的模版文件
* @param group pom group标签
* @param artifact pom artifact标签
* @param name pom name标签
* @param packageName 包名
* @param javaVersion Java版本
* @return 压缩包
* @throws IOException 异常
*/
public static File generate(String group, String artifact, String name, String packageName, Integer javaVersion) throws IOException {
String workerDir = OmsFileUtils.genTemporaryWorkPath();
File originJar = new File(workerDir + "tmp.jar");
String tmpPath = workerDir + "/unzip/";
// CentOS 7 上 getResource 会报 FileNotFoundException,原因不详...
try (InputStream is = ContainerTemplateGenerator.class.getClassLoader().getResourceAsStream(ORIGIN_FILE_NAME + ".zip")) {
Objects.requireNonNull(is, "generate container template failed, can't find zip file in classpath.");
FileUtils.copyInputStreamToFile(is, originJar);
}
ZipFile zipFile = new ZipFile(originJar);
zipFile.extractAll(tmpPath);
String rootPath = tmpPath + ORIGIN_FILE_NAME;
// 1. 修改 pom.xml (按行读,读取期间更改,然后回写)
String pomPath = rootPath + "/pom.xml";
String line;
StringBuilder buffer = new StringBuilder();
try (BufferedReader br = new BufferedReader(new FileReader(pomPath))) {
while ((line = br.readLine()) != null) {
if (line.contains("<groupId>groupId</groupId>")) {
buffer.append(" <groupId>").append(group).append("</groupId>");
}else if (line.contains("<artifactId>artifactId</artifactId>")) {
buffer.append(" <artifactId>").append(artifact).append("</artifactId>");
}else if (line.contains("<name>name</name>")) {
buffer.append(" <name>").append(name).append("</name>");
}else if (line.contains("<maven.compiler.source>")) {
buffer.append(" <maven.compiler.source>").append(javaVersion).append("</maven.compiler.source>");
}else if (line.contains("<maven.compiler.target>")) {
buffer.append(" <maven.compiler.target>").append(javaVersion).append("</maven.compiler.target>");
} else {
buffer.append(line);
}
buffer.append(System.lineSeparator());
}
}
OmsFileUtils.string2File(buffer.toString(), new File(pomPath));
// 2. 新建目录
String packagePath = StringUtils.replace(packageName, ".", "/");
String absPath = rootPath + "/src/main/java/" + packagePath;
FileUtils.forceMkdir(new File(absPath));
// 3. 修改 Spring 配置文件
String resourcePath = rootPath + "/src/main/resources/";
String springXMLPath = resourcePath + ContainerConstant.SPRING_CONTEXT_FILE_NAME;
buffer.setLength(0);
try (BufferedReader br = new BufferedReader(new FileReader(springXMLPath))) {
while ((line = br.readLine()) != null) {
if (line.contains("<context:component-scan base-package=\"")) {
buffer.append(" <context:component-scan base-package=\"").append(packageName).append("\"/>");
}else {
buffer.append(line);
}
buffer.append(System.lineSeparator());
}
}
OmsFileUtils.string2File(buffer.toString(), new File(springXMLPath));
// 4. 写入 packageName,便于容器加载用户类
String propertiesPath = resourcePath + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME;
String properties = ContainerConstant.CONTAINER_PACKAGE_NAME_KEY + "=" + packageName;
OmsFileUtils.string2File(properties, new File(propertiesPath));
// 5. 再打包回去
String finPath = tmpPath + "template.zip";
ZipFile finZip = new ZipFile(finPath);
finZip.addFolder(new File(rootPath));
// 6. 删除源文件
FileUtils.forceDelete(originJar);
return finZip.getFile();
}
}
package tech.powerjob.server.core.evaluator;
/**
* @author Echo009
* @since 2021/12/10
*/
public interface Evaluator {
/**
* 使用给定输入计算表达式
*
* @param expression 可执行的表达式
* @param input 输入
* @return 计算结果
*/
Object evaluate(String expression, Object input);
}
package tech.powerjob.server.core.evaluator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
/**
* @author Echo009
* @since 2021/12/10
*/
@Slf4j
@Component
public class GroovyEvaluator implements Evaluator {
private static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("groovy");
@Override
@SneakyThrows
public Object evaluate(String expression, Object input) {
Bindings bindings = ENGINE.createBindings();
bindings.put("context", input);
return ENGINE.eval(expression, bindings);
}
}
package tech.powerjob.server.core.handler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import static tech.powerjob.common.RemoteConstant.*;
/**
* wrapper monitor for IWorkerRequestHandler
*
* @author tjq
* @since 2022/9/11
*/
@RequiredArgsConstructor
@Slf4j
public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
protected final MonitorService monitorService;
protected final Environment environment;
protected final ContainerInfoRepository containerInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);
protected abstract AskResponse processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception;
protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event);
@Override
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
long startMs = System.currentTimeMillis();
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
.setAppName(heartbeat.getAppName())
.setAppId(heartbeat.getAppId())
.setVersion(heartbeat.getVersion())
.setProtocol(heartbeat.getProtocol())
.setTag(heartbeat.getTag())
.setWorkerAddress(heartbeat.getWorkerAddress())
.setDelayMs(startMs - heartbeat.getHeartbeatTime())
.setScore(heartbeat.getSystemMetrics().getScore());
processWorkerHeartbeat0(heartbeat, event);
monitorService.monitor(event);
}
@Override
@Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING)
public AskResponse processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
long startMs = System.currentTimeMillis();
TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent()
.setAppId(req.getAppId())
.setJobId(req.getJobId())
.setInstanceId(req.getInstanceId())
.setWfInstanceId(req.getWfInstanceId())
.setInstanceStatus(InstanceStatus.of(req.getInstanceStatus()))
.setDelayMs(startMs - req.getReportTime())
.setServerProcessStatus(TtReportInstanceStatusEvent.Status.SUCCESS);
try {
return processTaskTrackerReportInstanceStatus0(req, event);
} catch (Exception e) {
event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED);
log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e);
return AskResponse.failed(ExceptionUtils.getMessage(e));
} finally {
event.setServerProcessCost(System.currentTimeMillis() - startMs);
monitorService.monitor(event);
}
}
@Override
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
public void processWorkerLogReport(WorkerLogReportReq req) {
WorkerLogReportEvent event = new WorkerLogReportEvent()
.setWorkerAddress(req.getWorkerAddress())
.setLogNum(req.getInstanceLogContents().size());
try {
processWorkerLogReport0(req, event);
event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
} catch (RejectedExecutionException re) {
event.setStatus(WorkerLogReportEvent.Status.REJECTED);
} catch (Throwable t) {
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
log.warn("[WorkerRequestHandler] process worker report failed!", t);
} finally {
monitorService.monitor(event);
}
}
@Override
@Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING)
public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
Long jobId = req.getJobId();
Long appId = req.getAppId();
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (jobInfoOpt.isPresent()) {
JobInfoDO jobInfo = jobInfoOpt.get();
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
return askResponse;
}
@Override
@Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING)
public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) {
String port = environment.getProperty("local.server.port");
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
BeanUtils.copyProperties(containerInfo, dpReq);
dpReq.setContainerId(containerInfo.getId());
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
dpReq.setDownloadURL(downloadURL);
askResponse.setData(JsonUtils.toBytes(dpReq));
}
return askResponse;
}
}
package tech.powerjob.server.core.handler;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
/**
* 定义 server 与 worker 之间需要处理的协议
*
* @author tjq
* @since 2022/9/10
*/
public interface IWorkerRequestHandler {
/**
* 处理 worker 上报的心跳信息
* @param heartbeat 心跳信息
*/
void processWorkerHeartbeat(WorkerHeartbeat heartbeat);
/**
* 处理 TaskTracker 的任务实例上报
* @param req 上报请求
* @return 响应信息
*/
AskResponse processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
/**
* 处理 worker 查询执行器集群
* @param req 请求
* @return cluster info
*/
AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req);
/**
* 处理 worker 日志推送请求(内部使用线程池异步处理,非阻塞)
* @param req 请求
*/
void processWorkerLogReport(WorkerLogReportReq req);
/**
* 处理 worker 的容器部署请求
* @param request 请求
* @return 容器部署信息
*/
AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request);
}
package tech.powerjob.server.core.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.request.WorkerHeartbeat;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
/**
* receive and process worker's request
*
* @author tjq
* @since 2022/9/11
*/
@Slf4j
@Component
@Actor(path = RemoteConstant.S4W_PATH)
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
private final InstanceManager instanceManager;
private final WorkflowInstanceManager workflowInstanceManager;
private final InstanceLogService instanceLogService;
public WorkerRequestHandlerImpl(InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, InstanceLogService instanceLogService,
MonitorService monitorService, Environment environment, ContainerInfoRepository containerInfoRepository, WorkerClusterQueryService workerClusterQueryService) {
super(monitorService, environment, containerInfoRepository, workerClusterQueryService);
this.instanceManager = instanceManager;
this.workflowInstanceManager = workflowInstanceManager;
this.instanceLogService = instanceLogService;
}
@Override
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
WorkerClusterManagerService.updateStatus(heartbeat);
}
@Override
protected AskResponse processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception {
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
// 更新工作流上下文信息
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
}
instanceManager.updateStatus(req);
// 结束状态(成功/失败)需要回复消息
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
return AskResponse.succeed(null);
}
return null;
}
@Override
protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
}
package tech.powerjob.server.core.helper;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
/**
* @author Echo009
* @since 2021/12/13
*/
public class StatusMappingHelper {
private StatusMappingHelper(){
}
/**
* 工作流实例状态转任务实例状态
*/
public static InstanceStatus toInstanceStatus(WorkflowInstanceStatus workflowInstanceStatus) {
switch (workflowInstanceStatus) {
case FAILED:
return InstanceStatus.FAILED;
case SUCCEED:
return InstanceStatus.SUCCEED;
case RUNNING:
return InstanceStatus.RUNNING;
case STOPPED:
return InstanceStatus.STOPPED;
default:
return null;
}
}
}
package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* 存储 instance 对应的 JobInfo 信息
*
* @author tjq
* @since 2020/6/23
*/
@Service
@RequiredArgsConstructor
public class InstanceMetadataService implements InitializingBean {
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
/**
* 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变)
*/
private Cache<Long, JobInfoDO> instanceId2JobInfoCache;
@Value("${oms.instance.metadata.cache.size}")
private int instanceMetadataCacheSize;
private static final int CACHE_CONCURRENCY_LEVEL = 16;
@Override
public void afterPropertiesSet() throws Exception {
instanceId2JobInfoCache = CacheBuilder.newBuilder()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(instanceMetadataCacheSize)
.softValues()
.build();
}
/**
* 根据 instanceId 获取 JobInfo
* @param instanceId instanceId
* @return JobInfoDO
* @throws ExecutionException 异常
*/
public JobInfoDO fetchJobInfoByInstanceId(Long instanceId) throws ExecutionException {
return instanceId2JobInfoCache.get(instanceId, () -> {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo != null) {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instanceInfo.getJobId());
return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobInfo by jobId: " + instanceInfo.getJobId()));
}
throw new IllegalArgumentException("can't find Instance by instanceId: " + instanceId);
});
}
/**
* 装载缓存
* @param instanceId instanceId
* @param jobInfoDO 原始的任务数据
*/
public void loadJobInfo(Long instanceId, JobInfoDO jobInfoDO) {
instanceId2JobInfoCache.put(instanceId, jobInfoDO);
}
/**
* 失效缓存
* @param instanceId instanceId
*/
public void invalidateJobInfo(Long instanceId) {
instanceId2JobInfoCache.invalidate(instanceId);
}
}
package tech.powerjob.server.core.lock;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* use cached lock to make concurrent safe
*
* @author tjq
* @author Echo009
* @since 1/16/21
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {
String type();
String key();
int concurrencyLevel();
}
package tech.powerjob.server.core.lock;
import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.utils.AOPUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.lock.SlowLockEvent;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* aspect for @UseSegmentLock
*
* @author tjq
* @since 1/16/21
*/
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {
private final MonitorService monitorService;
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
private static final long SLOW_THRESHOLD = 100;
@Around(value = "@annotation(useCacheLock))")
public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
int concurrencyLevel = useCacheLock.concurrencyLevel();
log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
return CacheBuilder.newBuilder()
.initialCapacity(300000)
.maximumSize(500000)
.concurrencyLevel(concurrencyLevel)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
});
final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {
long timeCost = System.currentTimeMillis() - start;
if (timeCost > SLOW_THRESHOLD) {
final SlowLockEvent slowLockEvent = new SlowLockEvent()
.setType(SlowLockEvent.Type.LOCAL)
.setLockType(useCacheLock.type())
.setLockKey(String.valueOf(key))
.setCallerService(method.getDeclaringClass().getSimpleName())
.setCallerMethod(method.getName())
.setCost(timeCost);
monitorService.monitor(slowLockEvent);
log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
key,
JSON.toJSONString(point.getArgs()));
}
return point.proceed();
} finally {
reentrantLock.unlock();
}
}
}
package tech.powerjob.server.core.scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import java.io.File;
import java.util.Date;
/**
* CCO(Chief Clean Officer)
*
* @author tjq
* @since 2020/5/18
*/
@Slf4j
@Service
public class CleanService {
private final GridFsManager gridFsManager;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final LockService lockService;
private final int instanceInfoRetentionDay;
private final int localContainerRetentionDay;
private final int remoteContainerRetentionDay;
private static final int TEMPORARY_RETENTION_DAY = 3;
/**
* 每天凌晨3点定时清理
*/
private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?";
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
@Value("${oms.container.retention.local}") int localContainerRetentionDay,
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
this.gridFsManager = gridFsManager;
this.instanceInfoRepository = instanceInfoRepository;
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
this.workflowNodeInfoRepository = workflowNodeInfoRepository;
this.lockService = lockService;
this.instanceInfoRetentionDay = instanceInfoRetentionDay;
this.localContainerRetentionDay = localContainerRetentionDay;
this.remoteContainerRetentionDay = remoteContainerRetentionDay;
}
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
public void timingClean() {
// 释放本地缓存
WorkerClusterManagerService.cleanUp();
// 释放磁盘空间
cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay);
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
// 删除数据库历史的数据
cleanByOneServer();
}
/**
* 只能一台server清理的操作统一到这里执行
*/
private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L);
if (!lock) {
log.info("[CleanService] clean job is already running, just return.");
return;
}
try {
// 删除数据库运行记录
cleanInstanceLog();
cleanWorkflowInstanceLog();
// 删除无用节点
cleanWorkflowNodeInfo();
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally {
lockService.unlock(HISTORY_DELETE_LOCK);
}
}
@VisibleForTesting
public void cleanLocal(String path, int day) {
if (day < 0) {
log.info("[CleanService] won't clean up {} because of offset day <= 0.", path);
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
File dir = new File(path);
if (!dir.exists()) {
return;
}
File[] logFiles = dir.listFiles();
if (logFiles == null || logFiles.length == 0) {
return;
}
// 计算最大偏移量
long maxOffset = day * 24 * 60 * 60 * 1000L;
for (File f : logFiles) {
long offset = System.currentTimeMillis() - f.lastModified();
if (offset >= maxOffset) {
if (!f.delete()) {
log.warn("[CleanService] delete file({}) failed.", f.getName());
}else {
log.info("[CleanService] delete file({}) successfully.", f.getName());
}
}
}
log.info("[CleanService] clean {} successfully, using {}.", path, stopwatch.stop());
}
@VisibleForTesting
public void cleanRemote(String bucketName, int day) {
if (day < 0) {
log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName);
return;
}
if (gridFsManager.available()) {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
gridFsManager.deleteBefore(bucketName, day);
}catch (Exception e) {
log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e);
}
log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop());
}
}
@VisibleForTesting
public void cleanInstanceLog() {
if (instanceInfoRetentionDay < 0) {
return;
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.FINISHED_STATUS);
log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean instanceInfo failed.", e);
}
}
@VisibleForTesting
public void cleanWorkflowInstanceLog() {
if (instanceInfoRetentionDay < 0) {
return;
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.FINISHED_STATUS);
log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean workflow instanceInfo failed.", e);
}
}
@VisibleForTesting
public void cleanWorkflowNodeInfo(){
try {
// 清理一天前创建的,且没有工作流 ID 的节点信息
Date t = DateUtils.addDays(new Date(), -1);
int num = workflowNodeInfoRepository.deleteAllByWorkflowIdIsNullAndGmtCreateBefore(t);
log.info("[CleanService] deleted {} node records whose create time before {} and workflowId is null.", num, t);
} catch (Exception e) {
log.warn("[CleanService] clean workflow node info failed.", e);
}
}
}
package tech.powerjob.server.core.scheduler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* @author Echo009
* @since 2022/10/12
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class CoreScheduleTaskManager implements InitializingBean, DisposableBean {
private final PowerScheduleService powerScheduleService;
private final InstanceStatusCheckService instanceStatusCheckService;
private final List<Thread> coreThreadContainer = new ArrayList<>();
@SuppressWarnings("AlibabaAvoidManuallyCreateThread")
@Override
public void afterPropertiesSet() {
// 定时调度
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronJob), "Thread-ScheduleCronJob"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
// 数据清理
coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));
// 状态检查
coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));
coreThreadContainer.forEach(Thread::start);
}
@Override
public void destroy() {
coreThreadContainer.forEach(Thread::interrupt);
}
@RequiredArgsConstructor
private static class LoopRunnable implements Runnable {
private final String taskName;
private final Long runningInterval;
private final Runnable innerRunnable;
@SuppressWarnings("BusyWait")
@Override
public void run() {
log.info("start task : {}.", taskName);
while (true) {
try {
innerRunnable.run();
Thread.sleep(runningInterval);
} catch (InterruptedException e) {
log.warn("[{}] task has been interrupted!", taskName, e);
break;
} catch (Exception e) {
log.error("[{}] task failed!", taskName, e);
}
}
}
}
}
package tech.powerjob.server.core.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Echo009
* @since 2022/3/21
*/
@Slf4j
@Service
public class TimingStrategyService {
private static final int NEXT_N_TIMES = 5;
private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");
private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;
public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {
// init
strategyContainer = new EnumMap<>(TimeExpressionType.class);
for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {
strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);
}
}
/**
* 计算接下来几次的调度时间
*
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 调度时间列表
*/
public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);
List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);
Long nextTriggerTime = System.currentTimeMillis();
do {
nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);
if (nextTriggerTime == null) {
break;
}
triggerTimeList.add(nextTriggerTime);
} while (triggerTimeList.size() < NEXT_N_TIMES);
if (triggerTimeList.isEmpty()) {
return TIPS;
}
return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());
}
/**
* 计算下次的调度时间
*
* @param preTriggerTime 上次触发时间(nullable)
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 下次的调度时间
*/
public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {
preTriggerTime = System.currentTimeMillis();
}
return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);
}
/**
* 计算下次的调度时间并检查校验规则
*
* @param timeExpressionType 定时表达式类型
* @param timeExpression 表达式
* @param startTime 起始时间(include)
* @param endTime 结束时间(include)
* @return 下次的调度时间
*/
public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime);
if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {
throw new PowerJobException("time expression is out of date: " + timeExpression);
}
return nextTriggerTime;
}
public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
if (endTime != null) {
if (endTime <= System.currentTimeMillis()) {
throw new PowerJobException("lifecycle is out of date!");
}
if (startTime != null && startTime > endTime) {
throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");
}
}
getHandler(timeExpressionType).validate(timeExpression);
}
private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {
TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);
if (timingStrategyHandler == null) {
throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);
}
return timingStrategyHandler;
}
}
package tech.powerjob.server.core.scheduler.auxiliary;
/**
* @author Echo009
* @since 2022/3/22
*/
public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {
@Override
public void validate(String timeExpression) {
// do nothing
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
// do nothing
return null;
}
}
package tech.powerjob.server.core.scheduler.auxiliary;
import tech.powerjob.common.enums.TimeExpressionType;
/**
* @author Echo009
* @since 2022/2/24
*/
public interface TimingStrategyHandler {
/**
* 校验表达式
*
* @param timeExpression 时间表达式
*/
void validate(String timeExpression);
/**
* 计算下次触发时间
*
* @param preTriggerTime 上次触发时间 (not null)
* @param timeExpression 时间表达式
* @param startTime 开始时间(include)
* @param endTime 结束时间(include)
* @return next trigger time
*/
Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);
/**
* 支持的定时策略
*
* @return TimeExpressionType
*/
TimeExpressionType supportType();
}
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.API;
}
}
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.TimingStrategyHandler;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Optional;
/**
* @author Echo009
* @since 2022/2/24
*/
@Component
public class CronTimingStrategyHandler implements TimingStrategyHandler {
private final CronParser cronParser;
/**
* @see CronDefinitionBuilder#instanceDefinitionFor
* <p>
* Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter.
* https://github.com/PowerJob/PowerJob/issues/382
*/
public CronTimingStrategyHandler() {
CronDefinition cronDefinition = CronDefinitionBuilder.defineCron()
.withSeconds().withValidRange(0, 59).and()
.withMinutes().withValidRange(0, 59).and()
.withHours().withValidRange(0, 23).and()
.withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and()
.withMonth().withValidRange(1, 12).and()
.withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and()
.withYear().withValidRange(1970, 2099).withStrictRange().optional().and()
.instance();
this.cronParser = new CronParser(cronDefinition);
}
@Override
public void validate(String timeExpression) {
cronParser.parse(timeExpression);
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
Cron cron = cronParser.parse(timeExpression);
ExecutionTime executionTime = ExecutionTime.forCron(cron);
if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
// 需要计算出离 startTime 最近的一次真正的触发时间
Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));
preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);
}
Instant instant = Instant.ofEpochMilli(preTriggerTime);
ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);
if (opt.isPresent()) {
long nextTriggerTime = opt.get().toEpochSecond() * 1000;
if (endTime != null && endTime < nextTriggerTime) {
return null;
}
return nextTriggerTime;
}
return null;
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.CRON;
}
}
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public void validate(String timeExpression) {
long delay;
try {
delay = Long.parseLong(timeExpression);
} catch (Exception e) {
throw new PowerJobException("invalid timeExpression!");
}
// 默认 120s ,超过这个限制应该考虑使用其他类型以减少资源占用
int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
if (delay > maxInterval) {
throw new PowerJobException("the delay must be less than " + maxInterval + "ms");
}
if (delay <= 0) {
throw new PowerJobException("the delay must be greater than 0 ms");
}
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.FIXED_DELAY;
}
}
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public void validate(String timeExpression) {
long delay;
try {
delay = Long.parseLong(timeExpression);
} catch (Exception e) {
throw new PowerJobException("invalid timeExpression!");
}
// 默认 120s ,超过这个限制应该使用考虑使用其他类型以减少资源占用
int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
if (delay > maxInterval) {
throw new PowerJobException("the rate must be less than " + maxInterval + "ms");
}
if (delay <= 0) {
throw new PowerJobException("the rate must be greater than 0 ms");
}
}
@Override
public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
long r = startTime != null && startTime > preTriggerTime
? startTime : preTriggerTime + Long.parseLong(timeExpression);
return endTime != null && endTime < r ? null : r;
}
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.FIXED_RATE;
}
}
package tech.powerjob.server.core.scheduler.auxiliary.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.server.core.scheduler.auxiliary.AbstractTimingStrategyHandler;
/**
* @author Echo009
* @since 2022/3/22
*/
@Component
public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {
@Override
public TimeExpressionType supportType() {
return TimeExpressionType.WORKFLOW;
}
}
package tech.powerjob.server.core.service;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 应用信息服务
*
* @author tjq
* @since 2020/6/20
*/
@Service
@RequiredArgsConstructor
public class AppInfoService {
private final AppInfoRepository appInfoRepository;
/**
* 验证应用访问权限
* @param appName 应用名称
* @param password 密码
* @return 应用ID
*/
public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new PowerJobException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
throw new PowerJobException("password error!");
}
}
package tech.powerjob.server.core.service;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import java.time.Duration;
import java.util.Optional;
/**
* 本地缓存常用数据查询操作
*
* @author tjq
* @since 2020/4/14
*/
@Slf4j
@Service
public class CacheService {
private final JobInfoRepository jobInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final Cache<Long, String> jobId2JobNameCache;
private final Cache<Long, String> workflowId2WorkflowNameCache;
private final Cache<Long, Long> instanceId2AppId;
private final Cache<Long, Long> jobId2AppId;
public CacheService(JobInfoRepository jobInfoRepository, WorkflowInfoRepository workflowInfoRepository, InstanceInfoRepository instanceInfoRepository) {
this.jobInfoRepository = jobInfoRepository;
this.workflowInfoRepository = workflowInfoRepository;
this.instanceInfoRepository = instanceInfoRepository;
jobId2JobNameCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(1))
.maximumSize(512)
.softValues()
.build();
workflowId2WorkflowNameCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(1))
.maximumSize(512)
.softValues()
.build();
instanceId2AppId = CacheBuilder.newBuilder()
.maximumSize(1024)
.softValues()
.build();
jobId2AppId = CacheBuilder.newBuilder()
.maximumSize(1024)
.softValues()
.build();
}
/**
* 根据 jobId 查询 jobName(不保证数据一致性,或者说只要改了数据必不一致hhh)
* @param jobId 任务ID
* @return 任务名称
*/
public String getJobName(Long jobId) {
try {
return jobId2JobNameCache.get(jobId, () -> {
Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
// 防止缓存穿透 hhh(但是一开始没有,后来创建的情况下会有问题,不过问题不大,这里就不管了)
return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse("");
});
}catch (Exception e) {
log.error("[CacheService] getJobName for {} failed.", jobId, e);
}
return null;
}
/**
* 根据 workflowId 查询 工作流名称
* @param workflowId 工作流ID
* @return 工作流名称
*/
public String getWorkflowName(Long workflowId) {
try {
return workflowId2WorkflowNameCache.get(workflowId, () -> {
Optional<WorkflowInfoDO> jobInfoDOOptional = workflowInfoRepository.findById(workflowId);
// 防止缓存穿透 hhh(但是一开始没有,后来创建的情况下会有问题,不过问题不大,这里就不管了)
return jobInfoDOOptional.map(WorkflowInfoDO::getWfName).orElse("");
});
}catch (Exception e) {
log.error("[CacheService] getWorkflowName for {} failed.", workflowId, e);
}
return null;
}
public Long getAppIdByInstanceId(Long instanceId) {
try {
return instanceId2AppId.get(instanceId, () -> {
// 内部记录数据库异常
try {
InstanceInfoDO instanceLog = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceLog != null) {
return instanceLog.getAppId();
}
}catch (Exception e) {
log.error("[CacheService] getAppId for instanceId:{} failed.", instanceId, e);
}
return null;
});
}catch (Exception ignore) {
// 忽略缓存 load 失败的异常
}
return null;
}
public Long getAppIdByJobId(Long jobId) {
try {
return jobId2AppId.get(jobId, () -> {
try {
Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
return jobInfoDOOptional.map(JobInfoDO::getAppId).orElse(null);
}catch (Exception e) {
log.error("[CacheService] getAppId for job:{} failed.", jobId, e);
}
return null;
});
} catch (Exception ignore) {
}
return null;
}
}
package tech.powerjob.server.core.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.server.core.validator.NodeValidator;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* @author Echo009
* @since 2021/12/14
*/
@Service
@Slf4j
public class NodeValidateService {
private final Map<WorkflowNodeType, NodeValidator> nodeValidatorMap;
public NodeValidateService(List<NodeValidator> nodeValidators) {
nodeValidatorMap = new EnumMap<>(WorkflowNodeType.class);
nodeValidators.forEach(e -> nodeValidatorMap.put(e.matchingType(), e));
}
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
NodeValidator nodeValidator = getNodeValidator(node);
if (nodeValidator == null) {
// 默认不需要校验
return;
}
nodeValidator.complexValidate(node, dag);
}
public void simpleValidate(WorkflowNodeInfoDO node) {
NodeValidator nodeValidator = getNodeValidator(node);
if (nodeValidator == null) {
// 默认不需要校验
return;
}
nodeValidator.simpleValidate(node);
}
private NodeValidator getNodeValidator(WorkflowNodeInfoDO node) {
Integer nodeTypeCode = node.getType();
if (nodeTypeCode == null) {
// 前向兼容,默认为 任务节点
return nodeValidatorMap.get(WorkflowNodeType.JOB);
}
return nodeValidatorMap.get(WorkflowNodeType.of(nodeTypeCode));
}
}
package tech.powerjob.server.core.service;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.persistence.remote.repository.UserInfoRepository;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Service;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 用户服务
*
* @author tjq
* @since 2020/6/12
*/
@Service
public class UserService {
@Resource
private UserInfoRepository userInfoRepository;
/**
* 保存/修改 用户
* @param userInfoDO user
*/
public void save(UserInfoDO userInfoDO) {
userInfoDO.setGmtCreate(new Date());
userInfoDO.setGmtModified(userInfoDO.getGmtCreate());
userInfoRepository.saveAndFlush(userInfoDO);
}
/**
* 根据用户ID字符串获取用户信息详细列表
* @param userIds 逗号分割的用户ID信息
* @return 用户信息详细列表
*/
public List<UserInfoDO> fetchNotifyUserList(String userIds) {
if (StringUtils.isEmpty(userIds)) {
return Lists.newLinkedList();
}
// 去重
Set<Long> userIdList = Splitter.on(",").splitToList(userIds).stream().map(Long::valueOf).collect(Collectors.toSet());
List<UserInfoDO> res = userInfoRepository.findByIdIn(Lists.newLinkedList(userIdList));
res.forEach(x -> x.setPassword(null));
return res;
}
}
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.core.workflow.hanlder.WorkflowNodeHandlerMarker;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Service
public class WorkflowNodeHandleService {
private final Map<WorkflowNodeType, ControlNodeHandler> controlNodeHandlerContainer;
private final Map<WorkflowNodeType, TaskNodeHandler> taskNodeHandlerContainer;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
public WorkflowNodeHandleService(List<ControlNodeHandler> controlNodeHandlerList, List<TaskNodeHandler> taskNodeHandlerList, WorkflowInstanceInfoRepository workflowInstanceInfoRepository) {
// init
controlNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class);
taskNodeHandlerContainer = new EnumMap<>(WorkflowNodeType.class);
controlNodeHandlerList.forEach(controlNodeHandler -> controlNodeHandlerContainer.put(controlNodeHandler.matchingType(), controlNodeHandler));
taskNodeHandlerList.forEach(taskNodeHandler -> taskNodeHandlerContainer.put(taskNodeHandler.matchingType(), taskNodeHandler));
//
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
}
/**
* 处理任务节点
* 注意,上层调用方必须保证这里的 taskNodeList 不能为空
*/
public void handleTaskNodes(List<PEWorkflowDAG.Node> taskNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// 创建任务实例
taskNodeList.forEach(taskNode -> {
// 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示
TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode);
taskNodeHandler.createTaskInstance(taskNode, dag, wfInstanceInfo);
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={})", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), taskNode.getNodeId(), taskNode.getJobId());
});
// 持久化工作流实例信息
wfInstanceInfo.setDag(JSON.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
// 启动
taskNodeList.forEach(taskNode -> {
TaskNodeHandler taskNodeHandler = (TaskNodeHandler) findMatchingHandler(taskNode);
taskNodeHandler.startTaskInstance(taskNode);
});
}
/**
* 处理控制节点
* 注意,上层调用方必须保证这里的 controlNodeList 不能为空
*/
public void handleControlNodes(List<PEWorkflowDAG.Node> controlNodeList, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
for (PEWorkflowDAG.Node node : controlNodeList) {
handleControlNode(node, dag, wfInstanceInfo);
}
}
public void handleControlNode(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
ControlNodeHandler controlNodeHandler = (ControlNodeHandler) findMatchingHandler(node);
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
controlNodeHandler.handle(node, dag, wfInstanceInfo);
node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis()));
}
private WorkflowNodeHandlerMarker findMatchingHandler(PEWorkflowDAG.Node node) {
WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType());
WorkflowNodeHandlerMarker res;
if (!nodeType.isControlNode()) {
res = taskNodeHandlerContainer.get(nodeType);
} else {
res = controlNodeHandlerContainer.get(nodeType);
}
if (res == null) {
// impossible
throw new UnsupportedOperationException("unsupported node type : " + nodeType);
}
return res;
}
}
package tech.powerjob.server.core.uid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.server.remote.server.self.ServerInfoService;
/**
* 唯一ID生成服务,使用 Twitter snowflake 算法
* 机房ID:固定为0,占用2位
* 机器ID:由 ServerIdProvider 提供
*
* @author tjq
* @since 2020/4/6
*/
@Slf4j
@Service
public class IdGenerateService {
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
private static final int DATA_CENTER_ID = 0;
public IdGenerateService(ServerInfoService serverInfoService) {
long id = serverInfoService.fetchServiceInfo().getId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
}
/**
* 分配分布式唯一ID
* @return 分布式唯一ID
*/
public long allocate() {
return snowFlakeIdGenerator.nextId();
}
}
package tech.powerjob.server.core.uid;
/**
* Twitter SnowFlake(Scala -> Java)
*
* @author tjq
* @since 2020/4/6
*/
public class SnowFlakeIdGenerator {
/**
* 起始的时间戳(a special day for me)
*/
private final static long START_STAMP = 1555776000000L;
/**
* 序列号占用的位数
*/
private final static long SEQUENCE_BIT = 6;
/**
* 机器标识占用的位数
*/
private final static long MACHINE_BIT = 14;
/**
* 数据中心占用的位数
*/
private final static long DATA_CENTER_BIT = 2;
/**
* 每一部分的最大值
*/
private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
/**
* 数据中心
*/
private final long dataCenterId;
/**
* 机器标识
*/
private final long machineId;
/**
* 序列号
*/
private long sequence = 0L;
/**
* 上一次时间戳
*/
private long lastTimestamp = -1L;
public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*/
public synchronized long nextId() {
long currStamp = getNewStamp();
if (currStamp < lastTimestamp) {
return futureId();
}
if (currStamp == lastTimestamp) {
//相同毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStamp = getNextMill();
}
} else {
//不同毫秒内,序列号置为0
sequence = 0L;
}
lastTimestamp = currStamp;
return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
/**
* 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态
* 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题
*/
private long futureId() {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
lastTimestamp = lastTimestamp + 1;
}
return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewStamp();
while (mill <= lastTimestamp) {
mill = getNewStamp();
}
return mill;
}
private long getNewStamp() {
return System.currentTimeMillis();
}
}
package tech.powerjob.server.core.validator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import java.util.Collection;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
public class DecisionNodeValidator implements NodeValidator {
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// 出度固定为 2
WorkflowDAG.Node nodeWrapper = dag.getNode(node.getId());
Collection<PEWorkflowDAG.Edge> edges = nodeWrapper.getSuccessorEdgeMap().values();
if (edges.size() != 2) {
throw new PowerJobException("DecisionNode‘s out-degree must be 2,node name : " + node.getNodeName());
}
// 边的属性必须为 ture 或者 false
boolean containFalse = false;
boolean containTrue = false;
for (PEWorkflowDAG.Edge edge : edges) {
if (!isValidBooleanStr(edge.getProperty())) {
throw new PowerJobException("Illegal property of DecisionNode‘s out-degree edge,node name : " + node.getNodeName());
}
boolean b = Boolean.parseBoolean(edge.getProperty());
if (b) {
containTrue = true;
} else {
containFalse = true;
}
}
if (!containFalse || !containTrue) {
throw new PowerJobException("Illegal property of DecisionNode‘s out-degree edge,node name : " + node.getNodeName());
}
}
@Override
public void simpleValidate(WorkflowNodeInfoDO node) {
// 简单校验
String nodeParams = node.getNodeParams();
if (StringUtils.isBlank(nodeParams)) {
throw new PowerJobException("DecisionNode‘s param must be not null,node name : " + node.getNodeName());
}
}
public static boolean isValidBooleanStr(String str) {
return StringUtils.equalsIgnoreCase(str.trim(), "true") || StringUtils.equalsIgnoreCase(str.trim(), "false");
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.DECISION;
}
}
package tech.powerjob.server.core.validator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import javax.annotation.Resource;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class JobNodeValidator implements NodeValidator {
private final JobInfoRepository jobInfoRepository;
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// do nothing
}
@Override
public void simpleValidate(WorkflowNodeInfoDO node) {
// 判断对应的任务是否存在
JobInfoDO job = jobInfoRepository.findById(node.getJobId())
.orElseThrow(() -> new PowerJobException("Illegal job node,specified job is not exist,node name : " + node.getNodeName()));
if (job.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new PowerJobException("Illegal job node,specified job has been deleted,node name : " + node.getNodeName());
}
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.JOB;
}
}
package tech.powerjob.server.core.validator;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import java.util.Objects;
import java.util.Optional;
/**
* @author Echo009
* @since 2021/12/14
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class NestedWorkflowNodeValidator implements NodeValidator {
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
// 这里检查是否循环嵌套(自身引用自身)
if (Objects.equals(node.getJobId(), node.getWorkflowId())) {
throw new PowerJobException("Illegal nested workflow node,Prohibit circular references!" + node.getNodeName());
}
}
@Override
public void simpleValidate(WorkflowNodeInfoDO node) {
// 判断对应工作流是否存在
WorkflowInfoDO workflowInfo = workflowInfoRepository.findById(node.getJobId())
.orElseThrow(() -> new PowerJobException("Illegal nested workflow node,specified workflow is not exist,node name : " + node.getNodeName()));
if (workflowInfo.getStatus() == SwitchableStatus.DELETED.getV()) {
throw new PowerJobException("Illegal nested workflow node,specified workflow has been deleted,node name : " + node.getNodeName());
}
// 不允许多层嵌套,即 嵌套工作流节点引用的工作流中不能包含嵌套节点
PEWorkflowDAG peDag = JSON.parseObject(workflowInfo.getPeDAG(), PEWorkflowDAG.class);
for (PEWorkflowDAG.Node peDagNode : peDag.getNodes()) {
//
final Optional<WorkflowNodeInfoDO> nestWfNodeOp = workflowNodeInfoRepository.findById(peDagNode.getNodeId());
if (!nestWfNodeOp.isPresent()) {
// 嵌套的工作流无效,缺失节点元数据
throw new PowerJobException("Illegal nested workflow node,specified workflow is invalidate,node name : " + node.getNodeName());
}
if (Objects.equals(nestWfNodeOp.get().getType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) {
throw new PowerJobException("Illegal nested workflow node,specified workflow must be a simple workflow,node name : " + node.getNodeName());
}
}
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.NESTED_WORKFLOW;
}
}
package tech.powerjob.server.core.validator;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
/**
* @author Echo009
* @since 2021/12/14
*/
public interface NodeValidator {
/**
* 校验工作流节点(校验拓扑关系等)
* @param node 节点
* @param dag dag
*/
void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag);
/**
* 校验工作流节点
* @param node 节点
*/
void simpleValidate(WorkflowNodeInfoDO node);
/**
* 匹配的节点类型
* @return node type
*/
WorkflowNodeType matchingType();
}
package tech.powerjob.server.core.workflow.algorithm;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.*;
import tech.powerjob.common.model.PEWorkflowDAG;
import java.util.List;
import java.util.Map;
/**
* DAG 工作流对象
* 节点中均记录了上游以及下游的连接关系(无法使用 JSON 来序列化以及反序列化)
*
* @author tjq
* @author Echo009
* @since 2020/5/26
*/
@Data
@ToString(exclude = {"nodeMap"})
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowDAG {
/**
* DAG允许存在多个顶点
*/
private List<Node> roots;
private Map<Long, Node> nodeMap;
public Node getNode(Long nodeId) {
if (nodeMap == null) {
return null;
}
return nodeMap.get(nodeId);
}
@Getter
@Setter
@EqualsAndHashCode(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder","successors"})
@ToString(exclude = {"dependencies", "dependenceEdgeMap", "successorEdgeMap", "holder"})
@NoArgsConstructor
public static final class Node {
public Node(PEWorkflowDAG.Node node) {
this.nodeId = node.getNodeId();
this.holder = node;
this.dependencies = Lists.newLinkedList();
this.dependenceEdgeMap = Maps.newHashMap();
this.successors = Lists.newLinkedList();
this.successorEdgeMap = Maps.newHashMap();
}
/**
* node id
*
* @since 20210128
*/
private Long nodeId;
private PEWorkflowDAG.Node holder;
/**
* 依赖的上游节点
*/
private List<Node> dependencies;
/**
* 连接依赖节点的边
*/
private Map<Node, PEWorkflowDAG.Edge> dependenceEdgeMap;
/**
* 后继者,子节点
*/
private List<Node> successors;
/**
* 连接后继节点的边
*/
private Map<Node, PEWorkflowDAG.Edge> successorEdgeMap;
}
}
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface ControlNodeHandler extends WorkflowNodeHandlerMarker {
/**
* 处理控制节点
*
* @param node 需要被处理的目标节点
* @param dag 节点所属 DAG
* @param wfInstanceInfo 节点所属工作流实例
*/
void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo);
}
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface TaskNodeHandler extends WorkflowNodeHandlerMarker {
/**
* 创建任务实例
*
* @param node 目标节点
* @param dag DAG
* @param wfInstanceInfo 工作流实例
*/
void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo);
/**
* 执行任务实例
*
* @param node 目标节点
*/
void startTaskInstance(PEWorkflowDAG.Node node);
}
package tech.powerjob.server.core.workflow.hanlder;
import tech.powerjob.common.enums.WorkflowNodeType;
/**
* @author Echo009
* @since 2021/12/9
*/
public interface WorkflowNodeHandlerMarker {
/**
* 返回能够处理的节点类型
* @return matching node type
*/
WorkflowNodeType matchingType();
}
package tech.powerjob.server.core.workflow.hanlder.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.server.core.evaluator.GroovyEvaluator;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import java.util.*;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Component
public class DecisionNodeHandler implements ControlNodeHandler {
private final GroovyEvaluator groovyEvaluator = new GroovyEvaluator();
/**
* 处理判断节点
* 1. 执行脚本
* 2. 根据返回值 disable 掉相应的边以及节点
*/
@Override
public void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
String script = node.getNodeParams();
if (StringUtils.isBlank(script)) {
log.error("[Workflow-{}|{}]decision node's param is blank! nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId());
throw new PowerJobException("decision node's param is blank!");
}
// wfContext must be a map
HashMap<String, String> wfContext = JSON.parseObject(wfInstanceInfo.getWfContext(), new TypeReference<HashMap<String, String>>() {
});
Object result;
try {
result = groovyEvaluator.evaluate(script, wfContext);
} catch (Exception e) {
log.error("[Workflow-{}|{}]failed to evaluate decision node,nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), e);
throw new PowerJobException("can't evaluate decision node!");
}
boolean finalRes;
if (result instanceof Boolean) {
finalRes = ((Boolean) result);
} else if (result instanceof Number) {
finalRes = ((Number) result).doubleValue() > 0;
} else {
log.error("[Workflow-{}|{}]decision node's return value is illegal,nodeId:{},result:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), JsonUtils.toJSONString(result));
throw new PowerJobException("decision node's return value is illegal!");
}
handleDag(finalRes, node, dag);
}
private void handleDag(boolean res, PEWorkflowDAG.Node node, PEWorkflowDAG peDag) {
// 更新判断节点的状态为成功
node.setResult(String.valueOf(res));
node.setStatus(InstanceStatus.SUCCEED.getV());
WorkflowDAG dag = WorkflowDAGUtils.convert(peDag);
// 根据节点的计算结果,将相应的边 disable
WorkflowDAG.Node targetNode = dag.getNode(node.getNodeId());
Collection<PEWorkflowDAG.Edge> edges = targetNode.getSuccessorEdgeMap().values();
if (edges.isEmpty()) {
return;
}
List<PEWorkflowDAG.Edge> disableEdges = new ArrayList<>(edges.size());
for (PEWorkflowDAG.Edge edge : edges) {
// 这里一定不会出现异常
boolean property = Boolean.parseBoolean(edge.getProperty());
if (res != property) {
// disable
edge.setEnable(false);
disableEdges.add(edge);
}
}
WorkflowDAGUtils.handleDisableEdges(disableEdges,dag);
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.DECISION;
}
}
package tech.powerjob.server.core.workflow.hanlder.impl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import java.util.Optional;
/**
* @author Echo009
* @since 2021/12/9
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JobNodeHandler implements TaskNodeHandler {
private final JobInfoRepository jobInfoRepository;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// instanceParam 传递的是工作流实例的 wfContext
Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()).getInstanceId();
node.setInstanceId(instanceId);
node.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId);
}
@Override
public void startTaskInstance(PEWorkflowDAG.Node node) {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
// 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
SpringUtils.getBean(DispatchService.class).dispatch(jobInfo, node.getInstanceId(), Optional.empty(), Optional.empty());
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.JOB;
}
}
package tech.powerjob.server.core.workflow.hanlder.impl;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import java.util.Date;
/**
* @author Echo009
* @since 2021/12/13
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class NestedWorkflowNodeHandler implements TaskNodeHandler {
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// check
Long wfId = node.getJobId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) {
if (targetWf == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId());
} else {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId());
}
throw new PowerJobException("invalid nested workflow node," + node.getNodeId());
}
if (node.getInstanceId() != null) {
// 处理重试的情形,不需要创建实例,仅需要更改对应实例的状态,以及相应的节点状态
WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null);
if (wfInstance == null) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId());
throw new PowerJobException("invalid nested workflow instance id " + node.getInstanceId());
}
// 不用考虑状态,只有失败的工作流嵌套节点状态会被重置
// 需要将子工作流中失败的节点状态重置为 等待 派发
try {
PEWorkflowDAG nodeDag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
if (!WorkflowDAGUtils.valid(nodeDag)) {
throw new PowerJobException(SystemInstanceResult.INVALID_DAG);
}
WorkflowDAGUtils.resetRetryableNode(nodeDag);
wfInstance.setDag(JSON.toJSONString(nodeDag));
wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
} catch (Exception e) {
log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({})'s DAG is illegal!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId(),e);
throw new PowerJobException("illegal nested workflow instance, id : "+ node.getInstanceId());
}
} else {
// 透传当前的上下文创建新的工作流实例
String wfContext = wfInstanceInfo.getWfContext();
Long instanceId = SpringUtils.getBean(WorkflowInstanceManager.class).create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
node.setInstanceId(instanceId);
}
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
node.setStatus(InstanceStatus.RUNNING.getV());
}
@Override
public void startTaskInstance(PEWorkflowDAG.Node node) {
Long wfId = node.getJobId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
SpringUtils.getBean(WorkflowInstanceManager.class).start(targetWf, node.getInstanceId());
}
@Override
public WorkflowNodeType matchingType() {
return WorkflowNodeType.NESTED_WORKFLOW;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server-extension</artifactId>
<version>${project.parent.version}</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-persistence</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package tech.powerjob.server.extension;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import java.util.List;
/**
* 报警接口
*
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
package tech.powerjob.server.extension;
/**
* 锁服务,所有方法都不允许抛出任何异常!
*
* @author tjq
* @since 2020/4/2
*/
public interface LockService {
/**
* 上锁(获取锁),立即返回,不会阻塞等待锁
* @param name 锁名称
* @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
* @return true -> 获取到锁,false -> 未获取到锁
*/
boolean tryLock(String name, long maxLockTime);
/**
* 释放锁
* @param name 锁名称
*/
void unlock(String name);
}
package tech.powerjob.server.extension;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
/**
* filter worker by system metrics or other info
*
* @author tjq
* @since 2021/2/16
*/
public interface WorkerFilter {
/**
*
* @param workerInfo worker info, maybe you need to use your customized info in SystemMetrics#extra
* @param jobInfoDO job info
* @return true will remove the worker in process list
*/
boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO);
}
package tech.powerjob.server.extension.defaultimpl;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.OmsLockDO;
import tech.powerjob.server.persistence.remote.repository.OmsLockRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
/**
* 基于数据库实现的分布式锁
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class DatabaseLockService implements LockService {
private final String ownerIp;
private final OmsLockRepository omsLockRepository;
@Autowired
public DatabaseLockService(OmsLockRepository omsLockRepository) {
this.ownerIp = NetUtils.getLocalHost();
this.omsLockRepository = omsLockRepository;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
int num = omsLockRepository.deleteByOwnerIP(ownerIp);
log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
}));
}
@Override
public boolean tryLock(String name, long maxLockTime) {
OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
return true;
} catch (DataIntegrityViolationException ignore) {
} catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
// 锁超时,强制释放锁并重新尝试获取
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name);
return tryLock(name, maxLockTime);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}
package tech.powerjob.server.extension.defaultimpl.alarm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
@Component
public class AlarmCenter {
private final ExecutorService POOL;
private final List<Alarmable> BEANS = Lists.newLinkedList();
public AlarmCenter(List<Alarmable> alarmables) {
int cores = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory);
alarmables.forEach(bean -> {
BEANS.add(bean);
log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", bean.getClass().getName(), bean);
});
}
public void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
POOL.execute(() -> BEANS.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[AlarmCenter] alarm failed.", e);
}
}));
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Set;
/**
* 钉钉告警服务
*
* @author tjq
* @since 2020/8/6
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
private final Environment environment;
private Long agentId;
private DingTalkUtils dingTalkUtils;
private Cache<String, String> mobile2UserIdCache;
private static final int CACHE_SIZE = 8192;
/**
* 防止缓存击穿
*/
private static final String EMPTY_TAG = "EMPTY";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (dingTalkUtils == null) {
return;
}
Set<String> userIds = Sets.newHashSet();
targetUserList.forEach(user -> {
String phone = user.getPhone();
if (StringUtils.isEmpty(phone)) {
return;
}
try {
String userId = mobile2UserIdCache.get(phone, () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(phone);
} catch (PowerJobException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;
}
});
if (!EMPTY_TAG.equals(userId)) {
userIds .add(userId);
}
}catch (Exception ignore) {
}
});
userIds.remove(null);
if (!userIds.isEmpty()) {
String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
try {
dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
}catch (Exception e) {
log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
}
}
}
@PostConstruct
public void init() {
String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
return;
}
if (!StringUtils.isNumeric(agentId)) {
log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
return;
}
this.agentId = Long.valueOf(agentId);
dingTalkUtils = new DingTalkUtils(appKey, appSecret);
mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiGettokenRequest;
import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiUserGetByMobileRequest;
import com.dingtalk.api.response.OapiGettokenResponse;
import com.dingtalk.api.response.OapiUserGetByMobileResponse;
import tech.powerjob.common.exception.PowerJobException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpMethod;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 钉钉工具类
* 工作通知消息:https://ding-doc.dingtalk.com/doc#/serverapi2/pgoxpy
*
* @author tjq
* @since 2020/8/8
*/
@Slf4j
public class DingTalkUtils implements Closeable {
private String accessToken;
private final DingTalkClient sendMsgClient;
private final DingTalkClient accessTokenClient;
private final DingTalkClient userIdClient;
private final ScheduledExecutorService scheduledPool;
private static final long FLUSH_ACCESS_TOKEN_RATE = 6000;
private static final String GET_TOKEN_URL = "https://oapi.dingtalk.com/gettoken";
private static final String SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2";
private static final String GET_USER_ID_URL = "https://oapi.dingtalk.com/user/get_by_mobile";
public DingTalkUtils(String appKey, String appSecret) {
this.sendMsgClient = new DefaultDingTalkClient(SEND_URL);
this.accessTokenClient = new DefaultDingTalkClient(GET_TOKEN_URL);
this.userIdClient = new DefaultDingTalkClient(GET_USER_ID_URL);
refreshAccessToken(appKey, appSecret);
if (StringUtils.isEmpty(accessToken)) {
throw new PowerJobException("fetch AccessToken failed, please check your appKey & appSecret");
}
scheduledPool = Executors.newSingleThreadScheduledExecutor();
scheduledPool.scheduleAtFixedRate(() -> refreshAccessToken(appKey, appSecret), FLUSH_ACCESS_TOKEN_RATE, FLUSH_ACCESS_TOKEN_RATE, TimeUnit.SECONDS);
}
/**
* 获取 AccessToken,AccessToken 是调用其他接口的基础,有效期 7200 秒,需要不断刷新
* @param appKey 应用 appKey
* @param appSecret 应用 appSecret
*/
private void refreshAccessToken(String appKey, String appSecret) {
try {
OapiGettokenRequest req = new OapiGettokenRequest();
req.setAppkey(appKey);
req.setAppsecret(appSecret);
req.setHttpMethod(HttpMethod.GET.name());
OapiGettokenResponse rsp = accessTokenClient.execute(req);
if (rsp.isSuccess()) {
accessToken = rsp.getAccessToken();
}else {
log.warn("[DingTalkUtils] flush accessToken failed with req({}),code={},msg={}.", req.getTextParams(), rsp.getErrcode(), rsp.getErrmsg());
}
} catch (Exception e) {
log.warn("[DingTalkUtils] flush accessToken failed.", e);
}
}
public String fetchUserIdByMobile(String mobile) throws Exception {
OapiUserGetByMobileRequest request = new OapiUserGetByMobileRequest();
request.setMobile(mobile);
OapiUserGetByMobileResponse execute = userIdClient.execute(request, accessToken);
if (execute.isSuccess()) {
return execute.getUserid();
}
log.info("[DingTalkUtils] fetch userId by mobile({}) failed,reason is {}.", mobile, execute.getErrmsg());
throw new PowerJobException("fetch userId by phone number failed, reason is " + execute.getErrmsg());
}
public void sendMarkdownAsync(String title, List<MarkdownEntity> entities, String userList, Long agentId) throws Exception {
OapiMessageCorpconversationAsyncsendV2Request request = new OapiMessageCorpconversationAsyncsendV2Request();
request.setUseridList(userList);
request.setAgentId(agentId);
request.setToAllUser(false);
OapiMessageCorpconversationAsyncsendV2Request.Msg msg = new OapiMessageCorpconversationAsyncsendV2Request.Msg();
StringBuilder mdBuilder=new StringBuilder();
mdBuilder.append("## ").append(title).append("\n");
for (MarkdownEntity entity:entities){
mdBuilder.append("#### ").append(entity.title).append("\n");
mdBuilder.append("> ").append(entity.detail).append("\n\n");
}
msg.setMsgtype("markdown");
msg.setMarkdown(new OapiMessageCorpconversationAsyncsendV2Request.Markdown());
msg.getMarkdown().setTitle(title);
msg.getMarkdown().setText(mdBuilder.toString());
request.setMsg(msg);
sendMsgClient.execute(request, accessToken);
}
@Override
public void close() throws IOException {
scheduledPool.shutdownNow();
}
@AllArgsConstructor
public static final class MarkdownEntity {
private final String title;
private final String detail;
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import org.springframework.beans.factory.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 邮件通知服务
*
* @author tjq
* @since 2020/4/30
*/
@Slf4j
@Service
public class MailAlarmService implements Alarmable {
@Resource
private Environment environment;
private JavaMailSender javaMailSender;
@Value("${spring.mail.username:''}")
private String from;
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
SimpleMailMessage sm = new SimpleMailMessage();
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());
javaMailSender.send(sm);
}catch (Exception e) {
log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
}
}
@Autowired(required = false)
public void setJavaMailSender(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
/**
* http 回调报警
*
* @author tjq
* @since 11/14/20
*/
@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
private static final String HTTP_PROTOCOL_PREFIX = "http://";
private static final String HTTPS_PROTOCOL_PREFIX = "https://";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList)) {
return;
}
targetUserList.forEach(user -> {
String webHook = user.getWebHook();
if (StringUtils.isEmpty(webHook)) {
return;
}
// 自动添加协议头
if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
webHook = HTTP_PROTOCOL_PREFIX + webHook;
}
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
try {
String response = HttpUtils.post(webHook, requestBody);
log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
}catch (Exception e) {
log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
}
});
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.StringUtils;
/**
* 报警内容
*
* @author tjq
* @since 2020/8/1
*/
public interface Alarm extends PowerSerializable {
String fetchTitle();
default String fetchContent() {
StringBuilder sb = new StringBuilder();
JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
content.forEach((key, originWord) -> {
sb.append(key).append(": ");
String word = String.valueOf(originWord);
if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
try {
if (originWord instanceof Long) {
word = CommonUtils.formatTime((Long) originWord);
}
}catch (Exception ignore) {
}
}
sb.append(word).append(OmsConstant.LINE_SEPARATOR);
});
return sb.toString();
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import lombok.Data;
/**
* 任务执行失败告警对象
*
* @author tjq
* @since 2020/4/30
*/
@Data
public class JobInstanceAlarm implements Alarm {
/**
* 应用ID
*/
private long appId;
/**
* 任务ID
*/
private long jobId;
/**
* 任务实例ID
*/
private long instanceId;
/**
* 任务名称
*/
private String jobName;
/**
* 任务自带的参数
*/
private String jobParams;
/**
* 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
*/
private Integer timeExpressionType;
/**
* 时间表达式,CRON/NULL/LONG/LONG
*/
private String timeExpression;
/**
* 执行类型,单机/广播/MR
*/
private Integer executeType;
/**
* 执行器类型,Java/Shell
*/
private Integer processorType;
/**
* 执行器信息
*/
private String processorInfo;
/**
* 任务实例参数
*/
private String instanceParams;
/**
* 执行结果
*/
private String result;
/**
* 预计触发时间
*/
private Long expectedTriggerTime;
/**
* 实际触发时间
*/
private Long actualTriggerTime;
/**
* 结束时间
*/
private Long finishedTime;
/**
*
*/
private String taskTrackerAddress;
@Override
public String fetchTitle() {
return "PowerJob AlarmService: Job Running Failed";
}
}
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import tech.powerjob.common.model.PEWorkflowDAG;
import lombok.Data;
/**
* 工作流执行失败告警对象
*
* @author tjq
* @since 2020/6/12
*/
@Data
public class WorkflowInstanceAlarm implements Alarm {
private String workflowName;
/**
* 任务所属应用的ID,冗余提高查询效率
*/
private Long appId;
private Long workflowId;
/**
* workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求)
*/
private Long wfInstanceId;
/**
* workflow 状态(WorkflowInstanceStatus)
*/
private Integer status;
private PEWorkflowDAG peWorkflowDAG;
private String result;
/**
* 实际触发时间
*/
private Long actualTriggerTime;
/**
* 结束时间
*/
private Long finishedTime;
/**
* 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
*/
private Integer timeExpressionType;
/**
* 时间表达式,CRON/NULL/LONG/LONG
*/
private String timeExpression;
@Override
public String fetchTitle() {
return "PowerJob AlarmService: Workflow Running Failed";
}
}
package tech.powerjob.server.extension.defaultimpl.workerfilter;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.Set;
/**
* just use designated worker
*
* @author tjq
* @since 2021/2/19
*/
@Slf4j
@Component
public class DesignatedWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
String designatedWorkers = jobInfo.getDesignatedWorkers();
// no worker is specified, no filter of any
if (StringUtils.isEmpty(designatedWorkers)) {
return false;
}
Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers));
for (String tagOrAddress : designatedWorkersSet) {
if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
return false;
}
}
return true;
}
}
package tech.powerjob.server.extension.defaultimpl.workerfilter;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* filter disconnected worker
*
* @author tjq
* @since 2021/2/19
*/
@Slf4j
@Component
public class DisconnectedWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
boolean timeout = workerInfo.timeout();
if (timeout) {
log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
}
return timeout;
}
}
package tech.powerjob.server.extension.defaultimpl.workerfilter;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.server.extension.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.common.module.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* filter worker by system metric
*
* @author tjq
* @since 2021/2/19
*/
@Slf4j
@Component
public class SystemMetricsWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
SystemMetrics metrics = workerInfo.getSystemMetrics();
boolean filter = !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
if (filter) {
log.info("[Job-{}] filter worker[{}] because the {} do not meet the requirements", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getSystemMetrics());
}
return filter;
}
}
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment