diff options
| author | shizhendong <[email protected]> | 2024-07-05 16:38:12 +0800 |
|---|---|---|
| committer | shizhendong <[email protected]> | 2024-07-05 16:38:12 +0800 |
| commit | a8a0fa4a08baf242fa531dae24f9854754d0804b (patch) | |
| tree | 495dc40ad0abc5b2c11b0c323013c539357a47ca | |
| parent | 6698150959ccc888631e3161fb998ce2ebb0f115 (diff) | |
feat: ASW-4 新增 runner exexutor 线程
| -rw-r--r-- | src/main/java/net/geedge/ASWRunner.java | 17 | ||||
| -rw-r--r-- | src/main/java/net/geedge/executor/Executor.java | 493 | ||||
| -rw-r--r-- | src/main/java/net/geedge/executor/JobInfo.java | 18 | ||||
| -rw-r--r-- | src/main/java/net/geedge/executor/LogUploader.java | 82 | ||||
| -rw-r--r-- | src/main/java/net/geedge/util/PlaybookYml.java | 16 | ||||
| -rw-r--r-- | src/main/java/net/geedge/util/RunnerContext.java | 92 | ||||
| -rw-r--r-- | src/main/java/net/geedge/util/RunnerYml.java | 5 | ||||
| -rw-r--r-- | src/main/java/net/geedge/util/T.java | 107 |
8 files changed, 734 insertions, 96 deletions
diff --git a/src/main/java/net/geedge/ASWRunner.java b/src/main/java/net/geedge/ASWRunner.java index 9f338e0..f592319 100644 --- a/src/main/java/net/geedge/ASWRunner.java +++ b/src/main/java/net/geedge/ASWRunner.java @@ -1,15 +1,5 @@ package net.geedge; -import java.io.File; -import java.util.Map; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - import cn.hutool.core.lang.Dict; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; @@ -22,6 +12,9 @@ import net.geedge.util.RunnerContext.Parameters; import net.geedge.util.RunnerYml; import net.geedge.util.RunnerYml.ExecutorConfig; import net.geedge.util.T; +import org.apache.commons.cli.*; + +import java.util.Map; /** * 程序主要包括两个线程 </br> @@ -111,10 +104,6 @@ public class ASWRunner { /** * 注册登录 - * - * @param url - * @param token - * @return */ private static void register() { while (RunnerContext.getRunFlag()) { diff --git a/src/main/java/net/geedge/executor/Executor.java b/src/main/java/net/geedge/executor/Executor.java index 494ef2f..85cee12 100644 --- a/src/main/java/net/geedge/executor/Executor.java +++ b/src/main/java/net/geedge/executor/Executor.java @@ -1,39 +1,470 @@ package net.geedge.executor; +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.lang.Dict; +import cn.hutool.http.HttpException; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.Method; +import cn.hutool.json.JSONObject; import cn.hutool.log.Log; +import cn.hutool.setting.yaml.YamlUtil; import lombok.Data; +import net.geedge.util.PlaybookYml; import net.geedge.util.RunnerContext; import net.geedge.util.RunnerYml.ExecutorConfig; +import net.geedge.util.T; + +import java.io.File; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Data -public class Executor implements Runnable{ - private ExecutorConfig executorConfig; - private JobInfo jobInfo; - Log log = Log.get(); - - @Override - public void run() { - Thread.currentThread().setName("job-excutor-"+jobInfo.getId()); - log.info("job executor start"); - RunnerContext.addActiveExecutors(this); - try { - log.info("job executor run job start"); - this.runJob(); - log.info("job executor run job end"); - } finally { - RunnerContext.removeActiveExecutors(this); - log.info("job executor run jobend"); - } - } - - /** - * 运行playbook - * @param path playbook文件路径 - * @param config executor配置 - */ - void runJob() { - - }; - - -} +public class Executor implements Runnable { + private Log log = Log.get(); + + private static final String WORK_DIR_PREFIX = "/builds"; + private static final String TMP_DIR_PREFIX = "/tmp"; + + private JobInfo jobInfo; + private ExecutorConfig executorConfig; + + private LogUploader logUploader; + private final Queue<String> logQueue = new ConcurrentLinkedQueue<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + private Path getJobWorkPath() { + return Path.of(WORK_DIR_PREFIX, jobInfo.getId()); + } + + private Path getTmpDirPath() { + return Path.of(TMP_DIR_PREFIX, jobInfo.getId()); + } + + private void init() { + // 清空工作目录 + T.FileUtil.del(this.getJobWorkPath()); + T.FileUtil.del(this.getTmpDirPath()); + + // runnerContext + RunnerContext.addActiveExecutors(this); + + // logUploader + logUploader = new LogUploader(jobInfo.getId(), logQueue); + scheduler.scheduleAtFixedRate(logUploader, 0, 5, TimeUnit.SECONDS); + } + + @Override + public void run() { + Thread.currentThread().setName("job-excutor-" + jobInfo.getId()); + log.info("job executor start"); + if (log.isDebugEnabled()) { + log.debug("jobInfo: {}", T.JSONUtil.toJsonStr(jobInfo)); + log.debug("executorConfig: {}", T.JSONUtil.toJsonStr(executorConfig)); + } + try { + log.info("job executor run job start"); + // init + this.init(); + // run + this.runJob(); + log.info("job executor run job end"); + } catch (Exception e) { + log.error(e, "job executor run error"); + } finally { + // destroy + this.destroy(); + log.info("job executor end"); + } + } + + void runJob() { + // 下载 package playbook + this.downloadPkgAndPb(); + + String platform = executorConfig.getPlatform(); + switch (platform) { + case "android" -> { + new AndroidJob().run(); + } + case "ios" -> { + // TODO + } + case "windows" -> { + // TODO + } + } + } + + /** + * 下载 package 和 playbook 文件 + */ + private void downloadPkgAndPb() { + // package + JobInfo.Package pkg = jobInfo.getPkg(); + // http://{host}/api/v1/package/download/{id} + String pkgUrl = String.format("%s/%s", RunnerContext.getDownloadPkgPath(), pkg.getId()); + this.downloadFileFromUrl(pkgUrl, pkg.getFileName()); + + // playbook + JobInfo.Playbook pb = jobInfo.getPlaybook(); + // http://{host}/api/v1/playbook/download/{id} + String pbUrl = String.format("%s/%s", RunnerContext.getDownloadPbPath(), pb.getId()); + String pbFilename = pb.getName(); + this.downloadFileFromUrl(pbUrl, pbFilename); + + // docker image + if (T.StrUtil.equals("docker", executorConfig.getType()) && T.StrUtil.isNotEmpty(executorConfig.getImage())) { + String result = this.execCmdForStr(String.format("docker image inspect %s", executorConfig.getImage())); + boolean exist = false; + if (T.JSONUtil.isTypeJSON(result)) { + List<JSONObject> list = T.JSONUtil.toList(result, JSONObject.class); + if (T.ObjectUtil.isNotEmpty(T.CollUtil.getFirst(list))) { + exist = true; + log.info("Using docker image {} fot {}", T.CollUtil.getFirst(list).getStr("Id"), executorConfig.getImage()); + } + } + if (!exist) { + log.warn("docker image not exist, download file and load image"); + // http://{host}/api/v1/docker/image/download/{name} + String imgUrl = String.format("%s/%s", RunnerContext.getDownloadDockerImgPath(), executorConfig.getImage()); + String imgFilename = T.FileNameUtil.cleanInvalid(executorConfig.getImage()); + this.downloadFileFromUrl(imgUrl, imgFilename); + // load image + String imagePath = T.FileUtil.file(this.getJobWorkPath().toFile(), imgFilename).getPath(); + String loadResult = this.execCmdForStr(String.format("docker load -i %s", imagePath)); + log.info("docker load image finshed. result: {}", loadResult); + if (!T.StrUtil.contains(loadResult, executorConfig.getImage())) { + throw new RuntimeException(T.StrUtil.format("docker load image error, image: {} ", executorConfig.getImage())); + } + } + } + } + + private void downloadFileFromUrl(String url, String filename) { + log.info("download file start, url:{}", url); + HttpRequest request = T.HttpUtil.createGet(url); + request.addHeaders(T.MapUtil.of("Authorization", RunnerContext.getRegisterToken())); + HttpResponse response = request.timeout(-1).executeAsync(); + if (response.isOk()) { + File file = response.writeBodyForFile(T.FileUtil.file(this.getJobWorkPath().toFile(), filename), null); + log.info("download file finshed, filename: {}, file md5: {}", filename, T.DigestUtil.md5Hex(file)); + } else { + log.error("download file error, status code: {}", response.getStatus()); + throw new HttpException("download file error, status code: {}", response.getStatus()); + } + } + + private String execCmdForStr(String cmd) { + return this.execCmdForStr(cmd, null, null); + } + private String execCmdForStr(String cmd, Map<String, String> envp) { + return this.execCmdForStr(cmd, envp, null); + } + private String execCmdForStr(String cmd, File directory) { + return this.execCmdForStr(cmd, null, directory); + } + + private String execCmdForStr(String cmd, Map<String, String> envp, File directory) { + String result = ""; + InputStream in = null; + try { + ProcessBuilder processBuilder = new ProcessBuilder("/bin/sh", "-c", cmd); + // docker cmd + if (T.StrUtil.startWith(cmd, "docker ")) { + processBuilder = new ProcessBuilder(T.CommandLineUtil.translateCommandline(cmd)); + } + if (T.MapUtil.isNotEmpty(envp)) { + processBuilder.environment().putAll(envp); + } + if (T.ObjectUtil.isNotEmpty(directory)) { + processBuilder.directory(directory); + } + // 将标准错误流重定向到标准输出流 + processBuilder.redirectErrorStream(true); + Process process = processBuilder.start(); + in = process.getInputStream(); + result = IoUtil.read(in, T.CharsetUtil.CHARSET_UTF_8); + + // 命令&结果 + logQueue.add(T.StrUtil.concat(true, "$ ", cmd, "\n")); + logQueue.add(result); + int exitCode = process.waitFor(); + if (exitCode != 0) { + if (log.isDebugEnabled()) { + log.debug("cmd exited with non-zero.job id: {}, cmd: {}, status: {}", jobInfo.getId(), cmd, exitCode); + } + } + } catch (Exception e) { + log.error(e, "exec cmd error. job id: {}, cmd: {}", jobInfo.getId(), cmd); + } finally { + IoUtil.close(in); + } + return result; + } + + private void destroy() { + try { + // stop logUploader + logUploader.stop(); + // 等待 5 秒,确保最后的日志上传 + T.ThreadUtil.sleep(5, TimeUnit.SECONDS); + try { + scheduler.shutdown(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (Exception e) { + scheduler.shutdownNow(); + } + + // removeActiveExecutors + RunnerContext.removeActiveExecutors(this); + + // destroy work dir + T.FileUtil.del(this.getJobWorkPath()); + T.FileUtil.del(this.getTmpDirPath()); + } catch (Exception e) { + log.error(e, "destroy error. job id: {}", jobInfo.getId()); + } + } + + abstract class Job { + abstract void run(); + } + + class AndroidJob extends Job { + + @Override + void run() { + String type = executorConfig.getType(); + switch (type) { + case "local" -> { + this.runLocalJob(); + } + case "docker" -> { + try { + this.runDockerJob(); + } finally { + String containerName = jobInfo.getId(); + try { + // destroy container + execCmdForStr(String.format("docker stop %s", containerName)); + execCmdForStr(String.format("docker rm -f %s", containerName)); + } catch (Exception e) { + // do nothing + log.error("destroy container error. name: {}", containerName); + } + } + } + } + } + + private void runLocalJob() { + // default device + String device = "127.0.0.1:5555"; + // adb + List<String> cmdList = T.ListUtil.list(true); + cmdList.add("adb version"); + cmdList.add("adb disconnect"); + cmdList.add("adb connect " + device); + cmdList.add("adb root"); + cmdList.add("adb devices -l"); + + // install pkg + JobInfo.Package pkg = jobInfo.getPkg(); + String pkgFilePath = FileUtil.file(getJobWorkPath().toString(), pkg.getFileName()).getAbsolutePath(); + cmdList.add(String.format("adb -s %s install %s", device, pkgFilePath)); + cmdList.add(String.format("adb -s %s shell pm list packages -3", device)); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd)); + + // userId + String userId = T.StrUtil.trim(execCmdForStr(String.format("adb -s %s shell dumpsys package %s | grep userId= | awk -F 'userId=' '{print $2}'| head -n 1", device, pkg.getIdentifier()))); + + // iptables + cmdList.clear(); + cmdList.add(String.format("adb -s %s shell iptables -F", device)); + cmdList.add(String.format("adb -s %s shell iptables -X", device)); + cmdList.add(String.format("adb -s %s shell iptables -w 10 -A OUTPUT -m owner --uid-owner %s -j CONNMARK --set-mark %s", device, userId, userId)); + cmdList.add(String.format("adb -s %s shell iptables -w 10 -A INPUT -m connmark --mark %s -j NFLOG --nflog-group %s", device, userId, userId)); + cmdList.add(String.format("adb -s %s shell iptables -w 10 -A OUTPUT -m connmark --mark %s -j NFLOG --nflog-group %s", device, userId, userId)); + cmdList.add(String.format("adb -s %s shell iptables -L", device)); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd)); + + // tcpdump + String pcapFilePath = String.format("/data/data/%s/cache/%s.pcap", pkg.getIdentifier(), pkg.getId()); + String tcpdump = String.format("adb -s %s shell tcpdump -i nflog:%s -w %s &", device, userId, pcapFilePath); + logQueue.add(T.StrUtil.concat(true, "$ ", tcpdump, "\n")); + T.RuntimeUtil.exec(tcpdump); + + // playbook + JobInfo.Playbook pb = jobInfo.getPlaybook(); + String pbWorkDir = String.format("%s/playbook", getJobWorkPath()); + + String pbFilePath = FileUtil.file(getJobWorkPath().toString(), pb.getName()).getAbsolutePath(); + execCmdForStr(String.format("mkdir -p %s/playbook", getTmpDirPath())); + execCmdForStr(String.format("unzip -o %s -d %s/playbook", pbFilePath, getTmpDirPath())); + execCmdForStr(String.format("mv %s/playbook/* %s", getTmpDirPath(), pbWorkDir)); + execCmdForStr(String.format("ls -hl %s", pbWorkDir)); + + // read main.yml + Dict dict = YamlUtil.loadByPath(Path.of(pbWorkDir, "main.yml").toString()); + PlaybookYml playbookYml = T.BeanUtil.copyProperties(dict, PlaybookYml.class); + + // script + cmdList.clear(); + cmdList.addAll(playbookYml.getBeforeScript()); + cmdList.addAll(playbookYml.getScript()); + cmdList.addAll(playbookYml.getAfterScript()); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd, playbookYml.getVariables(), T.FileUtil.file(pbWorkDir))); + + // kill tcpdump + execCmdForStr(String.format("adb -s %s shell \"ps -ef | grep tcpdump | grep -v grep | awk '{print \\$2}' | xargs kill -INT \"", device)); + + // pull pcap file + String localPcapFilePath = FileUtil.file(getJobWorkPath().toFile(), pkg.getId() + ".pcap").toString(); + execCmdForStr(String.format("adb -s %s pull %s %s", device, pcapFilePath, localPcapFilePath)); + + // upload pcap file + this.uploadPcapFile(localPcapFilePath); + } + + private void runDockerJob() { + // docker container + String containerName = jobInfo.getId(); + String dockerRun = String.format("docker run -d -e EMULATOR_DEVICE=\"Samsung Galaxy S10\" -e WEB_VNC=true --device /dev/kvm -v %s:%s --name %s %s", getJobWorkPath(), getJobWorkPath(), containerName, executorConfig.getImage()); + execCmdForStr(dockerRun); + + String prefix = String.format("docker exec -u0 %s ", containerName); + // device_status + for (int i = 1; i <= 10; i++) { + T.ThreadUtil.sleep(30, TimeUnit.SECONDS); + String deviceStatus = execCmdForStr(prefix + "cat device_status"); + log.info("docker emulator device: {}", deviceStatus); + if (T.StrUtil.equalsIgnoreCase("READY", deviceStatus)) { + break; + } + } + + // adb + List<String> cmdList = T.ListUtil.list(true); + cmdList.add(prefix + "adb version"); + cmdList.add(prefix + "adb root"); + cmdList.add(prefix + "adb devices -l"); + + // install pkg + JobInfo.Package pkg = jobInfo.getPkg(); + String pkgFilePath = FileUtil.file(getJobWorkPath().toString(), pkg.getFileName()).getAbsolutePath(); + cmdList.add(prefix + String.format("adb install %s", pkgFilePath)); + cmdList.add(prefix + String.format("adb shell pm list packages -3")); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd)); + + // userId + String getUserIdCmd = String.format("adb shell dumpsys package %s | grep userId= | awk -F \"'userId='\" \"'{print $2}'\" | head -n 1", pkg.getIdentifier()); + String userId = T.StrUtil.trim(execCmdForStr(prefix + getUserIdCmd)); + + // iptables + cmdList.clear(); + cmdList.add(prefix + String.format("adb shell iptables -F")); + cmdList.add(prefix + String.format("adb shell iptables -X")); + cmdList.add(prefix + String.format("adb shell iptables -w 10 -A OUTPUT -m owner --uid-owner %s -j CONNMARK --set-mark %s", userId, userId)); + cmdList.add(prefix + String.format("adb shell iptables -w 10 -A INPUT -m connmark --mark %s -j NFLOG --nflog-group %s", userId, userId)); + cmdList.add(prefix + String.format("adb shell iptables -w 10 -A OUTPUT -m connmark --mark %s -j NFLOG --nflog-group %s", userId, userId)); + cmdList.add(prefix + String.format("adb shell iptables -L")); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd)); + + // tcpdump + String pcapFilePath = String.format("/data/data/%s/cache/%s.pcap", pkg.getIdentifier(), pkg.getId()); + String tcpdump = prefix + String.format("adb shell tcpdump -i nflog:%s -w %s &", userId, pcapFilePath); + logQueue.add(T.StrUtil.concat(true, "$ ", tcpdump, "\n")); + T.RuntimeUtil.exec(tcpdump); + + // playbook + JobInfo.Playbook pb = jobInfo.getPlaybook(); + String pbWorkDir = String.format("%s/playbook", getJobWorkPath()); + + String pbFilePath = FileUtil.file(getJobWorkPath().toString(), pb.getName()).getAbsolutePath(); + execCmdForStr(String.format("mkdir -p %s/playbook", getTmpDirPath())); + execCmdForStr(String.format("unzip -o %s -d %s/playbook", pbFilePath, getTmpDirPath())); + execCmdForStr(String.format("mv %s/playbook/* %s", getTmpDirPath(), pbWorkDir)); + execCmdForStr(String.format("ls -hl %s", pbWorkDir)); + + // read main.yml + Dict dict = YamlUtil.loadByPath(Path.of(pbWorkDir, "main.yml").toString()); + PlaybookYml pbYml = T.BeanUtil.copyProperties(dict, PlaybookYml.class); + + Map<String, String> variables = T.MapUtil.defaultIfEmpty(pbYml.getVariables(), new HashMap<>()); + String dockerEnvArgs = variables.entrySet().stream().map(entry -> "-e " + entry.getKey().trim() + "=" + entry.getValue().trim()).collect(Collectors.joining(" ")); + + // playbook cmd prefix + String pbPrefix = String.format("docker exec -u0 -w %s %s %s ", pbWorkDir, dockerEnvArgs, containerName); + + // script + List<String> beforeScript = pbYml.getBeforeScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList()); + List<String> script = pbYml.getScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList()); + List<String> afterScript = pbYml.getAfterScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList()); + + cmdList.clear(); + cmdList.addAll(beforeScript); + cmdList.addAll(script); + cmdList.addAll(afterScript); + + // exec + cmdList.stream().forEach(cmd -> execCmdForStr(cmd)); + + // kill tcpdump + execCmdForStr(prefix + "adb shell \"ps -ef | grep tcpdump | grep -v grep | awk '{print $2}' | xargs kill -INT \""); + + // pull pcap file + String localPcapFilePath = FileUtil.file(getJobWorkPath().toFile(), pkg.getId() + ".pcap").toString(); + execCmdForStr(prefix + String.format("adb pull %s %s", pcapFilePath, localPcapFilePath)); + + // upload pcap file + this.uploadPcapFile(localPcapFilePath); + } + + private void uploadPcapFile(String pcapFilePath) { + String jobId = jobInfo.getId(); + try { + String url = String.format("%s/%s", RunnerContext.getUploadResultPath(), jobId); + String token = RunnerContext.getRegisterToken(); + + HttpRequest request = T.HttpUtil.createRequest(Method.PUT, url); + request.addHeaders(T.MapUtil.of("Authorization", token)); + + // file data + request.form("file", FileUtil.file(pcapFilePath)); + request.contentType("multipart/form-data"); + + HttpResponse response = request.execute(); + if (response.isOk()) { + log.info("upload pcap success. job id: {}", jobId); + } else { + log.warn("upload pcap error. job id: {} status: {}", jobId, response.getStatus()); + } + } catch (Exception e) { + log.error(e, "upload pcap error. job id: {}", jobId); + } + } + } +}
\ No newline at end of file diff --git a/src/main/java/net/geedge/executor/JobInfo.java b/src/main/java/net/geedge/executor/JobInfo.java index 7fcedaa..becf53c 100644 --- a/src/main/java/net/geedge/executor/JobInfo.java +++ b/src/main/java/net/geedge/executor/JobInfo.java @@ -1,8 +1,9 @@ package net.geedge.executor; -import java.util.List; - import lombok.Data; +import net.geedge.util.T; + +import java.util.List; @Data public class JobInfo { @@ -17,6 +18,10 @@ public class JobInfo { String platform; String version; String identifier; + + public String getFileName() { + return T.StrUtil.concat(true, this.identifier, "_", this.platform, "_", this.version, ".apk"); + } } @Data @@ -32,4 +37,13 @@ public class JobInfo { String name; String conditions; } + + public Package newPkgInstance() { + return new Package(); + } + + + public Playbook newPbInstance() { + return new Playbook(); + } } diff --git a/src/main/java/net/geedge/executor/LogUploader.java b/src/main/java/net/geedge/executor/LogUploader.java new file mode 100644 index 0000000..4e71feb --- /dev/null +++ b/src/main/java/net/geedge/executor/LogUploader.java @@ -0,0 +1,82 @@ +package net.geedge.executor; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.Method; +import cn.hutool.log.Log; +import net.geedge.util.RunnerContext; +import net.geedge.util.T; + +import java.io.File; +import java.nio.file.Path; +import java.util.Queue; + +public class LogUploader implements Runnable { + private final Log log = Log.get(); + private final String jobId; + private final Queue<String> logQueue; + private final File failedLogFile; + private volatile boolean running = true; + + private Path getJobWorkPath() { + return Path.of("/builds", jobId); + } + + public LogUploader(String jobId, Queue<String> logQueue) { + this.jobId = jobId; + this.logQueue = logQueue; + failedLogFile = FileUtil.file(this.getJobWorkPath().toFile(), "failed_logs.txt"); + } + + @Override + public void run() { + Thread.currentThread().setName("log-uploader-" + jobId); + if (this.running) { + this.uploadLogs(); + } + } + + public void stop() { + log.info("log uploader thread prepare shundown, last upload"); + running = false; + // 最后上传一次 + this.uploadLogs(); + } + + private void uploadLogs() { + StringBuilder content = new StringBuilder(); + // 检查是否有未上传的日志文件 + if (FileUtil.exist(failedLogFile)) { + String str = FileUtil.readString(failedLogFile, T.CharsetUtil.CHARSET_UTF_8); + content.append(str); + // 删除临时文件 + FileUtil.del(failedLogFile); + } + + String line; + while ((line = logQueue.poll()) != null) { + content.append(line); + } + + if (content.length() > 0) { + try { + String url = String.format("%s/%s", RunnerContext.getTraceLogPath(), jobId); + String token = RunnerContext.getRegisterToken(); + + HttpRequest request = T.HttpUtil.createRequest(Method.PUT, url); + request.addHeaders(T.MapUtil.of("Authorization", token)); + request.body(T.StrUtil.str(content), "application/octet-stream"); + + HttpResponse response = request.execute(); + if (!response.isOk()) { + log.warn("upload logs error. job id: {} status: {}", jobId, response.getStatus()); + FileUtil.appendUtf8String(T.StrUtil.str(content), failedLogFile); + } + } catch (Exception e) { + log.error("upload logs error. job id: {} msg: {}", jobId, e.getMessage()); + FileUtil.appendUtf8String(T.StrUtil.str(content), failedLogFile); + } + } + } +}
\ No newline at end of file diff --git a/src/main/java/net/geedge/util/PlaybookYml.java b/src/main/java/net/geedge/util/PlaybookYml.java new file mode 100644 index 0000000..0aeeb30 --- /dev/null +++ b/src/main/java/net/geedge/util/PlaybookYml.java @@ -0,0 +1,16 @@ +package net.geedge.util; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class PlaybookYml { + + private Map<String, String> variables; + private List<String> beforeScript; + private List<String> script; + private List<String> afterScript; + +} diff --git a/src/main/java/net/geedge/util/RunnerContext.java b/src/main/java/net/geedge/util/RunnerContext.java index f3c093f..9207508 100644 --- a/src/main/java/net/geedge/util/RunnerContext.java +++ b/src/main/java/net/geedge/util/RunnerContext.java @@ -1,13 +1,11 @@ package net.geedge.util; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; - import net.geedge.executor.Executor; import net.geedge.util.RunnerYml.ExecutorConfig; +import java.util.List; +import java.util.Map; + public class RunnerContext { /** * 默认配置文件路径 @@ -17,11 +15,37 @@ public class RunnerContext { * 默认心跳间隔 5s */ public static final Integer DEFAULT_HEARTBEAT_INTERVAL = 5000; - public static final String REGISTER_PATH = "/register"; + public static final String REGISTER_PATH = "/api/v1/runner/register"; /** * 心跳接口路径 */ - private static final String HEARTBEAT_PATH = "/heartbeat"; + private static final String HEARTBEAT_PATH = "/api/v1/runner/heartbeat"; + /** + * 下载 package 接口路径 + * http://{host}/api/v1/package/download/{id} + */ + private static final String DOWNLOAD_PKG_PATH = "/api/v1/package/download"; + /** + * 下载 playbook 接口路径 + * http://{host}/api/v1/playbook/download/{id} + */ + private static final String DOWNLOAD_PB_PATH = "/api/v1/playbook/download"; + /** + * 下载 docker image 接口路径 + * http://{host}/api/v1/docker/image/download/{name} + */ + private static final String DOWNLOAD_DOCKER_IMG_PATH = "/api/v1/docker/image/download"; + + /** + * 上传任务执行日志 接口路径 + * http://{host}/api/v1/runner/trace/{jobId} + */ + private static final String TRACE_LOG_PATH = "/api/v1/runner/trace"; + /** + * 上传任务执行结果 接口路径 + * http://{host}/api/v1/runner/uploadResult/{jobId} + */ + private static final String UPLOAD_RESULT_PATH = "/api/v1/runner/uploadResult"; /** * 程序运行标识 */ @@ -52,15 +76,35 @@ public class RunnerContext { } public static String getRegisterPath() { - return REGISTER_PATH; + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), REGISTER_PATH); } public static String getheartbeatPath() { - return HEARTBEAT_PATH; + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), HEARTBEAT_PATH); + } + + public static String getDownloadPkgPath() { + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_PKG_PATH); + } + + public static String getDownloadPbPath() { + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_PB_PATH); + } + + public static String getDownloadDockerImgPath() { + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_DOCKER_IMG_PATH); + } + + public static String getTraceLogPath() { + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), TRACE_LOG_PATH); + } + + public static String getUploadResultPath() { + return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), UPLOAD_RESULT_PATH); } public static String getRegisterToken() { - return HEARTBEAT_PATH; + return runnerYml.getRegister().getToken(); } public static void setParameters(Parameters param) { @@ -95,17 +139,17 @@ public class RunnerContext { * @return */ public static List<ExecutorConfig> getFreeExecutor() { - List<ExecutorConfig> result = T.ListUtil.list(false); // 记录每个平台可以提供几个运行实例 - runnerYml.getExecutors().forEach(e -> { - result.add(e); - }); + List<ExecutorConfig> result = T.BeanUtil.copyToList(runnerYml.getExecutors(), ExecutorConfig.class); // 减去现有的运行实例 activeExecutors.forEach(ae -> { ExecutorConfig config = ae.getExecutorConfig(); result.forEach(e -> { - if (e.getType().equals(config.getType()) && e.getPlatform().equals(config.getPlatform())) { - e.setConcurrent(e.getConcurrent() - 1); + if (T.ObjectUtil.equals(e.getType(), config.getType()) + && T.ObjectUtil.equals(e.getPlatform(), config.getPlatform()) + && T.ObjectUtil.equals(e.getImage(), config.getImage())) { + int num = e.getConcurrent() == 0 ? 0 : e.getConcurrent() - 1; + e.setConcurrent(num); } }); }); @@ -115,7 +159,7 @@ public class RunnerContext { public static ExecutorConfig getFreeExecutorByPlatform(String platform) { List<ExecutorConfig> freeExecutor = getFreeExecutor(); for (ExecutorConfig e : freeExecutor) { - if(e.getPlatform().equals(platform)) { + if (e.getPlatform().equals(platform) && e.getConcurrent() > 0) { return e; } } @@ -131,18 +175,14 @@ public class RunnerContext { }); return result; } - + public static void addActiveExecutors(Executor e) { activeExecutors.add(e); } - + public static void removeActiveExecutors(Executor e) { - Iterator<Executor> ite = activeExecutors.iterator(); - while(ite.hasNext()) { - Executor next = ite.next(); - if(next.getJobInfo().getId().equals(e.getJobInfo().getId())) { - ite.remove(); - } - } + activeExecutors.removeIf(next -> + T.ObjectUtil.equals(next.getJobInfo().getId(), e.getJobInfo().getId()) + ); } } diff --git a/src/main/java/net/geedge/util/RunnerYml.java b/src/main/java/net/geedge/util/RunnerYml.java index 1ed0cfe..8ebec63 100644 --- a/src/main/java/net/geedge/util/RunnerYml.java +++ b/src/main/java/net/geedge/util/RunnerYml.java @@ -1,9 +1,8 @@ package net.geedge.util; -import java.util.List; - import lombok.Data; -import lombok.EqualsAndHashCode; + +import java.util.List; @Data public class RunnerYml { diff --git a/src/main/java/net/geedge/util/T.java b/src/main/java/net/geedge/util/T.java index 0f659b1..23ff032 100644 --- a/src/main/java/net/geedge/util/T.java +++ b/src/main/java/net/geedge/util/T.java @@ -1,12 +1,15 @@ package net.geedge.util; -import java.awt.Graphics; -import java.awt.Robot; -import java.lang.ref.PhantomReference; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.SoftReference; -import java.lang.ref.WeakReference; +import cn.hutool.core.date.DateTime; + +import javax.crypto.Cipher; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import javax.tools.JavaCompiler; +import javax.tools.JavaFileObject; +import java.awt.*; +import java.lang.ref.*; import java.lang.reflect.Type; import java.math.BigDecimal; import java.net.Socket; @@ -16,22 +19,10 @@ import java.security.SecureRandom; import java.time.LocalDateTime; import java.time.temporal.Temporal; import java.time.temporal.TemporalAccessor; -import java.util.Calendar; -import java.util.Collection; -import java.util.Iterator; -import java.util.Spliterator; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.crypto.Cipher; -import javax.crypto.KeyGenerator; -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; -import javax.tools.JavaCompiler; -import javax.tools.JavaFileObject; - -import cn.hutool.core.date.DateTime; - public class T { public static final Pattern REGEX_SPECIAL_DATE = Pattern @@ -1480,4 +1471,80 @@ public class T { } + /** + * CommandLineUtil + * + * @version org.apache.commons.commons-exec:1.3 + * @apiNote copy from rg.apache.commons.exec.CommandLine.translateCommandline + */ + public static class CommandLineUtil { + /** + * translateCommandline + * + * @param toProcess + * @return + */ + public static String[] translateCommandline(String toProcess) { + if (toProcess != null && toProcess.length() != 0) { + int state = 0; + StringTokenizer tok = new StringTokenizer(toProcess, "\"' ", true); + ArrayList<String> list = new ArrayList(); + StringBuilder current = new StringBuilder(); + boolean lastTokenHasBeenQuoted = false; + + while (true) { + while (tok.hasMoreTokens()) { + String nextTok = tok.nextToken(); + switch (state) { + case 1: + if ("'".equals(nextTok)) { + lastTokenHasBeenQuoted = true; + state = 0; + } else { + current.append(nextTok); + } + continue; + case 2: + if ("\"".equals(nextTok)) { + lastTokenHasBeenQuoted = true; + state = 0; + } else { + current.append(nextTok); + } + continue; + } + + if ("'".equals(nextTok)) { + state = 1; + } else if ("\"".equals(nextTok)) { + state = 2; + } else if (" ".equals(nextTok)) { + if (lastTokenHasBeenQuoted || current.length() != 0) { + list.add(current.toString()); + current = new StringBuilder(); + } + } else { + current.append(nextTok); + } + + lastTokenHasBeenQuoted = false; + } + + if (lastTokenHasBeenQuoted || current.length() != 0) { + list.add(current.toString()); + } + + if (state != 1 && state != 2) { + String[] args = new String[list.size()]; + return (String[]) list.toArray(args); + } + + throw new IllegalArgumentException("Unbalanced quotes in " + toProcess); + } + } else { + return new String[0]; + } + } + } + } |
