summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘永强 <[email protected]>2020-08-28 18:14:44 +0800
committer刘永强 <[email protected]>2020-08-28 18:14:44 +0800
commitf34a66a121a77a30539e7e519b9dc0ecdb413206 (patch)
treeeb2d76b1e8553062e3f9c144a2ea2e1bea0cb6a6
parent6fe6e50eb2d7dbb7417a99535a144da63376f912 (diff)
add TestCaseController
-rw-r--r--galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java30
-rw-r--r--galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java45
-rw-r--r--galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java20
-rw-r--r--galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java107
-rw-r--r--galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java123
-rw-r--r--galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java617
6 files changed, 942 insertions, 0 deletions
diff --git a/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java
new file mode 100644
index 0000000..421fb63
--- /dev/null
+++ b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java
@@ -0,0 +1,30 @@
+package com.mesalab.common.enums;
+
+import lombok.Getter;
+
+/**
+ * 运行环境枚举
+ *
+ * @author dazzlzy
+ * @date 2018/5/26
+ */
+@Getter
+public enum EnvironmentEnum {
+
+ /**
+ * 开发环境
+ */
+ DEV("dev"),
+ /**
+ * 生产环境
+ */
+ PROD("prod"),;
+
+ private String name;
+
+ EnvironmentEnum(String name) {
+ this.name = name;
+ }
+
+
+}
diff --git a/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java
new file mode 100644
index 0000000..5e433e0
--- /dev/null
+++ b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java
@@ -0,0 +1,45 @@
+package com.mesalab.common.enums;
+
+import lombok.Getter;
+
+/**
+ * 运行环境组枚举
+ *
+ * @author dazzlzy
+ * @date 2018/5/26
+ */
+@Getter
+public enum EnvironmentGroupEnum {
+
+ /**
+ * RUNTIME运行环境组:
+ * 1. DEV(开发环境)
+ * 2. PROD(生产环境)
+ */
+ RUNTIME(new EnvironmentEnum[]{EnvironmentEnum.DEV, EnvironmentEnum.PROD}),;
+
+ /**
+ * 运行环境
+ */
+ private EnvironmentEnum[] environments;
+
+ EnvironmentGroupEnum(EnvironmentEnum[] environments) {
+ this.environments = environments;
+ }
+
+ /**
+ * 是否是runtime运行环境组
+ *
+ * @param s 环境名
+ * @return boolean
+ */
+ public static boolean isRuntime(String s) {
+ EnvironmentEnum[] environmentEnums = RUNTIME.getEnvironments();
+ for (EnvironmentEnum environmentEnum : environmentEnums) {
+ if (environmentEnum.getName().equals(s)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java
new file mode 100644
index 0000000..7b1ad67
--- /dev/null
+++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java
@@ -0,0 +1,20 @@
+package com.mesalab.engine.component.configuration;
+
+import lombok.Data;
+
+/**
+ * 项目作者联系方式
+ *
+ * @author dazzlzy
+ * @date 2018/5/26
+ */
+@Data
+public class ProjectAuthorProperties {
+
+ private String name;
+
+ private String url;
+
+ private String email;
+
+}
diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java
new file mode 100644
index 0000000..8ab3250
--- /dev/null
+++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java
@@ -0,0 +1,107 @@
+package com.mesalab.engine.component.configuration;
+
+import com.mesalab.common.enums.EnvironmentEnum;
+import com.mesalab.common.enums.EnvironmentGroupEnum;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Repository;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 项目配置
+ *
+ * @author dazzlzy
+ * @date 2018/5/26
+ */
+@Data
+@Repository
+@ConfigurationProperties("project")
+public class ProjectProperties {
+
+ /**
+ * 工程名
+ */
+ private String name;
+
+ /**
+ * 版本
+ */
+ private String version;
+
+ /**
+ * 工程描述
+ */
+ private String description;
+
+ /**
+ * 项目组织标识
+ */
+ private String groupId;
+
+ /**
+ * 项目标识
+ */
+ private String artifactId;
+
+ /**
+ * 项目根目录
+ */
+ private String basedir;
+
+ /**
+ * 核心项目包
+ */
+ private String corePackage;
+
+ /**
+ * 业务项目包
+ */
+ private String servicePackage;
+
+ /**
+ * 当前环境值
+ */
+ private String[] env;
+
+ /**
+ * 项目作者
+ */
+ private ProjectAuthorProperties author;
+
+ /**
+ * 注入的spring环境上下文
+ */
+ private final Environment environment;
+
+ @Autowired
+ public ProjectProperties(Environment environment) {
+ this.environment = environment;
+ this.env = environment.getActiveProfiles();
+ }
+
+ /**
+ * 是否是生产环境
+ * 从运行环境中读取最后一个是否为生产环境
+ *
+ * @return boolean 是否为生产环境
+ */
+ public boolean isProduct() {
+ List<String> runtimeEnvs = new ArrayList<>();
+ for (String s : this.env) {
+ if (EnvironmentGroupEnum.isRuntime(s)) {
+ runtimeEnvs.add(s);
+ }
+ }
+ if (runtimeEnvs.size() == 0) {
+ return false;
+ }
+ //最后一个运行环境, 如果spring.profiles.active=dev, prod, mysql 则运行环境为dev, prod, 最后一个运行环境为prod,是生产环境
+ String env = runtimeEnvs.get(runtimeEnvs.size() - 1);
+ return EnvironmentEnum.PROD.getName().equals(env);
+ }
+
+}
diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java
new file mode 100644
index 0000000..64f87dc
--- /dev/null
+++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java
@@ -0,0 +1,123 @@
+package com.mesalab.engine.controller;
+
+import com.mesalab.common.dto.results.BaseResult;
+import com.mesalab.common.util.BaseResultUtil;
+import com.mesalab.engine.component.configuration.ProjectProperties;
+import com.mesalab.engine.service.impl.TestSqlServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Arrays;
+
+/**
+ * 测试Controller
+ * 使用@RestController可以不需要使用@ResponseBody注解,方法会自动处理成json格式返回
+ *
+ * @author darnell
+ * @date 2019/5/13
+ */
+@Slf4j
+@RestController
+@RequestMapping(value = "test")
+public class TestCaseController {
+
+ private final ProjectProperties projectProperties;
+
+ private final Environment environment;
+
+ @Autowired
+ TestSqlServiceImpl testSqlService;
+
+ @Autowired
+ public TestCaseController(ProjectProperties projectProperties, Environment environment) {
+ this.projectProperties = projectProperties;
+ this.environment = environment;
+ }
+
+ /**
+ * 测试
+ * 使用@GetMapping相当于使用@RequestMapping(method={RequestMethod.GET})
+ *
+ * @return BaseResult
+ */
+ @GetMapping(value = "projectProperties")
+ public BaseResult projectProperties() {
+ //此处本意是将projectProperties返回至前端,但是projectProperties对象是由spring注入而来,其中包含过多的动态代理数据,
+ //使用lombok的@Data注解处理BaseResult时,数据过大,返回报错,因此只返回success
+ return BaseResultUtil.success();
+ }
+
+ /**
+ * 获取项目环境值,获取的是Environment对象中的activeProfiles,String[]
+ *
+ * @return 返回当前项目的环境值
+ */
+ @GetMapping(value = "activeProfiles")
+ public BaseResult activeProfiles() {
+ String[] activeProfiles = environment.getActiveProfiles();
+ log.info("Active Profiles: {}", Arrays.toString(activeProfiles));
+ return BaseResultUtil.success(activeProfiles);
+ }
+
+ /**
+ * 运行环境,将Environment.activeProfiles注入到projectProfiles中
+ *
+ * @return 返回当前项目的运行环境
+ */
+ @GetMapping(value = "env")
+ public BaseResult env() {
+ String[] env = projectProperties.getEnv();
+ log.info("Project env: {}", Arrays.toString(env));
+ return BaseResultUtil.success(env);
+ }
+
+ /**
+ * 是否是生产环境
+ *
+ * @return 返回当前项目的运行环境
+ */
+ @GetMapping(value = "isProduct")
+ public BaseResult isProduct() {
+ boolean isProduct = projectProperties.isProduct();
+ String msg = "Current Environment is" + (isProduct ? "" : " not") + " product";
+ log.info(msg);
+ return BaseResultUtil.success(msg);
+ }
+
+ /**
+ * 检查是否授权
+ *
+ * @return 检查授权
+ */
+ @GetMapping(value = "checkAuthc")
+ public BaseResult checkAuthc() {
+ return BaseResultUtil.success();
+ }
+
+
+ /**
+ * 运行测试用例
+ * @return
+ */
+ @GetMapping(value = "runSql")
+ public BaseResult runSql(){
+ BaseResult result = testSqlService.runSql();
+ return result;
+ }
+
+ /**
+ * 校验schema
+ *
+ * @return
+ */
+ @GetMapping(value = "runSchema")
+ public BaseResult runSchema() {
+ BaseResult result = testSqlService.runSchema();
+ return result;
+ }
+
+}
diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java
new file mode 100644
index 0000000..d40afe1
--- /dev/null
+++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java
@@ -0,0 +1,617 @@
+package com.mesalab.engine.service.impl;
+
+import com.google.common.collect.Maps;
+import com.mesalab.common.dto.results.BaseResult;
+import com.mesalab.common.enums.ResultStatusEnum;
+import com.mesalab.common.exception.BusinessException;
+import com.mesalab.common.util.BaseResultUtil;
+import com.mesalab.engine.component.SchemaCache;
+import com.mesalab.engine.component.config.ClickHouseConfig;
+import com.mesalab.engine.component.config.DruidConfig;
+import com.mesalab.engine.service.HttpClientService;
+import com.zdjizhi.utils.Encodes;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.data.Json;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+@Slf4j
+@Service
+public class TestSqlServiceImpl {
+
+ @Autowired
+ HttpClientService httpClientService;
+ @Autowired
+ ClickHouseConfig clickHouseConfig;
+ @Autowired
+ SqlEngineServiceImpl sqlEngineService;
+ @Autowired
+ DruidConfig druidConfig;
+ @Autowired
+ SchemaCache schemaCache;
+
+ List errorDate;
+
+ /**
+ * 运行测试用例
+ *
+ * @return
+ */
+ public BaseResult runSql() {
+ long start = System.currentTimeMillis();
+ List list = ckSql();
+ List listDruid = druidSql();
+ list.addAll(listDruid);
+ Map<String, String> resultTemp;
+ BaseResult result = null;
+ List<Object> data = new ArrayList<>();
+ int countSuccess = 0;
+ SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:00:00");
+ SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date date = new Date(System.currentTimeMillis());
+ for (int i = 0; i < list.size(); i++) {
+ String sql = String.valueOf(list.get(i));
+ try {
+ sql = sql.replaceAll("\\$start_time", "'" + format1.format(date) + "'");
+ sql = sql.replaceAll("\\$end_time", "'" + format2.format(date) + "'");
+ sql = URLEncoder.encode(sql, "utf-8").replaceAll("\\+", "%20");
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("测试用例sql编码错误", e);
+ }
+ resultTemp = httpClientService.httpGet("http://127.0.0.1:9999/?query=" + sql);
+ if (resultTemp.get("status").equals(String.valueOf(HttpStatus.SC_OK))) {
+ resultTemp.remove("result");
+ countSuccess++;
+ } else {
+ try {
+ resultTemp.put("query", URLDecoder.decode(sql, "UTF-8"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("sql查询结果处理异常", e);
+ }
+ }
+ data.add(resultTemp);
+ }
+
+ Collections.sort(data, (o1, o2) -> {
+ Map<String, Object> oneMap = (Map<String, Object>) o1;
+ Map<String, Object> twoMap = (Map<String, Object>) o2;
+ return Integer.valueOf(twoMap.get("status").toString()).compareTo(Integer.valueOf(oneMap.get("status").toString()));
+ });
+ long cost = System.currentTimeMillis() - start;
+ HashMap<String, Object> statistics = new HashMap<>();
+ statistics.put("elapsed", String.format("%.2f", (double) cost / 1000));
+ statistics.put("total", list.size());
+ statistics.put("success", countSuccess);
+ String message = countSuccess == list.size() ? "ok" : "error";
+ result = BaseResultUtil.success(message, data, statistics);
+ return result;
+ }
+
+ /**
+ * 运行schema校验
+ *
+ * @return
+ */
+ public BaseResult runSchema() {
+ schemaCache.removeAll();
+ long start = System.currentTimeMillis();
+ errorDate = new ArrayList();
+ checkCKSchema("/schema/clickhouse");
+ checkDruidSchema("/schema/druid");
+ checkSchema("/schema/clickhouse");
+ checkSchema("/schema/druid");
+ long cost = System.currentTimeMillis() - start;
+ HashMap<String, Object> statistics = new HashMap<>();
+ statistics.put("elapsed", String.format("%.2f", (double) cost / 1000));
+ String message = errorDate.size() == 0 ? "ok" : "error";
+ return BaseResultUtil.success(message, errorDate, statistics);
+ }
+
+
+ public List ckSql() {
+ List<String> list = new ArrayList<>();
+ list.add("select toDateTime(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, ssl_sni, ssl_version from security_event_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($end_time) and common_client_ip like '49.7%' order by common_recv_time desc limit 0,20");
+ list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, ssl_sni, ssl_version from security_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and common_client_ip like '49.7%' order by common_recv_time desc limit 0,20");
+ list.add("select toDateTime(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20");
+ list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20");
+ list.add("select toDateTime(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($start_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20");
+ list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20");
+ list.add("select toDateTime(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($start_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20");
+ list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20");
+
+ list.add("select count(1) from connection_record_log");
+ list.add("select count(*) from connection_record_log");
+ list.add("select * from " + clickHouseConfig.getDbName() + ".connection_record_log limit 1");
+
+ //Radius 用户分析
+ //1. radius账户申请客户端IP变化
+ list.add("select framed_ip, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" +
+ "from radius_onff_log where event_timestamp >=toDateTime($start_time) and event_timestamp <toDateTime($end_time) and account='admin' group by framed_ip");
+ list.add("select framed_ip, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" +
+ "from radius_onff_log where event_timestamp >=$start_time and event_timestamp < $end_time and account='admin' group by framed_ip");
+ //2. 用户IP承载用户变化
+ list.add("select account, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" +
+ "from radius_onff_log where event_timestamp >= $start_time and event_timestamp < $end_time and framed_ip='127.0.0.1' group by account");
+ list.add("select account, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" +
+ "from radius_onff_log where event_timestamp >= $start_time and event_timestamp < $end_time and framed_ip='127.0.0.1' group by account");
+ //自定义报告API
+ //一. 预置Internal Hosts 报告
+ list.add("select common_client_ip, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($end_time)) group by common_client_ip order by sessions desc limit 0,100");
+ //二. 预置External Hosts 报告
+ list.add("select common_server_ip, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) group by common_server_ip order by sessions desc limit 0,100");
+ //三. 预置Domains报告
+ list.add("select http_domain AS domain,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(domain) GROUP BY domain ORDER BY bytes DESC LIMIT 100");
+ list.add("select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_domain, uniq (common_client_ip) as nums from connection_record_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and http_domain in (select http_domain from connection_record_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_domain) group by http_domain order by SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) desc limit 10 ) group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_domain order by stat_time asc limit 500\n");
+ list.add("SELECT http_host as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(http_host) GROUP BY host ORDER BY bytes DESC " +
+ "union all " +
+ "SELECT 'totals' as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes, SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes, SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(http_host)");
+ //四. 预置HTTP/HTTPS URLS报告
+ list.add("SELECT http_url AS url,count(*) AS sessions FROM proxy_event_log WHERE common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_url) GROUP BY url ORDER BY sessions DESC LIMIT 100");
+ list.add("select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_url, count(distinct(common_client_ip)) as nums from proxy_event_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and http_url IN (select http_url from proxy_event_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_url) group by http_url order by count(*) desc limit 10 )group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_url order by stat_time asc limit 500");
+ list.add("select common_subscriber_id as user, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(user) group by common_subscriber_id order by sessions desc limit 0,100");
+ list.add("SELECT common_subscriber_id as user,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(user) GROUP BY user ORDER BY bytes DESC LIMIT 100");
+
+ //RADIUS账户总计
+ list.add("select count(distinct(framed_ip)) as active_ip_num , sum(acct_session_time) as online_duration from (select any(framed_ip) as framed_ip ,max(acct_session_time) as acct_session_time from radius_onff_log where account='000jS' and event_timestamp >= $start_time and event_timestamp < $end_time group by acct_session_id)");
+ //RADIUS账户IP详情
+ list.add("select distinct(framed_ip) as framed_ip from radius_onff_log where account='000iS' and event_timestamp >= $start_time and event_timestamp < $end_time");
+ //RADIUS账户访问详情
+ list.add("select max(if(acct_status_type=1,event_timestamp,0)) as start_time,max(if(acct_status_type=2,event_timestamp,0)) as end_time, any(framed_ip) as ip,max(acct_session_time) as online_duration from radius_onff_log where account='000jS' and event_timestamp >= $start_time and event_timestamp < $end_time group by acct_session_id order by start_time desc limit 200");
+ //目标资源分析
+ list.add("SELECT domain, groupUniqArrayMerge(ip_list) as ip_list, groupUniqArrayMerge(cdn_list) as cdn_list, groupUniqArrayMerge(protocol_type_list) as protocol_type_list, groupUniqArrayMerge(port_list) as port_list FROM security_website_domain_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by domain");
+ //某IP的关联域名关系
+ list.add("SELECT ip, FROM_UNIXTIME(max(stat_time)) as last_time, FROM_UNIXTIME(min(stat_time)) as first_time, groupUniqArrayMerge(domain_list) as domain_list, groupUniqArrayMerge(port_list) as port_list FROM security_ip_info where stat_time >= $start_time and stat_time < $end_time and ip='192.168.50.70' group by ip");
+ //自定义函数测试
+ list.add("SELECT policy_id, APPROX_COUNT_DISTINCT_DS_HLL(isp) as num FROM proxy_event_hits_log where __time >= '2020-04-05 00:00:00' and __time < '2020-05-05 00:00:00' and policy_id=0 group by policy_id");
+ list.add("SELECT ip, IP_TO_CITY(ip) as location, IP_TO_GEO(ip) as geo from (SELECT policy_id, arrayJoin(groupUniqArrayMerge(ip_list)) as ip FROM proxy_ip_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by policy_id)");
+ list.add("select TIME_FLOOR_WITH_FILL(common_recv_time,'PT5M','previous') as stat_time from connection_record_log where common_recv_time > $start_time and common_recv_time < $end_time group by stat_time");
+
+ return list;
+ }
+
+ public List druidSql() {
+ List<String> list = new ArrayList<>();
+ //安全策略命中统计
+ //1. 某策略命中计数
+ list.add("select policy_id, sum(hits) as hits from security_event_hits_log where __time >$start_time and __time <$end_time and policy_id=40 group by policy_id");
+ list.add("select policy_id, sum(hits) as hits from security_event_hits_log where __time >$start_time and __time <$end_time and policy_id in (9,10,88,45) group by policy_id");
+ //2. 某策略命中计数趋势
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sum(hits) as hits from security_event_hits_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time and policy_id=10 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') limit 100");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=10 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') limit 100");
+ //3. 某策略命中时间(首次和最近一次)
+ list.add("select policy_id,TIME_FORMAT(min(__time) ,'yyyy-MM-dd HH:mm:ss') as first_used, TIME_FORMAT(max(__time) ,'yyyy-MM-dd HH:mm:ss') as last_used from security_event_hits_log where policy_id in (100,101 ,105, 102) group by policy_id");
+ list.add("select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from security_event_hits_log where policy_id in (100,101 ,105, 102) group by policy_id");
+ //4. TopN 命中策略
+ list.add("select policy_id, action, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by policy_id, action order by hits desc limit 200");
+ list.add("select policy_id, action, sum(hits) as hits from security_event_hits_log where __time >=$start_time and __time <$end_time group by policy_id, action order by hits desc limit 200");
+ //二、 代理策略命中统计
+ //1. 某策略命中计数
+ list.add("select policy_id, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time and policy_id=100 group by policy_id");
+ list.add("select policy_id, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time and policy_id=100 group by policy_id");
+ //2. 某策略命中趋势
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sum(hits) as hits from proxy_event_hits_log where __time >= TIMESTAMP $start_time and __time <TIMESTAMP $end_time and policy_id=100 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') limit 101");
+ list.add("select FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) as start_time, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=100 group by FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) limit 101");
+ //3.某策略命中时间(首次和最近一次
+ list.add("select policy_id,TIME_FORMAT(min(__time) ,'yyyy-MM-dd HH:mm:ss') as first_used, TIME_FORMAT(max(__time) ,'yyyy-MM-dd HH:mm:ss') as last_used from proxy_event_hits_log where policy_id in (100,101,102,105) group by policy_id");
+ list.add("select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from proxy_event_hits_log where policy_id in (100,101,102,105) group by policy_id");
+ //4. TopN 命中策略
+ list.add("select policy_id, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by policy_id, sub_action order by hits desc limit 200");
+ list.add("select policy_id, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time group by policy_id, sub_action order by hits desc limit 200");
+ //5. Proxy 操纵动作命中计数
+ list.add("select sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by sub_action");
+ list.add("select sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time group by sub_action");
+ //6. Proxy 操纵动作命中趋势
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') , sub_action limit 100\n");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s'), sub_action limit 100\n");
+ //7. Proxy Pinning TIMESTAMP 计数
+ list.add("select sum(hits) as hits, 'not_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 union all select sum(hits) as hits, 'pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 union all select sum(hits) as hits, 'maybe_pinning_num' as type from security_event_hits_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time and pinningst=2");
+ list.add("select sum(hits) as hits, 'not_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 union all select sum(hits) as hits, 'pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 union all select sum(hits) as hits, 'maybe_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=2");
+ //8.Proxy Pinning计数趋势
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'not_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=0 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=1 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'maybe_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=2 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'not_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'maybe_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=2 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')");
+ //三、 Traffics-带宽统计
+ //1. Traffic IN/OUT 计数
+ //Bytes
+ list.add("select sum(total_in_bytes) as traffic_in_bytes, sum(total_out_bytes) as traffic_out_bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time");
+ list.add("select sum(total_in_bytes) as traffic_in_bytes, sum(total_out_bytes) as traffic_out_bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //Packets
+ list.add("select sum(total_in_packets) as traffic_in_packets, sum(total_out_packets) as traffic_out_packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time");
+ list.add("select sum(total_in_packets) as traffic_in_packets, sum(total_out_packets) as traffic_out_packets from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //Sessions
+ list.add("select sum(new_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time");
+ list.add("select sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //2. Traffic IN/OUT 带宽趋势
+ //Bytes
+ list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_bytes' as type, sum(total_in_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_bytes' as type, sum(total_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_bytes' as type, sum(total_in_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_bytes' as type, sum(total_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')");
+ //Packets
+ list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')");
+ //Packets
+ list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')");
+ //Sessions
+ list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')");
+ //四、 Traffics-计数统计
+ //1. 新建与活跃链接计数
+ list.add("select sum(new_conn_num) as new_conn_num, sum(established_conn_num) as established_conn_num from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time");
+ list.add("select sum(new_conn_num) as new_conn_num, sum(established_conn_num) as established_conn_num from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //2.新建与活跃链接计数趋势
+ list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'established_conn_num' as type, sum(established_conn_num) as sessions from traffic_metrics_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'established_conn_num' as type, sum(established_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')");
+ //3. 安全策略动作计数
+ //Bytes
+ list.add("select sum(default_in_bytes+default_out_bytes) as default_bytes, sum(allow_in_bytes+allow_out_bytes) as allow_bytes, sum(deny_in_bytes+deny_out_bytes) as deny_bytes, sum(monitor_in_bytes+monitor_out_bytes) as monitor_bytes, sum(intercept_in_bytes+intercept_out_bytes) as intercept_bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time");
+ list.add("select sum(default_in_bytes+default_out_bytes) as default_bytes, sum(allow_in_bytes+allow_out_bytes) as allow_bytes, sum(deny_in_bytes+deny_out_bytes) as deny_bytes, sum(monitor_in_bytes+monitor_out_bytes) as monitor_bytes, sum(intercept_in_bytes+intercept_out_bytes) as intercept_bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //Packets
+ list.add("select sum(default_in_packets+default_out_packets) as default_packets, sum(allow_in_packets+allow_in_packets) as allow_packets, sum(deny_in_packets+deny_out_packets) as deny_packets, sum(monitor_in_packets+monitor_out_packets) as monitor_packets, sum(intercept_in_packets+intercept_out_packets) as intercept_packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time\n");
+ list.add("select sum(default_in_packets+default_out_packets) as default_packets, sum(allow_in_packets+allow_in_packets) as allow_packets, sum(deny_in_packets+deny_out_packets) as deny_packets, sum(monitor_in_packets+monitor_out_packets) as monitor_packets, sum(intercept_in_packets+intercept_out_packets) as intercept_packets from traffic_metrics_log where __time >= $start_time and __time < $end_time\n");
+ //Sessions
+ list.add("select sum(default_conn_num) as default_sessions, sum(allow_conn_num) as allow_sessions, sum(deny_conn_num) as deny_sessions, sum(monitor_conn_num) as monitor_sessions, sum(intercept_conn_num) as intercept_sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time");
+ list.add("select sum(default_conn_num) as default_sessions, sum(allow_conn_num) as allow_sessions, sum(deny_conn_num) as deny_sessions, sum(monitor_conn_num) as monitor_sessions, sum(intercept_conn_num) as intercept_sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time");
+ //4. 安全策略动作趋势
+ //Bytes
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_bytes' as type, sum(default_in_bytes+default_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_bytes' as type, sum(allow_in_bytes+allow_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_bytes' as type, sum(deny_in_bytes+deny_out_bytes) as bytes from traffic_metrics_log where __time >= TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_bytes' as type, sum(monitor_in_bytes+monitor_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_bytes' as type, sum(intercept_in_bytes+intercept_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_bytes' as type, sum(default_in_bytes+default_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_bytes' as type, sum(allow_in_bytes+allow_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_bytes' as type, sum(deny_in_bytes+deny_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_bytes' as type, sum(monitor_in_bytes+monitor_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_bytes' as type, sum(intercept_in_bytes+intercept_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')");
+ //packets
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_packets' as type, sum(default_in_packets+default_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_packets' as type, sum(allow_in_packets+allow_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_packets' as type, sum(deny_in_packets+deny_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_packets' as type, sum(monitor_in_packets+monitor_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_packets' as type, sum(intercept_in_packets+intercept_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_packets' as type, sum(default_in_packets+default_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_packets' as type, sum(allow_in_packets+allow_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_packets' as type, sum(deny_in_packets+deny_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_packets' as type, sum(monitor_in_packets+monitor_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_packets' as type, sum(intercept_in_packets+intercept_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')");
+ //sessions
+ list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_conn_num' as type, sum(default_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_conn_num' as type, sum(allow_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_conn_num' as type, sum(deny_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_conn_num' as type, sum(monitor_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" +
+ "union all\n" +
+ "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_conn_num' as type, sum(intercept_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')");
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_conn_num' as type, sum(default_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_conn_num' as type, sum(allow_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_conn_num' as type, sum(deny_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_conn_num' as type, sum(monitor_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" +
+ "union all\n" +
+ "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_conn_num' as type, sum(intercept_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')");
+ //五. Traffics-TOPN统计
+ //1. TopN 源IP
+ list.add("select source as client_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_internal_host_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=source group by source order by source desc limit 100");
+ list.add("select source as client_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_internal_host_log where __time >= $start_time and __time < $end_time and order_by=source group by source order by source desc limit 100");
+ //2. TopN 目的IP
+ list.add("select destination as server_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_external_host_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=destination group by destination order by destination desc limit 100");
+ list.add("select destination as server_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_external_host_log where __time >= $start_time and __time < $end_time and order_by=destination group by destination order by destination desc limit 100");
+ //3. TopN 域名
+ list.add("select domain, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_website_domain_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=domain group by domain order by domain desc limit 200");
+ list.add("select domain, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_website_domain_log where __time >= $start_time and __time < $end_time and order_by=domain group by domain order by domain desc limit 200");
+ //4.TopN 用户
+ list.add("select subscriber_id, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_user_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=subscriber_id group by subscriber_id order by subscriber_id desc limit 200");
+ list.add("select subscriber_id, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_user_log where __time >= $start_time and __time < $end_time and order_by=subscriber_id group by subscriber_id order by subscriber_id desc limit 200");
+ //六、命中URL统计
+ list.add("select url,sum(session_num) as hits from top_urls_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by url order by hits desc limit 100");
+ list.add("select url,sum(session_num) as hits from top_urls_log where __time >= $start_time and __time < $end_time group by url order by hits desc limit 100");
+
+ //国家客户端数量
+ list.add("SELECT country, APPROX_COUNT_DISTINCT_DS_HLL(ip_object) as num FROM source_country_ip_num_log group by country order by num desc limit 200");
+ //域名推荐
+ list.add("SELECT domain, protocol_type, ip_num, session_num FROM top_domain_recommend_daily_log where __time = now()");
+ //推荐URL
+ list.add("SELECT url, content_length, ip_num, session_num FROM top_website_urls_daily_log where __time = now()");
+ //劫持客户端数量
+ list.add("SELECT policy_id, APPROX_COUNT_DISTINCT_DS_HLL(isp) as num FROM proxy_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=0 group by policy_id");
+ //劫持客户端IP地域分布
+ list.add("select ip, IP_TO_CITY(ip) as location, IP_TO_GEO(ip) as geo from (SELECT policy_id, arrayJoin(groupUniqArrayMerge(ip_list)) as ip FROM proxy_ip_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by policy_id)");
+ //注入流量统计
+ list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, sum(bytes) as bytes from injection_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') limit 100");
+
+ return list;
+ }
+
+
+ /**
+ * 查询clickhouse数据库
+ *
+ * @param query
+ * @return
+ */
+ private List queryCKDynamic(String query) {
+ StringBuilder urlBuilder = new StringBuilder(clickHouseConfig.getQueryUrl()).append("/?");
+ StringBuilder queryparamBuilder = new StringBuilder("user=")
+ .append(clickHouseConfig.getRealTimeUserName()).append("&")
+ .append("password=").append(clickHouseConfig.getRealTimePassword()).append("&")
+ .append("database=")
+ .append(clickHouseConfig.getDbName()).append("&")
+ .append("query=").append(query)
+ .append(" FORMAT JSON;");
+
+ log.info("Query Session and Events Query : {}", urlBuilder.toString() + Encodes.urlDecode(queryparamBuilder.toString()));
+ List<NameValuePair> values = URLEncodedUtils.parse(queryparamBuilder.toString(), Charset.forName("UTF-8"));
+ String s = urlBuilder.toString() + URLEncodedUtils.format(values, "utf-8");
+ Map<String, String> resultMap = httpClientService.httpGet(s);
+ List result = new ArrayList<>();
+ if (resultMap.get("status").equals(String.valueOf(HttpStatus.SC_OK))) {
+ Map<String, Object> maps = (Map<String, Object>) JsonMapper.fromJsonString(resultMap.get("result"), Map.class);
+ List data = (List) maps.get("data");
+ Iterator iterator = data.iterator();
+ while (iterator.hasNext()) {
+ Map next = (Map) iterator.next();
+ result.add(String.valueOf(next.get("name")));
+ }
+ return result;
+ }
+
+ return null;
+ }
+
+
+ /**
+ * 查询druid数据库
+ *
+ * @param query
+ * @return
+ */
+ private List queryDruidDynamic(String query) {
+ //headers
+ HttpHeaders requestHeaders = new HttpHeaders();
+ requestHeaders.setContentType(MediaType.APPLICATION_JSON);
+ //body
+ Map<String, Object> params = Maps.newHashMap();
+ params.put("query", query);
+ Map<String, String> resultMap = httpClientService.httpPost(druidConfig.getQueryUrl(), JsonMapper.toJsonString(params));
+ if (resultMap.get("status").equals(String.valueOf(HttpStatus.SC_OK))) {
+ List<String> list = new ArrayList<>();
+ String result = resultMap.get("result");
+ List body = (List) Json.parseJson(result);
+ Iterator iterator = body.iterator();
+ while (iterator.hasNext()) {
+ Map next = (Map) iterator.next();
+ String name = String.valueOf(next.get("name"));
+ list.add(name);
+ }
+ return list;
+ }
+ return null;
+ }
+
+
+ private void checkDruidSchema(String path) {
+ Map<String, JsonProperties> druidSchemaInfo = getSchemaInfo(path);
+ Set<String> name = druidSchemaInfo.keySet();
+ Iterator<String> iterator = name.iterator();
+ while (iterator.hasNext()) {//遍历表,以Avro为基础
+ String tableName = iterator.next();
+ Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(druidSchemaInfo.get(tableName)), Map.class);
+ if (resultMap == null) {
+ throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null);
+ }
+ //查数据库
+ List resultSql = queryDruidDynamic("SELECT COLUMN_NAME as name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '" + tableName + "'");
+
+ List resultAvro = new ArrayList();
+
+ //schema为基础
+ List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列
+ Iterator<Map> fieldsAvro = fields.iterator();
+ while (fieldsAvro.hasNext()) {
+ Map next = fieldsAvro.next();
+ resultAvro.add(String.valueOf(next.get("name")));
+ boolean b = resultSql.contains(next.get("name"));
+ if (!b) {
+ String str = String.format("%s.shcema中注册多:%s", tableName, next.get("name"));
+ errorDate.add(str);
+ }
+ }
+
+ //Schema为基础
+ Iterator iteratorSql = resultSql.iterator();
+ while (iteratorSql.hasNext()) {
+ String next = String.valueOf(iteratorSql.next());
+ boolean b = resultAvro.contains(next);
+ if (!b) {
+ String str = String.format("%s.avro中注册缺少:%s", tableName, next);
+ errorDate.add(str);
+ }
+ }
+ }
+ }
+
+
+ private void checkCKSchema(String path) {
+ Map<String, JsonProperties> ckSchemaInfo = getSchemaInfo(path);
+ Set<String> name = ckSchemaInfo.keySet();
+ Iterator<String> iterator = name.iterator();
+ while (iterator.hasNext()) {//遍历表,以Avro为基础
+ String tableName = iterator.next();
+ Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(ckSchemaInfo.get(tableName)), Map.class);
+ if (resultMap == null) {
+ throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null);
+ }
+ //查数据库
+ List resultSql = queryCKDynamic("describe " + tableName);
+ if (StringUtil.isEmpty(resultSql)) {
+ String str = String.format("shcema中注册多:%s.avsc", tableName);
+ errorDate.add(str);
+ continue;
+ }
+
+ List resultAvro = new ArrayList();
+
+ //avro为基础
+ List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列
+ Iterator<Map> fieldsAvro = fields.iterator();
+ while (fieldsAvro.hasNext()) {
+ Map next = fieldsAvro.next();
+ resultAvro.add(String.valueOf(next.get("name")));
+ boolean b = resultSql.contains(next.get("name"));
+ if (!b) {
+ String str = String.format("%s.shcema中注册多:%s", tableName, next.get("name"));
+ errorDate.add(str);
+ }
+ }
+
+ //db为基础
+ Iterator iteratorSql = resultSql.iterator();
+ while (iteratorSql.hasNext()) {
+ String next = String.valueOf(iteratorSql.next());
+ boolean b = resultAvro.contains(next);
+ if (!b) {
+ String str = String.format("%s.avro中注册缺少:%s", tableName, next);
+ errorDate.add(str);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * 获取avro文件
+ *
+ * @param path
+ * @return
+ */
+ private Map<String, JsonProperties> getSchemaInfo(String path) {
+ Map<String, JsonProperties> schema = new HashMap<>();
+ String pathRoot = null;
+ try {
+ pathRoot = new File("").getCanonicalPath();
+ File file = new File(pathRoot + path);
+ File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (!f.getName().toLowerCase().endsWith(".avsc")) {
+ continue;
+ }
+ try {
+ Schema ckSchema = new Schema.Parser().parse(new File(f.getPath()));
+ schema.put(ckSchema.getName(), ckSchema);
+ } catch (Exception ex) {
+ log.error("遍历" + path + "路径下文件失败: " + f.getName(), ex);
+ String str = String.format("%s读取文件失败:%s", path, f.getName());
+ errorDate.add(str);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("获取 项目路径/{}失败", path);
+ String str = String.format("获取项目路径%s失败", path);
+ errorDate.add(str);
+ }
+ return schema;
+ }
+
+
+ /**
+ * 核对avro文件
+ *
+ * @param path 路径
+ */
+ private void checkSchema(String path) {
+ Map<String, JsonProperties> ckeckSchemaInfo = getSchemaInfo(path);
+ Set<String> name = ckeckSchemaInfo.keySet();
+ Iterator<String> iterator = name.iterator();
+ while (iterator.hasNext()) {//遍历表
+ String tableName = iterator.next();
+ Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(ckeckSchemaInfo.get(tableName)), Map.class);
+ if (resultMap == null) {
+ throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null);
+ }
+ List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列
+ Iterator<Map> fieldsIterator = fields.iterator();
+ Map mapDoc = null;
+ while (fieldsIterator.hasNext()) {
+ Map next = fieldsIterator.next();
+ try {
+ mapDoc = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(next.get("doc")), Map.class);
+ } catch (Exception ex) {
+ log.error("该字段的 doc 为非JSON字符串,不需类型转换");
+ String str = String.format("%s.avro中doc非json格式字段%s", tableName, next.get("name"));
+ errorDate.add(str);
+ }
+ if (mapDoc != null) {
+ next.put("doc", mapDoc);
+ }
+ if (StringUtil.isEmpty(mapDoc) && !StringUtil.isEmpty(next.get("doc"))) {
+ log.error("该字段的 doc 为非JSON字符串,不需类型转换");
+ String str = String.format("%s.avro中doc非json格式字段%s", tableName, next.get("name"));
+ errorDate.add(str);
+ }
+ }
+ }
+ }
+}