summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorwangwei <[email protected]>2024-09-23 18:38:58 +0800
committerwangwei <[email protected]>2024-09-23 18:38:58 +0800
commitbabd0ebc5bd8767cb1bca1f38ee763f9cf6c56b5 (patch)
treefde167961d6f00c2a099d76230a0b48edc88902c /src
parenta576ea7edd47a4720b6641af1279bbf5c8aff4eb (diff)
[Fix][field-discovery] 统一 Field Discovery DSL请求参数结构(TSG-21935)
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/mesalab/qgw/benchmark/Writer.java8
-rw-r--r--src/main/java/com/mesalab/qgw/service/DatasetService.java3
-rw-r--r--src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java19
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java30
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java525
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java47
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java4
-rw-r--r--src/main/java/com/mesalab/services/configuration/JobConfig.java3
-rw-r--r--src/main/java/com/mesalab/services/service/impl/JobExecuteService.java27
-rw-r--r--src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java8
10 files changed, 632 insertions, 42 deletions
diff --git a/src/main/java/com/mesalab/qgw/benchmark/Writer.java b/src/main/java/com/mesalab/qgw/benchmark/Writer.java
index 06ca964f..59988b3b 100644
--- a/src/main/java/com/mesalab/qgw/benchmark/Writer.java
+++ b/src/main/java/com/mesalab/qgw/benchmark/Writer.java
@@ -5,25 +5,19 @@ import cn.hutool.core.io.file.FileWriter;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
-import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.JsonPathException;
import com.mesalab.common.nacos.NacosConfig;
import com.mesalab.common.nacos.NacosConst;
import com.geedgenetworks.utils.StringUtil;
import com.mesalab.qgw.service.DatabaseService;
import com.mesalab.qgw.service.DatasetService;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
@Component
public abstract class Writer {
@@ -57,7 +51,7 @@ public abstract class Writer {
}
List<String> resultLines = Lists.newArrayList();
for (String line : list) {
- String sql = datasetService.buildExecSQL(variables, line);
+ String sql = datasetService.buildExecDSL(variables, line);
resultLines.add(sql);
}
writer.appendLines(resultLines);
diff --git a/src/main/java/com/mesalab/qgw/service/DatasetService.java b/src/main/java/com/mesalab/qgw/service/DatasetService.java
index 80666408..8500bed1 100644
--- a/src/main/java/com/mesalab/qgw/service/DatasetService.java
+++ b/src/main/java/com/mesalab/qgw/service/DatasetService.java
@@ -1,6 +1,5 @@
package com.mesalab.qgw.service;
-import com.google.common.collect.Lists;
import com.mesalab.common.entity.BaseResult;
import java.util.LinkedHashMap;
@@ -52,6 +51,6 @@ public interface DatasetService
* @param
* @return
*/
- String buildExecSQL(List<LinkedHashMap> variables, String sql);
+ String buildExecDSL(List<LinkedHashMap> variables, String sql);
}
diff --git a/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java b/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java
new file mode 100644
index 00000000..18157e0c
--- /dev/null
+++ b/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java
@@ -0,0 +1,19 @@
+package com.mesalab.qgw.service;
+
+import com.mesalab.common.entity.BaseResult;
+import com.mesalab.qgw.model.basic.DSLQueryRequestParam;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * TODO
+ *
+ * @Classname FieldDiscoveryService
+ * @Date 2024/9/23 14:04
+ * @Author wWei
+ */
+public interface FieldDiscoveryService {
+ void asyncRun(DSLQueryRequestParam request) throws ExecutionException;
+
+ void validate(DSLQueryRequestParam request);
+}
diff --git a/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java b/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java
index b923b28e..2ee571be 100644
--- a/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java
+++ b/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java
@@ -63,11 +63,11 @@ public class DatasetServiceImp
String key = String.valueOf(variable.get("key"));
String def = String.valueOf(variable.get("default"));
Date currentDate = DateUtils.convertStringToDate(DateUtils.getCurrentDate(DateUtils.YYYY_MM_DD_HH24_MM_SS), DateUtils.YYYY_MM_DD_HH24_MM_SS);
- if ("start_time".equals(key)&& StringUtil.isBlank(def)){
+ if ("start_time".equals(key) && StringUtil.isBlank(def)) {
def = DateUtils.getFormatDate(DateUtils.getSomeHour(currentDate, -1), DateUtils.YYYY_MM_DD_HH24_MM_SS);
variable.put("default", def);
}
- if ("end_time".equals(key)&& StringUtil.isBlank(def)){
+ if ("end_time".equals(key) && StringUtil.isBlank(def)) {
def = DateUtils.getFormatDate(currentDate, DateUtils.YYYY_MM_DD_HH24_MM_SS);
variable.put("default", def);
}
@@ -82,7 +82,7 @@ public class DatasetServiceImp
List<Record> list = Db.find(buildGetDatasetSQL(Lists.newArrayList(datasetId), null, null));
if (CollectionUtils.isEmpty(list)) {
throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
- String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(),QGWMessageConst.DATASET_ID_NOT_EXIST));
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), QGWMessageConst.DATASET_ID_NOT_EXIST));
}
List<Map<String, Object>> results = formatResultData(list);
Map<String, Object> result = results.get(0);
@@ -114,7 +114,7 @@ public class DatasetServiceImp
@Override
public BaseResult getPreview(String datasetId) {
Map<String, Object> dataset = getDataset(datasetId);
- String template = buildExecSQL(getVariable(), String.valueOf(dataset.get(TEMPLATE)));
+ String template = buildExecDSL(getVariable(), String.valueOf(dataset.get(TEMPLATE)));
String datasetType = String.valueOf(dataset.get(DATASET_TYPE));
if (datasetType.equalsIgnoreCase("sql")) {
SqlQueryRequestParam sqlQueryRequest = JSON.parseObject(template, SqlQueryRequestParam.class);
@@ -134,22 +134,24 @@ public class DatasetServiceImp
@Override
- public String buildExecSQL(List<LinkedHashMap> variables, String sql) {
- Matcher matcher = pLeftRightFlag.matcher(sql);
+ public String buildExecDSL(List<LinkedHashMap> variables, String template) {
+ Matcher matcher = pLeftRightFlag.matcher(template);
while (matcher.find()) {
- sql = processOptionalClause(sql, "[[", "]]");
+ template = processOptionalClause(template, "[[", "]]");
}
- sql = processFieldVariable(sql);
for (LinkedHashMap linkedHashMap : variables) {
String variable = String.valueOf(linkedHashMap.get("key"));
- String def = String.valueOf(linkedHashMap.get("default"));
- if ("filter".equals(variable) && StringUtil.isBlank(def)) {
- def = " 1 = 1";
+ Object def = linkedHashMap.get("default");
+ String parameter;
+ if (def instanceof Map) {
+ parameter = "\"${".concat(variable).concat("}\"");
+ } else {
+ parameter = "${".concat(variable).concat("}");
}
- String parameter = "${".concat(variable).concat("}");
- sql = sql.replace(parameter, def);
+ template = template.replace(parameter, String.valueOf(def));
}
- return sql;
+ template = processFieldVariable(template);
+ return template;
}
private String buildGetDatasetSQL(List<String> ids, String category, String backendEngine) {
diff --git a/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java
new file mode 100644
index 00000000..fc4317f2
--- /dev/null
+++ b/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java
@@ -0,0 +1,525 @@
+package com.mesalab.qgw.service.impl;
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.BooleanUtil;
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.geedgenetworks.utils.StringUtil;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.jayway.jsonpath.JsonPath;
+import com.mesalab.common.entity.BaseResult;
+import com.mesalab.common.entity.DataTypeMapping;
+import com.mesalab.common.enums.HttpStatusCodeEnum;
+import com.mesalab.common.exception.BusinessException;
+import com.mesalab.common.exception.CommonErrorCode;
+import com.mesalab.common.utils.HazelcastInstanceMapUtil;
+import com.mesalab.qgw.exception.QGWBusinessException;
+import com.mesalab.qgw.model.basic.DSLQueryRequestParam;
+import com.mesalab.qgw.model.basic.EngineConfigSource;
+import com.mesalab.qgw.model.basic.QueryCache;
+import com.mesalab.qgw.model.basic.SQLQueryContext;
+import com.mesalab.qgw.service.DatabaseService;
+import com.mesalab.qgw.service.FieldDiscoveryService;
+import com.mesalab.qgw.service.SQLSyncQueryService;
+import com.mesalab.services.common.enums.MetricFunction;
+import com.mesalab.services.common.enums.MetricType;
+import com.mesalab.services.configuration.JobConfig;
+import com.mesalab.services.service.impl.TaskExecuteService;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static com.mesalab.services.service.impl.JobExecuteService.getDecimalFormat;
+import static com.mesalab.services.service.impl.JobExecuteService.markJobFailure;
+
+/**
+ * TODO
+ *
+ * @Classname FieldDiscoveryServiceImpl
+ * @Date 2024/9/23 14:06
+ * @Author wWei
+ */
+@Service
+public class FieldDiscoveryServiceImpl implements FieldDiscoveryService {
+ private static final Log log = LogFactory.get();
+ private DatabaseService databaseService;
+ private Environment environment;
+ private EngineConfigSource engineConfigSource;
+ private TaskExecuteService taskExecuteService;
+ private JobConfig jobCfg;
+ private SQLSyncQueryService sqlSyncQueryService;
+
+ private final ThreadPoolTaskExecutor taskThreadPool;
+
+ public FieldDiscoveryServiceImpl(ThreadPoolTaskExecutor taskThreadPool) {
+ this.taskThreadPool = taskThreadPool;
+ }
+
+
+ @Override
+ public void asyncRun(DSLQueryRequestParam request) throws ExecutionException {
+ JobInfo jobInfo = new JobInfo(request.getId());
+ String partitionKey = databaseService.getPartitionKey(request.getDataSource());
+ List<String> intervals = request.getIntervals();
+ String[] split = intervals.get(0).split("/");
+ DateTime start = DateUtil.parse(split[0]);
+ DateTime end = DateUtil.parse(split[1]);
+ Interval interval = new Interval();
+ interval.setStart(start.getTime() / 1000);
+ interval.setEnd(end.getTime() / 1000);
+ try {
+ if (jobCfg.isTimeSlicingEnabled() && interval.isValid()) {
+ executeSlicingFieldDiscoveryJob(request.getId(), jobInfo, request, interval, partitionKey);
+ } else {
+ String filter = String.format(buildFilterFormatSql(),
+ request.getFilter(),
+ partitionKey, interval.getStart(), partitionKey, interval.getEnd());
+ executeFieldDiscoveryJob(request.getId(), jobInfo, request, filter);
+ }
+ } catch (InterruptedException e) {
+ // 处理 InterruptedException 并保留中断状态
+ Thread.currentThread().interrupt();
+ markJobFailure(request.getId(), "Job was interrupted: " + e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void validate(DSLQueryRequestParam request) {
+ Object filter = request.getFilter();
+ if (StrUtil.isEmptyIfStr(filter)) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Filter cannot be null or empty."));
+ }
+ Object dataSource = request.getDataSource();
+ if (StrUtil.isEmptyIfStr(dataSource)) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Data Source cannot be null or empty."));
+ }
+ List<String> intervals = request.getIntervals();
+ if (intervals == null || intervals.size() == 0) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Intervals cannot be null or empty."));
+
+ }
+
+ Map<String, Object> customRequestParam = request.getCustomRequestParam();
+ if (customRequestParam == null || customRequestParam.size() == 0) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.dimensions cannot be null or empty."));
+
+ }
+
+ Object dimensions = customRequestParam.get("custom.statistics.dimensions");
+ if (!(dimensions instanceof List)) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.dimensions cannot be null or empty."));
+ }
+ List<Map<String, Object>> collect = ((List<Map<String, Object>>) dimensions).stream().distinct().collect(Collectors.toList());
+ request.getCustomRequestParam().put("custom.statistics.dimensions", collect);
+
+ Object metrics = customRequestParam.get("custom.statistics.metrics");
+ if ((metrics instanceof List) && ((List<Map<String, Object>>) metrics).size() == 1) {
+ Map<String, Object> o = ((List<Map<String, Object>>) metrics).get(0);
+ Object metric = o.get("metric_name");
+ boolean validMetric = MetricType.isValid(String.valueOf(metric));
+ Map<String, Object> function = (Map<String, Object>) o.get("function");
+ Object funName = function.get("name");
+ boolean validFn = MetricFunction.isValid(String.valueOf(funName));
+ if (!validMetric || !validFn) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(),
+ String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.metrics.* illegal."));
+ }
+ }
+
+ }
+
+ private void executeFieldDiscoveryJob(String id, JobInfo jobInfo, DSLQueryRequestParam request, String segmentFilter) throws ExecutionException, InterruptedException {
+ if (isCancel(jobInfo.getId())) {
+ return;
+ }
+ String dataSource = request.getDataSource();
+ String totalMetric = "count(*)";
+ Map<String, Object> customRequestParam = request.getCustomRequestParam();
+ Object metrics = customRequestParam.get("custom.statistics.metrics");
+ Object metric = null;
+ Object fn = null;
+ if (StringUtil.isNotEmpty(metrics)) {
+ List<Map<String, Object>> metricsList = (List<Map<String, Object>>) metrics;
+ Map<String, Object> map = metricsList.get(0);
+ metric = map.get("metric_name");
+ Map<String, Object> function = (Map<String, Object>) map.get("function");
+ fn = function.get("name");
+ String measurements = databaseService.getValueByKeyInSchemaDoc(dataSource, "measurements");
+ List<Map<String, String>> read = JsonPath.read(measurements, "$.field_discovery_metric['" + metric + "'][?(@.fn == '" + fn + "')]");
+ if (!read.isEmpty()) {
+ String column = read.get(0).get("column");
+ totalMetric = read.get(0).get("fn") + "(" + column + ")";
+ }
+ }
+
+ Map<String, Long> map = getCount(dataSource, segmentFilter, totalMetric);
+ long currentCount = map.get("logCount");
+ if (currentCount == 0) {
+ return;
+ }
+ Map<String, Future<List<Map<String, Object>>>> taskCallbackList = Maps.newHashMap();
+ List<String> fields = Lists.newArrayList();
+ List<Map<String, Object>> o = (List<Map<String, Object>>) customRequestParam.get("custom.statistics.dimensions");
+ o.forEach(x -> fields.add(String.valueOf(x.get("dimension_name"))));
+
+ for (String field : fields) {
+ Object finalMetric = metric;
+ Object finalFn = fn;
+ taskCallbackList.put(field, CompletableFuture.supplyAsync(() -> taskExecuteService.executeFieldDiscovery(id, request.getDataSource(), field
+ , finalMetric
+ , finalFn
+ , segmentFilter), taskThreadPool));
+ }
+ waitAsyncResultAndUpdateProcess(id, jobInfo, taskCallbackList, request, map, metric, fn);
+ }
+
+ private void executeSlicingFieldDiscoveryJob(String id, JobInfo jobInfo, DSLQueryRequestParam request, Interval interval, String partitionKey) throws ExecutionException, InterruptedException {
+ jobInfo.setTotalTimes((int) ((interval.getEnd() - interval.getStart()) / jobCfg.getTimeSlicingInterval() + 1));
+ while (true) {
+ jobInfo.setCurrentTimes(jobInfo.getCurrentTimes() + 1);
+ if (jobInfo.isTimeout()) {
+ log.error("execute job timeout: job-{}", jobInfo);
+ markJobFailure(id, "execute job timeout");
+ break;
+ }
+ QueryCache queryCache = HazelcastInstanceMapUtil.get(id);
+ if (StrUtil.isEmptyIfStr(queryCache)) {
+ log.warn("cache expiration: job-{}", jobInfo);
+ break;
+ }
+ Map<String, Object> job = queryCache.getBaseResult().getJob();
+ if (BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_DONE)))
+ || BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_FAILED)))
+ || BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_CANCELED)))) {
+ log.warn("job done or interrupt: job obj-{}", job);
+ break;
+ }
+ long start = interval.getEnd() - jobCfg.getTimeSlicingInterval();
+ if (start > interval.getStart()) {
+ interval.setEnd(start);
+ String newFilter = String.format(buildFilterFormatSql(),
+ request.getFilter(),
+ partitionKey, interval.getEnd(), partitionKey, interval.getEnd() + jobCfg.getTimeSlicingInterval());
+ executeFieldDiscoveryJob(id, jobInfo, request, newFilter);
+ } else {
+ String where = String.format(buildFilterFormatSql(),
+ request.getFilter(),
+ partitionKey, interval.getStart(), partitionKey, interval.getEnd());
+ executeFieldDiscoveryJob(id, jobInfo, request, where);
+ break;
+ }
+ }
+ }
+
+ private static String buildFilterFormatSql() {
+ return "(%s) AND (%s >= %s AND %s < %s)";
+ }
+
+ private boolean isCancel(String id) {
+ QueryCache queryCache = HazelcastInstanceMapUtil.get(id);
+ if (queryCache == null) {
+ log.info("Can't find the cache, job id is: {}", id);
+ return true;
+ }
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - queryCache.getLatestQueryTimeMs() > jobCfg.getInteractiveTimeout()) {
+ queryCache.getBaseResult().getJob().put(JobConfig.IS_CANCELED, true);
+ HazelcastInstanceMapUtil.put(id, queryCache);
+ log.info("Interactive timeout, job id is: {}", id);
+ return true;
+ }
+ return false;
+ }
+
+ private void waitAsyncResultAndUpdateProcess(String id, JobInfo jobInfo, Map<String, Future<List<Map<String, Object>>>> taskCallbackList, DSLQueryRequestParam request, Map<String, Long> mapCurrent
+ , Object metric, Object fn) throws ExecutionException, InterruptedException {
+ List<String> updatedResult = Lists.newArrayList();
+ while (true) {
+ if (jobInfo.isTimeout()) {
+ log.error("execute job timeout, job id is: {}", id);
+ markJobFailure(id, "execute job timeout");
+ taskCallbackList.keySet().forEach(o -> taskCallbackList.get(o).cancel(true));
+ break;
+ }
+ if (updatedResult.size() >= taskCallbackList.size()) {
+ break;
+ }
+ long completedTaskCount = taskCallbackList.keySet().stream().filter(o -> taskCallbackList.get(o).isDone()).count();
+ if ((completedTaskCount - updatedResult.size()) * 1.0 / taskCallbackList.size() < 0.01 && completedTaskCount < taskCallbackList.size()) {
+ continue;
+ }
+ log.info("id :{}, done size: {}, task size: {}", id, completedTaskCount, taskCallbackList.size());
+ DecimalFormat format = getDecimalFormat();
+ double jobCompletedProcess = Double.parseDouble(format.format(jobInfo.getCurrentTimes() * 1.0 / jobInfo.getTotalTimes() - ((1 - completedTaskCount * 1.0 / taskCallbackList.size()) / jobInfo.getTotalTimes())));
+ List<String> queryDoneField = taskCallbackList.keySet().stream().filter(o -> taskCallbackList.get(o).isDone()).collect(Collectors.toList());
+ for (String field : queryDoneField) {
+ if (updatedResult.contains(field)) {
+ continue;
+ }
+ updatedResult.add(field);
+ Long currentTotalMetric = mapCurrent.get("totalMetric");
+ String fnDefault = "count";
+ String value = "count";
+ if (StringUtil.isNotEmpty(metric)) {
+ String measurements = databaseService.getValueByKeyInSchemaDoc(request.getDataSource(), "measurements");
+ List<Map<String, String>> read = JsonPath.read(measurements, "$.field_discovery_metric['" + metric + "'][?(@.fn == '" + fn + "')]");
+ if (!read.isEmpty()) {
+ fnDefault = fn.toString();
+ value = read.get(0).get("value");
+ }
+ }
+ Future<List<Map<String, Object>>> listFuture = taskCallbackList.get(field);
+ List<Map<String, Object>> data = listFuture.get();
+ data.forEach(x -> {
+ for (String k : x.keySet()) {
+ if (k.startsWith(JobConfig.FIELD_DISCOVERY_TOPK_METRIC_PREFIX)) {
+ Object v = x.get(k);
+ x.put(k.replace(JobConfig.FIELD_DISCOVERY_TOPK_METRIC_PREFIX, ""), v);
+ x.remove(k);
+ }
+ }
+ });
+ QueryCache queryCache = HazelcastInstanceMapUtil.get(id);
+ if (StrUtil.isEmptyIfStr(queryCache)) {
+ markJobFailure(id, "Cache expiration.");
+ log.error("field_discovery task query lastTopK Error: Cache expiration.");
+ return;
+ }
+ Object lastTotalMetric = 0;
+ if (StringUtil.isNotEmpty(queryCache.getBaseResult().getData())) {
+ List<Map> map = (List<Map>) queryCache.getBaseResult().getData();
+ for (Map map1 : map) {
+ if (CollectionUtil.isNotEmpty(map1) && map1.containsKey(field) && !StrUtil.isEmptyIfStr(map1.get(field))) {
+ Map item = (Map) map1.get(field);
+ List<Map<String, Object>> topk = (List<Map<String, Object>>) item.get("topk");
+ lastTotalMetric = item.get(value);
+ Map<String, String> metrics = Maps.newHashMap();
+ metrics.put(value, "count".equalsIgnoreCase(fnDefault) ? "sum" : fnDefault);
+ data = mergeData(data, topk, Lists.newArrayList("value"), metrics);
+ }
+
+ }
+ }
+ data = sortDataAndSetMaxSize(data, value, DataTypeMapping.LONG, false);
+ updateResult(id, field, currentTotalMetric, lastTotalMetric, value, data, jobCompletedProcess);
+
+ }
+ if (completedTaskCount >= taskCallbackList.size()) {
+ break;
+ }
+ }
+ }
+
+ private List<Map<String, Object>> mergeData(List<Map<String, Object>> data1, List<Map<String, Object>> date2, List<String> dimensions, Map<String, String> metrics) {
+ Collection<Map<String, Object>> data = CollectionUtil.addAll(data1, date2);
+ Map<String, Map<String, Object>> result = Maps.newHashMap();
+ for (Map<String, Object> datum : data) {
+ StringBuilder key = new StringBuilder();
+ dimensions.forEach(k -> key.append(datum.get(k)));
+ if (!result.containsKey(key.toString())) {
+ result.put(key.toString(), datum);
+ continue;
+ }
+ Map<String, Object> item = result.get(key.toString());
+ for (String k : metrics.keySet()) {
+ String action = metrics.get(k);
+ Object o1 = item.get(k);
+ Object o2 = datum.get(k);
+ if (StringUtil.isEmpty(o1) || StringUtil.isEmpty(o2)) {
+ item.put(k, (StringUtil.isEmpty(o1) && StringUtil.isEmpty(o2)) ? null : StringUtil.isEmpty(o1) ? o2 : o1);
+ continue;
+ }
+ Number number1 = NumberUtil.parseNumber(o1.toString());
+ Number number2 = NumberUtil.parseNumber(o2.toString());
+ if (action.equals("sum")) {
+ item.put(k, NumberUtil.add(number1, number2));
+ continue;
+ }
+ if (NumberUtil.isDouble(o1.toString()) || NumberUtil.isDouble(o2.toString())) {
+ double v1 = Double.parseDouble(number1.toString());
+ double v2 = Double.parseDouble(number2.toString());
+ switch (action) {
+ case "max":
+ item.put(k, NumberUtil.max(v1, v2));
+ break;
+ case "min":
+ item.put(k, NumberUtil.min(v1, v2));
+ break;
+ case "avg":
+ item.put(k, NumberUtil.div(NumberUtil.add(v1, v2), 2));
+ break;
+ }
+ } else if (NumberUtil.isLong(o1.toString()) || NumberUtil.isLong(o2.toString())) {
+ long v1 = Long.parseLong(number1.toString());
+ long v2 = Long.parseLong(number2.toString());
+ switch (action) {
+ case "max":
+ item.put(k, NumberUtil.max(v1, v2));
+ break;
+ case "min":
+ item.put(k, NumberUtil.min(v1, v2));
+ break;
+ case "avg":
+ item.put(k, NumberUtil.div(NumberUtil.add(v1, v2), 2));
+ break;
+ }
+ } else {
+ log.error("task merge data error, data is: {}, {}", data1, date2);
+ throw new BusinessException("task merge data error.");
+ }
+ }
+ }
+ return new ArrayList<>(result.values());
+ }
+
+ private void updateResult(String id, String field, long currentTotalMetric, Object lastTotalMetric, String value, List<Map<String, Object>> topList, double jobCompletedProcess) {
+ Map<String, Object> map = Maps.newHashMap();
+ map.put("topk", topList);
+ map.put("distinct_count", Math.min(topList.size(), 100));
+ map.put(value, currentTotalMetric + Long.parseLong(lastTotalMetric.toString()));
+ try {
+ HazelcastInstanceMapUtil.retrieveMap().lock(id);
+ QueryCache queryCache = HazelcastInstanceMapUtil.get(id);
+ BaseResult<Object> baseResult = queryCache.getBaseResult();
+ if (jobCompletedProcess < 1) {
+ Map<String, Object> job = queryCache.getBaseResult().getJob();
+ job.put(JobConfig.DONE_PROGRESS, jobCompletedProcess);
+ }
+
+ Object data1 = baseResult.getData();
+ if (StrUtil.isEmptyIfStr(data1)) {
+ List<Object> list = Lists.newArrayList();
+ Map<Object, Object> item = Maps.newLinkedHashMap();
+ item.put(field, map);
+ list.add(item);
+ baseResult.setData(list);
+ HazelcastInstanceMapUtil.put(id, queryCache);
+ return;
+ }
+ List<Map<String, Object>> list = (List<Map<String, Object>>) data1;
+ for (int i = 0; i < list.size(); i++) {
+ Map<String, Object> map1 = list.get(i);
+ if (map1.containsKey(field)) {
+ map1.put(field, map);
+ break;
+ }
+ }
+ if (list.stream().noneMatch(o -> o.containsKey(field))) {
+ Map<String, Object> objectObjectHashMap = Maps.newHashMap();
+ objectObjectHashMap.put(field, map);
+ list.add(objectObjectHashMap);
+ }
+ HazelcastInstanceMapUtil.put(id, queryCache);
+ } finally {
+ HazelcastInstanceMapUtil.retrieveMap().unlock(id);
+ }
+ }
+
+ private Map<String, Long> getCount(String logType, String filter, String totalMetric) {
+ String sql = String.format(Objects.requireNonNull(environment.getProperty("JOB_LOG_COUNT")),
+ totalMetric, logType, StrUtil.isBlankIfStr(filter) ? "" : "WHERE ".concat(filter));
+ BaseResult baseResult = sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build());
+ if (!baseResult.isSuccess()) {
+ log.error("Job-get log count failed, message: {}", baseResult.getMessage());
+ throw new QGWBusinessException(baseResult.getStatus(), baseResult.getCode(), "Job-get log count failed, message: " + baseResult.getMessage());
+ }
+ List<Map<String, Object>> data = (List<Map<String, Object>>) baseResult.getData();
+ Map<String, Long> map = Maps.newHashMap();
+ map.put("logCount", Long.parseLong(data.get(0).get("logCount").toString()));
+ map.put("totalMetric", Long.parseLong(data.get(0).get("totalMetric").toString()));
+ return map;
+ }
+
+ private List<Map<String, Object>> sortDataAndSetMaxSize(List<Map<String, Object>> data, String sortElement, String sortType, boolean isAsc) {
+ if (DataTypeMapping.INT.equalsIgnoreCase(sortType) || DataTypeMapping.LONG.equalsIgnoreCase(sortType)) {
+ data.sort(Comparator.comparing(o -> Long.valueOf(StringUtil.isEmpty(o.get(sortElement)) ? Long.MIN_VALUE + "" : o.get(sortElement).toString())));
+ } else if (DataTypeMapping.FLOAT.equalsIgnoreCase(sortType) || DataTypeMapping.DOUBLE.equalsIgnoreCase(sortType)) {
+ data.sort(Comparator.comparing(o -> Double.valueOf(StringUtil.isEmpty(o.get(sortElement)) ? Long.MIN_VALUE + "" : o.get(sortElement).toString())));
+ } else {
+ data.sort(Comparator.comparing(o -> String.valueOf(o.get(sortElement))));
+ }
+ if (!isAsc) {
+ Collections.reverse(data);
+ }
+ return data.size() > engineConfigSource.getMaxCacheNum() ? data.subList(0, engineConfigSource.getMaxCacheNum()) : data;
+ }
+
+ @Data
+ class Interval {
+ private Long start;
+ private Long end;
+
+ boolean isValid() {
+ return StringUtil.isNotEmpty(this.start)
+ && StringUtil.isNotEmpty(this.end)
+ && this.end - this.start > jobCfg.getTimeSlicingInterval();
+ }
+ }
+
+ @Data
+ class JobInfo {
+ private String id;
+ private long startTime = System.currentTimeMillis();
+ private long count = 0;
+ private int totalTimes = 1;
+ private int currentTimes = 1;
+
+ public JobInfo(String id) {
+ this.id = id;
+ }
+
+ public boolean isTimeout() {
+ return System.currentTimeMillis() - startTime > jobCfg.getExecutionTimeout();
+ }
+ }
+
+ @Autowired
+ public void setDatabaseService(DatabaseService databaseService) {
+ this.databaseService = databaseService;
+ }
+
+ @Autowired
+ public void setJobCfg(JobConfig jobCfg) {
+ this.jobCfg = jobCfg;
+ }
+
+ @Autowired
+ public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
+ this.taskExecuteService = taskExecuteService;
+ }
+
+ @Autowired
+ public void setSqlSyncQueryService(SQLSyncQueryService sqlSyncQueryService) {
+ this.sqlSyncQueryService = sqlSyncQueryService;
+ }
+
+ @Autowired
+ public void setEnvironment(Environment environment) {
+ this.environment = environment;
+ }
+
+ @Autowired
+ public void setEngineConfigSource(EngineConfigSource engineConfigSource) {
+ this.engineConfigSource = engineConfigSource;
+ }
+}
diff --git a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
index e495cf34..27cb64fc 100644
--- a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
+++ b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
@@ -107,7 +107,19 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
long start = System.currentTimeMillis();
ExecutionMode execMode = request.getExecutionMode();
if (ExecutionMode.ONESHOT.equals(execMode)) {
- if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) {
+ execAdnWaitFieldDiscoveryDoneDeprecatedV2410(request.getId(), request);
+ QueryCache queryCache = rebuildFieldDiscoveryQueryCache(Objects.requireNonNull(getAdhocQueryCacheAndRefresh(request.getId())));
+ HazelcastInstanceMapUtil.remove(request.getId());
+ BaseResult<Object> baseResult = queryCache.getBaseResult();
+ Map<String, Object> job = baseResult.getJob();
+ if (!BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_CANCELED)))
+ && !BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_FAILED)))
+ && BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_DONE)))) {
+ return BaseResultGenerator.success(baseResult.getStatistics(), null, baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData());
+ }
+ return BaseResultGenerator.error(baseResult.getMessage());
+ } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
execAdnWaitFieldDiscoveryDone(request.getId(), request);
QueryCache queryCache = rebuildFieldDiscoveryQueryCache(Objects.requireNonNull(getAdhocQueryCacheAndRefresh(request.getId())));
HazelcastInstanceMapUtil.remove(request.getId());
@@ -156,9 +168,12 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
}
} else if (ExecutionMode.NORMAL.equals(execMode)) {
QueryCache queryCache = new QueryCache(request.getId(), request.getName());
- if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) {
validFieldDiscovery(request);
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
+ jobExecuteService.addExecutorFieldDiscoveryDeprecatedV2410(request);
+ } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
+ HazelcastInstanceMapUtil.put(request.getId(), queryCache);
jobExecuteService.addExecutorFieldDiscovery(request);
} else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) {
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
@@ -185,7 +200,10 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
}
return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId()));
} else if (ExecutionMode.BLOCKING.equals(execMode)) {
- if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) {
+ execAdnWaitFieldDiscoveryDoneDeprecatedV2410(request.getId(), request);
+ return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId()));
+ } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
execAdnWaitFieldDiscoveryDone(request.getId(), request);
return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId()));
} else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) {
@@ -234,10 +252,22 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "not Supported"));
}
- private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) {
- QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY);
+ private void execAdnWaitFieldDiscoveryDoneDeprecatedV2410(String id, DSLQueryRequestParam request) {
+ QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410);
validFieldDiscovery(request);
HazelcastInstanceMapUtil.put(id, queryCacheStart);
+ Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscoveryDeprecatedV2410(request);
+ while (true) {
+ if (!booleanFuture.isDone()) {
+ continue;
+ }
+ break;
+ }
+ }
+
+ private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) {
+ QueryCache queryCacheStart = new QueryCache(id, request.getName());
+ HazelcastInstanceMapUtil.put(id, queryCacheStart);
Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscovery(request);
while (true) {
if (!booleanFuture.isDone()) {
@@ -247,6 +277,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
}
}
+
private QueryCache buildQueryCacheOfDone(String id, long start, BaseResult baseResult) {
QueryCache queryCache = new QueryCache(id, null);
Map<String, Object> jobInfo = queryCache.getBaseResult().getJob();
@@ -269,7 +300,9 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
if (queryCache == null) {
return BaseResultGenerator.success(Lists.newArrayList());
}
- if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryCache.getType())) {
+ queryCache = rebuildFieldDiscoveryQueryCache(queryCache);
+ } else if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) {
queryCache = rebuildFieldDiscoveryQueryCache(queryCache);
} else if (JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(queryCache.getType())) {
queryCache = rebuildTrafficSpectrumCIPConnectAppUsageQueryCache(queryCache);
@@ -345,7 +378,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
if (queryCache == null) {
continue;
}
- if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryCache.getType())) {
queryCache = rebuildFieldDiscoveryQueryCache(queryCache);
}
baseResult = queryCache.getBaseResult();
diff --git a/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java b/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java
index f2b7687b..c5e23600 100644
--- a/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java
+++ b/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java
@@ -272,7 +272,7 @@ public class TroubleshootingServiceImp implements TroubleshootingService, Enviro
datasetList.forEach(record -> {
Map<String, String> map = Maps.newHashMap();
map.put(IDENTIFIER_NAME, String.valueOf(record.getColumns().get(IDENTIFIER_NAME)));
- map.put(TEMPLATE, String.valueOf(datasetService.buildExecSQL(variables, String.valueOf(record.getColumns().get(TEMPLATE)))));
+ map.put(TEMPLATE, String.valueOf(datasetService.buildExecDSL(variables, String.valueOf(record.getColumns().get(TEMPLATE)))));
if (record.getColumns().get(DATASET_TYPE).equals("sql")) {
sqlDatasetList.add(map);
} else if (record.getColumns().get(DATASET_TYPE).equals("dsl")) {
@@ -346,7 +346,7 @@ public class TroubleshootingServiceImp implements TroubleshootingService, Enviro
Map<String, String> map = Maps.newHashMap();
map.put(IDENTIFIER_NAME, identifierName);
map.put(TYPE, type);
- map.put(TEMPLATE, String.valueOf(datasetService.buildExecSQL(variables, template)));
+ map.put(TEMPLATE, String.valueOf(datasetService.buildExecDSL(variables, template)));
if (DBEngineType.DRUID.getValue().equalsIgnoreCase(backendEngine)) {
druidDatasetList.add(map);
} else if (DBEngineType.CLICKHOUSE.getValue().equalsIgnoreCase(backendEngine)) {
diff --git a/src/main/java/com/mesalab/services/configuration/JobConfig.java b/src/main/java/com/mesalab/services/configuration/JobConfig.java
index ee9af771..9ce87bc7 100644
--- a/src/main/java/com/mesalab/services/configuration/JobConfig.java
+++ b/src/main/java/com/mesalab/services/configuration/JobConfig.java
@@ -55,7 +55,8 @@ public class JobConfig {
public static final String FILTER = "filter";
public static final String QUERY_DATA_SOURCE = "query.data_source";
public static final String SAVED_QUERY = "saved_query";
- public static final String FIELD_DISCOVERY = "field_discovery";
+ public static final String FIELD_DISCOVERY_DEPRECATED_V2410 = "field_discovery";
+ public static final String FIELD_DISCOVERY = "field-discovery";
public static final String LOG_QUERY = "log-query";
public static final String DATAPATH_PACKET_COMBINE = "datapath_telemetry_packet_combine";
public static final String TRAFFIC_SPECTRUM_SUMMARY = "traffic-spectrum-summary";
diff --git a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
index cf0ae5d5..85f0a04f 100644
--- a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
+++ b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
@@ -14,14 +14,12 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import com.mesalab.common.entity.BaseResult;
-import com.mesalab.common.entity.BaseResultGenerator;
import com.mesalab.common.entity.DataTypeMapping;
import com.mesalab.common.enums.DBEngineType;
import com.mesalab.common.enums.QueryOption;
import com.mesalab.common.enums.HttpStatusCodeEnum;
import com.mesalab.common.exception.BusinessException;
import com.mesalab.common.utils.HazelcastInstanceMapUtil;
-import com.mesalab.common.utils.sqlparser.AutoPeriodHelper;
import com.mesalab.common.utils.sqlparser.SQLFunctionUtil;
import com.mesalab.common.utils.sqlparser.SQLHelper;
import com.mesalab.qgw.constant.DslIdentifierNameConst;
@@ -52,7 +50,6 @@ import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
-import java.io.Serializable;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.*;
@@ -79,6 +76,7 @@ public class JobExecuteService implements EnvironmentAware {
private DatabaseService databaseService;
private DSLService dslService;
private PacketCombineDslService packetCombineDslService;
+ private FieldDiscoveryService fieldDiscoveryService;
private TrafficSpectrumDslService trafficSpectrumDslService;
private LogQueryService logQueryService;
private EngineConfigSource engineConfigSource;
@@ -95,7 +93,7 @@ public class JobExecuteService implements EnvironmentAware {
}
@Async("lightWeightThreadPool")
- public Future<Boolean> addExecutorFieldDiscovery(DSLQueryRequestParam request) {
+ public Future<Boolean> addExecutorFieldDiscoveryDeprecatedV2410(DSLQueryRequestParam request) {
try {
markJobBegin(request.getId());
JobInfo jobInfo = new JobInfo(request.getId());
@@ -118,6 +116,20 @@ public class JobExecuteService implements EnvironmentAware {
return new AsyncResult<>(true);
}
+ @Async("lightWeightThreadPool")
+ public Future<Boolean> addExecutorFieldDiscovery(DSLQueryRequestParam request) {
+ try {
+ markJobBegin(request.getId());
+ fieldDiscoveryService.validate(request);
+ fieldDiscoveryService.asyncRun(request);
+ } catch (Exception e) {
+ markJobFailure(request.getId(), e.getMessage());
+ } finally {
+ markJobCompletion(request.getId());
+ }
+ return new AsyncResult<>(true);
+ }
+
@Deprecated
@Async("lightWeightThreadPool")
public void addExecutorStatistics(String id, HashMap<String, Object> param, Map<String, Object> property) {
@@ -823,7 +835,7 @@ public class JobExecuteService implements EnvironmentAware {
return map;
}
- private static DecimalFormat getDecimalFormat() {
+ public static DecimalFormat getDecimalFormat() {
DecimalFormat format = new DecimalFormat("#0.####");
format.applyPattern("0.0000");
format.setRoundingMode(RoundingMode.FLOOR);
@@ -995,4 +1007,9 @@ public class JobExecuteService implements EnvironmentAware {
public void setLogQueryService(LogQueryService logQueryService) {
this.logQueryService = logQueryService;
}
+
+ @Autowired
+ public void setFieldDiscoveryService(FieldDiscoveryService fieldDiscoveryService) {
+ this.fieldDiscoveryService = fieldDiscoveryService;
+ }
}
diff --git a/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java b/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java
index 8f45d0f1..4470ce68 100644
--- a/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java
+++ b/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java
@@ -110,7 +110,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware {
}
log.info("Add Hoc Job, ID is {}, params is {}", id, JSON.toJSONString(reqBody));
Map<String, Object> property = Maps.newLinkedHashMap();
- if (JobConfig.FIELD_DISCOVERY.equals(queryType)) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryType)) {
initFieldDiscoveryJob(id, reqBody, property);
//jobExecuteService.addExecutorFieldDiscovery(id, reqBody);
} else if (JobConfig.STATISTICS.equals(queryType)) {
@@ -394,7 +394,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware {
}
private void initFieldDiscoveryJob(String id, HashMap<String, Object> body, Map<String, Object> property) {
- property.put(JobConfig.JOB_PROPERTY_TYPE, JobConfig.FIELD_DISCOVERY);
+ property.put(JobConfig.JOB_PROPERTY_TYPE, JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410);
List<String> fields = (List<String>) body.get(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS);
List<String> interim = Lists.newArrayList(fields);
BaseResult baseResult = sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(env.getProperty("JOB_GET_FIELD_DISCOVERY_ALL_COLUMN")).option(QueryOption.REAL_TIME.getValue()).build());
@@ -407,7 +407,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware {
meta.forEach(m -> metaName.add(m.get("name").toString()));
interim.removeAll(metaName);
for (String field : interim) {
- execute(String.format(Objects.requireNonNull(env.getProperty("JOB_ADD_COLUMN")), JobConfig.FIELD_DISCOVERY, field));
+ execute(String.format(Objects.requireNonNull(env.getProperty("JOB_ADD_COLUMN")), JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410, field));
}
property.put(JobConfig.JOB_PROPERTY_K_RESULT_FIELD, String.join(",", fields));
execute(String.format(Objects.requireNonNull(env.getProperty("JOB_INIT")), id, false, 0, false, false, System.currentTimeMillis(), JSON.toJSONString(property), 0, 0));
@@ -599,7 +599,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware {
private void validateAdHocQuery(HashMap<String, Object> body) {
Object type = body.get(JobConfig.KEY_QUERY_TYPE);
- if (JobConfig.FIELD_DISCOVERY.equals(type)) {
+ if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(type)) {
//validCustomParamsOfFieldDiscovery(body);
//List<String> fields = (List<String>) body.get(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS);
//body.put(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS, fields.stream().distinct().collect(Collectors.toList()));