diff options
| author | doufenghu <[email protected]> | 2020-02-21 17:30:08 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2020-02-21 17:30:08 +0800 |
| commit | 5350840a9f4c76fbb6c5ec4b3bdf8c75dcc9ccf3 (patch) | |
| tree | bb4cfd98a85e10c6a43340bf8885463a6e5802c0 /xxl-job-core | |
| parent | c71da1bd2069ae9f608bc214aae7d8ea3b2f97c0 (diff) | |
Diffstat (limited to 'xxl-job-core')
82 files changed, 3636 insertions, 0 deletions
diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml new file mode 100644 index 0000000..417f0fe --- /dev/null +++ b/xxl-job-core/pom.xml @@ -0,0 +1,50 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.xuxueli</groupId> + <artifactId>xxl-job</artifactId> + <version>2.2.0-SNAPSHOT</version> + </parent> + <artifactId>xxl-job-core</artifactId> + <packaging>jar</packaging> + + <name>${project.artifactId}</name> + <description>A distributed task scheduling framework.</description> + <url>https://www.xuxueli.com/</url> + + <dependencies> + + <!-- xxl-rpc-core --> + <dependency> + <groupId>com.xuxueli</groupId> + <artifactId>xxl-rpc-core</artifactId> + <version>${xxl-rpc.version}</version> + </dependency> + + <!-- groovy-all --> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy</artifactId> + <version>${groovy.version}</version> + </dependency> + + <!-- spring-context --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>provided</scope> + </dependency> + + <!-- junit --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project>
\ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java new file mode 100644 index 0000000..4b65ddf --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java @@ -0,0 +1,44 @@ +package com.xxl.job.core.biz; + +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; + +import java.util.List; + +/** + * @author xuxueli 2017-07-27 21:52:49 + */ +public interface AdminBiz { + + + // ---------------------- callback ---------------------- + + /** + * callback + * + * @param callbackParamList + * @return + */ + public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList); + + + // ---------------------- registry ---------------------- + + /** + * registry + * + * @param registryParam + * @return + */ + public ReturnT<String> registry(RegistryParam registryParam); + + /** + * registry remove + * + * @param registryParam + * @return + */ + public ReturnT<String> registryRemove(RegistryParam registryParam); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java new file mode 100644 index 0000000..deca9f2 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java @@ -0,0 +1,49 @@ +package com.xxl.job.core.biz; + +import com.xxl.job.core.biz.model.LogResult; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; + +/** + * Created by xuxueli on 17/3/1. + */ +public interface ExecutorBiz { + + /** + * beat + * @return + */ + public ReturnT<String> beat(); + + /** + * idle beat + * + * @param jobId + * @return + */ + public ReturnT<String> idleBeat(int jobId); + + /** + * kill + * @param jobId + * @return + */ + public ReturnT<String> kill(int jobId); + + /** + * log + * @param logDateTim + * @param logId + * @param fromLineNum + * @return + */ + public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum); + + /** + * run + * @param triggerParam + * @return + */ + public ReturnT<String> run(TriggerParam triggerParam); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java new file mode 100644 index 0000000..88ca224 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java @@ -0,0 +1,48 @@ +package com.xxl.job.core.biz.client; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.util.XxlJobRemotingUtil; + +import java.util.List; + +/** + * admin api test + * + * @author xuxueli 2017-07-28 22:14:52 + */ +public class AdminBizClient implements AdminBiz { + + public AdminBizClient() { + } + public AdminBizClient(String addressUrl, String accessToken) { + this.addressUrl = addressUrl; + this.accessToken = accessToken; + + // valid + if (!this.addressUrl.endsWith("/")) { + this.addressUrl = this.addressUrl + "/"; + } + } + + private String addressUrl ; + private String accessToken; + + + @Override + public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { + return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3); + } + + @Override + public ReturnT<String> registry(RegistryParam registryParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3); + } + + @Override + public ReturnT<String> registryRemove(RegistryParam registryParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3); + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java new file mode 100644 index 0000000..1c80960 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -0,0 +1,174 @@ +package com.xxl.job.core.biz.impl; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.LogResult; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.glue.GlueFactory; +import com.xxl.job.core.glue.GlueTypeEnum; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.impl.GlueJobHandler; +import com.xxl.job.core.handler.impl.ScriptJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.thread.JobThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +/** + * Created by xuxueli on 17/3/1. + */ +public class ExecutorBizImpl implements ExecutorBiz { + private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); + + @Override + public ReturnT<String> beat() { + return ReturnT.SUCCESS; + } + + @Override + public ReturnT<String> idleBeat(int jobId) { + + // isRunningOrHasQueue + boolean isRunningOrHasQueue = false; + JobThread jobThread = XxlJobExecutor.loadJobThread(jobId); + if (jobThread != null && jobThread.isRunningOrHasQueue()) { + isRunningOrHasQueue = true; + } + + if (isRunningOrHasQueue) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); + } + return ReturnT.SUCCESS; + } + + @Override + public ReturnT<String> kill(int jobId) { + // kill handlerThread, and create new one + JobThread jobThread = XxlJobExecutor.loadJobThread(jobId); + if (jobThread != null) { + XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job."); + return ReturnT.SUCCESS; + } + + return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed."); + } + + @Override + public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum) { + // log filename: logPath/yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId); + + LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum); + return new ReturnT<LogResult>(logResult); + } + + @Override + public ReturnT<String> run(TriggerParam triggerParam) { + // load old:jobHandler + jobThread + JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); + IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; + String removeOldReason = null; + + // valid:jobHandler + jobThread + GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); + if (GlueTypeEnum.BEAN == glueTypeEnum) { + + // new jobhandler + IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + + // valid old jobThread + if (jobThread!=null && jobHandler != newJobHandler) { + // change handler, need kill old thread + removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + jobHandler = newJobHandler; + if (jobHandler == null) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); + } + } + + } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { + + // valid old jobThread + if (jobThread != null && + !(jobThread.getHandler() instanceof GlueJobHandler + && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { + // change handler or gluesource updated, need kill old thread + removeOldReason = "change job source or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + try { + IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); + jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); + } + } + } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { + + // valid old jobThread + if (jobThread != null && + !(jobThread.getHandler() instanceof ScriptJobHandler + && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { + // change script or gluesource updated, need kill old thread + removeOldReason = "change job source or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); + } + } else { + return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); + } + + // executor block strategy + if (jobThread != null) { + ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); + if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + // discard when running + if (jobThread.isRunningOrHasQueue()) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); + } + } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + // kill running jobThread + if (jobThread.isRunningOrHasQueue()) { + removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); + + jobThread = null; + } + } else { + // just queue trigger + } + } + + // replace thread (new or exists invalid) + if (jobThread == null) { + jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); + } + + // push data to queue + ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); + return pushResult; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java new file mode 100644 index 0000000..034c2a5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java @@ -0,0 +1,56 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * Created by xuxueli on 17/3/2. + */ +public class HandleCallbackParam implements Serializable { + private static final long serialVersionUID = 42L; + + private long logId; + private long logDateTim; + + private ReturnT<String> executeResult; + + public HandleCallbackParam(){} + public HandleCallbackParam(long logId, long logDateTim, ReturnT<String> executeResult) { + this.logId = logId; + this.logDateTim = logDateTim; + this.executeResult = executeResult; + } + + public long getLogId() { + return logId; + } + + public void setLogId(long logId) { + this.logId = logId; + } + + public long getLogDateTim() { + return logDateTim; + } + + public void setLogDateTim(long logDateTim) { + this.logDateTim = logDateTim; + } + + public ReturnT<String> getExecuteResult() { + return executeResult; + } + + public void setExecuteResult(ReturnT<String> executeResult) { + this.executeResult = executeResult; + } + + @Override + public String toString() { + return "HandleCallbackParam{" + + "logId=" + logId + + ", logDateTim=" + logDateTim + + ", executeResult=" + executeResult + + '}'; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogResult.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogResult.java new file mode 100644 index 0000000..1bd4c72 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogResult.java @@ -0,0 +1,54 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * Created by xuxueli on 17/3/23. + */ +public class LogResult implements Serializable { + private static final long serialVersionUID = 42L; + + public LogResult(int fromLineNum, int toLineNum, String logContent, boolean isEnd) { + this.fromLineNum = fromLineNum; + this.toLineNum = toLineNum; + this.logContent = logContent; + this.isEnd = isEnd; + } + + private int fromLineNum; + private int toLineNum; + private String logContent; + private boolean isEnd; + + public int getFromLineNum() { + return fromLineNum; + } + + public void setFromLineNum(int fromLineNum) { + this.fromLineNum = fromLineNum; + } + + public int getToLineNum() { + return toLineNum; + } + + public void setToLineNum(int toLineNum) { + this.toLineNum = toLineNum; + } + + public String getLogContent() { + return logContent; + } + + public void setLogContent(String logContent) { + this.logContent = logContent; + } + + public boolean isEnd() { + return isEnd; + } + + public void setEnd(boolean end) { + isEnd = end; + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java new file mode 100644 index 0000000..8526c3e --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java @@ -0,0 +1,54 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * Created by xuxueli on 2017-05-10 20:22:42 + */ +public class RegistryParam implements Serializable { + private static final long serialVersionUID = 42L; + + private String registryGroup; + private String registryKey; + private String registryValue; + + public RegistryParam(){} + public RegistryParam(String registryGroup, String registryKey, String registryValue) { + this.registryGroup = registryGroup; + this.registryKey = registryKey; + this.registryValue = registryValue; + } + + public String getRegistryGroup() { + return registryGroup; + } + + public void setRegistryGroup(String registryGroup) { + this.registryGroup = registryGroup; + } + + public String getRegistryKey() { + return registryKey; + } + + public void setRegistryKey(String registryKey) { + this.registryKey = registryKey; + } + + public String getRegistryValue() { + return registryValue; + } + + public void setRegistryValue(String registryValue) { + this.registryValue = registryValue; + } + + @Override + public String toString() { + return "RegistryParam{" + + "registryGroup='" + registryGroup + '\'' + + ", registryKey='" + registryKey + '\'' + + ", registryValue='" + registryValue + '\'' + + '}'; + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java new file mode 100644 index 0000000..83d7a36 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java @@ -0,0 +1,57 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * common return + * @author xuxueli 2015-12-4 16:32:31 + * @param <T> + */ +public class ReturnT<T> implements Serializable { + public static final long serialVersionUID = 42L; + + public static final int SUCCESS_CODE = 200; + public static final int FAIL_CODE = 500; + + public static final ReturnT<String> SUCCESS = new ReturnT<String>(null); + public static final ReturnT<String> FAIL = new ReturnT<String>(FAIL_CODE, null); + + private int code; + private String msg; + private T content; + + public ReturnT(){} + public ReturnT(int code, String msg) { + this.code = code; + this.msg = msg; + } + public ReturnT(T content) { + this.code = SUCCESS_CODE; + this.content = content; + } + + public int getCode() { + return code; + } + public void setCode(int code) { + this.code = code; + } + public String getMsg() { + return msg; + } + public void setMsg(String msg) { + this.msg = msg; + } + public T getContent() { + return content; + } + public void setContent(T content) { + this.content = content; + } + + @Override + public String toString() { + return "ReturnT [code=" + code + ", msg=" + msg + ", content=" + content + "]"; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java new file mode 100644 index 0000000..4f56368 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -0,0 +1,144 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * Created by xuxueli on 16/7/22. + */ +public class TriggerParam implements Serializable{ + private static final long serialVersionUID = 42L; + + private int jobId; + + private String executorHandler; + private String executorParams; + private String executorBlockStrategy; + private int executorTimeout; + + private long logId; + private long logDateTime; + + private String glueType; + private String glueSource; + private long glueUpdatetime; + + private int broadcastIndex; + private int broadcastTotal; + + + public int getJobId() { + return jobId; + } + + public void setJobId(int jobId) { + this.jobId = jobId; + } + + public String getExecutorHandler() { + return executorHandler; + } + + public void setExecutorHandler(String executorHandler) { + this.executorHandler = executorHandler; + } + + public String getExecutorParams() { + return executorParams; + } + + public void setExecutorParams(String executorParams) { + this.executorParams = executorParams; + } + + public String getExecutorBlockStrategy() { + return executorBlockStrategy; + } + + public void setExecutorBlockStrategy(String executorBlockStrategy) { + this.executorBlockStrategy = executorBlockStrategy; + } + + public int getExecutorTimeout() { + return executorTimeout; + } + + public void setExecutorTimeout(int executorTimeout) { + this.executorTimeout = executorTimeout; + } + + public long getLogId() { + return logId; + } + + public void setLogId(long logId) { + this.logId = logId; + } + + public long getLogDateTime() { + return logDateTime; + } + + public void setLogDateTime(long logDateTime) { + this.logDateTime = logDateTime; + } + + public String getGlueType() { + return glueType; + } + + public void setGlueType(String glueType) { + this.glueType = glueType; + } + + public String getGlueSource() { + return glueSource; + } + + public void setGlueSource(String glueSource) { + this.glueSource = glueSource; + } + + public long getGlueUpdatetime() { + return glueUpdatetime; + } + + public void setGlueUpdatetime(long glueUpdatetime) { + this.glueUpdatetime = glueUpdatetime; + } + + public int getBroadcastIndex() { + return broadcastIndex; + } + + public void setBroadcastIndex(int broadcastIndex) { + this.broadcastIndex = broadcastIndex; + } + + public int getBroadcastTotal() { + return broadcastTotal; + } + + public void setBroadcastTotal(int broadcastTotal) { + this.broadcastTotal = broadcastTotal; + } + + + @Override + public String toString() { + return "TriggerParam{" + + "jobId=" + jobId + + ", executorHandler='" + executorHandler + '\'' + + ", executorParams='" + executorParams + '\'' + + ", executorBlockStrategy='" + executorBlockStrategy + '\'' + + ", executorTimeout=" + executorTimeout + + ", logId=" + logId + + ", logDateTime=" + logDateTime + + ", glueType='" + glueType + '\'' + + ", glueSource='" + glueSource + '\'' + + ", glueUpdatetime=" + glueUpdatetime + + ", broadcastIndex=" + broadcastIndex + + ", broadcastTotal=" + broadcastTotal + + '}'; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java new file mode 100644 index 0000000..a9dc1be --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java @@ -0,0 +1,35 @@ +package com.xxl.job.core.enums; + +/** + * Created by xuxueli on 17/5/9. + */ +public enum ExecutorBlockStrategyEnum { + + SERIAL_EXECUTION("Serial execution"), + /*CONCURRENT_EXECUTION("并行"),*/ + DISCARD_LATER("Discard Later"), + COVER_EARLY("Cover Early"); + + private String title; + private ExecutorBlockStrategyEnum (String title) { + this.title = title; + } + + public void setTitle(String title) { + this.title = title; + } + public String getTitle() { + return title; + } + + public static ExecutorBlockStrategyEnum match(String name, ExecutorBlockStrategyEnum defaultItem) { + if (name != null) { + for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) { + if (item.name().equals(name)) { + return item; + } + } + } + return defaultItem; + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java b/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java new file mode 100644 index 0000000..798beae --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java @@ -0,0 +1,13 @@ +package com.xxl.job.core.enums; + +/** + * Created by xuxueli on 17/5/10. + */ +public class RegistryConfig { + + public static final int BEAT_TIMEOUT = 30; + public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3; + + public enum RegistType{ EXECUTOR, ADMIN } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java new file mode 100644 index 0000000..5a38f41 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -0,0 +1,246 @@ +package com.xxl.job.core.executor; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.client.AdminBizClient; +import com.xxl.job.core.biz.impl.ExecutorBizImpl; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.thread.ExecutorRegistryThread; +import com.xxl.job.core.thread.JobLogFileCleanThread; +import com.xxl.job.core.thread.JobThread; +import com.xxl.job.core.thread.TriggerCallbackThread; +import com.xxl.rpc.registry.ServiceRegistry; +import com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer; +import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; +import com.xxl.rpc.serialize.Serializer; +import com.xxl.rpc.serialize.impl.HessianSerializer; +import com.xxl.rpc.util.IpUtil; +import com.xxl.rpc.util.NetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Created by xuxueli on 2016/3/2 21:14. + */ +public class XxlJobExecutor { + private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); + + // ---------------------- param ---------------------- + private String adminAddresses; + private String appName; + private String ip; + private int port; + private String accessToken; + private String logPath; + private int logRetentionDays; + + public void setAdminAddresses(String adminAddresses) { + this.adminAddresses = adminAddresses; + } + public void setAppName(String appName) { + this.appName = appName; + } + public void setIp(String ip) { + this.ip = ip; + } + public void setPort(int port) { + this.port = port; + } + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + public void setLogPath(String logPath) { + this.logPath = logPath; + } + public void setLogRetentionDays(int logRetentionDays) { + this.logRetentionDays = logRetentionDays; + } + + + // ---------------------- start + stop ---------------------- + public void start() throws Exception { + + // init logpath + XxlJobFileAppender.initLogPath(logPath); + + // init invoker, admin-client + initAdminBizList(adminAddresses, accessToken); + + + // init JobLogFileCleanThread + JobLogFileCleanThread.getInstance().start(logRetentionDays); + + // init TriggerCallbackThread + TriggerCallbackThread.getInstance().start(); + + // init executor-server + port = port>0?port: NetUtil.findAvailablePort(9999); + ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); + initRpcProvider(ip, port, appName, accessToken); + } + public void destroy(){ + // destory executor-server + stopRpcProvider(); + + // destory jobThreadRepository + if (jobThreadRepository.size() > 0) { + for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) { + removeJobThread(item.getKey(), "web container destroy and kill the job."); + } + jobThreadRepository.clear(); + } + jobHandlerRepository.clear(); + + + // destory JobLogFileCleanThread + JobLogFileCleanThread.getInstance().toStop(); + + // destory TriggerCallbackThread + TriggerCallbackThread.getInstance().toStop(); + + } + + + // ---------------------- admin-client (rpc invoker) ---------------------- + private static List<AdminBiz> adminBizList; + private static Serializer serializer = new HessianSerializer(); + private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { + if (adminAddresses!=null && adminAddresses.trim().length()>0) { + for (String address: adminAddresses.trim().split(",")) { + if (address!=null && address.trim().length()>0) { + + AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); + + if (adminBizList == null) { + adminBizList = new ArrayList<AdminBiz>(); + } + adminBizList.add(adminBiz); + } + } + } + } + public static List<AdminBiz> getAdminBizList(){ + return adminBizList; + } + public static Serializer getSerializer() { + return serializer; + } + + + // ---------------------- executor-server (rpc provider) ---------------------- + private XxlRpcProviderFactory xxlRpcProviderFactory = null; + + private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception { + + // init, provider factory + String address = IpUtil.getIpPort(ip, port); + Map<String, String> serviceRegistryParam = new HashMap<String, String>(); + serviceRegistryParam.put("appName", appName); + serviceRegistryParam.put("address", address); + + xxlRpcProviderFactory = new XxlRpcProviderFactory(); + + xxlRpcProviderFactory.setServer(NettyHttpServer.class); + xxlRpcProviderFactory.setSerializer(HessianSerializer.class); + xxlRpcProviderFactory.setCorePoolSize(20); + xxlRpcProviderFactory.setMaxPoolSize(200); + xxlRpcProviderFactory.setIp(ip); + xxlRpcProviderFactory.setPort(port); + xxlRpcProviderFactory.setAccessToken(accessToken); + xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class); + xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam); + + // add services + xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); + + // start + xxlRpcProviderFactory.start(); + + } + + public static class ExecutorServiceRegistry extends ServiceRegistry { + + @Override + public void start(Map<String, String> param) { + // start registry + ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address")); + } + @Override + public void stop() { + // stop registry + ExecutorRegistryThread.getInstance().toStop(); + } + + @Override + public boolean registry(Set<String> keys, String value) { + return false; + } + @Override + public boolean remove(Set<String> keys, String value) { + return false; + } + @Override + public Map<String, TreeSet<String>> discovery(Set<String> keys) { + return null; + } + @Override + public TreeSet<String> discovery(String key) { + return null; + } + + } + + private void stopRpcProvider() { + // stop provider factory + try { + xxlRpcProviderFactory.stop(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- job handler repository ---------------------- + private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ + logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return jobHandlerRepository.put(name, jobHandler); + } + public static IJobHandler loadJobHandler(String name){ + return jobHandlerRepository.get(name); + } + + + // ---------------------- job thread repository ---------------------- + private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); + public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + JobThread newJobThread = new JobThread(jobId, handler); + newJobThread.start(); + logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); + + JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } + + return newJobThread; + } + public static void removeJobThread(int jobId, String removeOldReason){ + JobThread oldJobThread = jobThreadRepository.remove(jobId); + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } + } + public static JobThread loadJobThread(int jobId){ + JobThread jobThread = jobThreadRepository.get(jobId); + return jobThread; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java new file mode 100644 index 0000000..205b487 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -0,0 +1,151 @@ +package com.xxl.job.core.executor.impl; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.glue.GlueFactory; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.JobHandler; +import com.xxl.job.core.handler.annotation.XxlJob; +import com.xxl.job.core.handler.impl.MethodJobHandler; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.annotation.AnnotationUtils; + +import java.lang.reflect.Method; +import java.util.Map; + +/** + * xxl-job executor (for spring) + * + * @author xuxueli 2018-11-01 09:24:52 + */ +public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean { + + + // start + @Override + public void afterPropertiesSet() throws Exception { + + // init JobHandler Repository + initJobHandlerRepository(applicationContext); + + // init JobHandler Repository (for method) + initJobHandlerMethodRepository(applicationContext); + + // refresh GlueFactory + GlueFactory.refreshInstance(1); + + // super start + super.start(); + } + + // destroy + @Override + public void destroy() { + super.destroy(); + } + + + private void initJobHandlerRepository(ApplicationContext applicationContext) { + if (applicationContext == null) { + return; + } + + // init job handler action + Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); + + if (serviceBeanMap != null && serviceBeanMap.size() > 0) { + for (Object serviceBean : serviceBeanMap.values()) { + if (serviceBean instanceof IJobHandler) { + String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); + IJobHandler handler = (IJobHandler) serviceBean; + if (loadJobHandler(name) != null) { + throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); + } + registJobHandler(name, handler); + } + } + } + } + + private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { + if (applicationContext == null) { + return; + } + + // init job handler from method + String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); + if (beanDefinitionNames!=null && beanDefinitionNames.length>0) { + for (String beanDefinitionName : beanDefinitionNames) { + Object bean = applicationContext.getBean(beanDefinitionName); + Method[] methods = bean.getClass().getDeclaredMethods(); + for (Method method: methods) { + XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class); + if (xxlJob != null) { + + // name + String name = xxlJob.value(); + if (name.trim().length() == 0) { + throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); + } + if (loadJobHandler(name) != null) { + throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); + } + + // execute method + if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { + throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); + } + if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { + throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); + } + method.setAccessible(true); + + // init and destory + Method initMethod = null; + Method destroyMethod = null; + + if(xxlJob.init().trim().length() > 0) { + try { + initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); + initMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); + } + } + if(xxlJob.destroy().trim().length() > 0) { + try { + destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); + destroyMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); + } + } + + // registry jobhandler + registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); + } + } + } + } + + } + + // ---------------------- applicationContext ---------------------- + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java new file mode 100644 index 0000000..79a83a9 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java @@ -0,0 +1,90 @@ +package com.xxl.job.core.glue; + +import com.xxl.job.core.glue.impl.SpringGlueFactory; +import com.xxl.job.core.handler.IJobHandler; +import groovy.lang.GroovyClassLoader; + +import java.math.BigInteger; +import java.security.MessageDigest; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * glue factory, product class/object by name + * + * @author xuxueli 2016-1-2 20:02:27 + */ +public class GlueFactory { + + + private static GlueFactory glueFactory = new GlueFactory(); + public static GlueFactory getInstance(){ + return glueFactory; + } + public static void refreshInstance(int type){ + if (type == 0) { + glueFactory = new GlueFactory(); + } else if (type == 1) { + glueFactory = new SpringGlueFactory(); + } + } + + + /** + * groovy class loader + */ + private GroovyClassLoader groovyClassLoader = new GroovyClassLoader(); + private ConcurrentMap<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>(); + + /** + * load new instance, prototype + * + * @param codeSource + * @return + * @throws Exception + */ + public IJobHandler loadNewInstance(String codeSource) throws Exception{ + if (codeSource!=null && codeSource.trim().length()>0) { + Class<?> clazz = getCodeSourceClass(codeSource); + if (clazz != null) { + Object instance = clazz.newInstance(); + if (instance!=null) { + if (instance instanceof IJobHandler) { + this.injectService(instance); + return (IJobHandler) instance; + } else { + throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, " + + "cannot convert from instance["+ instance.getClass() +"] to IJobHandler"); + } + } + } + } + throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null"); + } + private Class<?> getCodeSourceClass(String codeSource){ + try { + // md5 + byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes()); + String md5Str = new BigInteger(1, md5).toString(16); + + Class<?> clazz = CLASS_CACHE.get(md5Str); + if(clazz == null){ + clazz = groovyClassLoader.parseClass(codeSource); + CLASS_CACHE.putIfAbsent(md5Str, clazz); + } + return clazz; + } catch (Exception e) { + return groovyClassLoader.parseClass(codeSource); + } + } + + /** + * inject service of bean field + * + * @param instance + */ + public void injectService(Object instance) { + // do something + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueTypeEnum.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueTypeEnum.java new file mode 100644 index 0000000..a5c835c --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueTypeEnum.java @@ -0,0 +1,53 @@ +package com.xxl.job.core.glue; + +/** + * Created by xuxueli on 17/4/26. + */ +public enum GlueTypeEnum { + + BEAN("BEAN", false, null, null), + GLUE_GROOVY("GLUE(Java)", false, null, null), + GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"), + GLUE_PYTHON("GLUE(Python)", true, "python", ".py"), + GLUE_PHP("GLUE(PHP)", true, "php", ".php"), + GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js"), + GLUE_POWERSHELL("GLUE(PowerShell)", true, "powershell", ".ps1"); + + private String desc; + private boolean isScript; + private String cmd; + private String suffix; + + private GlueTypeEnum(String desc, boolean isScript, String cmd, String suffix) { + this.desc = desc; + this.isScript = isScript; + this.cmd = cmd; + this.suffix = suffix; + } + + public String getDesc() { + return desc; + } + + public boolean isScript() { + return isScript; + } + + public String getCmd() { + return cmd; + } + + public String getSuffix() { + return suffix; + } + + public static GlueTypeEnum match(String name){ + for (GlueTypeEnum item: GlueTypeEnum.values()) { + if (item.name().equals(name)) { + return item; + } + } + return null; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/impl/SpringGlueFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/impl/SpringGlueFactory.java new file mode 100644 index 0000000..37e44d5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/impl/SpringGlueFactory.java @@ -0,0 +1,80 @@ +package com.xxl.job.core.glue.impl; + +import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; +import com.xxl.job.core.glue.GlueFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.annotation.AnnotationUtils; + +import javax.annotation.Resource; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +/** + * @author xuxueli 2018-11-01 + */ +public class SpringGlueFactory extends GlueFactory { + private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class); + + + /** + * inject action of spring + * @param instance + */ + @Override + public void injectService(Object instance){ + if (instance==null) { + return; + } + + if (XxlJobSpringExecutor.getApplicationContext() == null) { + return; + } + + Field[] fields = instance.getClass().getDeclaredFields(); + for (Field field : fields) { + if (Modifier.isStatic(field.getModifiers())) { + continue; + } + + Object fieldBean = null; + // with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired + + if (AnnotationUtils.getAnnotation(field, Resource.class) != null) { + try { + Resource resource = AnnotationUtils.getAnnotation(field, Resource.class); + if (resource.name()!=null && resource.name().length()>0){ + fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name()); + } else { + fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName()); + } + } catch (Exception e) { + } + if (fieldBean==null ) { + fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType()); + } + } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) { + Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class); + if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) { + fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value()); + } else { + fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType()); + } + } + + if (fieldBean!=null) { + field.setAccessible(true); + try { + field.set(instance, fieldBean); + } catch (IllegalArgumentException e) { + logger.error(e.getMessage(), e); + } catch (IllegalAccessException e) { + logger.error(e.getMessage(), e); + } + } + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java new file mode 100644 index 0000000..cc81d32 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java @@ -0,0 +1,49 @@ +package com.xxl.job.core.handler; + +import com.xxl.job.core.biz.model.ReturnT; + +import java.lang.reflect.InvocationTargetException; + +/** + * job handler + * + * @author xuxueli 2015-12-19 19:06:38 + */ +public abstract class IJobHandler { + + + /** success */ + public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null); + /** fail */ + public static final ReturnT<String> FAIL = new ReturnT<String>(500, null); + /** fail timeout */ + public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null); + + + /** + * execute handler, invoked when executor receives a scheduling request + * + * @param param + * @return + * @throws Exception + */ + public abstract ReturnT<String> execute(String param) throws Exception; + + + /** + * init handler, invoked when JobThread init + */ + public void init() throws InvocationTargetException, IllegalAccessException { + // do something + } + + + /** + * destroy handler, invoked when JobThread destroy + */ + public void destroy() throws InvocationTargetException, IllegalAccessException { + // do something + } + + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/JobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/JobHandler.java new file mode 100644 index 0000000..5fdef69 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/JobHandler.java @@ -0,0 +1,24 @@ +package com.xxl.job.core.handler.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * annotation for job handler + * + * will be replaced by {@link com.xxl.job.core.handler.annotation.XxlJob} + * + * @author 2016-5-17 21:06:49 + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Deprecated +public @interface JobHandler { + + String value(); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/XxlJob.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/XxlJob.java new file mode 100644 index 0000000..33fb675 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/annotation/XxlJob.java @@ -0,0 +1,30 @@ +package com.xxl.job.core.handler.annotation; + +import java.lang.annotation.*; + +/** + * annotation for method jobhandler + * + * @author xuxueli 2019-12-11 20:50:13 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface XxlJob { + + /** + * jobhandler name + */ + String value(); + + /** + * init handler, invoked when JobThread init + */ + String init() default ""; + + /** + * destroy handler, invoked when JobThread destroy + */ + String destroy() default ""; + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java new file mode 100644 index 0000000..28c4a91 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java @@ -0,0 +1,30 @@ +package com.xxl.job.core.handler.impl; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobLogger; + +/** + * glue job handler + * + * @author xuxueli 2016-5-19 21:05:45 + */ +public class GlueJobHandler extends IJobHandler { + + private long glueUpdatetime; + private IJobHandler jobHandler; + public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) { + this.jobHandler = jobHandler; + this.glueUpdatetime = glueUpdatetime; + } + public long getGlueUpdatetime() { + return glueUpdatetime; + } + + @Override + public ReturnT<String> execute(String param) throws Exception { + XxlJobLogger.log("----------- glue.version:"+ glueUpdatetime +" -----------"); + return jobHandler.execute(param); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/MethodJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/MethodJobHandler.java new file mode 100644 index 0000000..620e47b --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/MethodJobHandler.java @@ -0,0 +1,50 @@ +package com.xxl.job.core.handler.impl; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * @author xuxueli 2019-12-11 21:12:18 + */ +public class MethodJobHandler extends IJobHandler { + + private final Object target; + private final Method method; + private Method initMethod; + private Method destroyMethod; + + public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) { + this.target = target; + this.method = method; + + this.initMethod =initMethod; + this.destroyMethod =destroyMethod; + } + + @Override + public ReturnT<String> execute(String param) throws Exception { + return (ReturnT<String>) method.invoke(target, new Object[]{param}); + } + + @Override + public void init() throws InvocationTargetException, IllegalAccessException { + if(initMethod != null) { + initMethod.invoke(target); + } + } + + @Override + public void destroy() throws InvocationTargetException, IllegalAccessException { + if(destroyMethod != null) { + destroyMethod.invoke(target); + } + } + + @Override + public String toString() { + return super.toString()+"["+ target.getClass() + "#" + method.getName() +"]"; + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java new file mode 100644 index 0000000..452c387 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java @@ -0,0 +1,92 @@ +package com.xxl.job.core.handler.impl; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.glue.GlueTypeEnum; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ScriptUtil; +import com.xxl.job.core.util.ShardingUtil; + +import java.io.File; + +/** + * Created by xuxueli on 17/4/27. + */ +public class ScriptJobHandler extends IJobHandler { + + private int jobId; + private long glueUpdatetime; + private String gluesource; + private GlueTypeEnum glueType; + + public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){ + this.jobId = jobId; + this.glueUpdatetime = glueUpdatetime; + this.gluesource = gluesource; + this.glueType = glueType; + + // clean old script file + File glueSrcPath = new File(XxlJobFileAppender.getGlueSrcPath()); + if (glueSrcPath.exists()) { + File[] glueSrcFileList = glueSrcPath.listFiles(); + if (glueSrcFileList!=null && glueSrcFileList.length>0) { + for (File glueSrcFileItem : glueSrcFileList) { + if (glueSrcFileItem.getName().startsWith(String.valueOf(jobId)+"_")) { + glueSrcFileItem.delete(); + } + } + } + } + + } + + public long getGlueUpdatetime() { + return glueUpdatetime; + } + + @Override + public ReturnT<String> execute(String param) throws Exception { + + if (!glueType.isScript()) { + return new ReturnT<String>(IJobHandler.FAIL.getCode(), "glueType["+ glueType +"] invalid."); + } + + // cmd + String cmd = glueType.getCmd(); + + // make script file + String scriptFileName = XxlJobFileAppender.getGlueSrcPath() + .concat(File.separator) + .concat(String.valueOf(jobId)) + .concat("_") + .concat(String.valueOf(glueUpdatetime)) + .concat(glueType.getSuffix()); + File scriptFile = new File(scriptFileName); + if (!scriptFile.exists()) { + ScriptUtil.markScriptFile(scriptFileName, gluesource); + } + + // log file + String logFileName = XxlJobFileAppender.contextHolder.get(); + + // script params:0=param、1=分片序号、2=分片总数 + ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); + String[] scriptParams = new String[3]; + scriptParams[0] = param; + scriptParams[1] = String.valueOf(shardingVO.getIndex()); + scriptParams[2] = String.valueOf(shardingVO.getTotal()); + + // invoke + XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------"); + int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams); + + if (exitValue == 0) { + return IJobHandler.SUCCESS; + } else { + return new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed"); + } + + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java new file mode 100644 index 0000000..3fa8c4d --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -0,0 +1,225 @@ +package com.xxl.job.core.log; + +import com.xxl.job.core.biz.model.LogResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * store trigger log in each log-file + * @author xuxueli 2016-3-12 19:25:12 + */ +public class XxlJobFileAppender { + private static Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class); + + // for JobThread (support log for child thread of job handler) + //public static ThreadLocal<String> contextHolder = new ThreadLocal<String>(); + public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>(); + + + /** + * log base path + * + * strut like: + * ---/ + * ---/gluesource/ + * ---/gluesource/10_1514171108000.js + * ---/gluesource/10_1514171108000.js + * ---/2017-12-25/ + * ---/2017-12-25/639.log + * ---/2017-12-25/821.log + * + */ + private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; + private static String glueSrcPath = logBasePath.concat("/gluesource"); + public static void initLogPath(String logPath){ + // init + if (logPath!=null && logPath.trim().length()>0) { + logBasePath = logPath; + } + // mk base dir + File logPathDir = new File(logBasePath); + if (!logPathDir.exists()) { + logPathDir.mkdirs(); + } + logBasePath = logPathDir.getPath(); + + // mk glue dir + File glueBaseDir = new File(logPathDir, "gluesource"); + if (!glueBaseDir.exists()) { + glueBaseDir.mkdirs(); + } + glueSrcPath = glueBaseDir.getPath(); + } + public static String getLogPath() { + return logBasePath; + } + public static String getGlueSrcPath() { + return glueSrcPath; + } + + /** + * log filename, like "logPath/yyyy-MM-dd/9999.log" + * + * @param triggerDate + * @param logId + * @return + */ + public static String makeLogFileName(Date triggerDate, long logId) { + + // filePath/yyyy-MM-dd + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be static + File logFilePath = new File(getLogPath(), sdf.format(triggerDate)); + if (!logFilePath.exists()) { + logFilePath.mkdir(); + } + + // filePath/yyyy-MM-dd/9999.log + String logFileName = logFilePath.getPath() + .concat(File.separator) + .concat(String.valueOf(logId)) + .concat(".log"); + return logFileName; + } + + /** + * append log + * + * @param logFileName + * @param appendLog + */ + public static void appendLog(String logFileName, String appendLog) { + + // log file + if (logFileName==null || logFileName.trim().length()==0) { + return; + } + File logFile = new File(logFileName); + + if (!logFile.exists()) { + try { + logFile.createNewFile(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + return; + } + } + + // log + if (appendLog == null) { + appendLog = ""; + } + appendLog += "\r\n"; + + // append file content + FileOutputStream fos = null; + try { + fos = new FileOutputStream(logFile, true); + fos.write(appendLog.getBytes("utf-8")); + fos.flush(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + } + + /** + * support read log-file + * + * @param logFileName + * @return log content + */ + public static LogResult readLog(String logFileName, int fromLineNum){ + + // valid log file + if (logFileName==null || logFileName.trim().length()==0) { + return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); + } + File logFile = new File(logFileName); + + if (!logFile.exists()) { + return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); + } + + // read file + StringBuffer logContentBuffer = new StringBuffer(); + int toLineNum = 0; + LineNumberReader reader = null; + try { + //reader = new LineNumberReader(new FileReader(logFile)); + reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8")); + String line = null; + + while ((line = reader.readLine())!=null) { + toLineNum = reader.getLineNumber(); // [from, to], start as 1 + if (toLineNum >= fromLineNum) { + logContentBuffer.append(line).append("\n"); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + // result + LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false); + return logResult; + + /* + // it will return the number of characters actually skipped + reader.skip(Long.MAX_VALUE); + int maxLineNum = reader.getLineNumber(); + maxLineNum++; // 最大行号 + */ + } + + /** + * read log data + * @param logFile + * @return log line content + */ + public static String readLines(File logFile){ + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFile), "utf-8")); + if (reader != null) { + StringBuilder sb = new StringBuilder(); + String line = null; + while ((line = reader.readLine()) != null) { + sb.append(line).append("\n"); + } + return sb.toString(); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + return null; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobLogger.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobLogger.java new file mode 100644 index 0000000..caf47b9 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobLogger.java @@ -0,0 +1,84 @@ +package com.xxl.job.core.log; + +import com.xxl.job.core.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Date; + +/** + * Created by xuxueli on 17/4/28. + */ +public class XxlJobLogger { + private static Logger logger = LoggerFactory.getLogger("xxl-job logger"); + + /** + * append log + * + * @param callInfo + * @param appendLog + */ + private static void logDetail(StackTraceElement callInfo, String appendLog) { + + + /*// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log"; + StackTraceElement[] stackTraceElements = new Throwable().getStackTrace(); + StackTraceElement callInfo = stackTraceElements[1];*/ + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ") + .append("["+ callInfo.getClassName() + "#" + callInfo.getMethodName() +"]").append("-") + .append("["+ callInfo.getLineNumber() +"]").append("-") + .append("["+ Thread.currentThread().getName() +"]").append(" ") + .append(appendLog!=null?appendLog:""); + String formatAppendLog = stringBuffer.toString(); + + // appendlog + String logFileName = XxlJobFileAppender.contextHolder.get(); + if (logFileName!=null && logFileName.trim().length()>0) { + XxlJobFileAppender.appendLog(logFileName, formatAppendLog); + } else { + logger.info(">>>>>>>>>>> {}", formatAppendLog); + } + } + + /** + * append log with pattern + * + * @param appendLogPattern like "aaa {} bbb {} ccc" + * @param appendLogArguments like "111, true" + */ + public static void log(String appendLogPattern, Object ... appendLogArguments) { + + FormattingTuple ft = MessageFormatter.arrayFormat(appendLogPattern, appendLogArguments); + String appendLog = ft.getMessage(); + + /*appendLog = appendLogPattern; + if (appendLogArguments!=null && appendLogArguments.length>0) { + appendLog = MessageFormat.format(appendLogPattern, appendLogArguments); + }*/ + + StackTraceElement callInfo = new Throwable().getStackTrace()[1]; + logDetail(callInfo, appendLog); + } + + /** + * append exception stack + * + * @param e + */ + public static void log(Throwable e) { + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String appendLog = stringWriter.toString(); + + StackTraceElement callInfo = new Throwable().getStackTrace()[1]; + logDetail(callInfo, appendLog); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java new file mode 100644 index 0000000..b7ae944 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -0,0 +1,125 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.executor.XxlJobExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Created by xuxueli on 17/3/2. + */ +public class ExecutorRegistryThread { + private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class); + + private static ExecutorRegistryThread instance = new ExecutorRegistryThread(); + public static ExecutorRegistryThread getInstance(){ + return instance; + } + + private Thread registryThread; + private volatile boolean toStop = false; + public void start(final String appName, final String address){ + + // valid + if (appName==null || appName.trim().length()==0) { + logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null."); + return; + } + if (XxlJobExecutor.getAdminBizList() == null) { + logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); + return; + } + + registryThread = new Thread(new Runnable() { + @Override + public void run() { + + // registry + while (!toStop) { + try { + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address); + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { + try { + ReturnT<String> registryResult = adminBiz.registry(registryParam); + if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { + registryResult = ReturnT.SUCCESS; + logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + break; + } else { + logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + } + } catch (Exception e) { + logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + + } + + try { + if (!toStop) { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } + } catch (InterruptedException e) { + if (!toStop) { + logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); + } + } + } + + // registry remove + try { + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address); + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { + try { + ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); + if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { + registryResult = ReturnT.SUCCESS; + logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + break; + } else { + logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + } + } catch (Exception e) { + if (!toStop) { + logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); + } + + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory."); + + } + }); + registryThread.setDaemon(true); + registryThread.setName("xxl-job, executor ExecutorRegistryThread"); + registryThread.start(); + } + + public void toStop() { + toStop = true; + // interrupt and wait + registryThread.interrupt(); + try { + registryThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java new file mode 100644 index 0000000..8b98371 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java @@ -0,0 +1,124 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * job file clean thread + * + * @author xuxueli 2017-12-29 16:23:43 + */ +public class JobLogFileCleanThread { + private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class); + + private static JobLogFileCleanThread instance = new JobLogFileCleanThread(); + public static JobLogFileCleanThread getInstance(){ + return instance; + } + + private Thread localThread; + private volatile boolean toStop = false; + public void start(final long logRetentionDays){ + + // limit min value + if (logRetentionDays < 3 ) { + return; + } + + localThread = new Thread(new Runnable() { + @Override + public void run() { + while (!toStop) { + try { + // clean log dir, over logRetentionDays + File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); + if (childDirs!=null && childDirs.length>0) { + + // today + Calendar todayCal = Calendar.getInstance(); + todayCal.set(Calendar.HOUR_OF_DAY,0); + todayCal.set(Calendar.MINUTE,0); + todayCal.set(Calendar.SECOND,0); + todayCal.set(Calendar.MILLISECOND,0); + + Date todayDate = todayCal.getTime(); + + for (File childFile: childDirs) { + + // valid + if (!childFile.isDirectory()) { + continue; + } + if (childFile.getName().indexOf("-") == -1) { + continue; + } + + // file create date + Date logFileCreateDate = null; + try { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); + logFileCreateDate = simpleDateFormat.parse(childFile.getName()); + } catch (ParseException e) { + logger.error(e.getMessage(), e); + } + if (logFileCreateDate == null) { + continue; + } + + if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { + FileUtil.deleteRecursively(childFile); + } + + } + } + + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + + } + + try { + TimeUnit.DAYS.sleep(1); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + } + logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory."); + + } + }); + localThread.setDaemon(true); + localThread.setName("xxl-job, executor JobLogFileCleanThread"); + localThread.start(); + } + + public void toStop() { + toStop = true; + + if (localThread == null) { + return; + } + + // interrupt and wait + localThread.interrupt(); + try { + localThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java new file mode 100644 index 0000000..126fd14 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -0,0 +1,218 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ShardingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; + + +/** + * handler thread + * @author xuxueli 2016-1-16 19:52:47 + */ +public class JobThread extends Thread{ + private static Logger logger = LoggerFactory.getLogger(JobThread.class); + + private int jobId; + private IJobHandler handler; + private LinkedBlockingQueue<TriggerParam> triggerQueue; + private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID + + private volatile boolean toStop = false; + private String stopReason; + + private boolean running = false; // if running job + private int idleTimes = 0; // idel times + + + public JobThread(int jobId, IJobHandler handler) { + this.jobId = jobId; + this.handler = handler; + this.triggerQueue = new LinkedBlockingQueue<TriggerParam>(); + this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>()); + } + public IJobHandler getHandler() { + return handler; + } + + /** + * new trigger to queue + * + * @param triggerParam + * @return + */ + public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { + // avoid repeat + if (triggerLogIdSet.contains(triggerParam.getLogId())) { + logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); + return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); + } + + triggerLogIdSet.add(triggerParam.getLogId()); + triggerQueue.add(triggerParam); + return ReturnT.SUCCESS; + } + + /** + * kill job thread + * + * @param stopReason + */ + public void toStop(String stopReason) { + /** + * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), + * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; + * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; + */ + this.toStop = true; + this.stopReason = stopReason; + } + + /** + * is running job + * @return + */ + public boolean isRunningOrHasQueue() { + return running || triggerQueue.size()>0; + } + + @Override + public void run() { + + // init + try { + handler.init(); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + // execute + while(!toStop){ + running = false; + idleTimes++; + + TriggerParam triggerParam = null; + ReturnT<String> executeResult = null; + try { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + running = true; + idleTimes = 0; + triggerLogIdSet.remove(triggerParam.getLogId()); + + // log filename, like "logPath/yyyy-MM-dd/9999.log" + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); + XxlJobFileAppender.contextHolder.set(logFileName); + ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); + + // execute + XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); + + if (triggerParam.getExecutorTimeout() > 0) { + // limit timeout + Thread futureThread = null; + try { + final TriggerParam triggerParamTmp = triggerParam; + FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { + @Override + public ReturnT<String> call() throws Exception { + return handler.execute(triggerParamTmp.getExecutorParams()); + } + }); + futureThread = new Thread(futureTask); + futureThread.start(); + + executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); + } catch (TimeoutException e) { + + XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); + XxlJobLogger.log(e); + + executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); + } finally { + futureThread.interrupt(); + } + } else { + // just execute + executeResult = handler.execute(triggerParam.getExecutorParams()); + } + + if (executeResult == null) { + executeResult = IJobHandler.FAIL; + } else { + executeResult.setMsg( + (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) + ?executeResult.getMsg().substring(0, 50000).concat("...") + :executeResult.getMsg()); + executeResult.setContent(null); // limit obj size + } + XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); + + } else { + if (idleTimes > 30) { + if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } + } + } + } catch (Throwable e) { + if (toStop) { + XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); + } finally { + if(triggerParam != null) { + // callback handler info + if (!toStop) { + // commonm + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); + } else { + // is killed + ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); + } + } + } + } + + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { + // is killed + ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); + } + } + + // destroy + try { + handler.destroy(); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java new file mode 100644 index 0000000..d4f784e --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -0,0 +1,246 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Created by xuxueli on 16/7/22. + */ +public class TriggerCallbackThread { + private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); + + private static TriggerCallbackThread instance = new TriggerCallbackThread(); + public static TriggerCallbackThread getInstance(){ + return instance; + } + + /** + * job results callback queue + */ + private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); + public static void pushCallBack(HandleCallbackParam callback){ + getInstance().callBackQueue.add(callback); + logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); + } + + /** + * callback thread + */ + private Thread triggerCallbackThread; + private Thread triggerRetryCallbackThread; + private volatile boolean toStop = false; + public void start() { + + // valid + if (XxlJobExecutor.getAdminBizList() == null) { + logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); + return; + } + + // callback + triggerCallbackThread = new Thread(new Runnable() { + + @Override + public void run() { + + // normal callback + while(!toStop){ + try { + HandleCallbackParam callback = getInstance().callBackQueue.take(); + if (callback != null) { + + // callback list param + List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + callbackParamList.add(callback); + + // callback, will retry if error + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); + } + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + } + + // last callback + try { + List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); + + } + }); + triggerCallbackThread.setDaemon(true); + triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); + triggerCallbackThread.start(); + + + // retry + triggerRetryCallbackThread = new Thread(new Runnable() { + @Override + public void run() { + while(!toStop){ + try { + retryFailCallbackFile(); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + + } + try { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + } + logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); + } + }); + triggerRetryCallbackThread.setDaemon(true); + triggerRetryCallbackThread.start(); + + } + public void toStop(){ + toStop = true; + // stop callback, interrupt and wait + if (triggerCallbackThread != null) { // support empty admin address + triggerCallbackThread.interrupt(); + try { + triggerCallbackThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + // stop retry, interrupt and wait + if (triggerRetryCallbackThread != null) { + triggerRetryCallbackThread.interrupt(); + try { + triggerRetryCallbackThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + } + + /** + * do callback, will retry if error + * @param callbackParamList + */ + private void doCallback(List<HandleCallbackParam> callbackParamList){ + boolean callbackRet = false; + // callback, will retry if error + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { + try { + ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); + if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); + callbackRet = true; + break; + } else { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); + } + } catch (Exception e) { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); + } + } + if (!callbackRet) { + appendFailCallbackFile(callbackParamList); + } + } + + /** + * callback log + */ + private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){ + for (HandleCallbackParam callbackParam: callbackParamList) { + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); + XxlJobFileAppender.contextHolder.set(logFileName); + XxlJobLogger.log(logContent); + } + } + + + // ---------------------- fail-callback file ---------------------- + + private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator); + private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log"); + + private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ + // valid + if (callbackParamList==null || callbackParamList.size()==0) { + return; + } + + // append file + byte[] callbackParamList_bytes = XxlJobExecutor.getSerializer().serialize(callbackParamList); + + File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); + if (callbackLogFile.exists()) { + for (int i = 0; i < 100; i++) { + callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); + if (!callbackLogFile.exists()) { + break; + } + } + } + FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); + } + + private void retryFailCallbackFile(){ + + // valid + File callbackLogPath = new File(failCallbackFilePath); + if (!callbackLogPath.exists()) { + return; + } + if (callbackLogPath.isFile()) { + callbackLogPath.delete(); + } + if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { + return; + } + + // load and clear file, retry + for (File callbaclLogFile: callbackLogPath.listFiles()) { + byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); + List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) XxlJobExecutor.getSerializer().deserialize(callbackParamList_bytes, HandleCallbackParam.class); + + callbaclLogFile.delete(); + doCallback(callbackParamList); + } + + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/DateUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/DateUtil.java new file mode 100644 index 0000000..12fab17 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/DateUtil.java @@ -0,0 +1,148 @@ +package com.xxl.job.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * date util + * + * @author xuxueli 2018-08-19 01:24:11 + */ +public class DateUtil { + + // ---------------------- format parse ---------------------- + private static Logger logger = LoggerFactory.getLogger(DateUtil.class); + + private static final String DATE_FORMAT = "yyyy-MM-dd"; + private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + private static final ThreadLocal<Map<String, DateFormat>> dateFormatThreadLocal = new ThreadLocal<Map<String, DateFormat>>(); + private static DateFormat getDateFormat(String pattern) { + if (pattern==null || pattern.trim().length()==0) { + throw new IllegalArgumentException("pattern cannot be empty."); + } + + Map<String, DateFormat> dateFormatMap = dateFormatThreadLocal.get(); + if(dateFormatMap!=null && dateFormatMap.containsKey(pattern)){ + return dateFormatMap.get(pattern); + } + + synchronized (dateFormatThreadLocal) { + if (dateFormatMap == null) { + dateFormatMap = new HashMap<String, DateFormat>(); + } + dateFormatMap.put(pattern, new SimpleDateFormat(pattern)); + dateFormatThreadLocal.set(dateFormatMap); + } + + return dateFormatMap.get(pattern); + } + + /** + * format datetime. like "yyyy-MM-dd" + * + * @param date + * @return + * @throws ParseException + */ + public static String formatDate(Date date) { + return format(date, DATE_FORMAT); + } + + /** + * format date. like "yyyy-MM-dd HH:mm:ss" + * + * @param date + * @return + * @throws ParseException + */ + public static String formatDateTime(Date date) { + return format(date, DATETIME_FORMAT); + } + + /** + * format date + * + * @param date + * @param patten + * @return + * @throws ParseException + */ + public static String format(Date date, String patten) { + return getDateFormat(patten).format(date); + } + + /** + * parse date string, like "yyyy-MM-dd HH:mm:s" + * + * @param dateString + * @return + * @throws ParseException + */ + public static Date parseDate(String dateString){ + return parse(dateString, DATE_FORMAT); + } + + /** + * parse datetime string, like "yyyy-MM-dd HH:mm:ss" + * + * @param dateString + * @return + * @throws ParseException + */ + public static Date parseDateTime(String dateString) { + return parse(dateString, DATETIME_FORMAT); + } + + /** + * parse date + * + * @param dateString + * @param pattern + * @return + * @throws ParseException + */ + public static Date parse(String dateString, String pattern) { + try { + Date date = getDateFormat(pattern).parse(dateString); + return date; + } catch (Exception e) { + logger.warn("parse date error, dateString = {}, pattern={}; errorMsg = {}", dateString, pattern, e.getMessage()); + return null; + } + } + + + // ---------------------- add date ---------------------- + + public static Date addYears(final Date date, final int amount) { + return add(date, Calendar.YEAR, amount); + } + + public static Date addMonths(final Date date, final int amount) { + return add(date, Calendar.MONTH, amount); + } + + public static Date addDays(final Date date, final int amount) { + return add(date, Calendar.DAY_OF_MONTH, amount); + } + + private static Date add(final Date date, final int calendarField, final int amount) { + if (date == null) { + return null; + } + final Calendar c = Calendar.getInstance(); + c.setTime(date); + c.add(calendarField, amount); + return c.getTime(); + } + +}
\ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java new file mode 100644 index 0000000..9d27af7 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java @@ -0,0 +1,179 @@ +package com.xxl.job.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * file tool + * + * @author xuxueli 2017-12-29 17:56:48 + */ +public class FileUtil { + private static Logger logger = LoggerFactory.getLogger(FileUtil.class); + + + /** + * delete recursively + * + * @param root + * @return + */ + public static boolean deleteRecursively(File root) { + if (root != null && root.exists()) { + if (root.isDirectory()) { + File[] children = root.listFiles(); + if (children != null) { + for (File child : children) { + deleteRecursively(child); + } + } + } + return root.delete(); + } + return false; + } + + + public static void deleteFile(String fileName) { + // file + File file = new File(fileName); + if (file.exists()) { + file.delete(); + } + } + + + public static void writeFileContent(File file, byte[] data) { + + // file + if (!file.exists()) { + file.getParentFile().mkdirs(); + } + + // append file content + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file); + fos.write(data); + fos.flush(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + } + + public static byte[] readFileContent(File file) { + Long filelength = file.length(); + byte[] filecontent = new byte[filelength.intValue()]; + + FileInputStream in = null; + try { + in = new FileInputStream(file); + in.read(filecontent); + in.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + return filecontent; + } + + + /*public static void appendFileLine(String fileName, String content) { + + // file + File file = new File(fileName); + if (!file.exists()) { + try { + file.createNewFile(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + return; + } + } + + // content + if (content == null) { + content = ""; + } + content += "\r\n"; + + // append file content + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file, true); + fos.write(content.getBytes("utf-8")); + fos.flush(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + } + + public static List<String> loadFileLines(String fileName){ + + List<String> result = new ArrayList<>(); + + // valid log file + File file = new File(fileName); + if (!file.exists()) { + return result; + } + + // read file + StringBuffer logContentBuffer = new StringBuffer(); + int toLineNum = 0; + LineNumberReader reader = null; + try { + //reader = new LineNumberReader(new FileReader(logFile)); + reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8")); + String line = null; + while ((line = reader.readLine())!=null) { + if (line!=null && line.trim().length()>0) { + result.add(line); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + return result; + }*/ + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java new file mode 100644 index 0000000..9fd0b4d --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java @@ -0,0 +1,228 @@ +package com.xxl.job.core.util; + +import com.xxl.job.core.log.XxlJobLogger; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * 1、内嵌编译器如"PythonInterpreter"无法引用扩展包,因此推荐使用java调用控制台进程方式"Runtime.getRuntime().exec()"来运行脚本(shell或python); + * 2、因为通过java调用控制台进程方式实现,需要保证目标机器PATH路径正确配置对应编译器; + * 3、暂时脚本执行日志只能在脚本执行结束后一次性获取,无法保证实时性;因此为确保日志实时性,可改为将脚本打印的日志存储在指定的日志文件上; + * 4、python 异常输出优先级高于标准输出,体现在Log文件中,因此推荐通过logging方式打日志保持和异常信息一致;否则用prinf日志顺序会错乱 + * + * Created by xuxueli on 17/2/25. + */ +public class ScriptUtil { + + /** + * make script file + * + * @param scriptFileName + * @param content + * @throws IOException + */ + public static void markScriptFile(String scriptFileName, String content) throws IOException { + // make file, filePath/gluesource/666-123456789.py + FileOutputStream fileOutputStream = null; + try { + fileOutputStream = new FileOutputStream(scriptFileName); + fileOutputStream.write(content.getBytes("UTF-8")); + fileOutputStream.close(); + } catch (Exception e) { + throw e; + }finally{ + if(fileOutputStream != null){ + fileOutputStream.close(); + } + } + } + + /** + * 脚本执行,日志文件实时输出 + * + * @param command + * @param scriptFile + * @param logFile + * @param params + * @return + * @throws IOException + */ + public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException { + + FileOutputStream fileOutputStream = null; + Thread inputThread = null; + Thread errThread = null; + try { + // file + fileOutputStream = new FileOutputStream(logFile, true); + + // command + List<String> cmdarray = new ArrayList<>(); + cmdarray.add(command); + cmdarray.add(scriptFile); + if (params!=null && params.length>0) { + for (String param:params) { + cmdarray.add(param); + } + } + String[] cmdarrayFinal = cmdarray.toArray(new String[cmdarray.size()]); + + // process-exec + final Process process = Runtime.getRuntime().exec(cmdarrayFinal); + + // log-thread + final FileOutputStream finalFileOutputStream = fileOutputStream; + inputThread = new Thread(new Runnable() { + @Override + public void run() { + try { + copy(process.getInputStream(), finalFileOutputStream, new byte[1024]); + } catch (IOException e) { + XxlJobLogger.log(e); + } + } + }); + errThread = new Thread(new Runnable() { + @Override + public void run() { + try { + copy(process.getErrorStream(), finalFileOutputStream, new byte[1024]); + } catch (IOException e) { + XxlJobLogger.log(e); + } + } + }); + inputThread.start(); + errThread.start(); + + // process-wait + int exitValue = process.waitFor(); // exit code: 0=success, 1=error + + // log-thread join + inputThread.join(); + errThread.join(); + + return exitValue; + } catch (Exception e) { + XxlJobLogger.log(e); + return -1; + } finally { + if (fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + XxlJobLogger.log(e); + } + + } + if (inputThread != null && inputThread.isAlive()) { + inputThread.interrupt(); + } + if (errThread != null && errThread.isAlive()) { + errThread.interrupt(); + } + } + } + + /** + * 数据流Copy(Input自动关闭,Output不处理) + * + * @param inputStream + * @param outputStream + * @param buffer + * @return + * @throws IOException + */ + private static long copy(InputStream inputStream, OutputStream outputStream, byte[] buffer) throws IOException { + try { + long total = 0; + for (;;) { + int res = inputStream.read(buffer); + if (res == -1) { + break; + } + if (res > 0) { + total += res; + if (outputStream != null) { + outputStream.write(buffer, 0, res); + } + } + } + outputStream.flush(); + //out = null; + inputStream.close(); + inputStream = null; + return total; + } finally { + if (inputStream != null) { + inputStream.close(); + } + } + } + + /** + * 脚本执行,日志文件实时输出 + * + * 优点:支持将目标数据实时输出到指定日志文件中去 + * 缺点: + * 标准输出和错误输出优先级固定,可能和脚本中顺序不一致 + * Java无法实时获取 + * + * <!-- commons-exec --> + * <dependency> + * <groupId>org.apache.commons</groupId> + * <artifactId>commons-exec</artifactId> + * <version>${commons-exec.version}</version> + * </dependency> + * + * @param command + * @param scriptFile + * @param logFile + * @param params + * @return + * @throws IOException + */ + /*public static int execToFileB(String command, String scriptFile, String logFile, String... params) throws IOException { + // 标准输出:print (null if watchdog timeout) + // 错误输出:logging + 异常 (still exists if watchdog timeout) + // 标准输入 + + FileOutputStream fileOutputStream = null; // + try { + fileOutputStream = new FileOutputStream(logFile, true); + PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null); + + // command + CommandLine commandline = new CommandLine(command); + commandline.addArgument(scriptFile); + if (params!=null && params.length>0) { + commandline.addArguments(params); + } + + // exec + DefaultExecutor exec = new DefaultExecutor(); + exec.setExitValues(null); + exec.setStreamHandler(streamHandler); + int exitValue = exec.execute(commandline); // exit code: 0=success, 1=error + return exitValue; + } catch (Exception e) { + XxlJobLogger.log(e); + return -1; + } finally { + if (fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + XxlJobLogger.log(e); + } + + } + } + }*/ + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java new file mode 100644 index 0000000..cfd35b9 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java @@ -0,0 +1,46 @@ +package com.xxl.job.core.util; + +/** + * sharding vo + * @author xuxueli 2017-07-25 21:26:38 + */ +public class ShardingUtil { + + private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>(); + + public static class ShardingVO { + + private int index; // sharding index + private int total; // sharding total + + public ShardingVO(int index, int total) { + this.index = index; + this.total = total; + } + + public int getIndex() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + public int getTotal() { + return total; + } + + public void setTotal(int total) { + this.total = total; + } + } + + public static void setShardingVo(ShardingVO shardingVo){ + contextHolder.set(shardingVo); + } + + public static ShardingVO getShardingVo(){ + return contextHolder.get(); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java new file mode 100644 index 0000000..f853c94 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java @@ -0,0 +1,162 @@ +package com.xxl.job.core.util; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.registry.client.util.json.BasicJson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.*; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Map; + +/** + * @author xuxueli 2018-11-25 00:55:31 + */ +public class XxlJobRemotingUtil { + private static Logger logger = LoggerFactory.getLogger(XxlJobRemotingUtil.class); + public static String XXL_RPC_ACCESS_TOKEN = "XXL-RPC-ACCESS-TOKEN"; + + + // trust-https start + private static void trustAllHosts(HttpsURLConnection connection) { + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + SSLSocketFactory newFactory = sc.getSocketFactory(); + + connection.setSSLSocketFactory(newFactory); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + connection.setHostnameVerifier(new HostnameVerifier() { + public boolean verify(String hostname, SSLSession session) { + return true; + } + }); + } + private static final TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return new java.security.cert.X509Certificate[]{}; + } + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } + }}; + // trust-https end + + + /** + * post + * + * @param url + * @param accessToken + * @param requestObj + * @return + */ + public static ReturnT<String> postBody(String url, String accessToken, Object requestObj, int timeout) { + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // trust-https + boolean useHttps = url.startsWith("https"); + if (useHttps) { + HttpsURLConnection https = (HttpsURLConnection) connection; + trustAllHosts(https); + } + + // connection setting + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(timeout * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + if(accessToken!=null && accessToken.trim().length()>0){ + connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken); + } + + // do connection + connection.connect(); + + // write requestBody + String requestBody = BasicJson.toJson(requestObj); + + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(requestBody.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + + /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8"); + connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length)); + OutputStream outwritestream = connection.getOutputStream(); + outwritestream.write(requestBodyBytes); + outwritestream.flush(); + outwritestream.close();*/ + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String resultJson = result.toString(); + + // parse returnT + try { + Map<String, Object> resultMap = BasicJson.parseMap(resultJson); + + ReturnT<String> returnT = new ReturnT<String>(); + if (resultMap==null) { + returnT.setCode(ReturnT.FAIL_CODE); + returnT.setMsg("AdminBizClient Remoting call fail."); + } else { + returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code")))); + returnT.setMsg(String.valueOf(resultMap.get("msg"))); + returnT.setContent(String.valueOf(resultMap.get("content"))); + } + return returnT; + } catch (Exception e) { + logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e); + return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +")."); + } + + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url); + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + logger.error(e2.getMessage(), e2); + } + } + } + +} diff --git a/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java b/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java new file mode 100644 index 0000000..b6f9fc9 --- /dev/null +++ b/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java @@ -0,0 +1,145 @@ +package com.xxl.job.core.biz.impl; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.LogResult; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.glue.GlueTypeEnum; +import com.xxl.rpc.remoting.invoker.call.CallType; +import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; +import com.xxl.rpc.remoting.invoker.route.LoadBalance; +import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; +import com.xxl.rpc.serialize.impl.HessianSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + + +public class ExecutorBizImplTest { + + public XxlJobExecutor xxlJobExecutor = null; + public ExecutorBiz executorBiz = null; + + @Before + public void before() throws Exception { + + // init executor + xxlJobExecutor = new XxlJobExecutor(); + xxlJobExecutor.setAdminAddresses(null); + xxlJobExecutor.setAppName("xxl-job-executor-sample"); + xxlJobExecutor.setIp(null); + xxlJobExecutor.setPort(9999); + xxlJobExecutor.setAccessToken(null); + xxlJobExecutor.setLogPath("/data/applogs/xxl-job/jobhandler"); + xxlJobExecutor.setLogRetentionDays(-1); + + // start executor + xxlJobExecutor.start(); + + TimeUnit.SECONDS.sleep(3); + + // init executor biz proxy + XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); + referenceBean.setClient(NettyHttpClient.class); + referenceBean.setSerializer(HessianSerializer.class); + referenceBean.setCallType(CallType.SYNC); + referenceBean.setLoadBalance(LoadBalance.ROUND); + referenceBean.setIface(ExecutorBiz.class); + referenceBean.setVersion(null); + referenceBean.setTimeout(3000); + referenceBean.setAddress("127.0.0.1:9999"); + referenceBean.setAccessToken(null); + referenceBean.setInvokeCallback(null); + referenceBean.setInvokerFactory(null); + + executorBiz = (ExecutorBiz) referenceBean.getObject(); + } + + @After + public void after(){ + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + @Test + public void beat() { + // Act + final ReturnT<String> retval = executorBiz.beat(); + + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT<String>) retval).getContent()); + Assert.assertEquals(200, retval.getCode()); + Assert.assertNull(retval.getMsg()); + } + + @Test + public void idleBeat(){ + final int jobId = 0; + + // Act + final ReturnT<String> retval = executorBiz.idleBeat(jobId); + + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT<String>) retval).getContent()); + Assert.assertEquals(500, retval.getCode()); + Assert.assertEquals("job thread is running or has trigger queue.", retval.getMsg()); + } + + @Test + public void kill(){ + final int jobId = 0; + + // Act + final ReturnT<String> retval = executorBiz.kill(jobId); + + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT<String>) retval).getContent()); + Assert.assertEquals(200, retval.getCode()); + Assert.assertNull(retval.getMsg()); + } + + @Test + public void log(){ + final long logDateTim = 0L; + final long logId = 0; + final int fromLineNum = 0; + + // Act + final ReturnT<LogResult> retval = executorBiz.log(logDateTim, logId, fromLineNum); + + // Assert result + Assert.assertNotNull(retval); + } + + @Test + public void run(){ + // trigger data + final TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(1); + triggerParam.setExecutorHandler("demoJobHandler"); + triggerParam.setExecutorParams(null); + triggerParam.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.COVER_EARLY.name()); + triggerParam.setGlueType(GlueTypeEnum.BEAN.name()); + triggerParam.setGlueSource(null); + triggerParam.setGlueUpdatetime(System.currentTimeMillis()); + triggerParam.setLogId(1); + triggerParam.setLogDateTime(System.currentTimeMillis()); + + // Act + final ReturnT<String> retval = executorBiz.run(triggerParam); + + // Assert result + Assert.assertNotNull(retval); + } + +} diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/AdminBiz.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/AdminBiz.class Binary files differnew file mode 100644 index 0000000..11f20a0 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/AdminBiz.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/ExecutorBiz.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/ExecutorBiz.class Binary files differnew file mode 100644 index 0000000..e8dddb9 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/ExecutorBiz.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/client/AdminBizClient.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/client/AdminBizClient.class Binary files differnew file mode 100644 index 0000000..c9d77a7 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/client/AdminBizClient.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/impl/ExecutorBizImpl.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/impl/ExecutorBizImpl.class Binary files differnew file mode 100644 index 0000000..2161d93 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/impl/ExecutorBizImpl.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/model/HandleCallbackParam.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/HandleCallbackParam.class Binary files differnew file mode 100644 index 0000000..23d45a8 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/HandleCallbackParam.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/model/LogResult.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/LogResult.class Binary files differnew file mode 100644 index 0000000..f54989f --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/LogResult.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/model/RegistryParam.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/RegistryParam.class Binary files differnew file mode 100644 index 0000000..9d0726d --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/RegistryParam.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/model/ReturnT.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/ReturnT.class Binary files differnew file mode 100644 index 0000000..d96fe33 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/ReturnT.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/biz/model/TriggerParam.class b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/TriggerParam.class Binary files differnew file mode 100644 index 0000000..43531dc --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/biz/model/TriggerParam.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.class b/xxl-job-core/target/classes/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.class Binary files differnew file mode 100644 index 0000000..95b7690 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig$RegistType.class b/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig$RegistType.class Binary files differnew file mode 100644 index 0000000..fb2da1e --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig$RegistType.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig.class b/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig.class Binary files differnew file mode 100644 index 0000000..4317862 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/enums/RegistryConfig.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor$ExecutorServiceRegistry.class b/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor$ExecutorServiceRegistry.class Binary files differnew file mode 100644 index 0000000..f52d3c6 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor$ExecutorServiceRegistry.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor.class b/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor.class Binary files differnew file mode 100644 index 0000000..ae0e6f2 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/executor/XxlJobExecutor.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.class b/xxl-job-core/target/classes/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.class Binary files differnew file mode 100644 index 0000000..3471756 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueFactory.class b/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueFactory.class Binary files differnew file mode 100644 index 0000000..52da126 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueFactory.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueTypeEnum.class b/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueTypeEnum.class Binary files differnew file mode 100644 index 0000000..f49e359 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/glue/GlueTypeEnum.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/glue/impl/SpringGlueFactory.class b/xxl-job-core/target/classes/com/xxl/job/core/glue/impl/SpringGlueFactory.class Binary files differnew file mode 100644 index 0000000..78db533 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/glue/impl/SpringGlueFactory.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/IJobHandler.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/IJobHandler.class Binary files differnew file mode 100644 index 0000000..c38efd7 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/IJobHandler.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/JobHandler.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/JobHandler.class Binary files differnew file mode 100644 index 0000000..c3db3a8 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/JobHandler.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/XxlJob.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/XxlJob.class Binary files differnew file mode 100644 index 0000000..0130413 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/annotation/XxlJob.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/GlueJobHandler.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/GlueJobHandler.class Binary files differnew file mode 100644 index 0000000..d9a1788 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/GlueJobHandler.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/MethodJobHandler.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/MethodJobHandler.class Binary files differnew file mode 100644 index 0000000..9e3a15a --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/MethodJobHandler.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/ScriptJobHandler.class b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/ScriptJobHandler.class Binary files differnew file mode 100644 index 0000000..98bbb29 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/handler/impl/ScriptJobHandler.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobFileAppender.class b/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobFileAppender.class Binary files differnew file mode 100644 index 0000000..be67b16 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobFileAppender.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobLogger.class b/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobLogger.class Binary files differnew file mode 100644 index 0000000..dd101cf --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/log/XxlJobLogger.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread$1.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread$1.class Binary files differnew file mode 100644 index 0000000..37b576d --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread.class Binary files differnew file mode 100644 index 0000000..cad07c0 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/ExecutorRegistryThread.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread$1.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread$1.class Binary files differnew file mode 100644 index 0000000..33afcb8 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread.class Binary files differnew file mode 100644 index 0000000..78a191c --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobLogFileCleanThread.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread$1.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread$1.class Binary files differnew file mode 100644 index 0000000..af04aed --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread.class Binary files differnew file mode 100644 index 0000000..2bc6854 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/JobThread.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$1.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$1.class Binary files differnew file mode 100644 index 0000000..972f190 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$2.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$2.class Binary files differnew file mode 100644 index 0000000..a97dfe5 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread$2.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread.class b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread.class Binary files differnew file mode 100644 index 0000000..0eeabd6 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/thread/TriggerCallbackThread.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/DateUtil.class b/xxl-job-core/target/classes/com/xxl/job/core/util/DateUtil.class Binary files differnew file mode 100644 index 0000000..7c09752 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/DateUtil.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/FileUtil.class b/xxl-job-core/target/classes/com/xxl/job/core/util/FileUtil.class Binary files differnew file mode 100644 index 0000000..02fac97 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/FileUtil.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$1.class b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$1.class Binary files differnew file mode 100644 index 0000000..f984b2b --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$2.class b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$2.class Binary files differnew file mode 100644 index 0000000..12c0ee2 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil$2.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil.class b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil.class Binary files differnew file mode 100644 index 0000000..f05016f --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/ScriptUtil.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil$ShardingVO.class b/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil$ShardingVO.class Binary files differnew file mode 100644 index 0000000..c32bff9 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil$ShardingVO.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil.class b/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil.class Binary files differnew file mode 100644 index 0000000..9592350 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/ShardingUtil.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$1.class b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$1.class Binary files differnew file mode 100644 index 0000000..b278a36 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$1.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$2.class b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$2.class Binary files differnew file mode 100644 index 0000000..744f175 --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil$2.class diff --git a/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil.class b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil.class Binary files differnew file mode 100644 index 0000000..6f19c3e --- /dev/null +++ b/xxl-job-core/target/classes/com/xxl/job/core/util/XxlJobRemotingUtil.class diff --git a/xxl-job-core/target/test-classes/com/xxl/job/core/biz/impl/ExecutorBizImplTest.class b/xxl-job-core/target/test-classes/com/xxl/job/core/biz/impl/ExecutorBizImplTest.class Binary files differnew file mode 100644 index 0000000..d83333b --- /dev/null +++ b/xxl-job-core/target/test-classes/com/xxl/job/core/biz/impl/ExecutorBizImplTest.class diff --git a/xxl-job-core/xxl-job-core.iml b/xxl-job-core/xxl-job-core.iml new file mode 100644 index 0000000..f3a1170 --- /dev/null +++ b/xxl-job-core/xxl-job-core.iml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> + <component name="FacetManager"> + <facet type="Spring" name="Spring"> + <configuration /> + </facet> + </component> + <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_7"> + <output url="file://$MODULE_DIR$/target/classes" /> + <output-test url="file://$MODULE_DIR$/target/test-classes" /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> + </content> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + <orderEntry type="library" name="Maven: com.xuxueli:xxl-rpc-core:1.5.0" level="project" /> + <orderEntry type="library" name="Maven: io.netty:netty-all:4.1.43.Final" level="project" /> + <orderEntry type="library" name="Maven: com.caucho:hessian:4.0.63" level="project" /> + <orderEntry type="library" name="Maven: com.xuxueli:xxl-registry-client:1.1.0" level="project" /> + <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.29" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.groovy:groovy:2.5.8" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.springframework:spring-context:4.3.25.RELEASE" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.springframework:spring-aop:4.3.25.RELEASE" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.springframework:spring-beans:4.3.25.RELEASE" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.springframework:spring-core:4.3.25.RELEASE" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: commons-logging:commons-logging:1.2" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.springframework:spring-expression:4.3.25.RELEASE" level="project" /> + <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" /> + <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> + </component> +</module>
\ No newline at end of file |
