diff options
| author | wangwei <[email protected]> | 2024-09-23 18:38:58 +0800 |
|---|---|---|
| committer | wangwei <[email protected]> | 2024-09-23 18:38:58 +0800 |
| commit | babd0ebc5bd8767cb1bca1f38ee763f9cf6c56b5 (patch) | |
| tree | fde167961d6f00c2a099d76230a0b48edc88902c /src | |
| parent | a576ea7edd47a4720b6641af1279bbf5c8aff4eb (diff) | |
[Fix][field-discovery] 统一 Field Discovery DSL请求参数结构(TSG-21935)
Diffstat (limited to 'src')
10 files changed, 632 insertions, 42 deletions
diff --git a/src/main/java/com/mesalab/qgw/benchmark/Writer.java b/src/main/java/com/mesalab/qgw/benchmark/Writer.java index 06ca964f..59988b3b 100644 --- a/src/main/java/com/mesalab/qgw/benchmark/Writer.java +++ b/src/main/java/com/mesalab/qgw/benchmark/Writer.java @@ -5,25 +5,19 @@ import cn.hutool.core.io.file.FileWriter; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; -import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.JsonPathException; import com.mesalab.common.nacos.NacosConfig; import com.mesalab.common.nacos.NacosConst; import com.geedgenetworks.utils.StringUtil; import com.mesalab.qgw.service.DatabaseService; import com.mesalab.qgw.service.DatasetService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.File; import java.io.IOException; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; @Component public abstract class Writer { @@ -57,7 +51,7 @@ public abstract class Writer { } List<String> resultLines = Lists.newArrayList(); for (String line : list) { - String sql = datasetService.buildExecSQL(variables, line); + String sql = datasetService.buildExecDSL(variables, line); resultLines.add(sql); } writer.appendLines(resultLines); diff --git a/src/main/java/com/mesalab/qgw/service/DatasetService.java b/src/main/java/com/mesalab/qgw/service/DatasetService.java index 80666408..8500bed1 100644 --- a/src/main/java/com/mesalab/qgw/service/DatasetService.java +++ b/src/main/java/com/mesalab/qgw/service/DatasetService.java @@ -1,6 +1,5 @@ package com.mesalab.qgw.service; -import com.google.common.collect.Lists; import com.mesalab.common.entity.BaseResult; import java.util.LinkedHashMap; @@ -52,6 +51,6 @@ public interface DatasetService * @param * @return */ - String buildExecSQL(List<LinkedHashMap> variables, String sql); + String buildExecDSL(List<LinkedHashMap> variables, String sql); } diff --git a/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java b/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java new file mode 100644 index 00000000..18157e0c --- /dev/null +++ b/src/main/java/com/mesalab/qgw/service/FieldDiscoveryService.java @@ -0,0 +1,19 @@ +package com.mesalab.qgw.service; + +import com.mesalab.common.entity.BaseResult; +import com.mesalab.qgw.model.basic.DSLQueryRequestParam; + +import java.util.concurrent.ExecutionException; + +/** + * TODO + * + * @Classname FieldDiscoveryService + * @Date 2024/9/23 14:04 + * @Author wWei + */ +public interface FieldDiscoveryService { + void asyncRun(DSLQueryRequestParam request) throws ExecutionException; + + void validate(DSLQueryRequestParam request); +} diff --git a/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java b/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java index b923b28e..2ee571be 100644 --- a/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java +++ b/src/main/java/com/mesalab/qgw/service/impl/DatasetServiceImp.java @@ -63,11 +63,11 @@ public class DatasetServiceImp String key = String.valueOf(variable.get("key")); String def = String.valueOf(variable.get("default")); Date currentDate = DateUtils.convertStringToDate(DateUtils.getCurrentDate(DateUtils.YYYY_MM_DD_HH24_MM_SS), DateUtils.YYYY_MM_DD_HH24_MM_SS); - if ("start_time".equals(key)&& StringUtil.isBlank(def)){ + if ("start_time".equals(key) && StringUtil.isBlank(def)) { def = DateUtils.getFormatDate(DateUtils.getSomeHour(currentDate, -1), DateUtils.YYYY_MM_DD_HH24_MM_SS); variable.put("default", def); } - if ("end_time".equals(key)&& StringUtil.isBlank(def)){ + if ("end_time".equals(key) && StringUtil.isBlank(def)) { def = DateUtils.getFormatDate(currentDate, DateUtils.YYYY_MM_DD_HH24_MM_SS); variable.put("default", def); } @@ -82,7 +82,7 @@ public class DatasetServiceImp List<Record> list = Db.find(buildGetDatasetSQL(Lists.newArrayList(datasetId), null, null)); if (CollectionUtils.isEmpty(list)) { throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), - String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(),QGWMessageConst.DATASET_ID_NOT_EXIST)); + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), QGWMessageConst.DATASET_ID_NOT_EXIST)); } List<Map<String, Object>> results = formatResultData(list); Map<String, Object> result = results.get(0); @@ -114,7 +114,7 @@ public class DatasetServiceImp @Override public BaseResult getPreview(String datasetId) { Map<String, Object> dataset = getDataset(datasetId); - String template = buildExecSQL(getVariable(), String.valueOf(dataset.get(TEMPLATE))); + String template = buildExecDSL(getVariable(), String.valueOf(dataset.get(TEMPLATE))); String datasetType = String.valueOf(dataset.get(DATASET_TYPE)); if (datasetType.equalsIgnoreCase("sql")) { SqlQueryRequestParam sqlQueryRequest = JSON.parseObject(template, SqlQueryRequestParam.class); @@ -134,22 +134,24 @@ public class DatasetServiceImp @Override - public String buildExecSQL(List<LinkedHashMap> variables, String sql) { - Matcher matcher = pLeftRightFlag.matcher(sql); + public String buildExecDSL(List<LinkedHashMap> variables, String template) { + Matcher matcher = pLeftRightFlag.matcher(template); while (matcher.find()) { - sql = processOptionalClause(sql, "[[", "]]"); + template = processOptionalClause(template, "[[", "]]"); } - sql = processFieldVariable(sql); for (LinkedHashMap linkedHashMap : variables) { String variable = String.valueOf(linkedHashMap.get("key")); - String def = String.valueOf(linkedHashMap.get("default")); - if ("filter".equals(variable) && StringUtil.isBlank(def)) { - def = " 1 = 1"; + Object def = linkedHashMap.get("default"); + String parameter; + if (def instanceof Map) { + parameter = "\"${".concat(variable).concat("}\""); + } else { + parameter = "${".concat(variable).concat("}"); } - String parameter = "${".concat(variable).concat("}"); - sql = sql.replace(parameter, def); + template = template.replace(parameter, String.valueOf(def)); } - return sql; + template = processFieldVariable(template); + return template; } private String buildGetDatasetSQL(List<String> ids, String category, String backendEngine) { diff --git a/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java new file mode 100644 index 00000000..fc4317f2 --- /dev/null +++ b/src/main/java/com/mesalab/qgw/service/impl/FieldDiscoveryServiceImpl.java @@ -0,0 +1,525 @@ +package com.mesalab.qgw.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.NumberUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.geedgenetworks.utils.StringUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.jayway.jsonpath.JsonPath; +import com.mesalab.common.entity.BaseResult; +import com.mesalab.common.entity.DataTypeMapping; +import com.mesalab.common.enums.HttpStatusCodeEnum; +import com.mesalab.common.exception.BusinessException; +import com.mesalab.common.exception.CommonErrorCode; +import com.mesalab.common.utils.HazelcastInstanceMapUtil; +import com.mesalab.qgw.exception.QGWBusinessException; +import com.mesalab.qgw.model.basic.DSLQueryRequestParam; +import com.mesalab.qgw.model.basic.EngineConfigSource; +import com.mesalab.qgw.model.basic.QueryCache; +import com.mesalab.qgw.model.basic.SQLQueryContext; +import com.mesalab.qgw.service.DatabaseService; +import com.mesalab.qgw.service.FieldDiscoveryService; +import com.mesalab.qgw.service.SQLSyncQueryService; +import com.mesalab.services.common.enums.MetricFunction; +import com.mesalab.services.common.enums.MetricType; +import com.mesalab.services.configuration.JobConfig; +import com.mesalab.services.service.impl.TaskExecuteService; +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +import java.text.DecimalFormat; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static com.mesalab.services.service.impl.JobExecuteService.getDecimalFormat; +import static com.mesalab.services.service.impl.JobExecuteService.markJobFailure; + +/** + * TODO + * + * @Classname FieldDiscoveryServiceImpl + * @Date 2024/9/23 14:06 + * @Author wWei + */ +@Service +public class FieldDiscoveryServiceImpl implements FieldDiscoveryService { + private static final Log log = LogFactory.get(); + private DatabaseService databaseService; + private Environment environment; + private EngineConfigSource engineConfigSource; + private TaskExecuteService taskExecuteService; + private JobConfig jobCfg; + private SQLSyncQueryService sqlSyncQueryService; + + private final ThreadPoolTaskExecutor taskThreadPool; + + public FieldDiscoveryServiceImpl(ThreadPoolTaskExecutor taskThreadPool) { + this.taskThreadPool = taskThreadPool; + } + + + @Override + public void asyncRun(DSLQueryRequestParam request) throws ExecutionException { + JobInfo jobInfo = new JobInfo(request.getId()); + String partitionKey = databaseService.getPartitionKey(request.getDataSource()); + List<String> intervals = request.getIntervals(); + String[] split = intervals.get(0).split("/"); + DateTime start = DateUtil.parse(split[0]); + DateTime end = DateUtil.parse(split[1]); + Interval interval = new Interval(); + interval.setStart(start.getTime() / 1000); + interval.setEnd(end.getTime() / 1000); + try { + if (jobCfg.isTimeSlicingEnabled() && interval.isValid()) { + executeSlicingFieldDiscoveryJob(request.getId(), jobInfo, request, interval, partitionKey); + } else { + String filter = String.format(buildFilterFormatSql(), + request.getFilter(), + partitionKey, interval.getStart(), partitionKey, interval.getEnd()); + executeFieldDiscoveryJob(request.getId(), jobInfo, request, filter); + } + } catch (InterruptedException e) { + // 处理 InterruptedException 并保留中断状态 + Thread.currentThread().interrupt(); + markJobFailure(request.getId(), "Job was interrupted: " + e.getMessage()); + } + + } + + @Override + public void validate(DSLQueryRequestParam request) { + Object filter = request.getFilter(); + if (StrUtil.isEmptyIfStr(filter)) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Filter cannot be null or empty.")); + } + Object dataSource = request.getDataSource(); + if (StrUtil.isEmptyIfStr(dataSource)) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Data Source cannot be null or empty.")); + } + List<String> intervals = request.getIntervals(); + if (intervals == null || intervals.size() == 0) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "Intervals cannot be null or empty.")); + + } + + Map<String, Object> customRequestParam = request.getCustomRequestParam(); + if (customRequestParam == null || customRequestParam.size() == 0) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.dimensions cannot be null or empty.")); + + } + + Object dimensions = customRequestParam.get("custom.statistics.dimensions"); + if (!(dimensions instanceof List)) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.dimensions cannot be null or empty.")); + } + List<Map<String, Object>> collect = ((List<Map<String, Object>>) dimensions).stream().distinct().collect(Collectors.toList()); + request.getCustomRequestParam().put("custom.statistics.dimensions", collect); + + Object metrics = customRequestParam.get("custom.statistics.metrics"); + if ((metrics instanceof List) && ((List<Map<String, Object>>) metrics).size() == 1) { + Map<String, Object> o = ((List<Map<String, Object>>) metrics).get(0); + Object metric = o.get("metric_name"); + boolean validMetric = MetricType.isValid(String.valueOf(metric)); + Map<String, Object> function = (Map<String, Object>) o.get("function"); + Object funName = function.get("name"); + boolean validFn = MetricFunction.isValid(String.valueOf(funName)); + if (!validMetric || !validFn) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.PARAMETER_ERROR.getCode(), + String.format(CommonErrorCode.PARAMETER_ERROR.getMessage(), "custom.statistics.metrics.* illegal.")); + } + } + + } + + private void executeFieldDiscoveryJob(String id, JobInfo jobInfo, DSLQueryRequestParam request, String segmentFilter) throws ExecutionException, InterruptedException { + if (isCancel(jobInfo.getId())) { + return; + } + String dataSource = request.getDataSource(); + String totalMetric = "count(*)"; + Map<String, Object> customRequestParam = request.getCustomRequestParam(); + Object metrics = customRequestParam.get("custom.statistics.metrics"); + Object metric = null; + Object fn = null; + if (StringUtil.isNotEmpty(metrics)) { + List<Map<String, Object>> metricsList = (List<Map<String, Object>>) metrics; + Map<String, Object> map = metricsList.get(0); + metric = map.get("metric_name"); + Map<String, Object> function = (Map<String, Object>) map.get("function"); + fn = function.get("name"); + String measurements = databaseService.getValueByKeyInSchemaDoc(dataSource, "measurements"); + List<Map<String, String>> read = JsonPath.read(measurements, "$.field_discovery_metric['" + metric + "'][?(@.fn == '" + fn + "')]"); + if (!read.isEmpty()) { + String column = read.get(0).get("column"); + totalMetric = read.get(0).get("fn") + "(" + column + ")"; + } + } + + Map<String, Long> map = getCount(dataSource, segmentFilter, totalMetric); + long currentCount = map.get("logCount"); + if (currentCount == 0) { + return; + } + Map<String, Future<List<Map<String, Object>>>> taskCallbackList = Maps.newHashMap(); + List<String> fields = Lists.newArrayList(); + List<Map<String, Object>> o = (List<Map<String, Object>>) customRequestParam.get("custom.statistics.dimensions"); + o.forEach(x -> fields.add(String.valueOf(x.get("dimension_name")))); + + for (String field : fields) { + Object finalMetric = metric; + Object finalFn = fn; + taskCallbackList.put(field, CompletableFuture.supplyAsync(() -> taskExecuteService.executeFieldDiscovery(id, request.getDataSource(), field + , finalMetric + , finalFn + , segmentFilter), taskThreadPool)); + } + waitAsyncResultAndUpdateProcess(id, jobInfo, taskCallbackList, request, map, metric, fn); + } + + private void executeSlicingFieldDiscoveryJob(String id, JobInfo jobInfo, DSLQueryRequestParam request, Interval interval, String partitionKey) throws ExecutionException, InterruptedException { + jobInfo.setTotalTimes((int) ((interval.getEnd() - interval.getStart()) / jobCfg.getTimeSlicingInterval() + 1)); + while (true) { + jobInfo.setCurrentTimes(jobInfo.getCurrentTimes() + 1); + if (jobInfo.isTimeout()) { + log.error("execute job timeout: job-{}", jobInfo); + markJobFailure(id, "execute job timeout"); + break; + } + QueryCache queryCache = HazelcastInstanceMapUtil.get(id); + if (StrUtil.isEmptyIfStr(queryCache)) { + log.warn("cache expiration: job-{}", jobInfo); + break; + } + Map<String, Object> job = queryCache.getBaseResult().getJob(); + if (BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_DONE))) + || BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_FAILED))) + || BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_CANCELED)))) { + log.warn("job done or interrupt: job obj-{}", job); + break; + } + long start = interval.getEnd() - jobCfg.getTimeSlicingInterval(); + if (start > interval.getStart()) { + interval.setEnd(start); + String newFilter = String.format(buildFilterFormatSql(), + request.getFilter(), + partitionKey, interval.getEnd(), partitionKey, interval.getEnd() + jobCfg.getTimeSlicingInterval()); + executeFieldDiscoveryJob(id, jobInfo, request, newFilter); + } else { + String where = String.format(buildFilterFormatSql(), + request.getFilter(), + partitionKey, interval.getStart(), partitionKey, interval.getEnd()); + executeFieldDiscoveryJob(id, jobInfo, request, where); + break; + } + } + } + + private static String buildFilterFormatSql() { + return "(%s) AND (%s >= %s AND %s < %s)"; + } + + private boolean isCancel(String id) { + QueryCache queryCache = HazelcastInstanceMapUtil.get(id); + if (queryCache == null) { + log.info("Can't find the cache, job id is: {}", id); + return true; + } + long currentTime = System.currentTimeMillis(); + if (currentTime - queryCache.getLatestQueryTimeMs() > jobCfg.getInteractiveTimeout()) { + queryCache.getBaseResult().getJob().put(JobConfig.IS_CANCELED, true); + HazelcastInstanceMapUtil.put(id, queryCache); + log.info("Interactive timeout, job id is: {}", id); + return true; + } + return false; + } + + private void waitAsyncResultAndUpdateProcess(String id, JobInfo jobInfo, Map<String, Future<List<Map<String, Object>>>> taskCallbackList, DSLQueryRequestParam request, Map<String, Long> mapCurrent + , Object metric, Object fn) throws ExecutionException, InterruptedException { + List<String> updatedResult = Lists.newArrayList(); + while (true) { + if (jobInfo.isTimeout()) { + log.error("execute job timeout, job id is: {}", id); + markJobFailure(id, "execute job timeout"); + taskCallbackList.keySet().forEach(o -> taskCallbackList.get(o).cancel(true)); + break; + } + if (updatedResult.size() >= taskCallbackList.size()) { + break; + } + long completedTaskCount = taskCallbackList.keySet().stream().filter(o -> taskCallbackList.get(o).isDone()).count(); + if ((completedTaskCount - updatedResult.size()) * 1.0 / taskCallbackList.size() < 0.01 && completedTaskCount < taskCallbackList.size()) { + continue; + } + log.info("id :{}, done size: {}, task size: {}", id, completedTaskCount, taskCallbackList.size()); + DecimalFormat format = getDecimalFormat(); + double jobCompletedProcess = Double.parseDouble(format.format(jobInfo.getCurrentTimes() * 1.0 / jobInfo.getTotalTimes() - ((1 - completedTaskCount * 1.0 / taskCallbackList.size()) / jobInfo.getTotalTimes()))); + List<String> queryDoneField = taskCallbackList.keySet().stream().filter(o -> taskCallbackList.get(o).isDone()).collect(Collectors.toList()); + for (String field : queryDoneField) { + if (updatedResult.contains(field)) { + continue; + } + updatedResult.add(field); + Long currentTotalMetric = mapCurrent.get("totalMetric"); + String fnDefault = "count"; + String value = "count"; + if (StringUtil.isNotEmpty(metric)) { + String measurements = databaseService.getValueByKeyInSchemaDoc(request.getDataSource(), "measurements"); + List<Map<String, String>> read = JsonPath.read(measurements, "$.field_discovery_metric['" + metric + "'][?(@.fn == '" + fn + "')]"); + if (!read.isEmpty()) { + fnDefault = fn.toString(); + value = read.get(0).get("value"); + } + } + Future<List<Map<String, Object>>> listFuture = taskCallbackList.get(field); + List<Map<String, Object>> data = listFuture.get(); + data.forEach(x -> { + for (String k : x.keySet()) { + if (k.startsWith(JobConfig.FIELD_DISCOVERY_TOPK_METRIC_PREFIX)) { + Object v = x.get(k); + x.put(k.replace(JobConfig.FIELD_DISCOVERY_TOPK_METRIC_PREFIX, ""), v); + x.remove(k); + } + } + }); + QueryCache queryCache = HazelcastInstanceMapUtil.get(id); + if (StrUtil.isEmptyIfStr(queryCache)) { + markJobFailure(id, "Cache expiration."); + log.error("field_discovery task query lastTopK Error: Cache expiration."); + return; + } + Object lastTotalMetric = 0; + if (StringUtil.isNotEmpty(queryCache.getBaseResult().getData())) { + List<Map> map = (List<Map>) queryCache.getBaseResult().getData(); + for (Map map1 : map) { + if (CollectionUtil.isNotEmpty(map1) && map1.containsKey(field) && !StrUtil.isEmptyIfStr(map1.get(field))) { + Map item = (Map) map1.get(field); + List<Map<String, Object>> topk = (List<Map<String, Object>>) item.get("topk"); + lastTotalMetric = item.get(value); + Map<String, String> metrics = Maps.newHashMap(); + metrics.put(value, "count".equalsIgnoreCase(fnDefault) ? "sum" : fnDefault); + data = mergeData(data, topk, Lists.newArrayList("value"), metrics); + } + + } + } + data = sortDataAndSetMaxSize(data, value, DataTypeMapping.LONG, false); + updateResult(id, field, currentTotalMetric, lastTotalMetric, value, data, jobCompletedProcess); + + } + if (completedTaskCount >= taskCallbackList.size()) { + break; + } + } + } + + private List<Map<String, Object>> mergeData(List<Map<String, Object>> data1, List<Map<String, Object>> date2, List<String> dimensions, Map<String, String> metrics) { + Collection<Map<String, Object>> data = CollectionUtil.addAll(data1, date2); + Map<String, Map<String, Object>> result = Maps.newHashMap(); + for (Map<String, Object> datum : data) { + StringBuilder key = new StringBuilder(); + dimensions.forEach(k -> key.append(datum.get(k))); + if (!result.containsKey(key.toString())) { + result.put(key.toString(), datum); + continue; + } + Map<String, Object> item = result.get(key.toString()); + for (String k : metrics.keySet()) { + String action = metrics.get(k); + Object o1 = item.get(k); + Object o2 = datum.get(k); + if (StringUtil.isEmpty(o1) || StringUtil.isEmpty(o2)) { + item.put(k, (StringUtil.isEmpty(o1) && StringUtil.isEmpty(o2)) ? null : StringUtil.isEmpty(o1) ? o2 : o1); + continue; + } + Number number1 = NumberUtil.parseNumber(o1.toString()); + Number number2 = NumberUtil.parseNumber(o2.toString()); + if (action.equals("sum")) { + item.put(k, NumberUtil.add(number1, number2)); + continue; + } + if (NumberUtil.isDouble(o1.toString()) || NumberUtil.isDouble(o2.toString())) { + double v1 = Double.parseDouble(number1.toString()); + double v2 = Double.parseDouble(number2.toString()); + switch (action) { + case "max": + item.put(k, NumberUtil.max(v1, v2)); + break; + case "min": + item.put(k, NumberUtil.min(v1, v2)); + break; + case "avg": + item.put(k, NumberUtil.div(NumberUtil.add(v1, v2), 2)); + break; + } + } else if (NumberUtil.isLong(o1.toString()) || NumberUtil.isLong(o2.toString())) { + long v1 = Long.parseLong(number1.toString()); + long v2 = Long.parseLong(number2.toString()); + switch (action) { + case "max": + item.put(k, NumberUtil.max(v1, v2)); + break; + case "min": + item.put(k, NumberUtil.min(v1, v2)); + break; + case "avg": + item.put(k, NumberUtil.div(NumberUtil.add(v1, v2), 2)); + break; + } + } else { + log.error("task merge data error, data is: {}, {}", data1, date2); + throw new BusinessException("task merge data error."); + } + } + } + return new ArrayList<>(result.values()); + } + + private void updateResult(String id, String field, long currentTotalMetric, Object lastTotalMetric, String value, List<Map<String, Object>> topList, double jobCompletedProcess) { + Map<String, Object> map = Maps.newHashMap(); + map.put("topk", topList); + map.put("distinct_count", Math.min(topList.size(), 100)); + map.put(value, currentTotalMetric + Long.parseLong(lastTotalMetric.toString())); + try { + HazelcastInstanceMapUtil.retrieveMap().lock(id); + QueryCache queryCache = HazelcastInstanceMapUtil.get(id); + BaseResult<Object> baseResult = queryCache.getBaseResult(); + if (jobCompletedProcess < 1) { + Map<String, Object> job = queryCache.getBaseResult().getJob(); + job.put(JobConfig.DONE_PROGRESS, jobCompletedProcess); + } + + Object data1 = baseResult.getData(); + if (StrUtil.isEmptyIfStr(data1)) { + List<Object> list = Lists.newArrayList(); + Map<Object, Object> item = Maps.newLinkedHashMap(); + item.put(field, map); + list.add(item); + baseResult.setData(list); + HazelcastInstanceMapUtil.put(id, queryCache); + return; + } + List<Map<String, Object>> list = (List<Map<String, Object>>) data1; + for (int i = 0; i < list.size(); i++) { + Map<String, Object> map1 = list.get(i); + if (map1.containsKey(field)) { + map1.put(field, map); + break; + } + } + if (list.stream().noneMatch(o -> o.containsKey(field))) { + Map<String, Object> objectObjectHashMap = Maps.newHashMap(); + objectObjectHashMap.put(field, map); + list.add(objectObjectHashMap); + } + HazelcastInstanceMapUtil.put(id, queryCache); + } finally { + HazelcastInstanceMapUtil.retrieveMap().unlock(id); + } + } + + private Map<String, Long> getCount(String logType, String filter, String totalMetric) { + String sql = String.format(Objects.requireNonNull(environment.getProperty("JOB_LOG_COUNT")), + totalMetric, logType, StrUtil.isBlankIfStr(filter) ? "" : "WHERE ".concat(filter)); + BaseResult baseResult = sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build()); + if (!baseResult.isSuccess()) { + log.error("Job-get log count failed, message: {}", baseResult.getMessage()); + throw new QGWBusinessException(baseResult.getStatus(), baseResult.getCode(), "Job-get log count failed, message: " + baseResult.getMessage()); + } + List<Map<String, Object>> data = (List<Map<String, Object>>) baseResult.getData(); + Map<String, Long> map = Maps.newHashMap(); + map.put("logCount", Long.parseLong(data.get(0).get("logCount").toString())); + map.put("totalMetric", Long.parseLong(data.get(0).get("totalMetric").toString())); + return map; + } + + private List<Map<String, Object>> sortDataAndSetMaxSize(List<Map<String, Object>> data, String sortElement, String sortType, boolean isAsc) { + if (DataTypeMapping.INT.equalsIgnoreCase(sortType) || DataTypeMapping.LONG.equalsIgnoreCase(sortType)) { + data.sort(Comparator.comparing(o -> Long.valueOf(StringUtil.isEmpty(o.get(sortElement)) ? Long.MIN_VALUE + "" : o.get(sortElement).toString()))); + } else if (DataTypeMapping.FLOAT.equalsIgnoreCase(sortType) || DataTypeMapping.DOUBLE.equalsIgnoreCase(sortType)) { + data.sort(Comparator.comparing(o -> Double.valueOf(StringUtil.isEmpty(o.get(sortElement)) ? Long.MIN_VALUE + "" : o.get(sortElement).toString()))); + } else { + data.sort(Comparator.comparing(o -> String.valueOf(o.get(sortElement)))); + } + if (!isAsc) { + Collections.reverse(data); + } + return data.size() > engineConfigSource.getMaxCacheNum() ? data.subList(0, engineConfigSource.getMaxCacheNum()) : data; + } + + @Data + class Interval { + private Long start; + private Long end; + + boolean isValid() { + return StringUtil.isNotEmpty(this.start) + && StringUtil.isNotEmpty(this.end) + && this.end - this.start > jobCfg.getTimeSlicingInterval(); + } + } + + @Data + class JobInfo { + private String id; + private long startTime = System.currentTimeMillis(); + private long count = 0; + private int totalTimes = 1; + private int currentTimes = 1; + + public JobInfo(String id) { + this.id = id; + } + + public boolean isTimeout() { + return System.currentTimeMillis() - startTime > jobCfg.getExecutionTimeout(); + } + } + + @Autowired + public void setDatabaseService(DatabaseService databaseService) { + this.databaseService = databaseService; + } + + @Autowired + public void setJobCfg(JobConfig jobCfg) { + this.jobCfg = jobCfg; + } + + @Autowired + public void setTaskExecuteService(TaskExecuteService taskExecuteService) { + this.taskExecuteService = taskExecuteService; + } + + @Autowired + public void setSqlSyncQueryService(SQLSyncQueryService sqlSyncQueryService) { + this.sqlSyncQueryService = sqlSyncQueryService; + } + + @Autowired + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + @Autowired + public void setEngineConfigSource(EngineConfigSource engineConfigSource) { + this.engineConfigSource = engineConfigSource; + } +} diff --git a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java index e495cf34..27cb64fc 100644 --- a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java +++ b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java @@ -107,7 +107,19 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { long start = System.currentTimeMillis(); ExecutionMode execMode = request.getExecutionMode(); if (ExecutionMode.ONESHOT.equals(execMode)) { - if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) { + execAdnWaitFieldDiscoveryDoneDeprecatedV2410(request.getId(), request); + QueryCache queryCache = rebuildFieldDiscoveryQueryCache(Objects.requireNonNull(getAdhocQueryCacheAndRefresh(request.getId()))); + HazelcastInstanceMapUtil.remove(request.getId()); + BaseResult<Object> baseResult = queryCache.getBaseResult(); + Map<String, Object> job = baseResult.getJob(); + if (!BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_CANCELED))) + && !BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_FAILED))) + && BooleanUtil.toBoolean(String.valueOf(job.get(JobConfig.IS_DONE)))) { + return BaseResultGenerator.success(baseResult.getStatistics(), null, baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData()); + } + return BaseResultGenerator.error(baseResult.getMessage()); + } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { execAdnWaitFieldDiscoveryDone(request.getId(), request); QueryCache queryCache = rebuildFieldDiscoveryQueryCache(Objects.requireNonNull(getAdhocQueryCacheAndRefresh(request.getId()))); HazelcastInstanceMapUtil.remove(request.getId()); @@ -156,9 +168,12 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } } else if (ExecutionMode.NORMAL.equals(execMode)) { QueryCache queryCache = new QueryCache(request.getId(), request.getName()); - if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) { validFieldDiscovery(request); HazelcastInstanceMapUtil.put(request.getId(), queryCache); + jobExecuteService.addExecutorFieldDiscoveryDeprecatedV2410(request); + } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { + HazelcastInstanceMapUtil.put(request.getId(), queryCache); jobExecuteService.addExecutorFieldDiscovery(request); } else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) { HazelcastInstanceMapUtil.put(request.getId(), queryCache); @@ -185,7 +200,10 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId())); } else if (ExecutionMode.BLOCKING.equals(execMode)) { - if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(request.getName())) { + execAdnWaitFieldDiscoveryDoneDeprecatedV2410(request.getId(), request); + return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId())); + } else if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { execAdnWaitFieldDiscoveryDone(request.getId(), request); return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId())); } else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) { @@ -234,10 +252,22 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "not Supported")); } - private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) { - QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY); + private void execAdnWaitFieldDiscoveryDoneDeprecatedV2410(String id, DSLQueryRequestParam request) { + QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410); validFieldDiscovery(request); HazelcastInstanceMapUtil.put(id, queryCacheStart); + Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscoveryDeprecatedV2410(request); + while (true) { + if (!booleanFuture.isDone()) { + continue; + } + break; + } + } + + private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) { + QueryCache queryCacheStart = new QueryCache(id, request.getName()); + HazelcastInstanceMapUtil.put(id, queryCacheStart); Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscovery(request); while (true) { if (!booleanFuture.isDone()) { @@ -247,6 +277,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } } + private QueryCache buildQueryCacheOfDone(String id, long start, BaseResult baseResult) { QueryCache queryCache = new QueryCache(id, null); Map<String, Object> jobInfo = queryCache.getBaseResult().getJob(); @@ -269,7 +300,9 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { if (queryCache == null) { return BaseResultGenerator.success(Lists.newArrayList()); } - if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryCache.getType())) { + queryCache = rebuildFieldDiscoveryQueryCache(queryCache); + } else if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) { queryCache = rebuildFieldDiscoveryQueryCache(queryCache); } else if (JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(queryCache.getType())) { queryCache = rebuildTrafficSpectrumCIPConnectAppUsageQueryCache(queryCache); @@ -345,7 +378,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { if (queryCache == null) { continue; } - if (JobConfig.FIELD_DISCOVERY.equals(queryCache.getType())) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryCache.getType())) { queryCache = rebuildFieldDiscoveryQueryCache(queryCache); } baseResult = queryCache.getBaseResult(); diff --git a/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java b/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java index f2b7687b..c5e23600 100644 --- a/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java +++ b/src/main/java/com/mesalab/qgw/service/impl/TroubleshootingServiceImp.java @@ -272,7 +272,7 @@ public class TroubleshootingServiceImp implements TroubleshootingService, Enviro datasetList.forEach(record -> { Map<String, String> map = Maps.newHashMap(); map.put(IDENTIFIER_NAME, String.valueOf(record.getColumns().get(IDENTIFIER_NAME))); - map.put(TEMPLATE, String.valueOf(datasetService.buildExecSQL(variables, String.valueOf(record.getColumns().get(TEMPLATE))))); + map.put(TEMPLATE, String.valueOf(datasetService.buildExecDSL(variables, String.valueOf(record.getColumns().get(TEMPLATE))))); if (record.getColumns().get(DATASET_TYPE).equals("sql")) { sqlDatasetList.add(map); } else if (record.getColumns().get(DATASET_TYPE).equals("dsl")) { @@ -346,7 +346,7 @@ public class TroubleshootingServiceImp implements TroubleshootingService, Enviro Map<String, String> map = Maps.newHashMap(); map.put(IDENTIFIER_NAME, identifierName); map.put(TYPE, type); - map.put(TEMPLATE, String.valueOf(datasetService.buildExecSQL(variables, template))); + map.put(TEMPLATE, String.valueOf(datasetService.buildExecDSL(variables, template))); if (DBEngineType.DRUID.getValue().equalsIgnoreCase(backendEngine)) { druidDatasetList.add(map); } else if (DBEngineType.CLICKHOUSE.getValue().equalsIgnoreCase(backendEngine)) { diff --git a/src/main/java/com/mesalab/services/configuration/JobConfig.java b/src/main/java/com/mesalab/services/configuration/JobConfig.java index ee9af771..9ce87bc7 100644 --- a/src/main/java/com/mesalab/services/configuration/JobConfig.java +++ b/src/main/java/com/mesalab/services/configuration/JobConfig.java @@ -55,7 +55,8 @@ public class JobConfig { public static final String FILTER = "filter"; public static final String QUERY_DATA_SOURCE = "query.data_source"; public static final String SAVED_QUERY = "saved_query"; - public static final String FIELD_DISCOVERY = "field_discovery"; + public static final String FIELD_DISCOVERY_DEPRECATED_V2410 = "field_discovery"; + public static final String FIELD_DISCOVERY = "field-discovery"; public static final String LOG_QUERY = "log-query"; public static final String DATAPATH_PACKET_COMBINE = "datapath_telemetry_packet_combine"; public static final String TRAFFIC_SPECTRUM_SUMMARY = "traffic-spectrum-summary"; diff --git a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java index cf0ae5d5..85f0a04f 100644 --- a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java +++ b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java @@ -14,14 +14,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; import com.mesalab.common.entity.BaseResult; -import com.mesalab.common.entity.BaseResultGenerator; import com.mesalab.common.entity.DataTypeMapping; import com.mesalab.common.enums.DBEngineType; import com.mesalab.common.enums.QueryOption; import com.mesalab.common.enums.HttpStatusCodeEnum; import com.mesalab.common.exception.BusinessException; import com.mesalab.common.utils.HazelcastInstanceMapUtil; -import com.mesalab.common.utils.sqlparser.AutoPeriodHelper; import com.mesalab.common.utils.sqlparser.SQLFunctionUtil; import com.mesalab.common.utils.sqlparser.SQLHelper; import com.mesalab.qgw.constant.DslIdentifierNameConst; @@ -52,7 +50,6 @@ import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; -import java.io.Serializable; import java.math.RoundingMode; import java.text.DecimalFormat; import java.util.*; @@ -79,6 +76,7 @@ public class JobExecuteService implements EnvironmentAware { private DatabaseService databaseService; private DSLService dslService; private PacketCombineDslService packetCombineDslService; + private FieldDiscoveryService fieldDiscoveryService; private TrafficSpectrumDslService trafficSpectrumDslService; private LogQueryService logQueryService; private EngineConfigSource engineConfigSource; @@ -95,7 +93,7 @@ public class JobExecuteService implements EnvironmentAware { } @Async("lightWeightThreadPool") - public Future<Boolean> addExecutorFieldDiscovery(DSLQueryRequestParam request) { + public Future<Boolean> addExecutorFieldDiscoveryDeprecatedV2410(DSLQueryRequestParam request) { try { markJobBegin(request.getId()); JobInfo jobInfo = new JobInfo(request.getId()); @@ -118,6 +116,20 @@ public class JobExecuteService implements EnvironmentAware { return new AsyncResult<>(true); } + @Async("lightWeightThreadPool") + public Future<Boolean> addExecutorFieldDiscovery(DSLQueryRequestParam request) { + try { + markJobBegin(request.getId()); + fieldDiscoveryService.validate(request); + fieldDiscoveryService.asyncRun(request); + } catch (Exception e) { + markJobFailure(request.getId(), e.getMessage()); + } finally { + markJobCompletion(request.getId()); + } + return new AsyncResult<>(true); + } + @Deprecated @Async("lightWeightThreadPool") public void addExecutorStatistics(String id, HashMap<String, Object> param, Map<String, Object> property) { @@ -823,7 +835,7 @@ public class JobExecuteService implements EnvironmentAware { return map; } - private static DecimalFormat getDecimalFormat() { + public static DecimalFormat getDecimalFormat() { DecimalFormat format = new DecimalFormat("#0.####"); format.applyPattern("0.0000"); format.setRoundingMode(RoundingMode.FLOOR); @@ -995,4 +1007,9 @@ public class JobExecuteService implements EnvironmentAware { public void setLogQueryService(LogQueryService logQueryService) { this.logQueryService = logQueryService; } + + @Autowired + public void setFieldDiscoveryService(FieldDiscoveryService fieldDiscoveryService) { + this.fieldDiscoveryService = fieldDiscoveryService; + } } diff --git a/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java b/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java index 8f45d0f1..4470ce68 100644 --- a/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java +++ b/src/main/java/com/mesalab/services/service/impl/JobServiceImpl.java @@ -110,7 +110,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware { } log.info("Add Hoc Job, ID is {}, params is {}", id, JSON.toJSONString(reqBody)); Map<String, Object> property = Maps.newLinkedHashMap(); - if (JobConfig.FIELD_DISCOVERY.equals(queryType)) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(queryType)) { initFieldDiscoveryJob(id, reqBody, property); //jobExecuteService.addExecutorFieldDiscovery(id, reqBody); } else if (JobConfig.STATISTICS.equals(queryType)) { @@ -394,7 +394,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware { } private void initFieldDiscoveryJob(String id, HashMap<String, Object> body, Map<String, Object> property) { - property.put(JobConfig.JOB_PROPERTY_TYPE, JobConfig.FIELD_DISCOVERY); + property.put(JobConfig.JOB_PROPERTY_TYPE, JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410); List<String> fields = (List<String>) body.get(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS); List<String> interim = Lists.newArrayList(fields); BaseResult baseResult = sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(env.getProperty("JOB_GET_FIELD_DISCOVERY_ALL_COLUMN")).option(QueryOption.REAL_TIME.getValue()).build()); @@ -407,7 +407,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware { meta.forEach(m -> metaName.add(m.get("name").toString())); interim.removeAll(metaName); for (String field : interim) { - execute(String.format(Objects.requireNonNull(env.getProperty("JOB_ADD_COLUMN")), JobConfig.FIELD_DISCOVERY, field)); + execute(String.format(Objects.requireNonNull(env.getProperty("JOB_ADD_COLUMN")), JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410, field)); } property.put(JobConfig.JOB_PROPERTY_K_RESULT_FIELD, String.join(",", fields)); execute(String.format(Objects.requireNonNull(env.getProperty("JOB_INIT")), id, false, 0, false, false, System.currentTimeMillis(), JSON.toJSONString(property), 0, 0)); @@ -599,7 +599,7 @@ public class JobServiceImpl implements JobService, EnvironmentAware { private void validateAdHocQuery(HashMap<String, Object> body) { Object type = body.get(JobConfig.KEY_QUERY_TYPE); - if (JobConfig.FIELD_DISCOVERY.equals(type)) { + if (JobConfig.FIELD_DISCOVERY_DEPRECATED_V2410.equals(type)) { //validCustomParamsOfFieldDiscovery(body); //List<String> fields = (List<String>) body.get(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS); //body.put(JobConfig.KEY_CUSTOM_FIELD_DISCOVERY_FIELDS, fields.stream().distinct().collect(Collectors.toList())); |
