summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangwei <[email protected]>2024-09-20 19:06:07 +0800
committerwangwei <[email protected]>2024-09-20 19:06:07 +0800
commita576ea7edd47a4720b6641af1279bbf5c8aff4eb (patch)
treef73b8760ee24c7b1d32a987f7cb8ff1af191f31b
parent16f7ce1b76e8f87bd370a94e2af67ca77ccc18e8 (diff)
[Feature][log-query] 支持日志DSL查询功能(基础功能版)
-rw-r--r--config/flyway/tsg/R__init_datasets.sql1
-rw-r--r--src/main/java/com/mesalab/qgw/controller/QueryController.java37
-rw-r--r--src/main/java/com/mesalab/qgw/model/basic/QueryCache.java31
-rw-r--r--src/main/java/com/mesalab/qgw/service/LogQueryService.java21
-rw-r--r--src/main/java/com/mesalab/qgw/service/QueryJobService.java16
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java173
-rw-r--r--src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java68
-rw-r--r--src/main/java/com/mesalab/services/configuration/JobConfig.java4
-rw-r--r--src/main/java/com/mesalab/services/service/impl/JobExecuteService.java25
-rw-r--r--src/main/resources/dsl-sql-template.sql6
10 files changed, 362 insertions, 20 deletions
diff --git a/config/flyway/tsg/R__init_datasets.sql b/config/flyway/tsg/R__init_datasets.sql
index c9170853..e9659b1f 100644
--- a/config/flyway/tsg/R__init_datasets.sql
+++ b/config/flyway/tsg/R__init_datasets.sql
@@ -156,6 +156,7 @@ INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`,
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-record-count', 'datapath_telemetry_record', 'qgw', 'sql', '{ "statement": "SELECT count(1) as count FROM datapath_telemetry_record WHERE recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND vsys_id in(${vsys_id}) AND ( ${filter})" }',null);
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-record-list', 'datapath_telemetry_record', 'qgw', 'sql', '{ "statement": "SELECT ${columns} FROM datapath_telemetry_record WHERE recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND vsys_id IN(${vsys_id}) AND (${filter}) ORDER BY timestamp_us ASC LIMIT ${limit}" }',null);
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-packet-combine', 'datapath_telemetry_record', 'qgw', 'dsl', '{"id":"${job_id}","name":"datapath_telemetry_packet_combine","data_source":"datapath_telemetry_record","filter":"job_id=''${job_id}'' AND vsys_id in (${vsys_id}) AND (${filter})"}',null);
+INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('log-query', 'statistics', 'qgw', 'dsl', '{"name": "log-query", "data_source": "${source}", "filter": "vsys_id in (${vsys_id}) AND (${filter})", "intervals": ["${start_time}/${end_time}" ] }',null);
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-discovery', 'statistics', 'qgw', 'dsl', '{ "name": "field_discovery", "data_source": "${source}", "filter": "vsys_id in (${vsys_id}) AND (${filter})", "custom.field_discovery.metric": "${metric}", "custom.field_discovery.metric.fn": "${fn}", "custom.field_discovery.fields": ["${field_list}"] }',null);
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-statistics-top-values', 'statistics', 'qgw', 'sql', '{ "statement":"SELECT ${column_name}, count(*) as cnt FROM ${source} where recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND ${filter} GROUP BY ${column_name} ORDER BY cnt DESC LIMIT ${limit}" }',null);
INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-statistics-rare-values', 'statistics', 'qgw', 'sql', '{ "statement":"SELECT ${column_name}, count(*) as cnt FROM ${source} where recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND ${filter} GROUP BY ${column_name} ORDER BY cnt ASC LIMIT ${limit}" }',null);
diff --git a/src/main/java/com/mesalab/qgw/controller/QueryController.java b/src/main/java/com/mesalab/qgw/controller/QueryController.java
index be534094..4448cf83 100644
--- a/src/main/java/com/mesalab/qgw/controller/QueryController.java
+++ b/src/main/java/com/mesalab/qgw/controller/QueryController.java
@@ -115,6 +115,43 @@ public class QueryController {
return deferredResult;
}
+ @GetMapping(value = "/v1/query/job/{id}/timeline", consumes = "application/x-www-form-urlencoded")
+ @AuditLog("QueryController.getJobTimelineById")
+ public BaseResult getJobTimelineById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) {
+ if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported."));
+ } else {
+ return queryJobService.getAdHocQueryTimelineById(id);
+ }
+ }
+
+ @GetMapping(value = "/v1/query/job/{id}/count", consumes = "application/x-www-form-urlencoded")
+ @AuditLog("QueryController.getJobTimelineById")
+ public BaseResult getJobCountById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) {
+ if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported."));
+ } else {
+ return queryJobService.getAdHocQueryCountById(id);
+ }
+ }
+
+ @GetMapping(value = "/v1/query/job/{id}/list", consumes = "application/x-www-form-urlencoded")
+ @AuditLog("QueryController.getJobTimelineById")
+ public BaseResult getJobListById(@PathVariable String id
+ , @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery
+ , @RequestParam(value = "fields", required = false, defaultValue = "*") String fields
+ , @RequestParam(value = "limit", required = false, defaultValue = "10") Integer limit
+ , @RequestParam(value = "offset", required = false, defaultValue = "0") Integer offset) {
+ if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported."));
+ } else {
+ return queryJobService.getAdHocQueryListById(id, fields, limit, offset);
+ }
+ }
+
@GetMapping(value = "/v1/query/job/{id}", consumes = "application/x-www-form-urlencoded")
@AuditLog("QueryController.getJobStatusById")
public DeferredResult<BaseResult> getJobStatusById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) {
diff --git a/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java b/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java
index aed03fd1..4ebab8e7 100644
--- a/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java
+++ b/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java
@@ -6,6 +6,7 @@ import com.mesalab.services.configuration.JobConfig;
import lombok.Data;
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.Map;
/**
@@ -18,10 +19,18 @@ import java.util.Map;
@Data
public class QueryCache implements Serializable {
private String type;
+ private Object requestParam;
private Long latestQueryTimeMs = System.currentTimeMillis();
private BaseResult<Object> baseResult;
- public QueryCache(String jobId) {
+
+ public QueryCache(String jobId, String type) {
this.baseResult = new BaseResult<>();
+ this.type = type;
+ Map<String, Object> job = createJobMap(jobId);
+ this.baseResult.setJob(job);
+ }
+
+ private Map<String, Object> createJobMap(String jobId) {
Map<String, Object> job = Maps.newLinkedHashMap();
job.put(JobConfig.JOB_ID, jobId);
job.put(JobConfig.IS_DONE, false);
@@ -31,10 +40,22 @@ public class QueryCache implements Serializable {
job.put(JobConfig.REASON, null);
job.put(JobConfig.START_TIME, null);
job.put(JobConfig.END_TIME, null);
- Map<String, Object> links = Maps.newLinkedHashMap();
- links.put(JobConfig.LINKS_STATUS, "/v1/query/job/" + jobId);
- links.put(JobConfig.LINKS_RESULT, "/v1/query/job/" + jobId + "/result");
+ Map<String, Object> links = createLinksMap(jobId);
job.put(JobConfig.LINKS, links);
- this.baseResult.setJob(job);
+ return job;
+ }
+
+ private Map<String, Object> createLinksMap(String jobId) {
+ Map<String, Object> links = new LinkedHashMap<>();
+ links.put(JobConfig.LINKS_STATUS, "/v1/query/job/" + jobId);
+ if (JobConfig.LOG_QUERY.equals(type)) {
+ links.put(JobConfig.LINKS_COUNT, "/v1/query/job/" + jobId + "/count");
+ links.put(JobConfig.LINKS_LIST, "/v1/query/job/" + jobId + "/list");
+ links.put(JobConfig.LINKS_TIMELINE, "/v1/query/job/" + jobId + "/timeline");
+ } else {
+ links.put(JobConfig.LINKS_RESULT, "/v1/query/job/" + jobId + "/result");
+ }
+
+ return links;
}
}
diff --git a/src/main/java/com/mesalab/qgw/service/LogQueryService.java b/src/main/java/com/mesalab/qgw/service/LogQueryService.java
new file mode 100644
index 00000000..ac2a0dae
--- /dev/null
+++ b/src/main/java/com/mesalab/qgw/service/LogQueryService.java
@@ -0,0 +1,21 @@
+package com.mesalab.qgw.service;
+
+import com.mesalab.common.entity.BaseResult;
+import com.mesalab.qgw.model.basic.DSLQueryRequestParam;
+import com.mesalab.qgw.model.basic.QueryCache;
+
+/**
+ * TODO
+ *
+ * @Classname LogQueryService
+ * @Date 2024/9/18 11:38
+ * @Author wWei
+ */
+public interface LogQueryService {
+
+ BaseResult run(DSLQueryRequestParam param);
+
+ BaseResult getQueryListByQueryContext(QueryCache queryCache, String fields, int limit, int offset);
+ BaseResult getQueryCountByQueryContext(QueryCache queryCache);
+ BaseResult getQueryTimelineByQueryContext(QueryCache queryCache);
+}
diff --git a/src/main/java/com/mesalab/qgw/service/QueryJobService.java b/src/main/java/com/mesalab/qgw/service/QueryJobService.java
index ecb99f61..a991b043 100644
--- a/src/main/java/com/mesalab/qgw/service/QueryJobService.java
+++ b/src/main/java/com/mesalab/qgw/service/QueryJobService.java
@@ -7,7 +7,6 @@ import com.mesalab.qgw.model.basic.SqlQueryRequestParam;
import java.util.List;
/**
- *
* @Date 2023/12/6 18:09
* @Author wWei
*/
@@ -16,6 +15,7 @@ public interface QueryJobService {
/**
* Create an SQL saved query and return the query job id. It will storage the query job and results in the database.
* The query job will be executed in the background through the SAVED-QUERY-SCHEDULER.
+ *
* @param sqlQueryRequestParam
* @return {@link BaseResult}
* @created by wWei
@@ -26,9 +26,10 @@ public interface QueryJobService {
/**
* Create an SQL ad-hoc query and return the query job id. The query will be executed immediately.
* It Support three modes of execution: NORMAL, BLOCKING, and ONESHOT.
- * NORMAL: The query will be executed in the asynchronous thread pool.
- * BLOCKING: The query will return the jobID when the query is completed (Sync Mode).
- * ONESHOT: The query will return the results in the same call (Sync Mode). Does not return the job ID.
+ * NORMAL: The query will be executed in the asynchronous thread pool.
+ * BLOCKING: The query will return the jobID when the query is completed (Sync Mode).
+ * ONESHOT: The query will return the results in the same call (Sync Mode). Does not return the job ID.
+ *
* @param sqlQueryRequestParam
* @return {@link BaseResult}
* @created by wWei
@@ -39,6 +40,7 @@ public interface QueryJobService {
/**
* Create a DSL ad-hoc query and return the query job id. The query will be executed immediately.
* It Support three modes of execution: NORMAL, BLOCKING, and ONESHOT[Recommend].
+ *
* @param dslQueryRequestParam
* @return {@link BaseResult}
* @return
@@ -47,6 +49,12 @@ public interface QueryJobService {
BaseResult getAdHocQueryResultById(String id);
+ BaseResult getAdHocQueryTimelineById(String id);
+
+ BaseResult getAdHocQueryCountById(String id);
+
+ BaseResult getAdHocQueryListById(String id, String fields, int limit, int offset);
+
BaseResult getSavedQueryResultById(String id);
BaseResult getAdHocQueryStatusById(String id);
diff --git a/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java
new file mode 100644
index 00000000..8e7a804e
--- /dev/null
+++ b/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java
@@ -0,0 +1,173 @@
+package com.mesalab.qgw.service.impl;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.DateUtil;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.jayway.jsonpath.JsonPath;
+import com.mesalab.common.entity.BaseResult;
+import com.mesalab.common.entity.BaseResultGenerator;
+import com.mesalab.common.enums.OutputMode;
+import com.mesalab.qgw.model.basic.DSLQueryContext;
+import com.mesalab.qgw.model.basic.DSLQueryRequestParam;
+import com.mesalab.qgw.model.basic.QueryCache;
+import com.mesalab.qgw.model.basic.SQLQueryContext;
+import com.mesalab.qgw.service.DatabaseService;
+import com.mesalab.qgw.service.LogQueryService;
+import com.mesalab.qgw.service.SQLSyncQueryService;
+import org.joda.time.Period;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * TODO
+ *
+ * @Classname LogQueryServiceImpl
+ * @Date 2024/9/18 11:41
+ * @Author wWei
+ */
+@Service("logQueryService")
+public class LogQueryServiceImpl implements LogQueryService {
+ private static final String LOG_DEFAULT_LOGICAL_TYPE = "unix_timestamp";
+ private DatabaseService databaseService;
+ private Environment environment;
+ private SQLSyncQueryService sqlSyncQueryService;
+
+ private static final Map<String, String> TIMELINE_SOURCE_TYPE_MAP = new HashMap<>();
+
+ static {
+ TIMELINE_SOURCE_TYPE_MAP.put("session_record", ", decoded_as AS type");
+ TIMELINE_SOURCE_TYPE_MAP.put("voip_record", ", decoded_as AS type");
+ TIMELINE_SOURCE_TYPE_MAP.put("security_event", ", security_action AS type");
+ TIMELINE_SOURCE_TYPE_MAP.put("proxy_event", ", proxy_action AS type");
+ TIMELINE_SOURCE_TYPE_MAP.put("dos_event", ", attack_type AS type");
+ }
+
+ @Override
+ public BaseResult run(DSLQueryRequestParam param) {
+ String partitionKey = databaseService.getPartitionKey(param.getDataSource());
+ LinkedHashMap<String, Object> schemaByName = databaseService.getSchemaByName(param.getDataSource());
+ List<String> partitionKeyLogicalTypes = JsonPath.read(schemaByName, "$.fields[?(@.name == \"" + partitionKey + "\")].type.logicalType");
+ String partitionKeyLogicalType = partitionKeyLogicalTypes.isEmpty() ? LOG_DEFAULT_LOGICAL_TYPE : partitionKeyLogicalTypes.get(0);
+ DSLQueryContext dslQueryContext = BeanUtil.copyProperties(param, DSLQueryContext.class);
+ String sqlTemplate = dslQueryContext.toSql(
+ environment.getProperty("LOG_QUERY_TIMELINE")
+ , param.getDataSource()
+ , databaseService.getPartitionKey(param.getDataSource())
+ , partitionKeyLogicalType);
+ String timelineType = TIMELINE_SOURCE_TYPE_MAP.getOrDefault(param.getDataSource(), "");
+ String sql = String.format(sqlTemplate, partitionKey, timelineType, timelineType.isEmpty() ? "" : ", type");
+ return sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build());
+ }
+
+ @Override
+ public BaseResult getQueryListByQueryContext(QueryCache queryCache, String fields, int limit, int offset) {
+ BaseResult<Object> baseResult = queryCache.getBaseResult();
+ Object dataObj = baseResult.getData();
+ if (dataObj == null || ((List) dataObj).isEmpty()) {
+ return BaseResultGenerator.success(null, null, OutputMode.JSON.getValue(), null, Lists.newArrayList());
+ }
+ List<Map<String, Object>> dataList = (List<Map<String, Object>>) dataObj;
+ dataList = dataList.stream()
+ .sorted((map1, map2) -> {
+ DateTime time1 = DateUtil.parse(String.valueOf(map1.get("stat_time")));
+ DateTime time2 = DateUtil.parse(String.valueOf(map2.get("stat_time")));
+ return time2.compareTo(time1); // 降序排序
+ })
+ .collect(Collectors.toList());
+ AtomicLong totalCount = new AtomicLong();
+ DateTime cursorStartTime = null;
+ DateTime cursorEndTime = null;
+ for (Map<String, Object> item : dataList) {
+ long itemCount = Long.parseLong(String.valueOf(item.getOrDefault("count", 0)));
+ DateTime itemTime = DateUtil.parse(String.valueOf(item.get("stat_time")));
+ if (itemCount > 0 && totalCount.get() == 0) {
+ cursorEndTime = itemTime;
+ }
+ totalCount.addAndGet(itemCount);
+ cursorStartTime = itemTime;
+ if (totalCount.get() >= (limit + offset)) {
+ break;
+ }
+ }
+ DSLQueryRequestParam requestParam = BeanUtil.copyProperties(queryCache.getRequestParam(), DSLQueryRequestParam.class);
+ String granularity = requestParam.getGranularity();
+ DSLQueryContext dslQueryContextTemp = BeanUtil.copyProperties(requestParam, DSLQueryContext.class);
+ List<String> intervals = dslQueryContextTemp.getIntervals();
+ String[] split = intervals.get(0).split("/");
+ DateTime start = DateUtil.parse(split[0]);
+ DateTime end = DateUtil.parse(split[1]);
+
+ int seconds = Period.parse(granularity).toStandardSeconds().getSeconds();
+ DateTime dateTime = DateUtil.offsetSecond(cursorEndTime, seconds);
+ if (DateUtil.compare(end, dateTime) > 0) {
+ cursorEndTime = dateTime;
+ } else {
+ cursorEndTime = end;
+ }
+
+ if (DateUtil.compare(start, cursorStartTime) > 0) {
+ cursorStartTime = start;
+ }
+ String startFormat = DateUtil.format(cursorStartTime, DatePattern.UTC_PATTERN);
+ String endFormat = DateUtil.format(cursorEndTime, DatePattern.UTC_PATTERN);
+ String dataSource = requestParam.getDataSource();
+ String partitionKey = databaseService.getPartitionKey(requestParam.getDataSource());
+ List<String> list = Lists.newArrayList(startFormat + "/" + endFormat);
+ dslQueryContextTemp.setIntervals(list);
+ dslQueryContextTemp.setLimit(offset + "," + limit);
+
+ String sqlTemplate = environment.getProperty("LOG_QUERY_LIST");
+ LinkedHashMap<String, Object> schemaByName = databaseService.getSchemaByName(requestParam.getDataSource());
+ List<String> partitionKeyLogicalTypes = JsonPath.read(schemaByName, "$.fields[?(@.name == \"" + partitionKey + "\")].type.logicalType");
+ String partitionKeyLogicalType = partitionKeyLogicalTypes.isEmpty() ? null : partitionKeyLogicalTypes.get(0);
+
+ String sqlTemplate2 = dslQueryContextTemp.toSql(sqlTemplate, dataSource, partitionKey, partitionKeyLogicalType);
+ String sql = String.format(sqlTemplate2, fields, partitionKey + " DESC ");
+ return sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build());
+ }
+
+ @Override
+ public BaseResult getQueryCountByQueryContext(QueryCache queryCache) {
+ BaseResult<Object> baseResult = queryCache.getBaseResult();
+ Object data = baseResult.getData();
+
+ long totalCount = (data == null || ((List<?>) data).isEmpty()) ? 0 :
+ ((List<Map<String, Object>>) data).stream()
+ .mapToLong(map -> Long.parseLong(String.valueOf(map.getOrDefault("count", 0))))
+ .sum();
+ List<Map<String, Object>> result = Lists.newArrayList();
+ Map<String, Object> map = Maps.newHashMap();
+ map.put("count", totalCount);
+ result.add(map);
+ return BaseResultGenerator.success(null, null, OutputMode.JSON.getValue(), null, result);
+ }
+
+ @Override
+ public BaseResult getQueryTimelineByQueryContext(QueryCache queryCache) {
+ BaseResult<Object> baseResult = queryCache.getBaseResult();
+ return BaseResultGenerator.success(baseResult.getStatistics(), null, baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData());
+ }
+
+ @Autowired
+ public void setDatabaseService(DatabaseService databaseService) {
+ this.databaseService = databaseService;
+ }
+
+ @Autowired
+ public void setEnvironment(Environment environment) {
+ this.environment = environment;
+ }
+
+ @Autowired
+ public void setSQLSyncQueryService(SQLSyncQueryService sqlSyncQueryService) {
+ this.sqlSyncQueryService = sqlSyncQueryService;
+ }
+}
diff --git a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
index e69e7496..e495cf34 100644
--- a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
+++ b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java
@@ -54,10 +54,12 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
private SQLSyncQueryService sqlSyncQueryService;
private JobService jobService;
private JobExecuteService jobExecuteService;
+ private DatabaseService databaseService;
private DSLService dslService;
private TrafficSpectrumDslService trafficSpectrumDslService;
private EngineConfigSource engineConfigSource;
private JobConfig jobConfig;
+ private LogQueryService logQueryService;
@Override
@@ -85,7 +87,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
.format(sqlQueryRequestParam.getOutputMode().getValue())
.build());
} else if (ExecutionMode.NORMAL.equals(execMode)) {
- HazelcastInstanceMapUtil.put(sqlQueryRequestParam.getId(), new QueryCache(sqlQueryRequestParam.getId()));
+ HazelcastInstanceMapUtil.put(sqlQueryRequestParam.getId(), new QueryCache(sqlQueryRequestParam.getId(), null));
jobExecuteService.addExecutorSql(sqlQueryRequestParam);
return BaseResultGenerator.successCreate(buildJobInfoOfCreated(sqlQueryRequestParam.getId()));
} else if (ExecutionMode.BLOCKING.equals(execMode)) {
@@ -124,8 +126,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
|| JobConfig.TRAFFIC_SPECTRUM_APP_DISTRIBUTION.equals(request.getName())
|| JobConfig.TRAFFIC_SPECTRUM_NETWORK_THROUGHPUT_TREND.equals(request.getName())
|| JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(request.getName())) {
- QueryCache queryCache = new QueryCache(request.getId());
- queryCache.setType(request.getName());
+ QueryCache queryCache = new QueryCache(request.getId(), request.getName());
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
CountDownLatch countDownLatch = new CountDownLatch(1);
jobExecuteService.addDslExecutorTrafficSpectrumWithCache(request, countDownLatch);
@@ -154,9 +155,8 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
return dslService.execDsl(dslQueryContext, request.isDryRun());
}
} else if (ExecutionMode.NORMAL.equals(execMode)) {
- QueryCache queryCache = new QueryCache(request.getId());
+ QueryCache queryCache = new QueryCache(request.getId(), request.getName());
if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) {
- queryCache.setType(JobConfig.FIELD_DISCOVERY);
validFieldDiscovery(request);
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
jobExecuteService.addExecutorFieldDiscovery(request);
@@ -174,6 +174,11 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
} else if (JobConfig.CUSTOMIZED_STATISTICS.equals(request.getName())) {
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
jobExecuteService.addDslExecutorCustomizedStatisticsWithCache(request, null);
+ } else if (JobConfig.LOG_QUERY.equals(request.getName())) {
+ queryCache.setType(JobConfig.LOG_QUERY);
+ queryCache.setRequestParam(request);
+ HazelcastInstanceMapUtil.put(request.getId(), queryCache);
+ jobExecuteService.addDslExecutorLogQueryWithCache(request, null);
} else {
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
jobExecuteService.addExecutorDsl(request);
@@ -184,7 +189,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
execAdnWaitFieldDiscoveryDone(request.getId(), request);
return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId()));
} else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) {
- QueryCache queryCache = new QueryCache(request.getId());
+ QueryCache queryCache = new QueryCache(request.getId(), null);
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
CountDownLatch countDownLatch = new CountDownLatch(1);
jobExecuteService.addExecutorDslPacketCombineWithCache(request, countDownLatch);
@@ -203,8 +208,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
|| JobConfig.TRAFFIC_SPECTRUM_APP_DISTRIBUTION.equals(request.getName())
|| JobConfig.TRAFFIC_SPECTRUM_NETWORK_THROUGHPUT_TREND.equals(request.getName())
|| JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(request.getName())) {
- QueryCache queryCache = new QueryCache(request.getId());
- queryCache.setType(request.getName());
+ QueryCache queryCache = new QueryCache(request.getId(), request.getName());
HazelcastInstanceMapUtil.put(request.getId(), queryCache);
CountDownLatch countDownLatch = new CountDownLatch(1);
jobExecuteService.addDslExecutorTrafficSpectrumWithCache(request, countDownLatch);
@@ -231,8 +235,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
}
private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) {
- QueryCache queryCacheStart = new QueryCache(id);
- queryCacheStart.setType(JobConfig.FIELD_DISCOVERY);
+ QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY);
validFieldDiscovery(request);
HazelcastInstanceMapUtil.put(id, queryCacheStart);
Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscovery(request);
@@ -245,7 +248,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
}
private QueryCache buildQueryCacheOfDone(String id, long start, BaseResult baseResult) {
- QueryCache queryCache = new QueryCache(id);
+ QueryCache queryCache = new QueryCache(id, null);
Map<String, Object> jobInfo = queryCache.getBaseResult().getJob();
jobInfo.put(JobConfig.IS_DONE, true);
jobInfo.put(JobConfig.DONE_PROGRESS, 1);
@@ -270,12 +273,45 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
queryCache = rebuildFieldDiscoveryQueryCache(queryCache);
} else if (JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(queryCache.getType())) {
queryCache = rebuildTrafficSpectrumCIPConnectAppUsageQueryCache(queryCache);
+ } else if (JobConfig.LOG_QUERY.equals(queryCache.getType())) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "The log-query DSL does not have a result API."));
}
BaseResult<Object> baseResult = queryCache.getBaseResult();
return BaseResultGenerator.success(baseResult.getStatistics(), baseResult.getJob(), baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData());
}
@Override
+ public BaseResult getAdHocQueryTimelineById(String id) {
+ QueryCache queryCache = getAdhocQueryCacheAndRefresh(id);
+ if (queryCache == null) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired."));
+ }
+ return logQueryService.getQueryTimelineByQueryContext(queryCache);
+ }
+
+ @Override
+ public BaseResult getAdHocQueryCountById(String id) {
+ QueryCache queryCache = getAdhocQueryCacheAndRefresh(id);
+ if (queryCache == null) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired."));
+ }
+ return logQueryService.getQueryCountByQueryContext(queryCache);
+ }
+
+ @Override
+ public BaseResult getAdHocQueryListById(String id, String fields, int limit, int offset) {
+ QueryCache queryCache = getAdhocQueryCacheAndRefresh(id);
+ if (queryCache == null) {
+ throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(),
+ String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired."));
+ }
+ return logQueryService.getQueryListByQueryContext(queryCache, fields, limit, offset);
+ }
+
+ @Override
public BaseResult getSavedQueryResultById(String id) {
return jobService.getSavedQueryResult(id);
}
@@ -505,4 +541,14 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware {
public void setJobConfig(JobConfig jobConfig) {
this.jobConfig = jobConfig;
}
+
+ @Autowired
+ public void setDatabaseService(DatabaseService databaseService) {
+ this.databaseService = databaseService;
+ }
+
+ @Autowired
+ public void setDatabaseService(LogQueryService logQueryService) {
+ this.logQueryService = logQueryService;
+ }
} \ No newline at end of file
diff --git a/src/main/java/com/mesalab/services/configuration/JobConfig.java b/src/main/java/com/mesalab/services/configuration/JobConfig.java
index b082ecb8..ee9af771 100644
--- a/src/main/java/com/mesalab/services/configuration/JobConfig.java
+++ b/src/main/java/com/mesalab/services/configuration/JobConfig.java
@@ -32,6 +32,9 @@ public class JobConfig {
public static final String LINKS = "links";
public static final String LINKS_STATUS = "status";
public static final String LINKS_RESULT = "result";
+ public static final String LINKS_TIMELINE = "timeline";
+ public static final String LINKS_LIST = "list";
+ public static final String LINKS_COUNT = "count";
public static final String DURATION_TIME = "duration_time";
public static final String LAST_QUERY_TIME = "last_query_time";
public static final String LONG_TERM_RESULT = "result";
@@ -53,6 +56,7 @@ public class JobConfig {
public static final String QUERY_DATA_SOURCE = "query.data_source";
public static final String SAVED_QUERY = "saved_query";
public static final String FIELD_DISCOVERY = "field_discovery";
+ public static final String LOG_QUERY = "log-query";
public static final String DATAPATH_PACKET_COMBINE = "datapath_telemetry_packet_combine";
public static final String TRAFFIC_SPECTRUM_SUMMARY = "traffic-spectrum-summary";
public static final String TRAFFIC_SPECTRUM_UNIQUE_CLIENT_AND_SERVER_IPS = "traffic-spectrum-unique-client-and-server-ips";
diff --git a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
index 4758e573..cf0ae5d5 100644
--- a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
+++ b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java
@@ -21,6 +21,7 @@ import com.mesalab.common.enums.QueryOption;
import com.mesalab.common.enums.HttpStatusCodeEnum;
import com.mesalab.common.exception.BusinessException;
import com.mesalab.common.utils.HazelcastInstanceMapUtil;
+import com.mesalab.common.utils.sqlparser.AutoPeriodHelper;
import com.mesalab.common.utils.sqlparser.SQLFunctionUtil;
import com.mesalab.common.utils.sqlparser.SQLHelper;
import com.mesalab.qgw.constant.DslIdentifierNameConst;
@@ -79,6 +80,7 @@ public class JobExecuteService implements EnvironmentAware {
private DSLService dslService;
private PacketCombineDslService packetCombineDslService;
private TrafficSpectrumDslService trafficSpectrumDslService;
+ private LogQueryService logQueryService;
private EngineConfigSource engineConfigSource;
private JobConfig jobCfg;
private CustomizedStatisticsService customizedStatisticsService;
@@ -209,6 +211,24 @@ public class JobExecuteService implements EnvironmentAware {
}
}
+ @Async("lightWeightThreadPool")
+ public void addDslExecutorLogQueryWithCache(DSLQueryRequestParam request, CountDownLatch countDownLatch) {
+ try {
+ markJobBegin(request.getId());
+ updateJobResultOnQueryCache(request.getId(), logQueryService.run(request), null);
+ } catch (RuntimeException e) {
+ markJobFailure(request.getId(), e.getMessage());
+ } finally {
+ try {
+ markJobCompletion(request.getId());
+ } finally {
+ if (countDownLatch != null) {
+ countDownLatch.countDown();
+ }
+ }
+ }
+ }
+
public BaseResult addDslExecutorTrafficSpectrumWithoutCache(DSLQueryRequestParam request) {
try {
@@ -970,4 +990,9 @@ public class JobExecuteService implements EnvironmentAware {
public void setJobCfg(CustomizedStatisticsService customizedStatisticsService) {
this.customizedStatisticsService = customizedStatisticsService;
}
+
+ @Autowired
+ public void setLogQueryService(LogQueryService logQueryService) {
+ this.logQueryService = logQueryService;
+ }
}
diff --git a/src/main/resources/dsl-sql-template.sql b/src/main/resources/dsl-sql-template.sql
index e94ba6c6..19d31808 100644
--- a/src/main/resources/dsl-sql-template.sql
+++ b/src/main/resources/dsl-sql-template.sql
@@ -54,4 +54,10 @@ SELECT %s FROM $table WHERE $intervals_and_filter %s %s %s $limit
#end
#sql("CUSTOMIZED_STATISTICS_SUBQUERY_TOPK")
(%s) IN (SELECT %s FROM $table WHERE $intervals_and_filter %s %s $limit)
+#end
+#sql("LOG_QUERY_TIMELINE")
+SELECT FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(%s, '$granularity', 'zero')) AS stat_time, COUNT(1) AS count %s FROM $table WHERE $intervals_and_filter GROUP BY stat_time %s
+#end
+#sql("LOG_QUERY_LIST")
+SELECT %s FROM $table WHERE $intervals_and_filter ORDER BY %s $limit
#end \ No newline at end of file