diff options
| author | wangwei <[email protected]> | 2024-09-20 19:06:07 +0800 |
|---|---|---|
| committer | wangwei <[email protected]> | 2024-09-20 19:06:07 +0800 |
| commit | a576ea7edd47a4720b6641af1279bbf5c8aff4eb (patch) | |
| tree | f73b8760ee24c7b1d32a987f7cb8ff1af191f31b | |
| parent | 16f7ce1b76e8f87bd370a94e2af67ca77ccc18e8 (diff) | |
[Feature][log-query] 支持日志DSL查询功能(基础功能版)
10 files changed, 362 insertions, 20 deletions
diff --git a/config/flyway/tsg/R__init_datasets.sql b/config/flyway/tsg/R__init_datasets.sql index c9170853..e9659b1f 100644 --- a/config/flyway/tsg/R__init_datasets.sql +++ b/config/flyway/tsg/R__init_datasets.sql @@ -156,6 +156,7 @@ INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-record-count', 'datapath_telemetry_record', 'qgw', 'sql', '{ "statement": "SELECT count(1) as count FROM datapath_telemetry_record WHERE recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND vsys_id in(${vsys_id}) AND ( ${filter})" }',null); INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-record-list', 'datapath_telemetry_record', 'qgw', 'sql', '{ "statement": "SELECT ${columns} FROM datapath_telemetry_record WHERE recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND vsys_id IN(${vsys_id}) AND (${filter}) ORDER BY timestamp_us ASC LIMIT ${limit}" }',null); INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('datapath-telemetry-packet-combine', 'datapath_telemetry_record', 'qgw', 'dsl', '{"id":"${job_id}","name":"datapath_telemetry_packet_combine","data_source":"datapath_telemetry_record","filter":"job_id=''${job_id}'' AND vsys_id in (${vsys_id}) AND (${filter})"}',null); +INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('log-query', 'statistics', 'qgw', 'dsl', '{"name": "log-query", "data_source": "${source}", "filter": "vsys_id in (${vsys_id}) AND (${filter})", "intervals": ["${start_time}/${end_time}" ] }',null); INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-discovery', 'statistics', 'qgw', 'dsl', '{ "name": "field_discovery", "data_source": "${source}", "filter": "vsys_id in (${vsys_id}) AND (${filter})", "custom.field_discovery.metric": "${metric}", "custom.field_discovery.metric.fn": "${fn}", "custom.field_discovery.fields": ["${field_list}"] }',null); INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-statistics-top-values', 'statistics', 'qgw', 'sql', '{ "statement":"SELECT ${column_name}, count(*) as cnt FROM ${source} where recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND ${filter} GROUP BY ${column_name} ORDER BY cnt DESC LIMIT ${limit}" }',null); INSERT INTO `dataset` (`identifier_name`, `category`, `backend_engine`, `type`, `template`, `description`) VALUES ('field-statistics-rare-values', 'statistics', 'qgw', 'sql', '{ "statement":"SELECT ${column_name}, count(*) as cnt FROM ${source} where recv_time >= UNIX_TIMESTAMP(''${start_time}'') AND recv_time < UNIX_TIMESTAMP(''${end_time}'') AND ${filter} GROUP BY ${column_name} ORDER BY cnt ASC LIMIT ${limit}" }',null); diff --git a/src/main/java/com/mesalab/qgw/controller/QueryController.java b/src/main/java/com/mesalab/qgw/controller/QueryController.java index be534094..4448cf83 100644 --- a/src/main/java/com/mesalab/qgw/controller/QueryController.java +++ b/src/main/java/com/mesalab/qgw/controller/QueryController.java @@ -115,6 +115,43 @@ public class QueryController { return deferredResult; } + @GetMapping(value = "/v1/query/job/{id}/timeline", consumes = "application/x-www-form-urlencoded") + @AuditLog("QueryController.getJobTimelineById") + public BaseResult getJobTimelineById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) { + if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported.")); + } else { + return queryJobService.getAdHocQueryTimelineById(id); + } + } + + @GetMapping(value = "/v1/query/job/{id}/count", consumes = "application/x-www-form-urlencoded") + @AuditLog("QueryController.getJobTimelineById") + public BaseResult getJobCountById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) { + if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported.")); + } else { + return queryJobService.getAdHocQueryCountById(id); + } + } + + @GetMapping(value = "/v1/query/job/{id}/list", consumes = "application/x-www-form-urlencoded") + @AuditLog("QueryController.getJobTimelineById") + public BaseResult getJobListById(@PathVariable String id + , @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery + , @RequestParam(value = "fields", required = false, defaultValue = "*") String fields + , @RequestParam(value = "limit", required = false, defaultValue = "10") Integer limit + , @RequestParam(value = "offset", required = false, defaultValue = "0") Integer offset) { + if (BooleanUtil.toBoolean(String.valueOf(isSavedQuery))) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "Currently, the DSL Saved Query is not supported.")); + } else { + return queryJobService.getAdHocQueryListById(id, fields, limit, offset); + } + } + @GetMapping(value = "/v1/query/job/{id}", consumes = "application/x-www-form-urlencoded") @AuditLog("QueryController.getJobStatusById") public DeferredResult<BaseResult> getJobStatusById(@PathVariable String id, @RequestParam(value = "is_saved_query", required = false, defaultValue = "0") Integer isSavedQuery) { diff --git a/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java b/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java index aed03fd1..4ebab8e7 100644 --- a/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java +++ b/src/main/java/com/mesalab/qgw/model/basic/QueryCache.java @@ -6,6 +6,7 @@ import com.mesalab.services.configuration.JobConfig; import lombok.Data; import java.io.Serializable; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -18,10 +19,18 @@ import java.util.Map; @Data public class QueryCache implements Serializable { private String type; + private Object requestParam; private Long latestQueryTimeMs = System.currentTimeMillis(); private BaseResult<Object> baseResult; - public QueryCache(String jobId) { + + public QueryCache(String jobId, String type) { this.baseResult = new BaseResult<>(); + this.type = type; + Map<String, Object> job = createJobMap(jobId); + this.baseResult.setJob(job); + } + + private Map<String, Object> createJobMap(String jobId) { Map<String, Object> job = Maps.newLinkedHashMap(); job.put(JobConfig.JOB_ID, jobId); job.put(JobConfig.IS_DONE, false); @@ -31,10 +40,22 @@ public class QueryCache implements Serializable { job.put(JobConfig.REASON, null); job.put(JobConfig.START_TIME, null); job.put(JobConfig.END_TIME, null); - Map<String, Object> links = Maps.newLinkedHashMap(); - links.put(JobConfig.LINKS_STATUS, "/v1/query/job/" + jobId); - links.put(JobConfig.LINKS_RESULT, "/v1/query/job/" + jobId + "/result"); + Map<String, Object> links = createLinksMap(jobId); job.put(JobConfig.LINKS, links); - this.baseResult.setJob(job); + return job; + } + + private Map<String, Object> createLinksMap(String jobId) { + Map<String, Object> links = new LinkedHashMap<>(); + links.put(JobConfig.LINKS_STATUS, "/v1/query/job/" + jobId); + if (JobConfig.LOG_QUERY.equals(type)) { + links.put(JobConfig.LINKS_COUNT, "/v1/query/job/" + jobId + "/count"); + links.put(JobConfig.LINKS_LIST, "/v1/query/job/" + jobId + "/list"); + links.put(JobConfig.LINKS_TIMELINE, "/v1/query/job/" + jobId + "/timeline"); + } else { + links.put(JobConfig.LINKS_RESULT, "/v1/query/job/" + jobId + "/result"); + } + + return links; } } diff --git a/src/main/java/com/mesalab/qgw/service/LogQueryService.java b/src/main/java/com/mesalab/qgw/service/LogQueryService.java new file mode 100644 index 00000000..ac2a0dae --- /dev/null +++ b/src/main/java/com/mesalab/qgw/service/LogQueryService.java @@ -0,0 +1,21 @@ +package com.mesalab.qgw.service; + +import com.mesalab.common.entity.BaseResult; +import com.mesalab.qgw.model.basic.DSLQueryRequestParam; +import com.mesalab.qgw.model.basic.QueryCache; + +/** + * TODO + * + * @Classname LogQueryService + * @Date 2024/9/18 11:38 + * @Author wWei + */ +public interface LogQueryService { + + BaseResult run(DSLQueryRequestParam param); + + BaseResult getQueryListByQueryContext(QueryCache queryCache, String fields, int limit, int offset); + BaseResult getQueryCountByQueryContext(QueryCache queryCache); + BaseResult getQueryTimelineByQueryContext(QueryCache queryCache); +} diff --git a/src/main/java/com/mesalab/qgw/service/QueryJobService.java b/src/main/java/com/mesalab/qgw/service/QueryJobService.java index ecb99f61..a991b043 100644 --- a/src/main/java/com/mesalab/qgw/service/QueryJobService.java +++ b/src/main/java/com/mesalab/qgw/service/QueryJobService.java @@ -7,7 +7,6 @@ import com.mesalab.qgw.model.basic.SqlQueryRequestParam; import java.util.List; /** - * * @Date 2023/12/6 18:09 * @Author wWei */ @@ -16,6 +15,7 @@ public interface QueryJobService { /** * Create an SQL saved query and return the query job id. It will storage the query job and results in the database. * The query job will be executed in the background through the SAVED-QUERY-SCHEDULER. + * * @param sqlQueryRequestParam * @return {@link BaseResult} * @created by wWei @@ -26,9 +26,10 @@ public interface QueryJobService { /** * Create an SQL ad-hoc query and return the query job id. The query will be executed immediately. * It Support three modes of execution: NORMAL, BLOCKING, and ONESHOT. - * NORMAL: The query will be executed in the asynchronous thread pool. - * BLOCKING: The query will return the jobID when the query is completed (Sync Mode). - * ONESHOT: The query will return the results in the same call (Sync Mode). Does not return the job ID. + * NORMAL: The query will be executed in the asynchronous thread pool. + * BLOCKING: The query will return the jobID when the query is completed (Sync Mode). + * ONESHOT: The query will return the results in the same call (Sync Mode). Does not return the job ID. + * * @param sqlQueryRequestParam * @return {@link BaseResult} * @created by wWei @@ -39,6 +40,7 @@ public interface QueryJobService { /** * Create a DSL ad-hoc query and return the query job id. The query will be executed immediately. * It Support three modes of execution: NORMAL, BLOCKING, and ONESHOT[Recommend]. + * * @param dslQueryRequestParam * @return {@link BaseResult} * @return @@ -47,6 +49,12 @@ public interface QueryJobService { BaseResult getAdHocQueryResultById(String id); + BaseResult getAdHocQueryTimelineById(String id); + + BaseResult getAdHocQueryCountById(String id); + + BaseResult getAdHocQueryListById(String id, String fields, int limit, int offset); + BaseResult getSavedQueryResultById(String id); BaseResult getAdHocQueryStatusById(String id); diff --git a/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java new file mode 100644 index 00000000..8e7a804e --- /dev/null +++ b/src/main/java/com/mesalab/qgw/service/impl/LogQueryServiceImpl.java @@ -0,0 +1,173 @@ +package com.mesalab.qgw.service.impl; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.jayway.jsonpath.JsonPath; +import com.mesalab.common.entity.BaseResult; +import com.mesalab.common.entity.BaseResultGenerator; +import com.mesalab.common.enums.OutputMode; +import com.mesalab.qgw.model.basic.DSLQueryContext; +import com.mesalab.qgw.model.basic.DSLQueryRequestParam; +import com.mesalab.qgw.model.basic.QueryCache; +import com.mesalab.qgw.model.basic.SQLQueryContext; +import com.mesalab.qgw.service.DatabaseService; +import com.mesalab.qgw.service.LogQueryService; +import com.mesalab.qgw.service.SQLSyncQueryService; +import org.joda.time.Period; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * TODO + * + * @Classname LogQueryServiceImpl + * @Date 2024/9/18 11:41 + * @Author wWei + */ +@Service("logQueryService") +public class LogQueryServiceImpl implements LogQueryService { + private static final String LOG_DEFAULT_LOGICAL_TYPE = "unix_timestamp"; + private DatabaseService databaseService; + private Environment environment; + private SQLSyncQueryService sqlSyncQueryService; + + private static final Map<String, String> TIMELINE_SOURCE_TYPE_MAP = new HashMap<>(); + + static { + TIMELINE_SOURCE_TYPE_MAP.put("session_record", ", decoded_as AS type"); + TIMELINE_SOURCE_TYPE_MAP.put("voip_record", ", decoded_as AS type"); + TIMELINE_SOURCE_TYPE_MAP.put("security_event", ", security_action AS type"); + TIMELINE_SOURCE_TYPE_MAP.put("proxy_event", ", proxy_action AS type"); + TIMELINE_SOURCE_TYPE_MAP.put("dos_event", ", attack_type AS type"); + } + + @Override + public BaseResult run(DSLQueryRequestParam param) { + String partitionKey = databaseService.getPartitionKey(param.getDataSource()); + LinkedHashMap<String, Object> schemaByName = databaseService.getSchemaByName(param.getDataSource()); + List<String> partitionKeyLogicalTypes = JsonPath.read(schemaByName, "$.fields[?(@.name == \"" + partitionKey + "\")].type.logicalType"); + String partitionKeyLogicalType = partitionKeyLogicalTypes.isEmpty() ? LOG_DEFAULT_LOGICAL_TYPE : partitionKeyLogicalTypes.get(0); + DSLQueryContext dslQueryContext = BeanUtil.copyProperties(param, DSLQueryContext.class); + String sqlTemplate = dslQueryContext.toSql( + environment.getProperty("LOG_QUERY_TIMELINE") + , param.getDataSource() + , databaseService.getPartitionKey(param.getDataSource()) + , partitionKeyLogicalType); + String timelineType = TIMELINE_SOURCE_TYPE_MAP.getOrDefault(param.getDataSource(), ""); + String sql = String.format(sqlTemplate, partitionKey, timelineType, timelineType.isEmpty() ? "" : ", type"); + return sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build()); + } + + @Override + public BaseResult getQueryListByQueryContext(QueryCache queryCache, String fields, int limit, int offset) { + BaseResult<Object> baseResult = queryCache.getBaseResult(); + Object dataObj = baseResult.getData(); + if (dataObj == null || ((List) dataObj).isEmpty()) { + return BaseResultGenerator.success(null, null, OutputMode.JSON.getValue(), null, Lists.newArrayList()); + } + List<Map<String, Object>> dataList = (List<Map<String, Object>>) dataObj; + dataList = dataList.stream() + .sorted((map1, map2) -> { + DateTime time1 = DateUtil.parse(String.valueOf(map1.get("stat_time"))); + DateTime time2 = DateUtil.parse(String.valueOf(map2.get("stat_time"))); + return time2.compareTo(time1); // 降序排序 + }) + .collect(Collectors.toList()); + AtomicLong totalCount = new AtomicLong(); + DateTime cursorStartTime = null; + DateTime cursorEndTime = null; + for (Map<String, Object> item : dataList) { + long itemCount = Long.parseLong(String.valueOf(item.getOrDefault("count", 0))); + DateTime itemTime = DateUtil.parse(String.valueOf(item.get("stat_time"))); + if (itemCount > 0 && totalCount.get() == 0) { + cursorEndTime = itemTime; + } + totalCount.addAndGet(itemCount); + cursorStartTime = itemTime; + if (totalCount.get() >= (limit + offset)) { + break; + } + } + DSLQueryRequestParam requestParam = BeanUtil.copyProperties(queryCache.getRequestParam(), DSLQueryRequestParam.class); + String granularity = requestParam.getGranularity(); + DSLQueryContext dslQueryContextTemp = BeanUtil.copyProperties(requestParam, DSLQueryContext.class); + List<String> intervals = dslQueryContextTemp.getIntervals(); + String[] split = intervals.get(0).split("/"); + DateTime start = DateUtil.parse(split[0]); + DateTime end = DateUtil.parse(split[1]); + + int seconds = Period.parse(granularity).toStandardSeconds().getSeconds(); + DateTime dateTime = DateUtil.offsetSecond(cursorEndTime, seconds); + if (DateUtil.compare(end, dateTime) > 0) { + cursorEndTime = dateTime; + } else { + cursorEndTime = end; + } + + if (DateUtil.compare(start, cursorStartTime) > 0) { + cursorStartTime = start; + } + String startFormat = DateUtil.format(cursorStartTime, DatePattern.UTC_PATTERN); + String endFormat = DateUtil.format(cursorEndTime, DatePattern.UTC_PATTERN); + String dataSource = requestParam.getDataSource(); + String partitionKey = databaseService.getPartitionKey(requestParam.getDataSource()); + List<String> list = Lists.newArrayList(startFormat + "/" + endFormat); + dslQueryContextTemp.setIntervals(list); + dslQueryContextTemp.setLimit(offset + "," + limit); + + String sqlTemplate = environment.getProperty("LOG_QUERY_LIST"); + LinkedHashMap<String, Object> schemaByName = databaseService.getSchemaByName(requestParam.getDataSource()); + List<String> partitionKeyLogicalTypes = JsonPath.read(schemaByName, "$.fields[?(@.name == \"" + partitionKey + "\")].type.logicalType"); + String partitionKeyLogicalType = partitionKeyLogicalTypes.isEmpty() ? null : partitionKeyLogicalTypes.get(0); + + String sqlTemplate2 = dslQueryContextTemp.toSql(sqlTemplate, dataSource, partitionKey, partitionKeyLogicalType); + String sql = String.format(sqlTemplate2, fields, partitionKey + " DESC "); + return sqlSyncQueryService.executeQuery(SQLQueryContext.builder().originalSQL(sql).build()); + } + + @Override + public BaseResult getQueryCountByQueryContext(QueryCache queryCache) { + BaseResult<Object> baseResult = queryCache.getBaseResult(); + Object data = baseResult.getData(); + + long totalCount = (data == null || ((List<?>) data).isEmpty()) ? 0 : + ((List<Map<String, Object>>) data).stream() + .mapToLong(map -> Long.parseLong(String.valueOf(map.getOrDefault("count", 0)))) + .sum(); + List<Map<String, Object>> result = Lists.newArrayList(); + Map<String, Object> map = Maps.newHashMap(); + map.put("count", totalCount); + result.add(map); + return BaseResultGenerator.success(null, null, OutputMode.JSON.getValue(), null, result); + } + + @Override + public BaseResult getQueryTimelineByQueryContext(QueryCache queryCache) { + BaseResult<Object> baseResult = queryCache.getBaseResult(); + return BaseResultGenerator.success(baseResult.getStatistics(), null, baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData()); + } + + @Autowired + public void setDatabaseService(DatabaseService databaseService) { + this.databaseService = databaseService; + } + + @Autowired + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + @Autowired + public void setSQLSyncQueryService(SQLSyncQueryService sqlSyncQueryService) { + this.sqlSyncQueryService = sqlSyncQueryService; + } +} diff --git a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java index e69e7496..e495cf34 100644 --- a/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java +++ b/src/main/java/com/mesalab/qgw/service/impl/QueryJobServiceImpl.java @@ -54,10 +54,12 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { private SQLSyncQueryService sqlSyncQueryService; private JobService jobService; private JobExecuteService jobExecuteService; + private DatabaseService databaseService; private DSLService dslService; private TrafficSpectrumDslService trafficSpectrumDslService; private EngineConfigSource engineConfigSource; private JobConfig jobConfig; + private LogQueryService logQueryService; @Override @@ -85,7 +87,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { .format(sqlQueryRequestParam.getOutputMode().getValue()) .build()); } else if (ExecutionMode.NORMAL.equals(execMode)) { - HazelcastInstanceMapUtil.put(sqlQueryRequestParam.getId(), new QueryCache(sqlQueryRequestParam.getId())); + HazelcastInstanceMapUtil.put(sqlQueryRequestParam.getId(), new QueryCache(sqlQueryRequestParam.getId(), null)); jobExecuteService.addExecutorSql(sqlQueryRequestParam); return BaseResultGenerator.successCreate(buildJobInfoOfCreated(sqlQueryRequestParam.getId())); } else if (ExecutionMode.BLOCKING.equals(execMode)) { @@ -124,8 +126,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { || JobConfig.TRAFFIC_SPECTRUM_APP_DISTRIBUTION.equals(request.getName()) || JobConfig.TRAFFIC_SPECTRUM_NETWORK_THROUGHPUT_TREND.equals(request.getName()) || JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(request.getName())) { - QueryCache queryCache = new QueryCache(request.getId()); - queryCache.setType(request.getName()); + QueryCache queryCache = new QueryCache(request.getId(), request.getName()); HazelcastInstanceMapUtil.put(request.getId(), queryCache); CountDownLatch countDownLatch = new CountDownLatch(1); jobExecuteService.addDslExecutorTrafficSpectrumWithCache(request, countDownLatch); @@ -154,9 +155,8 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { return dslService.execDsl(dslQueryContext, request.isDryRun()); } } else if (ExecutionMode.NORMAL.equals(execMode)) { - QueryCache queryCache = new QueryCache(request.getId()); + QueryCache queryCache = new QueryCache(request.getId(), request.getName()); if (JobConfig.FIELD_DISCOVERY.equals(request.getName())) { - queryCache.setType(JobConfig.FIELD_DISCOVERY); validFieldDiscovery(request); HazelcastInstanceMapUtil.put(request.getId(), queryCache); jobExecuteService.addExecutorFieldDiscovery(request); @@ -174,6 +174,11 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } else if (JobConfig.CUSTOMIZED_STATISTICS.equals(request.getName())) { HazelcastInstanceMapUtil.put(request.getId(), queryCache); jobExecuteService.addDslExecutorCustomizedStatisticsWithCache(request, null); + } else if (JobConfig.LOG_QUERY.equals(request.getName())) { + queryCache.setType(JobConfig.LOG_QUERY); + queryCache.setRequestParam(request); + HazelcastInstanceMapUtil.put(request.getId(), queryCache); + jobExecuteService.addDslExecutorLogQueryWithCache(request, null); } else { HazelcastInstanceMapUtil.put(request.getId(), queryCache); jobExecuteService.addExecutorDsl(request); @@ -184,7 +189,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { execAdnWaitFieldDiscoveryDone(request.getId(), request); return BaseResultGenerator.successCreate(buildJobInfoOfCreated(request.getId())); } else if (JobConfig.DATAPATH_PACKET_COMBINE.equals(request.getName())) { - QueryCache queryCache = new QueryCache(request.getId()); + QueryCache queryCache = new QueryCache(request.getId(), null); HazelcastInstanceMapUtil.put(request.getId(), queryCache); CountDownLatch countDownLatch = new CountDownLatch(1); jobExecuteService.addExecutorDslPacketCombineWithCache(request, countDownLatch); @@ -203,8 +208,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { || JobConfig.TRAFFIC_SPECTRUM_APP_DISTRIBUTION.equals(request.getName()) || JobConfig.TRAFFIC_SPECTRUM_NETWORK_THROUGHPUT_TREND.equals(request.getName()) || JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(request.getName())) { - QueryCache queryCache = new QueryCache(request.getId()); - queryCache.setType(request.getName()); + QueryCache queryCache = new QueryCache(request.getId(), request.getName()); HazelcastInstanceMapUtil.put(request.getId(), queryCache); CountDownLatch countDownLatch = new CountDownLatch(1); jobExecuteService.addDslExecutorTrafficSpectrumWithCache(request, countDownLatch); @@ -231,8 +235,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } private void execAdnWaitFieldDiscoveryDone(String id, DSLQueryRequestParam request) { - QueryCache queryCacheStart = new QueryCache(id); - queryCacheStart.setType(JobConfig.FIELD_DISCOVERY); + QueryCache queryCacheStart = new QueryCache(id, JobConfig.FIELD_DISCOVERY); validFieldDiscovery(request); HazelcastInstanceMapUtil.put(id, queryCacheStart); Future<Boolean> booleanFuture = jobExecuteService.addExecutorFieldDiscovery(request); @@ -245,7 +248,7 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { } private QueryCache buildQueryCacheOfDone(String id, long start, BaseResult baseResult) { - QueryCache queryCache = new QueryCache(id); + QueryCache queryCache = new QueryCache(id, null); Map<String, Object> jobInfo = queryCache.getBaseResult().getJob(); jobInfo.put(JobConfig.IS_DONE, true); jobInfo.put(JobConfig.DONE_PROGRESS, 1); @@ -270,12 +273,45 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { queryCache = rebuildFieldDiscoveryQueryCache(queryCache); } else if (JobConfig.TRAFFIC_SPECTRUM_CLIENT_IP_CONNECT_APPLICATION_USAGE.equals(queryCache.getType())) { queryCache = rebuildTrafficSpectrumCIPConnectAppUsageQueryCache(queryCache); + } else if (JobConfig.LOG_QUERY.equals(queryCache.getType())) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "The log-query DSL does not have a result API.")); } BaseResult<Object> baseResult = queryCache.getBaseResult(); return BaseResultGenerator.success(baseResult.getStatistics(), baseResult.getJob(), baseResult.getOutputMode(), baseResult.getMeta(), baseResult.getData()); } @Override + public BaseResult getAdHocQueryTimelineById(String id) { + QueryCache queryCache = getAdhocQueryCacheAndRefresh(id); + if (queryCache == null) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired.")); + } + return logQueryService.getQueryTimelineByQueryContext(queryCache); + } + + @Override + public BaseResult getAdHocQueryCountById(String id) { + QueryCache queryCache = getAdhocQueryCacheAndRefresh(id); + if (queryCache == null) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired.")); + } + return logQueryService.getQueryCountByQueryContext(queryCache); + } + + @Override + public BaseResult getAdHocQueryListById(String id, String fields, int limit, int offset) { + QueryCache queryCache = getAdhocQueryCacheAndRefresh(id); + if (queryCache == null) { + throw new QGWBusinessException(HttpStatusCodeEnum.BAD_REQUEST.getCode(), CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getCode(), + String.format(CommonErrorCode.BAD_REQUEST_PARAM_SYNTAX_EXCEPTION.getMessage(), "No job found for ID " + id + ", or the cache has expired.")); + } + return logQueryService.getQueryListByQueryContext(queryCache, fields, limit, offset); + } + + @Override public BaseResult getSavedQueryResultById(String id) { return jobService.getSavedQueryResult(id); } @@ -505,4 +541,14 @@ public class QueryJobServiceImpl implements QueryJobService, EnvironmentAware { public void setJobConfig(JobConfig jobConfig) { this.jobConfig = jobConfig; } + + @Autowired + public void setDatabaseService(DatabaseService databaseService) { + this.databaseService = databaseService; + } + + @Autowired + public void setDatabaseService(LogQueryService logQueryService) { + this.logQueryService = logQueryService; + } }
\ No newline at end of file diff --git a/src/main/java/com/mesalab/services/configuration/JobConfig.java b/src/main/java/com/mesalab/services/configuration/JobConfig.java index b082ecb8..ee9af771 100644 --- a/src/main/java/com/mesalab/services/configuration/JobConfig.java +++ b/src/main/java/com/mesalab/services/configuration/JobConfig.java @@ -32,6 +32,9 @@ public class JobConfig { public static final String LINKS = "links"; public static final String LINKS_STATUS = "status"; public static final String LINKS_RESULT = "result"; + public static final String LINKS_TIMELINE = "timeline"; + public static final String LINKS_LIST = "list"; + public static final String LINKS_COUNT = "count"; public static final String DURATION_TIME = "duration_time"; public static final String LAST_QUERY_TIME = "last_query_time"; public static final String LONG_TERM_RESULT = "result"; @@ -53,6 +56,7 @@ public class JobConfig { public static final String QUERY_DATA_SOURCE = "query.data_source"; public static final String SAVED_QUERY = "saved_query"; public static final String FIELD_DISCOVERY = "field_discovery"; + public static final String LOG_QUERY = "log-query"; public static final String DATAPATH_PACKET_COMBINE = "datapath_telemetry_packet_combine"; public static final String TRAFFIC_SPECTRUM_SUMMARY = "traffic-spectrum-summary"; public static final String TRAFFIC_SPECTRUM_UNIQUE_CLIENT_AND_SERVER_IPS = "traffic-spectrum-unique-client-and-server-ips"; diff --git a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java index 4758e573..cf0ae5d5 100644 --- a/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java +++ b/src/main/java/com/mesalab/services/service/impl/JobExecuteService.java @@ -21,6 +21,7 @@ import com.mesalab.common.enums.QueryOption; import com.mesalab.common.enums.HttpStatusCodeEnum; import com.mesalab.common.exception.BusinessException; import com.mesalab.common.utils.HazelcastInstanceMapUtil; +import com.mesalab.common.utils.sqlparser.AutoPeriodHelper; import com.mesalab.common.utils.sqlparser.SQLFunctionUtil; import com.mesalab.common.utils.sqlparser.SQLHelper; import com.mesalab.qgw.constant.DslIdentifierNameConst; @@ -79,6 +80,7 @@ public class JobExecuteService implements EnvironmentAware { private DSLService dslService; private PacketCombineDslService packetCombineDslService; private TrafficSpectrumDslService trafficSpectrumDslService; + private LogQueryService logQueryService; private EngineConfigSource engineConfigSource; private JobConfig jobCfg; private CustomizedStatisticsService customizedStatisticsService; @@ -209,6 +211,24 @@ public class JobExecuteService implements EnvironmentAware { } } + @Async("lightWeightThreadPool") + public void addDslExecutorLogQueryWithCache(DSLQueryRequestParam request, CountDownLatch countDownLatch) { + try { + markJobBegin(request.getId()); + updateJobResultOnQueryCache(request.getId(), logQueryService.run(request), null); + } catch (RuntimeException e) { + markJobFailure(request.getId(), e.getMessage()); + } finally { + try { + markJobCompletion(request.getId()); + } finally { + if (countDownLatch != null) { + countDownLatch.countDown(); + } + } + } + } + public BaseResult addDslExecutorTrafficSpectrumWithoutCache(DSLQueryRequestParam request) { try { @@ -970,4 +990,9 @@ public class JobExecuteService implements EnvironmentAware { public void setJobCfg(CustomizedStatisticsService customizedStatisticsService) { this.customizedStatisticsService = customizedStatisticsService; } + + @Autowired + public void setLogQueryService(LogQueryService logQueryService) { + this.logQueryService = logQueryService; + } } diff --git a/src/main/resources/dsl-sql-template.sql b/src/main/resources/dsl-sql-template.sql index e94ba6c6..19d31808 100644 --- a/src/main/resources/dsl-sql-template.sql +++ b/src/main/resources/dsl-sql-template.sql @@ -54,4 +54,10 @@ SELECT %s FROM $table WHERE $intervals_and_filter %s %s %s $limit #end #sql("CUSTOMIZED_STATISTICS_SUBQUERY_TOPK") (%s) IN (SELECT %s FROM $table WHERE $intervals_and_filter %s %s $limit) +#end +#sql("LOG_QUERY_TIMELINE") +SELECT FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(%s, '$granularity', 'zero')) AS stat_time, COUNT(1) AS count %s FROM $table WHERE $intervals_and_filter GROUP BY stat_time %s +#end +#sql("LOG_QUERY_LIST") +SELECT %s FROM $table WHERE $intervals_and_filter ORDER BY %s $limit #end
\ No newline at end of file |
