summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhangshuai <[email protected]>2024-11-20 16:47:19 +0800
committerzhangshuai <[email protected]>2024-11-20 16:47:19 +0800
commit70feac12fcb6d7e6cdb2f78c70aee38ebe26e20a (patch)
treee677a0f0cafa3506840de302a107625e06abb028
parent0133dc72b20743eb799223a09775fed57410191f (diff)
fix: 调整 job 执行流程
-rw-r--r--src/main/java/net/geedge/asw/common/config/SetupRunner.java109
-rw-r--r--src/main/java/net/geedge/asw/module/runner/controller/JobController.java8
-rw-r--r--src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java209
-rw-r--r--src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java61
4 files changed, 299 insertions, 88 deletions
diff --git a/src/main/java/net/geedge/asw/common/config/SetupRunner.java b/src/main/java/net/geedge/asw/common/config/SetupRunner.java
new file mode 100644
index 0000000..116566d
--- /dev/null
+++ b/src/main/java/net/geedge/asw/common/config/SetupRunner.java
@@ -0,0 +1,109 @@
+package net.geedge.asw.common.config;
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.Method;
+import cn.hutool.json.JSONObject;
+import cn.hutool.log.Log;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import net.geedge.asw.common.util.Constants;
+import net.geedge.asw.common.util.T;
+import net.geedge.asw.module.environment.entity.EnvironmentEntity;
+import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
+import net.geedge.asw.module.environment.service.IEnvironmentService;
+import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
+import net.geedge.asw.module.runner.entity.JobEntity;
+import net.geedge.asw.module.runner.util.JobQueueManager;
+import net.geedge.asw.module.runner.service.IJobService;
+import net.geedge.asw.module.runner.util.RunnerConstant;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+
+/**
+ * setup初始化操作
+ */
+@Component
+public class SetupRunner implements CommandLineRunner{
+ private static final Log log = Log.get();
+
+ @Autowired
+ private IJobService jobService;
+
+ @Autowired
+ private JobQueueManager jobQueueManager;
+
+ @Autowired
+ private IEnvironmentService environmentService;
+
+ @Autowired
+ private IEnvironmentSessionService environmentSessionService;
+
+ @Override
+ public void run(String... args) throws Exception {
+ log.info("Setup inited");
+ List<JobEntity> pendingJobs = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.PENDING.getValue()));
+ pendingJobs.forEach(jobQueueManager::addJob);
+ log.info("[SetupRunner] [init pending job to JobQueueManager]");
+
+
+ log.info("[SetupRunner] [begin interrupted running job]");
+ List<JobEntity> runningJobs = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue()));
+ for (JobEntity runningJob : runningJobs) {
+ String id = runningJob.getId();
+ EnvironmentEntity environment = environmentService.getById(runningJob.getEnvId());
+
+ JSONObject paramJSONObject = environment.getParamJSONObject();
+ String url = paramJSONObject.getStr("url");
+ String token = paramJSONObject.getStr("token");
+
+ HttpRequest requestStatus = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId()));
+ requestStatus.header("Authorization", token);
+
+ HttpResponse response = requestStatus.execute();
+ if (response.isOk()){
+ String body = response.body();
+ JSONObject result = T.JSONUtil.toBean(body, JSONObject.class);
+ JSONObject data = result.getJSONObject("data");
+ String status = data.getStr("status");
+ if (RunnerConstant.JobStatus.RUNNING.getValue().equals(status)){
+ HttpRequest request = T.HttpUtil.createRequest(Method.DELETE, String.format("%s/api/v1/env/playbook/%s", url, runningJob.getId()));
+ request.header("Authorization", token);
+ request.execute();
+ }
+ }
+
+ Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
+ if (runningThread != null) {
+ runningThread.interrupt();
+ }
+
+ Thread resultThread = Constants.RESULT_JOB_THREAD.get(id);
+ if (resultThread != null) {
+ resultThread.interrupt();
+ }
+ EnvironmentSessionEntity session = environmentSessionService.getOne(new LambdaQueryWrapper<EnvironmentSessionEntity>()
+ .eq(EnvironmentSessionEntity::getJobId, id)
+ .eq(EnvironmentSessionEntity::getStatus, 1));
+
+ if (T.ObjectUtil.isNotEmpty(session)) {
+ environmentService.removeSession(session.getId());
+ }
+
+ T.FileUtil.appendString("Job execution interrupted.", FileUtil.file(runningJob.getLogPath()), "UTF-8");
+
+ // update state
+ jobService.update(new LambdaUpdateWrapper<JobEntity>()
+ .eq(JobEntity::getId, id)
+ .set(JobEntity::getStatus, "failed")
+ );
+ }
+
+ log.info("[SetupRunner] [interrupted running job end!]");
+ }
+}
diff --git a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
index 461e1b9..2e13f42 100644
--- a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
+++ b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
@@ -17,6 +17,7 @@ import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
import net.geedge.asw.module.environment.service.IEnvironmentService;
import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
import net.geedge.asw.module.runner.entity.JobEntity;
+import net.geedge.asw.module.runner.util.JobQueueManager;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,6 +42,9 @@ public class JobController {
@Autowired
private IEnvironmentSessionService sessionService;
+ @Autowired
+ private JobQueueManager jobQueueManager;
+
@GetMapping("/{workspaceId}/job/{id}")
public R detail(@PathVariable("workspaceId") String workspaceId,
@PathVariable("id") String id) {
@@ -117,6 +121,10 @@ public class JobController {
log.info("[cancelJob] [request env stop playbook] [status: {}]", response.body());
}
+ if (job.getStatus().contains(RunnerConstant.JobStatus.PENDING.getValue())){
+ jobQueueManager.requeueJob(job);
+ }
+
Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
if (runningThread != null) {
runningThread.interrupt();
diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java
index 3e9907c..43843aa 100644
--- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java
+++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecutor.java
@@ -7,7 +7,6 @@ import cn.hutool.log.Log;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import net.geedge.asw.common.util.Constants;
-import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
import net.geedge.asw.module.app.entity.PackageEntity;
import net.geedge.asw.module.app.service.IPackageService;
@@ -19,6 +18,7 @@ import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.entity.PlaybookEntity;
import net.geedge.asw.module.runner.service.IJobService;
import net.geedge.asw.module.runner.service.IPlaybookService;
+import net.geedge.asw.module.runner.util.JobQueueManager;
import net.geedge.asw.module.runner.util.RunnerConstant;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.DisallowConcurrentExecution;
@@ -30,10 +30,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-
@DisallowConcurrentExecution
+@SuppressWarnings("all")
public class JobPlaybookExecutor extends QuartzJobBean {
private static final Log log = Log.get();
@@ -53,6 +52,9 @@ public class JobPlaybookExecutor extends QuartzJobBean {
@Autowired
private IEnvironmentSessionService environmentSessionService;
+ @Autowired
+ private JobQueueManager jobQueueManager;
+
@Override
protected void executeInternal(JobExecutionContext context) {
Thread.currentThread().setName("JobPlaybookExecutor");
@@ -72,121 +74,152 @@ public class JobPlaybookExecutor extends QuartzJobBean {
@Transactional(rollbackFor = Exception.class)
public void playbookExecutor() {
- List<JobEntity> createdList = jobService.list(new LambdaQueryWrapper<JobEntity>().eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue()));
- Map<String, List<JobEntity>> jobByEnvList = createdList.stream().collect(Collectors.groupingBy(JobEntity::getEnvId));
- for (Map.Entry<String, List<JobEntity>> jobByEnv : jobByEnvList.entrySet()) {
- String envId = jobByEnv.getKey();
- List<JobEntity> jobList = jobByEnv.getValue();
- T.ThreadUtil.execAsync(() -> {
- for (JobEntity job : jobList) {
- List<JobEntity> JobRunList = jobService.list(new LambdaQueryWrapper<JobEntity>()
- .eq(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
- .eq(JobEntity::getEnvId, envId));
- if (T.CollUtil.isNotEmpty(JobRunList)) {
- continue;
- }
-
- EnvironmentEntity environment = environmentService.getById(envId);
- if (!environment.getStatus().equals(1)) {
- if (log.isDebugEnabled()) {
- log.debug("[playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", job.getId(), environment.getId());
- }
- continue;
- }
-
- List<EnvironmentSessionEntity> sessionList = environmentSessionService.list(new LambdaQueryWrapper<EnvironmentSessionEntity>()
- .eq(EnvironmentSessionEntity::getStatus, "1")
- .eq(EnvironmentSessionEntity::getEnvId, envId));
- if (T.CollUtil.isNotEmpty(sessionList)) {
- if (log.isDebugEnabled()) {
- log.debug("[playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", job.getId(), environment.getId());
- }
- continue;
- }
-
- // update job status running
- jobService.update(new LambdaUpdateWrapper<JobEntity>()
- .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
- .set(JobEntity::getStartTimestamp, System.currentTimeMillis())
- .eq(JobEntity::getId, job.getId())
- );
-
- // add session
- EnvironmentSessionEntity session = new EnvironmentSessionEntity();
- session.setEnvId(envId);
- session.setJobId(job.getId());
- session.setStatus(1);
- session.setUserId("system");
- session.setWorkspaceId(job.getWorkspaceId());
- session.setStartTimestamp(System.currentTimeMillis());
- environmentSessionService.save(session);
-
- HttpResponse response = requestEnvPlaybook(job, environment);
- log.info("[playbookExecutor] [job id: {}] [env: {}] [status: {}]", job.getId(), environment.getId(), response.getStatus());
- if (!response.isOk()) {
-
- String result = response.body();
- if (log.isDebugEnabled()) {
- log.debug("[playbookExecutor] [env: {}] [result: {}]", environment.getId(), result);
- }
-
- File logFile = T.FileUtil.file(job.getLogPath());
- T.FileUtil.appendString(String.format("ERROR: Request %s environment error \n", environment.getId()), logFile, "UTF-8");
- T.FileUtil.appendString(String.format("Result: %s", result), logFile, "UTF-8");
-
- // update job status, starTime, updateTimestamp
- jobService.update(new LambdaUpdateWrapper<JobEntity>()
- .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
- .set(JobEntity::getEndTimestamp, System.currentTimeMillis())
- .eq(JobEntity::getId, job.getId()));
-
- // remove session
- environmentService.removeSession(session.getId());
- log.info("[playbookExecutor] [request env exec playbook error] [job id: {}]", job.getId());
- }
+ List<JobEntity> createdJobs = jobService.list(
+ new LambdaQueryWrapper<JobEntity>()
+ .eq(JobEntity::getStatus, RunnerConstant.JobStatus.CREATED.getValue())
+ .orderByAsc(JobEntity::getCreateTimestamp)
+ );
+
+ if (T.CollUtil.isNotEmpty(createdJobs)) {
+ log.info("[JobPlaybookExecutor] [playbookExecutor] [fetching created jobs] [size: {}]", createdJobs.size());
+ // 将 CREATED 任务加入队列
+ createdJobs.forEach(jobQueueManager::addJob);
+ // 更新 createdJobs 状态为 pending
+ createdJobs.forEach(x -> x.setStatus(RunnerConstant.JobStatus.PENDING.getValue()));
+ jobService.updateBatchById(createdJobs);
+ }
+
+ // 处理队列中的任务
+ if (!jobQueueManager.isAllQueuesEmpty()) {
+ List<JobEntity> nextJobList = jobQueueManager.fetchNextJob();
+ for (JobEntity nextJob : nextJobList) {
+ String envId = nextJob.getEnvId();
+ log.info("[JobPlaybookExecutor] [playbookExecutor] [Processing jobId: {}] [envId: {}]", nextJob.getId(), envId);
+
+ EnvironmentEntity environment = environmentService.getById(envId);
+ if (!environment.getStatus().equals(1)) {
+ log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is not available] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId());
+ jobQueueManager.requeueJob(nextJob); // 将任务放回队列
+ continue;
+ }
+
+ List<EnvironmentSessionEntity> sessionList = environmentSessionService.list(new LambdaQueryWrapper<EnvironmentSessionEntity>()
+ .eq(EnvironmentSessionEntity::getStatus, "1")
+ .eq(EnvironmentSessionEntity::getEnvId, envId));
+ if (T.CollUtil.isNotEmpty(sessionList)) {
+ log.warn("[JobPlaybookExecutor] [playbookExecutor] [environment is in used] [jobId: {}] [envId: {}]", nextJob.getId(), environment.getId());
+ jobQueueManager.requeueJob(nextJob); // 将任务放回队列
+ continue;
}
- });
+
+ // update job status running
+ jobService.update(new LambdaUpdateWrapper<JobEntity>()
+ .set(JobEntity::getStatus, RunnerConstant.JobStatus.RUNNING.getValue())
+ .set(JobEntity::getStartTimestamp, System.currentTimeMillis())
+ .eq(JobEntity::getId, nextJob.getId())
+ );
+
+ // add session
+ EnvironmentSessionEntity session = new EnvironmentSessionEntity();
+ session.setEnvId(envId);
+ session.setJobId(nextJob.getId());
+ session.setStatus(1);
+ session.setUserId("system");
+ session.setWorkspaceId(nextJob.getWorkspaceId());
+ session.setStartTimestamp(System.currentTimeMillis());
+ environmentSessionService.save(session);
+
+ // 执行任务
+ processJobAsync(nextJob, environment, session);
+ }
}
}
- private HttpResponse requestEnvPlaybook(JobEntity job, EnvironmentEntity environment) {
+ private void processJobAsync(JobEntity job, EnvironmentEntity environment, EnvironmentSessionEntity session) {
+ T.ThreadUtil.execAsync(() -> {
+ log.info("[JobPlaybookExecutor] [processJobAsync] [start jobId: {}]", job.getId());
+ try {
+ // 执行请求
+ HttpResponse response = requestEnvironment(job, environment);
+ if (!response.isOk()) {
+ String result = response.body();
+ log.warn("[JobPlaybookExecutor] [processJobAsync] [envId: {}] [result: {}]", environment.getId(), result);
+
+ File logFile = T.FileUtil.file(job.getLogPath());
+ T.FileUtil.appendString(String.format("ERROR: Request %s environment error! msg: %s.\n", environment.getName(), result), logFile, "UTF-8");
+
+ // update job status, starTime, updateTimestamp
+ jobService.update(new LambdaUpdateWrapper<JobEntity>()
+ .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
+ .set(JobEntity::getEndTimestamp, System.currentTimeMillis())
+ .eq(JobEntity::getId, job.getId()));
+
+ // remove session
+ environmentService.removeSession(session.getId());
+ }
+ log.info("[JobPlaybookExecutor] [processJobAsync] [Finished jobId: {}]", job.getId());
+ } catch (Exception e) {
+
+ // update job status, starTime, updateTimestamp
+ jobService.update(new LambdaUpdateWrapper<JobEntity>()
+ .set(JobEntity::getStatus, RunnerConstant.JobStatus.FAILED.getValue())
+ .set(JobEntity::getEndTimestamp, System.currentTimeMillis())
+ .eq(JobEntity::getId, job.getId()));
+
+ // remove session
+ environmentService.removeSession(session.getId());
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private HttpResponse requestEnvironment(JobEntity job, EnvironmentEntity environment) {
File zipFile = null;
try {
String playbookId = job.getPlaybookId();
String packageId = job.getPackageId();
+ // package playbook file
PackageEntity packageEntity = packageService.getById(packageId);
File packageFile = T.FileUtil.file(packageEntity.getPath());
+ PlaybookEntity playbook = playbookService.getById(playbookId);
+ File playbookFile = T.FileUtil.file(playbook.getPath());
+
+ // zip
+ zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
+ T.ZipUtil.zip(zipFile, true, packageFile, playbookFile);
+
+ JSONObject paramJSONObject = environment.getParamJSONObject();
+ String url = paramJSONObject.getStr("url");
+ String token = paramJSONObject.getStr("token");
+
+ // parameters
String packageName = packageEntity.getIdentifier();
String parameters = job.getParameters();
Map<String, Object> params = T.MapUtil.newHashMap();
if (T.StrUtil.isNotEmpty(parameters)) {
params = T.JSONUtil.toBean(parameters, Map.class);
- }else {
+ } else {
params.put("reInstall", true);
params.put("clearCache", true);
params.put("unInstall", true);
}
- PlaybookEntity playbook = playbookService.getById(playbookId);
- File playbookFile = T.FileUtil.file(playbook.getPath());
-
- log.info("[playbookExecutor] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId);
- JSONObject paramJSONObject = environment.getParamJSONObject();
- String url = paramJSONObject.getStr("url");
- String token = paramJSONObject.getStr("token");
- zipFile = T.FileUtil.file(Constants.TEMP_PATH, T.StrUtil.concat(true, job.getId(), ".zip"));
+ // build request
+ log.info("[JobPlaybookExecutor] [requestEnvironment] [jobId: {}] [envId: {}] [playbookId: {}] [packageId: {}]", job.getId(), environment.getId(), playbookId, packageId);
HttpRequest request = T.HttpUtil.createPost(String.format("%s/api/v1/env/playbook", url));
- T.ZipUtil.zip(zipFile, true, packageFile, playbookFile);
+ request.header("Authorization", token);
request.form("file", zipFile);
request.form("id", job.getId());
request.form("packageName", packageName);
- request.header("Authorization", token);
for (Map.Entry<String, Object> param : params.entrySet()) {
request.form(param.getKey(), param.getValue());
}
HttpResponse response = request.execute();
return response;
+ } catch (Exception e) {
+ log.error("[JobPlaybookExecutor] [requestEnvironment] [error] [jobId: {}]", job.getId(), e);
+ throw new RuntimeException(e);
} finally {
T.FileUtil.del(zipFile);
}
diff --git a/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java b/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java
new file mode 100644
index 0000000..a0dd96c
--- /dev/null
+++ b/src/main/java/net/geedge/asw/module/runner/util/JobQueueManager.java
@@ -0,0 +1,61 @@
+package net.geedge.asw.module.runner.util;
+
+import net.geedge.asw.common.util.T;
+import net.geedge.asw.module.runner.entity.JobEntity;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * job 任务队列管理
+ */
+
+@Component
+public class JobQueueManager {
+
+ // 每个环境对应一个任务队列,以创建时间排序
+ private final Map<String, PriorityQueue<JobEntity>> groupedQueues = new ConcurrentHashMap<>();
+
+ // 添加任务到队列
+ public synchronized void addJob(JobEntity job) {
+ boolean contains = groupedQueues.containsKey(job.getEnvId());
+ if (!contains) {
+ PriorityQueue<JobEntity> queue = new PriorityQueue<>(Comparator.comparing(JobEntity::getCreateTimestamp));
+ queue.offer(job);
+ groupedQueues.put(job.getEnvId(), queue);
+ } else {
+ groupedQueues.get(job.getEnvId()).offer(job);
+ }
+ }
+
+ // 获取每个环境下的任务
+ public synchronized List<JobEntity> fetchNextJob() {
+ List<JobEntity> list = T.ListUtil.list(false);
+ for (Map.Entry<String, PriorityQueue<JobEntity>> queueEntry : groupedQueues.entrySet()) {
+ PriorityQueue<JobEntity> queue = queueEntry.getValue();
+ if (queue == null || queue.isEmpty()) {
+ continue; // 如果该环境下没有任务则跳过
+ }
+ list.add(queue.poll());
+ }
+ return list;
+ }
+
+ // 检查所有队列是否为空
+ public synchronized boolean isAllQueuesEmpty() {
+ return groupedQueues.values().stream().allMatch(PriorityQueue::isEmpty);
+ }
+
+ // 重新添加队列
+ public synchronized void requeueJob(JobEntity job) {
+ PriorityQueue<JobEntity> queue = groupedQueues.get(job.getEnvId());
+ queue.offer(job);
+ }
+
+ // 删除
+ public synchronized void removeJob(JobEntity job) {
+ PriorityQueue<JobEntity> queue = groupedQueues.get(job.getEnvId());
+ queue.remove(job);
+ }
+}