diff options
| author | lifengchao <[email protected]> | 2024-07-11 10:31:57 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-07-11 10:31:57 +0800 |
| commit | 0c046806a14576652273f282bc3f1f77ad9a49e6 (patch) | |
| tree | dba2cb02c4ff5a9ea62e07a24cb24cfc1f70308b | |
| parent | 5b52a1807f7e3f71dbecc8c19930a3ba5e5f37ad (diff) | |
TSG-21732 Druid合并调度任务自动添加dimensionsSpec、metricsSpec配置develop-fix-2402
5 files changed, 130 insertions, 23 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactJob.java index 0ddfaa4..abb35a3 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactJob.java @@ -58,7 +58,7 @@ public class DruidCompactJob { return ReturnT.SUCCESS;
}
- JSONArray paramArray = parseParams(params);
+ JSONArray paramArray = parseParams(params, druidCompactUtil);
List<JSONObject> submittedTaskParams = new ArrayList<>();
List<DruidWaitedSubmitTaskParam> waitedSubmitTaskParams = new ArrayList<>();
JSONObject paramTemplate;
@@ -67,7 +67,7 @@ public class DruidCompactJob { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -111,7 +111,7 @@ public class DruidCompactJob { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -122,7 +122,7 @@ public class DruidCompactJob { String endDate = endTime;
// 检查补录前两天compact任务
- for (int j = 1; j <= 2; j++) {
+ for (int j = 1; j <= 1; j++) {
startDate = LocalDate.parse(startTime).minusDays(j).toString();
endDate = LocalDate.parse(startDate).plusDays(1).toString();
@@ -172,7 +172,7 @@ public class DruidCompactJob { return ReturnT.SUCCESS;
}
- public JSONArray parseParams(String params) {
+ public JSONArray parseParams(String params, DruidCompactUtil druidCompactUtil) {
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC")); // 默认结束时间是UTC时间
LocalDateTime defaultEndDateTime = now.getHour() >= 1? now: now.plusDays(-1); // 如果hour在1小时内, 结束时间往前推一天
@@ -181,6 +181,7 @@ public class DruidCompactJob { JSONArray paramArray = JSON.parseArray(params);
for (int i = 0; i < paramArray.size(); i++) {
JSONObject param = paramArray.getJSONObject(i);
+ druidCompactUtil.generateOverrideCompactTaskDefaultParam(param);
String paramStr = param.toString();
Preconditions.checkArgument("compact".equals(param.getString("type")), "type must is compact:%s", paramStr);
Preconditions.checkArgument(param.getString("dataSource") != null, "dataSource is required:%s", paramStr);
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactUtil.java index 6758ddb..8573576 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactUtil.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidCompactUtil.java @@ -4,6 +4,7 @@ import cn.hutool.log.Log; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONPath;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mesalab.executor.core.utils.HttpClientUtils;
@@ -113,6 +114,92 @@ public class DruidCompactUtil { }
}
+ public void generateOverrideCompactTaskDefaultParam(JSONObject param){
+ String dataSource = param.getString("dataSource");
+ JSONObject defaultParam = extractCompactTaskDefaultParam(dataSource);
+ Object dimensionsSpec = defaultParam.get("dimensionsSpec");
+ Object metricsSpec = defaultParam.get("metricsSpec");
+ JSONObject granularitySpec = (JSONObject) defaultParam.get("granularitySpec");
+ if(!param.containsKey("dimensionsSpec") && dimensionsSpec != null){
+ param.put("dimensionsSpec", dimensionsSpec);
+ }
+ if(!param.containsKey("metricsSpec") && metricsSpec != null){
+ param.put("metricsSpec", metricsSpec);
+ }
+ if(granularitySpec != null){
+ if(!param.containsKey("granularitySpec")){
+ param.put("granularitySpec", new JSONObject());
+ }
+ JSONObject granularityParam = param.getJSONObject("granularitySpec");
+ if(!granularityParam.containsKey("segmentGranularity") && granularitySpec.containsKey("segmentGranularity")){
+ granularityParam.put("segmentGranularity", granularitySpec.get("segmentGranularity"));
+ }
+ if(!granularityParam.containsKey("queryGranularity") && granularitySpec.containsKey("queryGranularity")){
+ granularityParam.put("queryGranularity", granularitySpec.get("queryGranularity"));
+ }
+ if(!granularityParam.containsKey("rollup") && granularitySpec.containsKey("rollup")){
+ granularityParam.put("rollup", granularitySpec.get("rollup"));
+ }
+ }
+ }
+
+ public JSONObject extractCompactTaskDefaultParam(String dataSource){
+ JSONObject defaultSpec = new JSONObject(8);
+
+ try {
+ String url = UrlUtil.getUrl(server, "/druid/indexer/v1/supervisor/" + dataSource);
+ String response = HttpClientUtils.httpGet(url);
+ JSONObject spec = JSON.parseObject(response);
+
+ // 提取任务配置: 有多种配置方式
+ String[] dimensionsPaths = new String[]{"$.dataSchema.dimensionsSpec", "$.spec.dataSchema.dimensionsSpec", "$.dataSchema.parser.parseSpec.dimensionsSpec"};
+ String[] metricsPaths = new String[]{"$.dataSchema.metricsSpec", "$.spec.dataSchema.metricsSpec"};
+ String[] granularityPaths = new String[]{"$.dataSchema.granularitySpec", "$.spec.dataSchema.granularitySpec"};
+ JSONObject dimensionsSpec = null;
+ JSONArray metricsSpec = null;
+ JSONObject granularitySpec = null;
+ JSONObject metric;
+ String fieldName;
+ String name;
+
+ for (String dimensionsPath : dimensionsPaths) {
+ dimensionsSpec = (JSONObject) JSONPath.of(dimensionsPath).eval(spec);
+ if(dimensionsSpec != null){
+ break;
+ }
+ }
+ for (String metricsPath : metricsPaths) {
+ metricsSpec = (JSONArray) JSONPath.of(metricsPath).eval(spec);
+ if(metricsSpec != null){
+ for (int i = 0; i < metricsSpec.size(); i++) {
+ metric = metricsSpec.getJSONObject(i);
+ fieldName = metric.getString("fieldName");
+ name = metric.getString("name");
+ if(fieldName != null && !fieldName.equals(name)){
+ metric.put("fieldName", name);
+ }
+ }
+ break;
+ }
+ }
+ for (String granularityPath : granularityPaths) {
+ granularitySpec = (JSONObject) JSONPath.of(granularityPath).eval(spec);
+ if(granularitySpec != null){
+ break;
+ }
+ }
+
+ defaultSpec.put("dimensionsSpec", dimensionsSpec);
+ defaultSpec.put("metricsSpec", metricsSpec);
+ defaultSpec.put("granularitySpec", granularitySpec);
+ } catch (Exception e) {
+ XxlJobLogger.log("extract compact task default param error:" + e.getMessage());
+ XxlJobLogger.log(e);
+ }
+
+ return defaultSpec;
+ }
+
public void submitCompactTask(JSONObject param, List<JSONObject> submittedTaskParams){
String paramJson = param.toJSONString();
XxlJobLogger.log("compact task param:" + paramJson);
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidMergeHistoryDataJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidMergeHistoryDataJob.java index c60a253..e040153 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidMergeHistoryDataJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DruidMergeHistoryDataJob.java @@ -54,7 +54,7 @@ public class DruidMergeHistoryDataJob { return ReturnT.SUCCESS;
}
- JSONArray paramArray = parseParams(params);
+ JSONArray paramArray = parseParams(params, druidCompactUtil);
List<JSONObject> submittedTaskParams = new ArrayList<>();
List<DruidWaitedSubmitTaskParam> waitedSubmitTaskParams = new ArrayList<>();
JSONObject paramTemplate;
@@ -63,7 +63,7 @@ public class DruidMergeHistoryDataJob { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -107,7 +107,7 @@ public class DruidMergeHistoryDataJob { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -168,13 +168,14 @@ public class DruidMergeHistoryDataJob { return ReturnT.SUCCESS;
}
- public JSONArray parseParams(String params) {
+ public JSONArray parseParams(String params, DruidCompactUtil druidCompactUtil) {
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC")); // 默认结束时间是UTC时间
JSONArray paramArray = JSON.parseArray(params);
for (int i = 0; i < paramArray.size(); i++) {
JSONObject param = paramArray.getJSONObject(i);
+ druidCompactUtil.generateOverrideCompactTaskDefaultParam(param);
String paramStr = param.toString();
Preconditions.checkArgument("compact".equals(param.getString("type")), "type must is compact:%s", paramStr);
Preconditions.checkArgument(param.getString("dataSource") != null, "dataSource is required:%s", paramStr);
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidCompactJobTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidCompactJobTest.java index 265dec1..077d7fd 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidCompactJobTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidCompactJobTest.java @@ -5,6 +5,7 @@ import cn.hutool.log.Log; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONWriter;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Preconditions;
import com.mesalab.executor.jobhandler.DruidCompactUtil;
@@ -40,6 +41,21 @@ import java.util.List; public class DruidCompactJobTest {
private Log logger = Log.get();
+ @Test
+ public void extractCompactTaskDefaultParamTest() throws Exception {
+ final String server = "192.168.44.12:8089";
+ DruidCompactUtil druidCompactUtil = DruidCompactUtil.getInstance(server);
+ //JSONObject jsonObject = druidCompactUtil.extractCompactTaskDefaultParam("object_statistics");
+ //System.out.println(jsonObject);
+ String[] tables = new String[]{"sys_storage_log", "statistics_rule", "object_statistics", "application_protocol_stat", "security_rule_hits", "monitor_rule_hits", "traffic_general_stat", "top_server_ips", "traffic_shaping_rule_hits", "service_chaining_rule_hits", "service_function_status", "statistics_rule_hits", "top_client_ips", "top_external_ips", "top_internal_ips", "top_server_domains", "top_server_fqdns", "proxy_rule_hits", "top_server_countries", "top_client_countries", "dos_protection_rule_hits", "dos_protection_rule_metric", "dos_sketch_top_server_ip"};
+ for (String table : tables) {
+ JSONObject jsonObject = druidCompactUtil.extractCompactTaskDefaultParam(table);
+ System.out.println(table + ":");
+ System.out.println(jsonObject.toJSONString(JSONWriter.Feature.PrettyFormat));
+ System.out.println("#################");
+ }
+ }
+
/**
* 压缩测试
*
@@ -77,7 +93,7 @@ public class DruidCompactJobTest { " {\n" +
" \"type\": \"compact\",\n" +
" \"dataSource\": \"object_statistics\",\n" +
- " \"interval\": \"2024-06-14/2024-06-15\",\n" +
+ " \"interval\": \"2024-05-16/2024-05-17\",\n" +
" \"tuningConfig\" : {\n" +
" \"type\" : \"index_parallel\",\n" +
" \"maxNumConcurrentSubTasks\" : 2\n" +
@@ -93,7 +109,7 @@ public class DruidCompactJobTest { public ReturnT<String> druidCompactSegmentJobHandler(String params) {
// final String server = compactConfig.getAnalyticServer();
- final String server = "192.168.41.29:8088";
+ final String server = "192.168.41.29:8089";
try {
DruidCompactUtil druidCompactUtil = DruidCompactUtil.getInstance(server);
@@ -109,7 +125,7 @@ public class DruidCompactJobTest { return ReturnT.SUCCESS;
}
- JSONArray paramArray = parseParams(params);
+ JSONArray paramArray = parseParams(params, druidCompactUtil);
List<JSONObject> submittedTaskParams = new ArrayList<>();
List<DruidWaitedSubmitTaskParam> waitedSubmitTaskParams = new ArrayList<>();
JSONObject paramTemplate;
@@ -118,7 +134,7 @@ public class DruidCompactJobTest { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -162,7 +178,7 @@ public class DruidCompactJobTest { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -173,7 +189,7 @@ public class DruidCompactJobTest { String endDate = endTime;
// 检查补录前两天compact任务
- for (int j = 1; j <= 2; j++) {
+ for (int j = 1; j <= 1; j++) {
startDate = LocalDate.parse(startTime).minusDays(j).toString();
endDate = LocalDate.parse(startDate).plusDays(1).toString();
@@ -223,7 +239,7 @@ public class DruidCompactJobTest { return ReturnT.SUCCESS;
}
- public JSONArray parseParams(String params) {
+ public JSONArray parseParams(String params, DruidCompactUtil druidCompactUtil) {
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC")); // 默认结束时间是UTC时间
LocalDateTime defaultEndDateTime = now.getHour() >= 1? now: now.plusDays(-1); // 如果hour在1小时内, 结束时间往前推一天
@@ -232,6 +248,7 @@ public class DruidCompactJobTest { JSONArray paramArray = JSON.parseArray(params);
for (int i = 0; i < paramArray.size(); i++) {
JSONObject param = paramArray.getJSONObject(i);
+ druidCompactUtil.generateOverrideCompactTaskDefaultParam(param);
String paramStr = param.toString();
Preconditions.checkArgument("compact".equals(param.getString("type")), "type must is compact:%s", paramStr);
Preconditions.checkArgument(param.getString("dataSource") != null, "dataSource is required:%s", paramStr);
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidMergeHistoryDataJobTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidMergeHistoryDataJobTest.java index cfe98ef..a6596f0 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidMergeHistoryDataJobTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DruidMergeHistoryDataJobTest.java @@ -62,14 +62,14 @@ public class DruidMergeHistoryDataJobTest { " {\n" +
" \"type\": \"compact\",\n" +
" \"dataSource\": \"object_statistics_test\",\n" +
- " \"interval\": \"2024-05-16/2024-05-17\",\n" +
+ " \"interval\": \"2024-06-14/2024-06-15\",\n" +
" \"tuningConfig\" : {\n" +
" \"type\" : \"index_parallel\",\n" +
" \"maxNumConcurrentSubTasks\" : 2\n" +
" },\n" +
" \"granularitySpec\" : {\n" +
" \"segmentGranularity\": \"DAY\",\n" +
- " \"queryGranularity\": \"HOUR\"\n" +
+ " \"queryGranularity\": \"DAY\"\n" +
" },\n" +
" },\n" +
" {\n" +
@@ -82,7 +82,7 @@ public class DruidMergeHistoryDataJobTest { " },\n" +
" \"granularitySpec\" : {\n" +
" \"segmentGranularity\": \"DAY\",\n" +
- " \"queryGranularity\": \"HOUR\"\n" +
+ " \"queryGranularity\": \"DAY\"\n" +
" },\n" +
" }\n" +
"]";
@@ -111,7 +111,7 @@ public class DruidMergeHistoryDataJobTest { return ReturnT.SUCCESS;
}
- JSONArray paramArray = parseParams(params);
+ JSONArray paramArray = parseParams(params, druidCompactUtil);
List<JSONObject> submittedTaskParams = new ArrayList<>();
List<DruidWaitedSubmitTaskParam> waitedSubmitTaskParams = new ArrayList<>();
JSONObject paramTemplate;
@@ -120,7 +120,7 @@ public class DruidMergeHistoryDataJobTest { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -164,7 +164,7 @@ public class DruidMergeHistoryDataJobTest { for (int i = 0; i < paramArray.size(); i++) {
paramTemplate = paramArray.getJSONObject(i);
- param = paramTemplate.clone();
+ param = JSON.parseObject(JSON.toJSONString(paramTemplate));
String dataSource = param.getString("dataSource");
int maxNumConcurrentSubTasks = param.getJSONObject("tuningConfig").getIntValue("maxNumConcurrentSubTasks", 1);
final String queryGranularity = DruidCompactUtil.extractQueryGranularityFromParam(param);
@@ -225,13 +225,14 @@ public class DruidMergeHistoryDataJobTest { return ReturnT.SUCCESS;
}
- public JSONArray parseParams(String params) {
+ public JSONArray parseParams(String params, DruidCompactUtil druidCompactUtil) {
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC")); // 默认结束时间是UTC时间
JSONArray paramArray = JSON.parseArray(params);
for (int i = 0; i < paramArray.size(); i++) {
JSONObject param = paramArray.getJSONObject(i);
+ druidCompactUtil.generateOverrideCompactTaskDefaultParam(param);
String paramStr = param.toString();
Preconditions.checkArgument("compact".equals(param.getString("type")), "type must is compact:%s", paramStr);
Preconditions.checkArgument(param.getString("dataSource") != null, "dataSource is required:%s", paramStr);
|
