summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshizhendong <[email protected]>2024-07-05 16:38:12 +0800
committershizhendong <[email protected]>2024-07-05 16:38:12 +0800
commita8a0fa4a08baf242fa531dae24f9854754d0804b (patch)
tree495dc40ad0abc5b2c11b0c323013c539357a47ca
parent6698150959ccc888631e3161fb998ce2ebb0f115 (diff)
feat: ASW-4 新增 runner exexutor 线程
-rw-r--r--src/main/java/net/geedge/ASWRunner.java17
-rw-r--r--src/main/java/net/geedge/executor/Executor.java493
-rw-r--r--src/main/java/net/geedge/executor/JobInfo.java18
-rw-r--r--src/main/java/net/geedge/executor/LogUploader.java82
-rw-r--r--src/main/java/net/geedge/util/PlaybookYml.java16
-rw-r--r--src/main/java/net/geedge/util/RunnerContext.java92
-rw-r--r--src/main/java/net/geedge/util/RunnerYml.java5
-rw-r--r--src/main/java/net/geedge/util/T.java107
8 files changed, 734 insertions, 96 deletions
diff --git a/src/main/java/net/geedge/ASWRunner.java b/src/main/java/net/geedge/ASWRunner.java
index 9f338e0..f592319 100644
--- a/src/main/java/net/geedge/ASWRunner.java
+++ b/src/main/java/net/geedge/ASWRunner.java
@@ -1,15 +1,5 @@
package net.geedge;
-import java.io.File;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
import cn.hutool.core.lang.Dict;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
@@ -22,6 +12,9 @@ import net.geedge.util.RunnerContext.Parameters;
import net.geedge.util.RunnerYml;
import net.geedge.util.RunnerYml.ExecutorConfig;
import net.geedge.util.T;
+import org.apache.commons.cli.*;
+
+import java.util.Map;
/**
* 程序主要包括两个线程 </br>
@@ -111,10 +104,6 @@ public class ASWRunner {
/**
* 注册登录
- *
- * @param url
- * @param token
- * @return
*/
private static void register() {
while (RunnerContext.getRunFlag()) {
diff --git a/src/main/java/net/geedge/executor/Executor.java b/src/main/java/net/geedge/executor/Executor.java
index 494ef2f..85cee12 100644
--- a/src/main/java/net/geedge/executor/Executor.java
+++ b/src/main/java/net/geedge/executor/Executor.java
@@ -1,39 +1,470 @@
package net.geedge.executor;
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.lang.Dict;
+import cn.hutool.http.HttpException;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.Method;
+import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
+import cn.hutool.setting.yaml.YamlUtil;
import lombok.Data;
+import net.geedge.util.PlaybookYml;
import net.geedge.util.RunnerContext;
import net.geedge.util.RunnerYml.ExecutorConfig;
+import net.geedge.util.T;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Data
-public class Executor implements Runnable{
- private ExecutorConfig executorConfig;
- private JobInfo jobInfo;
- Log log = Log.get();
-
- @Override
- public void run() {
- Thread.currentThread().setName("job-excutor-"+jobInfo.getId());
- log.info("job executor start");
- RunnerContext.addActiveExecutors(this);
- try {
- log.info("job executor run job start");
- this.runJob();
- log.info("job executor run job end");
- } finally {
- RunnerContext.removeActiveExecutors(this);
- log.info("job executor run jobend");
- }
- }
-
- /**
- * 运行playbook
- * @param path playbook文件路径
- * @param config executor配置
- */
- void runJob() {
-
- };
-
-
-}
+public class Executor implements Runnable {
+ private Log log = Log.get();
+
+ private static final String WORK_DIR_PREFIX = "/builds";
+ private static final String TMP_DIR_PREFIX = "/tmp";
+
+ private JobInfo jobInfo;
+ private ExecutorConfig executorConfig;
+
+ private LogUploader logUploader;
+ private final Queue<String> logQueue = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ private Path getJobWorkPath() {
+ return Path.of(WORK_DIR_PREFIX, jobInfo.getId());
+ }
+
+ private Path getTmpDirPath() {
+ return Path.of(TMP_DIR_PREFIX, jobInfo.getId());
+ }
+
+ private void init() {
+ // 清空工作目录
+ T.FileUtil.del(this.getJobWorkPath());
+ T.FileUtil.del(this.getTmpDirPath());
+
+ // runnerContext
+ RunnerContext.addActiveExecutors(this);
+
+ // logUploader
+ logUploader = new LogUploader(jobInfo.getId(), logQueue);
+ scheduler.scheduleAtFixedRate(logUploader, 0, 5, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("job-excutor-" + jobInfo.getId());
+ log.info("job executor start");
+ if (log.isDebugEnabled()) {
+ log.debug("jobInfo: {}", T.JSONUtil.toJsonStr(jobInfo));
+ log.debug("executorConfig: {}", T.JSONUtil.toJsonStr(executorConfig));
+ }
+ try {
+ log.info("job executor run job start");
+ // init
+ this.init();
+ // run
+ this.runJob();
+ log.info("job executor run job end");
+ } catch (Exception e) {
+ log.error(e, "job executor run error");
+ } finally {
+ // destroy
+ this.destroy();
+ log.info("job executor end");
+ }
+ }
+
+ void runJob() {
+ // 下载 package playbook
+ this.downloadPkgAndPb();
+
+ String platform = executorConfig.getPlatform();
+ switch (platform) {
+ case "android" -> {
+ new AndroidJob().run();
+ }
+ case "ios" -> {
+ // TODO
+ }
+ case "windows" -> {
+ // TODO
+ }
+ }
+ }
+
+ /**
+ * 下载 package 和 playbook 文件
+ */
+ private void downloadPkgAndPb() {
+ // package
+ JobInfo.Package pkg = jobInfo.getPkg();
+ // http://{host}/api/v1/package/download/{id}
+ String pkgUrl = String.format("%s/%s", RunnerContext.getDownloadPkgPath(), pkg.getId());
+ this.downloadFileFromUrl(pkgUrl, pkg.getFileName());
+
+ // playbook
+ JobInfo.Playbook pb = jobInfo.getPlaybook();
+ // http://{host}/api/v1/playbook/download/{id}
+ String pbUrl = String.format("%s/%s", RunnerContext.getDownloadPbPath(), pb.getId());
+ String pbFilename = pb.getName();
+ this.downloadFileFromUrl(pbUrl, pbFilename);
+
+ // docker image
+ if (T.StrUtil.equals("docker", executorConfig.getType()) && T.StrUtil.isNotEmpty(executorConfig.getImage())) {
+ String result = this.execCmdForStr(String.format("docker image inspect %s", executorConfig.getImage()));
+ boolean exist = false;
+ if (T.JSONUtil.isTypeJSON(result)) {
+ List<JSONObject> list = T.JSONUtil.toList(result, JSONObject.class);
+ if (T.ObjectUtil.isNotEmpty(T.CollUtil.getFirst(list))) {
+ exist = true;
+ log.info("Using docker image {} fot {}", T.CollUtil.getFirst(list).getStr("Id"), executorConfig.getImage());
+ }
+ }
+ if (!exist) {
+ log.warn("docker image not exist, download file and load image");
+ // http://{host}/api/v1/docker/image/download/{name}
+ String imgUrl = String.format("%s/%s", RunnerContext.getDownloadDockerImgPath(), executorConfig.getImage());
+ String imgFilename = T.FileNameUtil.cleanInvalid(executorConfig.getImage());
+ this.downloadFileFromUrl(imgUrl, imgFilename);
+ // load image
+ String imagePath = T.FileUtil.file(this.getJobWorkPath().toFile(), imgFilename).getPath();
+ String loadResult = this.execCmdForStr(String.format("docker load -i %s", imagePath));
+ log.info("docker load image finshed. result: {}", loadResult);
+ if (!T.StrUtil.contains(loadResult, executorConfig.getImage())) {
+ throw new RuntimeException(T.StrUtil.format("docker load image error, image: {} ", executorConfig.getImage()));
+ }
+ }
+ }
+ }
+
+ private void downloadFileFromUrl(String url, String filename) {
+ log.info("download file start, url:{}", url);
+ HttpRequest request = T.HttpUtil.createGet(url);
+ request.addHeaders(T.MapUtil.of("Authorization", RunnerContext.getRegisterToken()));
+ HttpResponse response = request.timeout(-1).executeAsync();
+ if (response.isOk()) {
+ File file = response.writeBodyForFile(T.FileUtil.file(this.getJobWorkPath().toFile(), filename), null);
+ log.info("download file finshed, filename: {}, file md5: {}", filename, T.DigestUtil.md5Hex(file));
+ } else {
+ log.error("download file error, status code: {}", response.getStatus());
+ throw new HttpException("download file error, status code: {}", response.getStatus());
+ }
+ }
+
+ private String execCmdForStr(String cmd) {
+ return this.execCmdForStr(cmd, null, null);
+ }
+ private String execCmdForStr(String cmd, Map<String, String> envp) {
+ return this.execCmdForStr(cmd, envp, null);
+ }
+ private String execCmdForStr(String cmd, File directory) {
+ return this.execCmdForStr(cmd, null, directory);
+ }
+
+ private String execCmdForStr(String cmd, Map<String, String> envp, File directory) {
+ String result = "";
+ InputStream in = null;
+ try {
+ ProcessBuilder processBuilder = new ProcessBuilder("/bin/sh", "-c", cmd);
+ // docker cmd
+ if (T.StrUtil.startWith(cmd, "docker ")) {
+ processBuilder = new ProcessBuilder(T.CommandLineUtil.translateCommandline(cmd));
+ }
+ if (T.MapUtil.isNotEmpty(envp)) {
+ processBuilder.environment().putAll(envp);
+ }
+ if (T.ObjectUtil.isNotEmpty(directory)) {
+ processBuilder.directory(directory);
+ }
+ // 将标准错误流重定向到标准输出流
+ processBuilder.redirectErrorStream(true);
+ Process process = processBuilder.start();
+ in = process.getInputStream();
+ result = IoUtil.read(in, T.CharsetUtil.CHARSET_UTF_8);
+
+ // 命令&结果
+ logQueue.add(T.StrUtil.concat(true, "$ ", cmd, "\n"));
+ logQueue.add(result);
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("cmd exited with non-zero.job id: {}, cmd: {}, status: {}", jobInfo.getId(), cmd, exitCode);
+ }
+ }
+ } catch (Exception e) {
+ log.error(e, "exec cmd error. job id: {}, cmd: {}", jobInfo.getId(), cmd);
+ } finally {
+ IoUtil.close(in);
+ }
+ return result;
+ }
+
+ private void destroy() {
+ try {
+ // stop logUploader
+ logUploader.stop();
+ // 等待 5 秒,确保最后的日志上传
+ T.ThreadUtil.sleep(5, TimeUnit.SECONDS);
+ try {
+ scheduler.shutdown();
+ if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (Exception e) {
+ scheduler.shutdownNow();
+ }
+
+ // removeActiveExecutors
+ RunnerContext.removeActiveExecutors(this);
+
+ // destroy work dir
+ T.FileUtil.del(this.getJobWorkPath());
+ T.FileUtil.del(this.getTmpDirPath());
+ } catch (Exception e) {
+ log.error(e, "destroy error. job id: {}", jobInfo.getId());
+ }
+ }
+
+ abstract class Job {
+ abstract void run();
+ }
+
+ class AndroidJob extends Job {
+
+ @Override
+ void run() {
+ String type = executorConfig.getType();
+ switch (type) {
+ case "local" -> {
+ this.runLocalJob();
+ }
+ case "docker" -> {
+ try {
+ this.runDockerJob();
+ } finally {
+ String containerName = jobInfo.getId();
+ try {
+ // destroy container
+ execCmdForStr(String.format("docker stop %s", containerName));
+ execCmdForStr(String.format("docker rm -f %s", containerName));
+ } catch (Exception e) {
+ // do nothing
+ log.error("destroy container error. name: {}", containerName);
+ }
+ }
+ }
+ }
+ }
+
+ private void runLocalJob() {
+ // default device
+ String device = "127.0.0.1:5555";
+ // adb
+ List<String> cmdList = T.ListUtil.list(true);
+ cmdList.add("adb version");
+ cmdList.add("adb disconnect");
+ cmdList.add("adb connect " + device);
+ cmdList.add("adb root");
+ cmdList.add("adb devices -l");
+
+ // install pkg
+ JobInfo.Package pkg = jobInfo.getPkg();
+ String pkgFilePath = FileUtil.file(getJobWorkPath().toString(), pkg.getFileName()).getAbsolutePath();
+ cmdList.add(String.format("adb -s %s install %s", device, pkgFilePath));
+ cmdList.add(String.format("adb -s %s shell pm list packages -3", device));
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd));
+
+ // userId
+ String userId = T.StrUtil.trim(execCmdForStr(String.format("adb -s %s shell dumpsys package %s | grep userId= | awk -F 'userId=' '{print $2}'| head -n 1", device, pkg.getIdentifier())));
+
+ // iptables
+ cmdList.clear();
+ cmdList.add(String.format("adb -s %s shell iptables -F", device));
+ cmdList.add(String.format("adb -s %s shell iptables -X", device));
+ cmdList.add(String.format("adb -s %s shell iptables -w 10 -A OUTPUT -m owner --uid-owner %s -j CONNMARK --set-mark %s", device, userId, userId));
+ cmdList.add(String.format("adb -s %s shell iptables -w 10 -A INPUT -m connmark --mark %s -j NFLOG --nflog-group %s", device, userId, userId));
+ cmdList.add(String.format("adb -s %s shell iptables -w 10 -A OUTPUT -m connmark --mark %s -j NFLOG --nflog-group %s", device, userId, userId));
+ cmdList.add(String.format("adb -s %s shell iptables -L", device));
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd));
+
+ // tcpdump
+ String pcapFilePath = String.format("/data/data/%s/cache/%s.pcap", pkg.getIdentifier(), pkg.getId());
+ String tcpdump = String.format("adb -s %s shell tcpdump -i nflog:%s -w %s &", device, userId, pcapFilePath);
+ logQueue.add(T.StrUtil.concat(true, "$ ", tcpdump, "\n"));
+ T.RuntimeUtil.exec(tcpdump);
+
+ // playbook
+ JobInfo.Playbook pb = jobInfo.getPlaybook();
+ String pbWorkDir = String.format("%s/playbook", getJobWorkPath());
+
+ String pbFilePath = FileUtil.file(getJobWorkPath().toString(), pb.getName()).getAbsolutePath();
+ execCmdForStr(String.format("mkdir -p %s/playbook", getTmpDirPath()));
+ execCmdForStr(String.format("unzip -o %s -d %s/playbook", pbFilePath, getTmpDirPath()));
+ execCmdForStr(String.format("mv %s/playbook/* %s", getTmpDirPath(), pbWorkDir));
+ execCmdForStr(String.format("ls -hl %s", pbWorkDir));
+
+ // read main.yml
+ Dict dict = YamlUtil.loadByPath(Path.of(pbWorkDir, "main.yml").toString());
+ PlaybookYml playbookYml = T.BeanUtil.copyProperties(dict, PlaybookYml.class);
+
+ // script
+ cmdList.clear();
+ cmdList.addAll(playbookYml.getBeforeScript());
+ cmdList.addAll(playbookYml.getScript());
+ cmdList.addAll(playbookYml.getAfterScript());
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd, playbookYml.getVariables(), T.FileUtil.file(pbWorkDir)));
+
+ // kill tcpdump
+ execCmdForStr(String.format("adb -s %s shell \"ps -ef | grep tcpdump | grep -v grep | awk '{print \\$2}' | xargs kill -INT \"", device));
+
+ // pull pcap file
+ String localPcapFilePath = FileUtil.file(getJobWorkPath().toFile(), pkg.getId() + ".pcap").toString();
+ execCmdForStr(String.format("adb -s %s pull %s %s", device, pcapFilePath, localPcapFilePath));
+
+ // upload pcap file
+ this.uploadPcapFile(localPcapFilePath);
+ }
+
+ private void runDockerJob() {
+ // docker container
+ String containerName = jobInfo.getId();
+ String dockerRun = String.format("docker run -d -e EMULATOR_DEVICE=\"Samsung Galaxy S10\" -e WEB_VNC=true --device /dev/kvm -v %s:%s --name %s %s", getJobWorkPath(), getJobWorkPath(), containerName, executorConfig.getImage());
+ execCmdForStr(dockerRun);
+
+ String prefix = String.format("docker exec -u0 %s ", containerName);
+ // device_status
+ for (int i = 1; i <= 10; i++) {
+ T.ThreadUtil.sleep(30, TimeUnit.SECONDS);
+ String deviceStatus = execCmdForStr(prefix + "cat device_status");
+ log.info("docker emulator device: {}", deviceStatus);
+ if (T.StrUtil.equalsIgnoreCase("READY", deviceStatus)) {
+ break;
+ }
+ }
+
+ // adb
+ List<String> cmdList = T.ListUtil.list(true);
+ cmdList.add(prefix + "adb version");
+ cmdList.add(prefix + "adb root");
+ cmdList.add(prefix + "adb devices -l");
+
+ // install pkg
+ JobInfo.Package pkg = jobInfo.getPkg();
+ String pkgFilePath = FileUtil.file(getJobWorkPath().toString(), pkg.getFileName()).getAbsolutePath();
+ cmdList.add(prefix + String.format("adb install %s", pkgFilePath));
+ cmdList.add(prefix + String.format("adb shell pm list packages -3"));
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd));
+
+ // userId
+ String getUserIdCmd = String.format("adb shell dumpsys package %s | grep userId= | awk -F \"'userId='\" \"'{print $2}'\" | head -n 1", pkg.getIdentifier());
+ String userId = T.StrUtil.trim(execCmdForStr(prefix + getUserIdCmd));
+
+ // iptables
+ cmdList.clear();
+ cmdList.add(prefix + String.format("adb shell iptables -F"));
+ cmdList.add(prefix + String.format("adb shell iptables -X"));
+ cmdList.add(prefix + String.format("adb shell iptables -w 10 -A OUTPUT -m owner --uid-owner %s -j CONNMARK --set-mark %s", userId, userId));
+ cmdList.add(prefix + String.format("adb shell iptables -w 10 -A INPUT -m connmark --mark %s -j NFLOG --nflog-group %s", userId, userId));
+ cmdList.add(prefix + String.format("adb shell iptables -w 10 -A OUTPUT -m connmark --mark %s -j NFLOG --nflog-group %s", userId, userId));
+ cmdList.add(prefix + String.format("adb shell iptables -L"));
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd));
+
+ // tcpdump
+ String pcapFilePath = String.format("/data/data/%s/cache/%s.pcap", pkg.getIdentifier(), pkg.getId());
+ String tcpdump = prefix + String.format("adb shell tcpdump -i nflog:%s -w %s &", userId, pcapFilePath);
+ logQueue.add(T.StrUtil.concat(true, "$ ", tcpdump, "\n"));
+ T.RuntimeUtil.exec(tcpdump);
+
+ // playbook
+ JobInfo.Playbook pb = jobInfo.getPlaybook();
+ String pbWorkDir = String.format("%s/playbook", getJobWorkPath());
+
+ String pbFilePath = FileUtil.file(getJobWorkPath().toString(), pb.getName()).getAbsolutePath();
+ execCmdForStr(String.format("mkdir -p %s/playbook", getTmpDirPath()));
+ execCmdForStr(String.format("unzip -o %s -d %s/playbook", pbFilePath, getTmpDirPath()));
+ execCmdForStr(String.format("mv %s/playbook/* %s", getTmpDirPath(), pbWorkDir));
+ execCmdForStr(String.format("ls -hl %s", pbWorkDir));
+
+ // read main.yml
+ Dict dict = YamlUtil.loadByPath(Path.of(pbWorkDir, "main.yml").toString());
+ PlaybookYml pbYml = T.BeanUtil.copyProperties(dict, PlaybookYml.class);
+
+ Map<String, String> variables = T.MapUtil.defaultIfEmpty(pbYml.getVariables(), new HashMap<>());
+ String dockerEnvArgs = variables.entrySet().stream().map(entry -> "-e " + entry.getKey().trim() + "=" + entry.getValue().trim()).collect(Collectors.joining(" "));
+
+ // playbook cmd prefix
+ String pbPrefix = String.format("docker exec -u0 -w %s %s %s ", pbWorkDir, dockerEnvArgs, containerName);
+
+ // script
+ List<String> beforeScript = pbYml.getBeforeScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList());
+ List<String> script = pbYml.getScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList());
+ List<String> afterScript = pbYml.getAfterScript().stream().map(item -> pbPrefix + item).collect(Collectors.toList());
+
+ cmdList.clear();
+ cmdList.addAll(beforeScript);
+ cmdList.addAll(script);
+ cmdList.addAll(afterScript);
+
+ // exec
+ cmdList.stream().forEach(cmd -> execCmdForStr(cmd));
+
+ // kill tcpdump
+ execCmdForStr(prefix + "adb shell \"ps -ef | grep tcpdump | grep -v grep | awk '{print $2}' | xargs kill -INT \"");
+
+ // pull pcap file
+ String localPcapFilePath = FileUtil.file(getJobWorkPath().toFile(), pkg.getId() + ".pcap").toString();
+ execCmdForStr(prefix + String.format("adb pull %s %s", pcapFilePath, localPcapFilePath));
+
+ // upload pcap file
+ this.uploadPcapFile(localPcapFilePath);
+ }
+
+ private void uploadPcapFile(String pcapFilePath) {
+ String jobId = jobInfo.getId();
+ try {
+ String url = String.format("%s/%s", RunnerContext.getUploadResultPath(), jobId);
+ String token = RunnerContext.getRegisterToken();
+
+ HttpRequest request = T.HttpUtil.createRequest(Method.PUT, url);
+ request.addHeaders(T.MapUtil.of("Authorization", token));
+
+ // file data
+ request.form("file", FileUtil.file(pcapFilePath));
+ request.contentType("multipart/form-data");
+
+ HttpResponse response = request.execute();
+ if (response.isOk()) {
+ log.info("upload pcap success. job id: {}", jobId);
+ } else {
+ log.warn("upload pcap error. job id: {} status: {}", jobId, response.getStatus());
+ }
+ } catch (Exception e) {
+ log.error(e, "upload pcap error. job id: {}", jobId);
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/net/geedge/executor/JobInfo.java b/src/main/java/net/geedge/executor/JobInfo.java
index 7fcedaa..becf53c 100644
--- a/src/main/java/net/geedge/executor/JobInfo.java
+++ b/src/main/java/net/geedge/executor/JobInfo.java
@@ -1,8 +1,9 @@
package net.geedge.executor;
-import java.util.List;
-
import lombok.Data;
+import net.geedge.util.T;
+
+import java.util.List;
@Data
public class JobInfo {
@@ -17,6 +18,10 @@ public class JobInfo {
String platform;
String version;
String identifier;
+
+ public String getFileName() {
+ return T.StrUtil.concat(true, this.identifier, "_", this.platform, "_", this.version, ".apk");
+ }
}
@Data
@@ -32,4 +37,13 @@ public class JobInfo {
String name;
String conditions;
}
+
+ public Package newPkgInstance() {
+ return new Package();
+ }
+
+
+ public Playbook newPbInstance() {
+ return new Playbook();
+ }
}
diff --git a/src/main/java/net/geedge/executor/LogUploader.java b/src/main/java/net/geedge/executor/LogUploader.java
new file mode 100644
index 0000000..4e71feb
--- /dev/null
+++ b/src/main/java/net/geedge/executor/LogUploader.java
@@ -0,0 +1,82 @@
+package net.geedge.executor;
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.Method;
+import cn.hutool.log.Log;
+import net.geedge.util.RunnerContext;
+import net.geedge.util.T;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Queue;
+
+public class LogUploader implements Runnable {
+ private final Log log = Log.get();
+ private final String jobId;
+ private final Queue<String> logQueue;
+ private final File failedLogFile;
+ private volatile boolean running = true;
+
+ private Path getJobWorkPath() {
+ return Path.of("/builds", jobId);
+ }
+
+ public LogUploader(String jobId, Queue<String> logQueue) {
+ this.jobId = jobId;
+ this.logQueue = logQueue;
+ failedLogFile = FileUtil.file(this.getJobWorkPath().toFile(), "failed_logs.txt");
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("log-uploader-" + jobId);
+ if (this.running) {
+ this.uploadLogs();
+ }
+ }
+
+ public void stop() {
+ log.info("log uploader thread prepare shundown, last upload");
+ running = false;
+ // 最后上传一次
+ this.uploadLogs();
+ }
+
+ private void uploadLogs() {
+ StringBuilder content = new StringBuilder();
+ // 检查是否有未上传的日志文件
+ if (FileUtil.exist(failedLogFile)) {
+ String str = FileUtil.readString(failedLogFile, T.CharsetUtil.CHARSET_UTF_8);
+ content.append(str);
+ // 删除临时文件
+ FileUtil.del(failedLogFile);
+ }
+
+ String line;
+ while ((line = logQueue.poll()) != null) {
+ content.append(line);
+ }
+
+ if (content.length() > 0) {
+ try {
+ String url = String.format("%s/%s", RunnerContext.getTraceLogPath(), jobId);
+ String token = RunnerContext.getRegisterToken();
+
+ HttpRequest request = T.HttpUtil.createRequest(Method.PUT, url);
+ request.addHeaders(T.MapUtil.of("Authorization", token));
+ request.body(T.StrUtil.str(content), "application/octet-stream");
+
+ HttpResponse response = request.execute();
+ if (!response.isOk()) {
+ log.warn("upload logs error. job id: {} status: {}", jobId, response.getStatus());
+ FileUtil.appendUtf8String(T.StrUtil.str(content), failedLogFile);
+ }
+ } catch (Exception e) {
+ log.error("upload logs error. job id: {} msg: {}", jobId, e.getMessage());
+ FileUtil.appendUtf8String(T.StrUtil.str(content), failedLogFile);
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/net/geedge/util/PlaybookYml.java b/src/main/java/net/geedge/util/PlaybookYml.java
new file mode 100644
index 0000000..0aeeb30
--- /dev/null
+++ b/src/main/java/net/geedge/util/PlaybookYml.java
@@ -0,0 +1,16 @@
+package net.geedge.util;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class PlaybookYml {
+
+ private Map<String, String> variables;
+ private List<String> beforeScript;
+ private List<String> script;
+ private List<String> afterScript;
+
+}
diff --git a/src/main/java/net/geedge/util/RunnerContext.java b/src/main/java/net/geedge/util/RunnerContext.java
index f3c093f..9207508 100644
--- a/src/main/java/net/geedge/util/RunnerContext.java
+++ b/src/main/java/net/geedge/util/RunnerContext.java
@@ -1,13 +1,11 @@
package net.geedge.util;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-
import net.geedge.executor.Executor;
import net.geedge.util.RunnerYml.ExecutorConfig;
+import java.util.List;
+import java.util.Map;
+
public class RunnerContext {
/**
* 默认配置文件路径
@@ -17,11 +15,37 @@ public class RunnerContext {
* 默认心跳间隔 5s
*/
public static final Integer DEFAULT_HEARTBEAT_INTERVAL = 5000;
- public static final String REGISTER_PATH = "/register";
+ public static final String REGISTER_PATH = "/api/v1/runner/register";
/**
* 心跳接口路径
*/
- private static final String HEARTBEAT_PATH = "/heartbeat";
+ private static final String HEARTBEAT_PATH = "/api/v1/runner/heartbeat";
+ /**
+ * 下载 package 接口路径
+ * http://{host}/api/v1/package/download/{id}
+ */
+ private static final String DOWNLOAD_PKG_PATH = "/api/v1/package/download";
+ /**
+ * 下载 playbook 接口路径
+ * http://{host}/api/v1/playbook/download/{id}
+ */
+ private static final String DOWNLOAD_PB_PATH = "/api/v1/playbook/download";
+ /**
+ * 下载 docker image 接口路径
+ * http://{host}/api/v1/docker/image/download/{name}
+ */
+ private static final String DOWNLOAD_DOCKER_IMG_PATH = "/api/v1/docker/image/download";
+
+ /**
+ * 上传任务执行日志 接口路径
+ * http://{host}/api/v1/runner/trace/{jobId}
+ */
+ private static final String TRACE_LOG_PATH = "/api/v1/runner/trace";
+ /**
+ * 上传任务执行结果 接口路径
+ * http://{host}/api/v1/runner/uploadResult/{jobId}
+ */
+ private static final String UPLOAD_RESULT_PATH = "/api/v1/runner/uploadResult";
/**
* 程序运行标识
*/
@@ -52,15 +76,35 @@ public class RunnerContext {
}
public static String getRegisterPath() {
- return REGISTER_PATH;
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), REGISTER_PATH);
}
public static String getheartbeatPath() {
- return HEARTBEAT_PATH;
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), HEARTBEAT_PATH);
+ }
+
+ public static String getDownloadPkgPath() {
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_PKG_PATH);
+ }
+
+ public static String getDownloadPbPath() {
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_PB_PATH);
+ }
+
+ public static String getDownloadDockerImgPath() {
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), DOWNLOAD_DOCKER_IMG_PATH);
+ }
+
+ public static String getTraceLogPath() {
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), TRACE_LOG_PATH);
+ }
+
+ public static String getUploadResultPath() {
+ return T.URLUtil.completeUrl(runnerYml.getRegister().getUrl(), UPLOAD_RESULT_PATH);
}
public static String getRegisterToken() {
- return HEARTBEAT_PATH;
+ return runnerYml.getRegister().getToken();
}
public static void setParameters(Parameters param) {
@@ -95,17 +139,17 @@ public class RunnerContext {
* @return
*/
public static List<ExecutorConfig> getFreeExecutor() {
- List<ExecutorConfig> result = T.ListUtil.list(false);
// 记录每个平台可以提供几个运行实例
- runnerYml.getExecutors().forEach(e -> {
- result.add(e);
- });
+ List<ExecutorConfig> result = T.BeanUtil.copyToList(runnerYml.getExecutors(), ExecutorConfig.class);
// 减去现有的运行实例
activeExecutors.forEach(ae -> {
ExecutorConfig config = ae.getExecutorConfig();
result.forEach(e -> {
- if (e.getType().equals(config.getType()) && e.getPlatform().equals(config.getPlatform())) {
- e.setConcurrent(e.getConcurrent() - 1);
+ if (T.ObjectUtil.equals(e.getType(), config.getType())
+ && T.ObjectUtil.equals(e.getPlatform(), config.getPlatform())
+ && T.ObjectUtil.equals(e.getImage(), config.getImage())) {
+ int num = e.getConcurrent() == 0 ? 0 : e.getConcurrent() - 1;
+ e.setConcurrent(num);
}
});
});
@@ -115,7 +159,7 @@ public class RunnerContext {
public static ExecutorConfig getFreeExecutorByPlatform(String platform) {
List<ExecutorConfig> freeExecutor = getFreeExecutor();
for (ExecutorConfig e : freeExecutor) {
- if(e.getPlatform().equals(platform)) {
+ if (e.getPlatform().equals(platform) && e.getConcurrent() > 0) {
return e;
}
}
@@ -131,18 +175,14 @@ public class RunnerContext {
});
return result;
}
-
+
public static void addActiveExecutors(Executor e) {
activeExecutors.add(e);
}
-
+
public static void removeActiveExecutors(Executor e) {
- Iterator<Executor> ite = activeExecutors.iterator();
- while(ite.hasNext()) {
- Executor next = ite.next();
- if(next.getJobInfo().getId().equals(e.getJobInfo().getId())) {
- ite.remove();
- }
- }
+ activeExecutors.removeIf(next ->
+ T.ObjectUtil.equals(next.getJobInfo().getId(), e.getJobInfo().getId())
+ );
}
}
diff --git a/src/main/java/net/geedge/util/RunnerYml.java b/src/main/java/net/geedge/util/RunnerYml.java
index 1ed0cfe..8ebec63 100644
--- a/src/main/java/net/geedge/util/RunnerYml.java
+++ b/src/main/java/net/geedge/util/RunnerYml.java
@@ -1,9 +1,8 @@
package net.geedge.util;
-import java.util.List;
-
import lombok.Data;
-import lombok.EqualsAndHashCode;
+
+import java.util.List;
@Data
public class RunnerYml {
diff --git a/src/main/java/net/geedge/util/T.java b/src/main/java/net/geedge/util/T.java
index 0f659b1..23ff032 100644
--- a/src/main/java/net/geedge/util/T.java
+++ b/src/main/java/net/geedge/util/T.java
@@ -1,12 +1,15 @@
package net.geedge.util;
-import java.awt.Graphics;
-import java.awt.Robot;
-import java.lang.ref.PhantomReference;
-import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.SoftReference;
-import java.lang.ref.WeakReference;
+import cn.hutool.core.date.DateTime;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import java.awt.*;
+import java.lang.ref.*;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.net.Socket;
@@ -16,22 +19,10 @@ import java.security.SecureRandom;
import java.time.LocalDateTime;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Spliterator;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.crypto.Cipher;
-import javax.crypto.KeyGenerator;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
-import javax.tools.JavaCompiler;
-import javax.tools.JavaFileObject;
-
-import cn.hutool.core.date.DateTime;
-
public class T {
public static final Pattern REGEX_SPECIAL_DATE = Pattern
@@ -1480,4 +1471,80 @@ public class T {
}
+ /**
+ * CommandLineUtil
+ *
+ * @version org.apache.commons.commons-exec:1.3
+ * @apiNote copy from rg.apache.commons.exec.CommandLine.translateCommandline
+ */
+ public static class CommandLineUtil {
+ /**
+ * translateCommandline
+ *
+ * @param toProcess
+ * @return
+ */
+ public static String[] translateCommandline(String toProcess) {
+ if (toProcess != null && toProcess.length() != 0) {
+ int state = 0;
+ StringTokenizer tok = new StringTokenizer(toProcess, "\"' ", true);
+ ArrayList<String> list = new ArrayList();
+ StringBuilder current = new StringBuilder();
+ boolean lastTokenHasBeenQuoted = false;
+
+ while (true) {
+ while (tok.hasMoreTokens()) {
+ String nextTok = tok.nextToken();
+ switch (state) {
+ case 1:
+ if ("'".equals(nextTok)) {
+ lastTokenHasBeenQuoted = true;
+ state = 0;
+ } else {
+ current.append(nextTok);
+ }
+ continue;
+ case 2:
+ if ("\"".equals(nextTok)) {
+ lastTokenHasBeenQuoted = true;
+ state = 0;
+ } else {
+ current.append(nextTok);
+ }
+ continue;
+ }
+
+ if ("'".equals(nextTok)) {
+ state = 1;
+ } else if ("\"".equals(nextTok)) {
+ state = 2;
+ } else if (" ".equals(nextTok)) {
+ if (lastTokenHasBeenQuoted || current.length() != 0) {
+ list.add(current.toString());
+ current = new StringBuilder();
+ }
+ } else {
+ current.append(nextTok);
+ }
+
+ lastTokenHasBeenQuoted = false;
+ }
+
+ if (lastTokenHasBeenQuoted || current.length() != 0) {
+ list.add(current.toString());
+ }
+
+ if (state != 1 && state != 2) {
+ String[] args = new String[list.size()];
+ return (String[]) list.toArray(args);
+ }
+
+ throw new IllegalArgumentException("Unbalanced quotes in " + toProcess);
+ }
+ } else {
+ return new String[0];
+ }
+ }
+ }
+
}