summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhangshuai <[email protected]>2024-11-14 14:34:49 +0800
committerzhangshuai <[email protected]>2024-11-14 14:34:49 +0800
commitd14bc9864acd3908d9fca507280660276d62a280 (patch)
tree1aad385e07abf3c52fa262740a5778782f9d715a
parentcba1251bf34f4899b7c8be6979d1d7e80df8efdc (diff)
fix: ASW-164 修复 job被取消未释放Environment资源
-rw-r--r--src/main/java/net/geedge/asw/module/runner/controller/JobController.java71
-rw-r--r--src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java9
2 files changed, 77 insertions, 3 deletions
diff --git a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
index c244469..3805119 100644
--- a/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
+++ b/src/main/java/net/geedge/asw/module/runner/controller/JobController.java
@@ -1,26 +1,46 @@
package net.geedge.asw.module.runner.controller;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.Method;
+import cn.hutool.json.JSONObject;
+import cn.hutool.log.Log;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import net.geedge.asw.common.util.Constants;
import net.geedge.asw.common.util.R;
import net.geedge.asw.common.util.RCode;
import net.geedge.asw.common.util.T;
+import net.geedge.asw.module.environment.entity.EnvironmentEntity;
+import net.geedge.asw.module.environment.entity.EnvironmentSessionEntity;
+import net.geedge.asw.module.environment.service.IEnvironmentService;
+import net.geedge.asw.module.environment.service.IEnvironmentSessionService;
import net.geedge.asw.module.runner.entity.JobEntity;
import net.geedge.asw.module.runner.service.IJobService;
+import net.geedge.asw.module.runner.util.RunnerConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
@RestController
@RequestMapping("/api/v1/workspace")
public class JobController {
+ private static final Log log = Log.get();
@Autowired
private IJobService jobService;
+ @Autowired
+ private IEnvironmentService environmentService;
+
+ @Autowired
+ private IEnvironmentSessionService sessionService;
+
@GetMapping("/{workspaceId}/job/{id}")
public R detail(@PathVariable("workspaceId") String workspaceId,
@PathVariable("id") String id) {
@@ -65,14 +85,59 @@ public class JobController {
@RequestParam String ids) {
T.VerifyUtil.is(ids).notEmpty();
List<String> idList = Arrays.asList(ids.split(","));
- // TODO 其他处理
+ for (String id : idList) {
+ cancelJob(id);
+ }
+ return R.ok();
+ }
+
+ private void cancelJob(String id) {
+ JobEntity job = jobService.getById(id);
+ EnvironmentEntity environment = environmentService.getById(job.getEnvId());
+ log.info("[cancelJob] [jobId: {}]", id);
+
+ JSONObject paramJSONObject = environment.getParamJSONObject();
+ String url = paramJSONObject.getStr("url");
+ String token = paramJSONObject.getStr("token");
+
+ HttpRequest requestStatus = T.HttpUtil.createGet(String.format("%s/api/v1/env/playbook/%s", url, id));
+ requestStatus.header("Authorization", token);
+
+ if (job.getStatus().contains(RunnerConstant.JobStatus.RUNNING.getValue())){
+ while (true){
+ HttpResponse response = requestStatus.execute();
+ if (response.isOk()){
+ break;
+ }
+ }
+ HttpRequest request = T.HttpUtil.createRequest(Method.DELETE, String.format("%s/api/v1/env/playbook/%s", url, id));
+ request.header("Authorization", token);
+ HttpResponse response = request.execute();
+ log.info("[cancelJob] [request env stop playbook] [status: {}]", response.body());
+ }
+
+ Thread runningThread = Constants.RUNNING_JOB_THREAD.get(id);
+ if (runningThread != null) {
+ runningThread.interrupt();
+ }
+
+ Thread resultThread = Constants.RESULT_JOB_THREAD.get(id);
+ if (resultThread != null) {
+ resultThread.interrupt();
+ }
+ EnvironmentSessionEntity session = sessionService.getOne(new LambdaQueryWrapper<EnvironmentSessionEntity>()
+ .eq(EnvironmentSessionEntity::getJobId, id)
+ .eq(EnvironmentSessionEntity::getStatus, 1));
+
+ if (T.ObjectUtil.isNotEmpty(session)) {
+ environmentService.removeSession(session.getId());
+ }
// update state
jobService.update(new LambdaUpdateWrapper<JobEntity>()
- .in(JobEntity::getId, idList)
+ .eq(JobEntity::getId, id)
.set(JobEntity::getStatus, "cancel")
);
- return R.ok();
}
diff --git a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java
index a1ae15c..b58a638 100644
--- a/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java
+++ b/src/main/java/net/geedge/asw/module/runner/job/JobPlaybookExecResultChecker.java
@@ -132,6 +132,7 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
Thread.sleep(2000); // 每 2 秒执行一次
}
} catch (InterruptedException e) {
+ log.info("[playbookExecResultChecker] [startGetJobLogThread] [stop thread] [job id: {}]", job.getId());
Constants.RUNNING_JOB_THREAD.remove(job.getId());
Thread.currentThread().interrupt(); // 恢复中断状态
}
@@ -212,6 +213,11 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
zipFile = new ZipFile(destination);
List<FileHeader> fileHeaders = zipFile.getFileHeaders();
for (FileHeader fileHeader : fileHeaders) {
+ // 检查中断状态
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("[playbookExecResultChecker] [startGetJobResultThread] [stop thread] [job id: {}]", job.getId());
+ return; // 中断线程,退出循环
+ }
// 处理 pcap 文件
if (fileHeader.getFileName().endsWith("pcap")) {
PackageEntity packageEntity = packageService.getById(job.getPackageId());
@@ -269,6 +275,9 @@ public class JobPlaybookExecResultChecker extends QuartzJobBean {
log.info("[playbookExecResultChecker] [startGetJobResultThread] [finshed] [job id: {}]", job.getId());
}
} catch (Exception e) {
+ if (e.getMessage().contains("Closed by interrupt")) {
+ Thread.currentThread().interrupt(); // 恢复中断状态
+ }
log.error("[playbookExecResultChecker] [startGetJobResultThread] [error]", e);
} finally {
T.IoUtil.close(zipFile);