diff options
| author | 刘永强 <[email protected]> | 2020-08-28 18:14:44 +0800 |
|---|---|---|
| committer | 刘永强 <[email protected]> | 2020-08-28 18:14:44 +0800 |
| commit | f34a66a121a77a30539e7e519b9dc0ecdb413206 (patch) | |
| tree | eb2d76b1e8553062e3f9c144a2ea2e1bea0cb6a6 | |
| parent | 6fe6e50eb2d7dbb7417a99535a144da63376f912 (diff) | |
add TestCaseController
6 files changed, 942 insertions, 0 deletions
diff --git a/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java new file mode 100644 index 0000000..421fb63 --- /dev/null +++ b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentEnum.java @@ -0,0 +1,30 @@ +package com.mesalab.common.enums; + +import lombok.Getter; + +/** + * 运行环境枚举 + * + * @author dazzlzy + * @date 2018/5/26 + */ +@Getter +public enum EnvironmentEnum { + + /** + * 开发环境 + */ + DEV("dev"), + /** + * 生产环境 + */ + PROD("prod"),; + + private String name; + + EnvironmentEnum(String name) { + this.name = name; + } + + +} diff --git a/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java new file mode 100644 index 0000000..5e433e0 --- /dev/null +++ b/galaxy-common/src/main/java/com/mesalab/common/enums/EnvironmentGroupEnum.java @@ -0,0 +1,45 @@ +package com.mesalab.common.enums; + +import lombok.Getter; + +/** + * 运行环境组枚举 + * + * @author dazzlzy + * @date 2018/5/26 + */ +@Getter +public enum EnvironmentGroupEnum { + + /** + * RUNTIME运行环境组: + * 1. DEV(开发环境) + * 2. PROD(生产环境) + */ + RUNTIME(new EnvironmentEnum[]{EnvironmentEnum.DEV, EnvironmentEnum.PROD}),; + + /** + * 运行环境 + */ + private EnvironmentEnum[] environments; + + EnvironmentGroupEnum(EnvironmentEnum[] environments) { + this.environments = environments; + } + + /** + * 是否是runtime运行环境组 + * + * @param s 环境名 + * @return boolean + */ + public static boolean isRuntime(String s) { + EnvironmentEnum[] environmentEnums = RUNTIME.getEnvironments(); + for (EnvironmentEnum environmentEnum : environmentEnums) { + if (environmentEnum.getName().equals(s)) { + return true; + } + } + return false; + } +} diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java new file mode 100644 index 0000000..7b1ad67 --- /dev/null +++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectAuthorProperties.java @@ -0,0 +1,20 @@ +package com.mesalab.engine.component.configuration; + +import lombok.Data; + +/** + * 项目作者联系方式 + * + * @author dazzlzy + * @date 2018/5/26 + */ +@Data +public class ProjectAuthorProperties { + + private String name; + + private String url; + + private String email; + +} diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java new file mode 100644 index 0000000..8ab3250 --- /dev/null +++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/component/configuration/ProjectProperties.java @@ -0,0 +1,107 @@ +package com.mesalab.engine.component.configuration; + +import com.mesalab.common.enums.EnvironmentEnum; +import com.mesalab.common.enums.EnvironmentGroupEnum; +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Repository; + +import java.util.ArrayList; +import java.util.List; + +/** + * 项目配置 + * + * @author dazzlzy + * @date 2018/5/26 + */ +@Data +@Repository +@ConfigurationProperties("project") +public class ProjectProperties { + + /** + * 工程名 + */ + private String name; + + /** + * 版本 + */ + private String version; + + /** + * 工程描述 + */ + private String description; + + /** + * 项目组织标识 + */ + private String groupId; + + /** + * 项目标识 + */ + private String artifactId; + + /** + * 项目根目录 + */ + private String basedir; + + /** + * 核心项目包 + */ + private String corePackage; + + /** + * 业务项目包 + */ + private String servicePackage; + + /** + * 当前环境值 + */ + private String[] env; + + /** + * 项目作者 + */ + private ProjectAuthorProperties author; + + /** + * 注入的spring环境上下文 + */ + private final Environment environment; + + @Autowired + public ProjectProperties(Environment environment) { + this.environment = environment; + this.env = environment.getActiveProfiles(); + } + + /** + * 是否是生产环境 + * 从运行环境中读取最后一个是否为生产环境 + * + * @return boolean 是否为生产环境 + */ + public boolean isProduct() { + List<String> runtimeEnvs = new ArrayList<>(); + for (String s : this.env) { + if (EnvironmentGroupEnum.isRuntime(s)) { + runtimeEnvs.add(s); + } + } + if (runtimeEnvs.size() == 0) { + return false; + } + //最后一个运行环境, 如果spring.profiles.active=dev, prod, mysql 则运行环境为dev, prod, 最后一个运行环境为prod,是生产环境 + String env = runtimeEnvs.get(runtimeEnvs.size() - 1); + return EnvironmentEnum.PROD.getName().equals(env); + } + +} diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java new file mode 100644 index 0000000..64f87dc --- /dev/null +++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/controller/TestCaseController.java @@ -0,0 +1,123 @@ +package com.mesalab.engine.controller; + +import com.mesalab.common.dto.results.BaseResult; +import com.mesalab.common.util.BaseResultUtil; +import com.mesalab.engine.component.configuration.ProjectProperties; +import com.mesalab.engine.service.impl.TestSqlServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Arrays; + +/** + * 测试Controller + * 使用@RestController可以不需要使用@ResponseBody注解,方法会自动处理成json格式返回 + * + * @author darnell + * @date 2019/5/13 + */ +@Slf4j +@RestController +@RequestMapping(value = "test") +public class TestCaseController { + + private final ProjectProperties projectProperties; + + private final Environment environment; + + @Autowired + TestSqlServiceImpl testSqlService; + + @Autowired + public TestCaseController(ProjectProperties projectProperties, Environment environment) { + this.projectProperties = projectProperties; + this.environment = environment; + } + + /** + * 测试 + * 使用@GetMapping相当于使用@RequestMapping(method={RequestMethod.GET}) + * + * @return BaseResult + */ + @GetMapping(value = "projectProperties") + public BaseResult projectProperties() { + //此处本意是将projectProperties返回至前端,但是projectProperties对象是由spring注入而来,其中包含过多的动态代理数据, + //使用lombok的@Data注解处理BaseResult时,数据过大,返回报错,因此只返回success + return BaseResultUtil.success(); + } + + /** + * 获取项目环境值,获取的是Environment对象中的activeProfiles,String[] + * + * @return 返回当前项目的环境值 + */ + @GetMapping(value = "activeProfiles") + public BaseResult activeProfiles() { + String[] activeProfiles = environment.getActiveProfiles(); + log.info("Active Profiles: {}", Arrays.toString(activeProfiles)); + return BaseResultUtil.success(activeProfiles); + } + + /** + * 运行环境,将Environment.activeProfiles注入到projectProfiles中 + * + * @return 返回当前项目的运行环境 + */ + @GetMapping(value = "env") + public BaseResult env() { + String[] env = projectProperties.getEnv(); + log.info("Project env: {}", Arrays.toString(env)); + return BaseResultUtil.success(env); + } + + /** + * 是否是生产环境 + * + * @return 返回当前项目的运行环境 + */ + @GetMapping(value = "isProduct") + public BaseResult isProduct() { + boolean isProduct = projectProperties.isProduct(); + String msg = "Current Environment is" + (isProduct ? "" : " not") + " product"; + log.info(msg); + return BaseResultUtil.success(msg); + } + + /** + * 检查是否授权 + * + * @return 检查授权 + */ + @GetMapping(value = "checkAuthc") + public BaseResult checkAuthc() { + return BaseResultUtil.success(); + } + + + /** + * 运行测试用例 + * @return + */ + @GetMapping(value = "runSql") + public BaseResult runSql(){ + BaseResult result = testSqlService.runSql(); + return result; + } + + /** + * 校验schema + * + * @return + */ + @GetMapping(value = "runSchema") + public BaseResult runSchema() { + BaseResult result = testSqlService.runSchema(); + return result; + } + +} diff --git a/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java b/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java new file mode 100644 index 0000000..d40afe1 --- /dev/null +++ b/galaxy-data-engine/src/main/java/com/mesalab/engine/service/impl/TestSqlServiceImpl.java @@ -0,0 +1,617 @@ +package com.mesalab.engine.service.impl; + +import com.google.common.collect.Maps; +import com.mesalab.common.dto.results.BaseResult; +import com.mesalab.common.enums.ResultStatusEnum; +import com.mesalab.common.exception.BusinessException; +import com.mesalab.common.util.BaseResultUtil; +import com.mesalab.engine.component.SchemaCache; +import com.mesalab.engine.component.config.ClickHouseConfig; +import com.mesalab.engine.component.config.DruidConfig; +import com.mesalab.engine.service.HttpClientService; +import com.zdjizhi.utils.Encodes; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; +import org.apache.avro.data.Json; +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.*; + +@Slf4j +@Service +public class TestSqlServiceImpl { + + @Autowired + HttpClientService httpClientService; + @Autowired + ClickHouseConfig clickHouseConfig; + @Autowired + SqlEngineServiceImpl sqlEngineService; + @Autowired + DruidConfig druidConfig; + @Autowired + SchemaCache schemaCache; + + List errorDate; + + /** + * 运行测试用例 + * + * @return + */ + public BaseResult runSql() { + long start = System.currentTimeMillis(); + List list = ckSql(); + List listDruid = druidSql(); + list.addAll(listDruid); + Map<String, String> resultTemp; + BaseResult result = null; + List<Object> data = new ArrayList<>(); + int countSuccess = 0; + SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:00:00"); + SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(System.currentTimeMillis()); + for (int i = 0; i < list.size(); i++) { + String sql = String.valueOf(list.get(i)); + try { + sql = sql.replaceAll("\\$start_time", "'" + format1.format(date) + "'"); + sql = sql.replaceAll("\\$end_time", "'" + format2.format(date) + "'"); + sql = URLEncoder.encode(sql, "utf-8").replaceAll("\\+", "%20"); + } catch (Exception e) { + e.printStackTrace(); + log.error("测试用例sql编码错误", e); + } + resultTemp = httpClientService.httpGet("http://127.0.0.1:9999/?query=" + sql); + if (resultTemp.get("status").equals(String.valueOf(HttpStatus.SC_OK))) { + resultTemp.remove("result"); + countSuccess++; + } else { + try { + resultTemp.put("query", URLDecoder.decode(sql, "UTF-8")); + } catch (Exception e) { + e.printStackTrace(); + log.error("sql查询结果处理异常", e); + } + } + data.add(resultTemp); + } + + Collections.sort(data, (o1, o2) -> { + Map<String, Object> oneMap = (Map<String, Object>) o1; + Map<String, Object> twoMap = (Map<String, Object>) o2; + return Integer.valueOf(twoMap.get("status").toString()).compareTo(Integer.valueOf(oneMap.get("status").toString())); + }); + long cost = System.currentTimeMillis() - start; + HashMap<String, Object> statistics = new HashMap<>(); + statistics.put("elapsed", String.format("%.2f", (double) cost / 1000)); + statistics.put("total", list.size()); + statistics.put("success", countSuccess); + String message = countSuccess == list.size() ? "ok" : "error"; + result = BaseResultUtil.success(message, data, statistics); + return result; + } + + /** + * 运行schema校验 + * + * @return + */ + public BaseResult runSchema() { + schemaCache.removeAll(); + long start = System.currentTimeMillis(); + errorDate = new ArrayList(); + checkCKSchema("/schema/clickhouse"); + checkDruidSchema("/schema/druid"); + checkSchema("/schema/clickhouse"); + checkSchema("/schema/druid"); + long cost = System.currentTimeMillis() - start; + HashMap<String, Object> statistics = new HashMap<>(); + statistics.put("elapsed", String.format("%.2f", (double) cost / 1000)); + String message = errorDate.size() == 0 ? "ok" : "error"; + return BaseResultUtil.success(message, errorDate, statistics); + } + + + public List ckSql() { + List<String> list = new ArrayList<>(); + list.add("select toDateTime(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, ssl_sni, ssl_version from security_event_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($end_time) and common_client_ip like '49.7%' order by common_recv_time desc limit 0,20"); + list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, ssl_sni, ssl_version from security_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and common_client_ip like '49.7%' order by common_recv_time desc limit 0,20"); + list.add("select toDateTime(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20"); + list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20"); + list.add("select toDateTime(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($start_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20"); + list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_subscriber_id, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, http_host,http_url,http_user_agent from proxy_event_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and http_host like '%joy.cn%' order by common_recv_time desc limit 0,20"); + list.add("select toDateTime(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= toDateTime($start_time) and common_recv_time< toDateTime($start_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20"); + list.add("select FROM_UNIXTIME(common_recv_time) as common_recv_time, common_address_type, common_l4_protocol, common_client_ip, common_server_ip, common_server_port, radius_framed_ip, radius_account from radius_record_log where common_recv_time >= UNIX_TIMESTAMP($start_time) and common_recv_time< UNIX_TIMESTAMP($end_time) and radius_account='T1yRd' order by common_recv_time desc limit 0,20"); + + list.add("select count(1) from connection_record_log"); + list.add("select count(*) from connection_record_log"); + list.add("select * from " + clickHouseConfig.getDbName() + ".connection_record_log limit 1"); + + //Radius 用户分析 + //1. radius账户申请客户端IP变化 + list.add("select framed_ip, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" + + "from radius_onff_log where event_timestamp >=toDateTime($start_time) and event_timestamp <toDateTime($end_time) and account='admin' group by framed_ip"); + list.add("select framed_ip, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" + + "from radius_onff_log where event_timestamp >=$start_time and event_timestamp < $end_time and account='admin' group by framed_ip"); + //2. 用户IP承载用户变化 + list.add("select account, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" + + "from radius_onff_log where event_timestamp >= $start_time and event_timestamp < $end_time and framed_ip='127.0.0.1' group by account"); + list.add("select account, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries \n" + + "from radius_onff_log where event_timestamp >= $start_time and event_timestamp < $end_time and framed_ip='127.0.0.1' group by account"); + //自定义报告API + //一. 预置Internal Hosts 报告 + list.add("select common_client_ip, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($end_time)) group by common_client_ip order by sessions desc limit 0,100"); + //二. 预置External Hosts 报告 + list.add("select common_server_ip, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) group by common_server_ip order by sessions desc limit 0,100"); + //三. 预置Domains报告 + list.add("select http_domain AS domain,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(domain) GROUP BY domain ORDER BY bytes DESC LIMIT 100"); + list.add("select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_domain, uniq (common_client_ip) as nums from connection_record_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and http_domain in (select http_domain from connection_record_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_domain) group by http_domain order by SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) desc limit 10 ) group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_domain order by stat_time asc limit 500\n"); + list.add("SELECT http_host as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(http_host) GROUP BY host ORDER BY bytes DESC " + + "union all " + + "SELECT 'totals' as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes, SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes, SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(http_host)"); + //四. 预置HTTP/HTTPS URLS报告 + list.add("SELECT http_url AS url,count(*) AS sessions FROM proxy_event_log WHERE common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_url) GROUP BY url ORDER BY sessions DESC LIMIT 100"); + list.add("select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_url, count(distinct(common_client_ip)) as nums from proxy_event_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and http_url IN (select http_url from proxy_event_log where common_recv_time >= toStartOfDay(toDateTime($start_time))-86400 AND common_recv_time < toStartOfDay(toDateTime($start_time)) and notEmpty(http_url) group by http_url order by count(*) desc limit 10 )group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_url order by stat_time asc limit 500"); + list.add("select common_subscriber_id as user, count(*) as sessions from connection_record_log where common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(user) group by common_subscriber_id order by sessions desc limit 0,100"); + list.add("SELECT common_subscriber_id as user,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM connection_record_log WHERE common_recv_time>= toStartOfDay(toDateTime($start_time))-604800 and common_recv_time< toStartOfDay(toDateTime($start_time)) and notEmpty(user) GROUP BY user ORDER BY bytes DESC LIMIT 100"); + + //RADIUS账户总计 + list.add("select count(distinct(framed_ip)) as active_ip_num , sum(acct_session_time) as online_duration from (select any(framed_ip) as framed_ip ,max(acct_session_time) as acct_session_time from radius_onff_log where account='000jS' and event_timestamp >= $start_time and event_timestamp < $end_time group by acct_session_id)"); + //RADIUS账户IP详情 + list.add("select distinct(framed_ip) as framed_ip from radius_onff_log where account='000iS' and event_timestamp >= $start_time and event_timestamp < $end_time"); + //RADIUS账户访问详情 + list.add("select max(if(acct_status_type=1,event_timestamp,0)) as start_time,max(if(acct_status_type=2,event_timestamp,0)) as end_time, any(framed_ip) as ip,max(acct_session_time) as online_duration from radius_onff_log where account='000jS' and event_timestamp >= $start_time and event_timestamp < $end_time group by acct_session_id order by start_time desc limit 200"); + //目标资源分析 + list.add("SELECT domain, groupUniqArrayMerge(ip_list) as ip_list, groupUniqArrayMerge(cdn_list) as cdn_list, groupUniqArrayMerge(protocol_type_list) as protocol_type_list, groupUniqArrayMerge(port_list) as port_list FROM security_website_domain_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by domain"); + //某IP的关联域名关系 + list.add("SELECT ip, FROM_UNIXTIME(max(stat_time)) as last_time, FROM_UNIXTIME(min(stat_time)) as first_time, groupUniqArrayMerge(domain_list) as domain_list, groupUniqArrayMerge(port_list) as port_list FROM security_ip_info where stat_time >= $start_time and stat_time < $end_time and ip='192.168.50.70' group by ip"); + //自定义函数测试 + list.add("SELECT policy_id, APPROX_COUNT_DISTINCT_DS_HLL(isp) as num FROM proxy_event_hits_log where __time >= '2020-04-05 00:00:00' and __time < '2020-05-05 00:00:00' and policy_id=0 group by policy_id"); + list.add("SELECT ip, IP_TO_CITY(ip) as location, IP_TO_GEO(ip) as geo from (SELECT policy_id, arrayJoin(groupUniqArrayMerge(ip_list)) as ip FROM proxy_ip_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by policy_id)"); + list.add("select TIME_FLOOR_WITH_FILL(common_recv_time,'PT5M','previous') as stat_time from connection_record_log where common_recv_time > $start_time and common_recv_time < $end_time group by stat_time"); + + return list; + } + + public List druidSql() { + List<String> list = new ArrayList<>(); + //安全策略命中统计 + //1. 某策略命中计数 + list.add("select policy_id, sum(hits) as hits from security_event_hits_log where __time >$start_time and __time <$end_time and policy_id=40 group by policy_id"); + list.add("select policy_id, sum(hits) as hits from security_event_hits_log where __time >$start_time and __time <$end_time and policy_id in (9,10,88,45) group by policy_id"); + //2. 某策略命中计数趋势 + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sum(hits) as hits from security_event_hits_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time and policy_id=10 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') limit 100"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=10 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') limit 100"); + //3. 某策略命中时间(首次和最近一次) + list.add("select policy_id,TIME_FORMAT(min(__time) ,'yyyy-MM-dd HH:mm:ss') as first_used, TIME_FORMAT(max(__time) ,'yyyy-MM-dd HH:mm:ss') as last_used from security_event_hits_log where policy_id in (100,101 ,105, 102) group by policy_id"); + list.add("select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from security_event_hits_log where policy_id in (100,101 ,105, 102) group by policy_id"); + //4. TopN 命中策略 + list.add("select policy_id, action, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by policy_id, action order by hits desc limit 200"); + list.add("select policy_id, action, sum(hits) as hits from security_event_hits_log where __time >=$start_time and __time <$end_time group by policy_id, action order by hits desc limit 200"); + //二、 代理策略命中统计 + //1. 某策略命中计数 + list.add("select policy_id, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time and policy_id=100 group by policy_id"); + list.add("select policy_id, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time and policy_id=100 group by policy_id"); + //2. 某策略命中趋势 + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sum(hits) as hits from proxy_event_hits_log where __time >= TIMESTAMP $start_time and __time <TIMESTAMP $end_time and policy_id=100 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') limit 101"); + list.add("select FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) as start_time, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=100 group by FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) limit 101"); + //3.某策略命中时间(首次和最近一次 + list.add("select policy_id,TIME_FORMAT(min(__time) ,'yyyy-MM-dd HH:mm:ss') as first_used, TIME_FORMAT(max(__time) ,'yyyy-MM-dd HH:mm:ss') as last_used from proxy_event_hits_log where policy_id in (100,101,102,105) group by policy_id"); + list.add("select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from proxy_event_hits_log where policy_id in (100,101,102,105) group by policy_id"); + //4. TopN 命中策略 + list.add("select policy_id, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by policy_id, sub_action order by hits desc limit 200"); + list.add("select policy_id, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=$start_time and __time <$end_time group by policy_id, sub_action order by hits desc limit 200"); + //5. Proxy 操纵动作命中计数 + list.add("select sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by sub_action"); + list.add("select sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time group by sub_action"); + //6. Proxy 操纵动作命中趋势 + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') , sub_action limit 100\n"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, sub_action as action, sum(hits) as hits from proxy_event_hits_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s'), sub_action limit 100\n"); + //7. Proxy Pinning TIMESTAMP 计数 + list.add("select sum(hits) as hits, 'not_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 union all select sum(hits) as hits, 'pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 union all select sum(hits) as hits, 'maybe_pinning_num' as type from security_event_hits_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time and pinningst=2"); + list.add("select sum(hits) as hits, 'not_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 union all select sum(hits) as hits, 'pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 union all select sum(hits) as hits, 'maybe_pinning_num' as type from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=2"); + //8.Proxy Pinning计数趋势 + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'not_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=0 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=1 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as start_time, 'maybe_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and pinningst=2 group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'not_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=0 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=1 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, 'maybe_pinning_num' as type, sum(hits) as hits from security_event_hits_log where __time >= $start_time and __time < $end_time and pinningst=2 group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')"); + //三、 Traffics-带宽统计 + //1. Traffic IN/OUT 计数 + //Bytes + list.add("select sum(total_in_bytes) as traffic_in_bytes, sum(total_out_bytes) as traffic_out_bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time"); + list.add("select sum(total_in_bytes) as traffic_in_bytes, sum(total_out_bytes) as traffic_out_bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //Packets + list.add("select sum(total_in_packets) as traffic_in_packets, sum(total_out_packets) as traffic_out_packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time"); + list.add("select sum(total_in_packets) as traffic_in_packets, sum(total_out_packets) as traffic_out_packets from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //Sessions + list.add("select sum(new_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time"); + list.add("select sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //2. Traffic IN/OUT 带宽趋势 + //Bytes + list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_bytes' as type, sum(total_in_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_bytes' as type, sum(total_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_bytes' as type, sum(total_in_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_bytes' as type, sum(total_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')"); + //Packets + list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')"); + //Packets + list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_in_packets' as type, sum(total_in_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'traffic_out_packets' as type, sum(total_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')"); + //Sessions + list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')"); + //四、 Traffics-计数统计 + //1. 新建与活跃链接计数 + list.add("select sum(new_conn_num) as new_conn_num, sum(established_conn_num) as established_conn_num from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time"); + list.add("select sum(new_conn_num) as new_conn_num, sum(established_conn_num) as established_conn_num from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //2.新建与活跃链接计数趋势 + list.add("select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'established_conn_num' as type, sum(established_conn_num) as sessions from traffic_metrics_log where __time >= TIMESTAMP $start_time and __time < TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT30S'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'new_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, 'established_conn_num' as type, sum(established_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s')"); + //3. 安全策略动作计数 + //Bytes + list.add("select sum(default_in_bytes+default_out_bytes) as default_bytes, sum(allow_in_bytes+allow_out_bytes) as allow_bytes, sum(deny_in_bytes+deny_out_bytes) as deny_bytes, sum(monitor_in_bytes+monitor_out_bytes) as monitor_bytes, sum(intercept_in_bytes+intercept_out_bytes) as intercept_bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time < TIMESTAMP $end_time"); + list.add("select sum(default_in_bytes+default_out_bytes) as default_bytes, sum(allow_in_bytes+allow_out_bytes) as allow_bytes, sum(deny_in_bytes+deny_out_bytes) as deny_bytes, sum(monitor_in_bytes+monitor_out_bytes) as monitor_bytes, sum(intercept_in_bytes+intercept_out_bytes) as intercept_bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //Packets + list.add("select sum(default_in_packets+default_out_packets) as default_packets, sum(allow_in_packets+allow_in_packets) as allow_packets, sum(deny_in_packets+deny_out_packets) as deny_packets, sum(monitor_in_packets+monitor_out_packets) as monitor_packets, sum(intercept_in_packets+intercept_out_packets) as intercept_packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time\n"); + list.add("select sum(default_in_packets+default_out_packets) as default_packets, sum(allow_in_packets+allow_in_packets) as allow_packets, sum(deny_in_packets+deny_out_packets) as deny_packets, sum(monitor_in_packets+monitor_out_packets) as monitor_packets, sum(intercept_in_packets+intercept_out_packets) as intercept_packets from traffic_metrics_log where __time >= $start_time and __time < $end_time\n"); + //Sessions + list.add("select sum(default_conn_num) as default_sessions, sum(allow_conn_num) as allow_sessions, sum(deny_conn_num) as deny_sessions, sum(monitor_conn_num) as monitor_sessions, sum(intercept_conn_num) as intercept_sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time"); + list.add("select sum(default_conn_num) as default_sessions, sum(allow_conn_num) as allow_sessions, sum(deny_conn_num) as deny_sessions, sum(monitor_conn_num) as monitor_sessions, sum(intercept_conn_num) as intercept_sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time"); + //4. 安全策略动作趋势 + //Bytes + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_bytes' as type, sum(default_in_bytes+default_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_bytes' as type, sum(allow_in_bytes+allow_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_bytes' as type, sum(deny_in_bytes+deny_out_bytes) as bytes from traffic_metrics_log where __time >= TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_bytes' as type, sum(monitor_in_bytes+monitor_out_bytes) as bytes from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_bytes' as type, sum(intercept_in_bytes+intercept_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_bytes' as type, sum(default_in_bytes+default_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_bytes' as type, sum(allow_in_bytes+allow_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_bytes' as type, sum(deny_in_bytes+deny_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_bytes' as type, sum(monitor_in_bytes+monitor_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_bytes' as type, sum(intercept_in_bytes+intercept_out_bytes) as bytes from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')"); + //packets + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_packets' as type, sum(default_in_packets+default_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_packets' as type, sum(allow_in_packets+allow_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_packets' as type, sum(deny_in_packets+deny_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_packets' as type, sum(monitor_in_packets+monitor_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_packets' as type, sum(intercept_in_packets+intercept_out_packets) as packets from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_packets' as type, sum(default_in_packets+default_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_packets' as type, sum(allow_in_packets+allow_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_packets' as type, sum(deny_in_packets+deny_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_packets' as type, sum(monitor_in_packets+monitor_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_packets' as type, sum(intercept_in_packets+intercept_out_packets) as packets from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')"); + //sessions + list.add("select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'default_conn_num' as type, sum(default_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'allow_conn_num' as type, sum(allow_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'deny_conn_num' as type, sum(deny_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') \n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'monitor_conn_num' as type, sum(monitor_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')\n" + + "union all\n" + + "select TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss') as stat_time, 'intercept_conn_num' as type, sum(intercept_conn_num) as sessions from traffic_metrics_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by TIME_FORMAT(time_floor(__time,'PT5M'),'yyyy-MM-dd HH:mm:ss')"); + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'default_conn_num' as type, sum(default_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'allow_conn_num' as type, sum(allow_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'deny_conn_num' as type, sum(deny_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') \n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'monitor_conn_num' as type, sum(monitor_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')\n" + + "union all\n" + + "select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as stat_time, 'intercept_conn_num' as type, sum(intercept_conn_num) as sessions from traffic_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s')"); + //五. Traffics-TOPN统计 + //1. TopN 源IP + list.add("select source as client_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_internal_host_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=source group by source order by source desc limit 100"); + list.add("select source as client_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_internal_host_log where __time >= $start_time and __time < $end_time and order_by=source group by source order by source desc limit 100"); + //2. TopN 目的IP + list.add("select destination as server_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_external_host_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=destination group by destination order by destination desc limit 100"); + list.add("select destination as server_ip, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_external_host_log where __time >= $start_time and __time < $end_time and order_by=destination group by destination order by destination desc limit 100"); + //3. TopN 域名 + list.add("select domain, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_website_domain_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=domain group by domain order by domain desc limit 200"); + list.add("select domain, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_website_domain_log where __time >= $start_time and __time < $end_time and order_by=domain group by domain order by domain desc limit 200"); + //4.TopN 用户 + list.add("select subscriber_id, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_user_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time and order_by=subscriber_id group by subscriber_id order by subscriber_id desc limit 200"); + list.add("select subscriber_id, sum(session_num) as sessions, sum(c2s_byte_num) as sent_bytes, sum(s2c_byte_num) as received_bytes, sum(c2s_byte_num + s2c_byte_num) as bytes, sum(c2s_pkt_num) as sent_packets ,sum(s2c_pkt_num) as received_packets, sum(c2s_pkt_num+s2c_pkt_num) as packets from top_user_log where __time >= $start_time and __time < $end_time and order_by=subscriber_id group by subscriber_id order by subscriber_id desc limit 200"); + //六、命中URL统计 + list.add("select url,sum(session_num) as hits from top_urls_log where __time >=TIMESTAMP $start_time and __time <TIMESTAMP $end_time group by url order by hits desc limit 100"); + list.add("select url,sum(session_num) as hits from top_urls_log where __time >= $start_time and __time < $end_time group by url order by hits desc limit 100"); + + //国家客户端数量 + list.add("SELECT country, APPROX_COUNT_DISTINCT_DS_HLL(ip_object) as num FROM source_country_ip_num_log group by country order by num desc limit 200"); + //域名推荐 + list.add("SELECT domain, protocol_type, ip_num, session_num FROM top_domain_recommend_daily_log where __time = now()"); + //推荐URL + list.add("SELECT url, content_length, ip_num, session_num FROM top_website_urls_daily_log where __time = now()"); + //劫持客户端数量 + list.add("SELECT policy_id, APPROX_COUNT_DISTINCT_DS_HLL(isp) as num FROM proxy_event_hits_log where __time >= $start_time and __time < $end_time and policy_id=0 group by policy_id"); + //劫持客户端IP地域分布 + list.add("select ip, IP_TO_CITY(ip) as location, IP_TO_GEO(ip) as geo from (SELECT policy_id, arrayJoin(groupUniqArrayMerge(ip_list)) as ip FROM proxy_ip_info where stat_time >= $start_time and stat_time < $end_time and policy_id=0 group by policy_id)"); + //注入流量统计 + list.add("select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') as stat_time, sum(bytes) as bytes from injection_metrics_log where __time >= $start_time and __time < $end_time group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/30)*30),'%Y-%m-%d %H:%i:%s') limit 100"); + + return list; + } + + + /** + * 查询clickhouse数据库 + * + * @param query + * @return + */ + private List queryCKDynamic(String query) { + StringBuilder urlBuilder = new StringBuilder(clickHouseConfig.getQueryUrl()).append("/?"); + StringBuilder queryparamBuilder = new StringBuilder("user=") + .append(clickHouseConfig.getRealTimeUserName()).append("&") + .append("password=").append(clickHouseConfig.getRealTimePassword()).append("&") + .append("database=") + .append(clickHouseConfig.getDbName()).append("&") + .append("query=").append(query) + .append(" FORMAT JSON;"); + + log.info("Query Session and Events Query : {}", urlBuilder.toString() + Encodes.urlDecode(queryparamBuilder.toString())); + List<NameValuePair> values = URLEncodedUtils.parse(queryparamBuilder.toString(), Charset.forName("UTF-8")); + String s = urlBuilder.toString() + URLEncodedUtils.format(values, "utf-8"); + Map<String, String> resultMap = httpClientService.httpGet(s); + List result = new ArrayList<>(); + if (resultMap.get("status").equals(String.valueOf(HttpStatus.SC_OK))) { + Map<String, Object> maps = (Map<String, Object>) JsonMapper.fromJsonString(resultMap.get("result"), Map.class); + List data = (List) maps.get("data"); + Iterator iterator = data.iterator(); + while (iterator.hasNext()) { + Map next = (Map) iterator.next(); + result.add(String.valueOf(next.get("name"))); + } + return result; + } + + return null; + } + + + /** + * 查询druid数据库 + * + * @param query + * @return + */ + private List queryDruidDynamic(String query) { + //headers + HttpHeaders requestHeaders = new HttpHeaders(); + requestHeaders.setContentType(MediaType.APPLICATION_JSON); + //body + Map<String, Object> params = Maps.newHashMap(); + params.put("query", query); + Map<String, String> resultMap = httpClientService.httpPost(druidConfig.getQueryUrl(), JsonMapper.toJsonString(params)); + if (resultMap.get("status").equals(String.valueOf(HttpStatus.SC_OK))) { + List<String> list = new ArrayList<>(); + String result = resultMap.get("result"); + List body = (List) Json.parseJson(result); + Iterator iterator = body.iterator(); + while (iterator.hasNext()) { + Map next = (Map) iterator.next(); + String name = String.valueOf(next.get("name")); + list.add(name); + } + return list; + } + return null; + } + + + private void checkDruidSchema(String path) { + Map<String, JsonProperties> druidSchemaInfo = getSchemaInfo(path); + Set<String> name = druidSchemaInfo.keySet(); + Iterator<String> iterator = name.iterator(); + while (iterator.hasNext()) {//遍历表,以Avro为基础 + String tableName = iterator.next(); + Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(druidSchemaInfo.get(tableName)), Map.class); + if (resultMap == null) { + throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null); + } + //查数据库 + List resultSql = queryDruidDynamic("SELECT COLUMN_NAME as name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '" + tableName + "'"); + + List resultAvro = new ArrayList(); + + //schema为基础 + List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列 + Iterator<Map> fieldsAvro = fields.iterator(); + while (fieldsAvro.hasNext()) { + Map next = fieldsAvro.next(); + resultAvro.add(String.valueOf(next.get("name"))); + boolean b = resultSql.contains(next.get("name")); + if (!b) { + String str = String.format("%s.shcema中注册多:%s", tableName, next.get("name")); + errorDate.add(str); + } + } + + //Schema为基础 + Iterator iteratorSql = resultSql.iterator(); + while (iteratorSql.hasNext()) { + String next = String.valueOf(iteratorSql.next()); + boolean b = resultAvro.contains(next); + if (!b) { + String str = String.format("%s.avro中注册缺少:%s", tableName, next); + errorDate.add(str); + } + } + } + } + + + private void checkCKSchema(String path) { + Map<String, JsonProperties> ckSchemaInfo = getSchemaInfo(path); + Set<String> name = ckSchemaInfo.keySet(); + Iterator<String> iterator = name.iterator(); + while (iterator.hasNext()) {//遍历表,以Avro为基础 + String tableName = iterator.next(); + Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(ckSchemaInfo.get(tableName)), Map.class); + if (resultMap == null) { + throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null); + } + //查数据库 + List resultSql = queryCKDynamic("describe " + tableName); + if (StringUtil.isEmpty(resultSql)) { + String str = String.format("shcema中注册多:%s.avsc", tableName); + errorDate.add(str); + continue; + } + + List resultAvro = new ArrayList(); + + //avro为基础 + List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列 + Iterator<Map> fieldsAvro = fields.iterator(); + while (fieldsAvro.hasNext()) { + Map next = fieldsAvro.next(); + resultAvro.add(String.valueOf(next.get("name"))); + boolean b = resultSql.contains(next.get("name")); + if (!b) { + String str = String.format("%s.shcema中注册多:%s", tableName, next.get("name")); + errorDate.add(str); + } + } + + //db为基础 + Iterator iteratorSql = resultSql.iterator(); + while (iteratorSql.hasNext()) { + String next = String.valueOf(iteratorSql.next()); + boolean b = resultAvro.contains(next); + if (!b) { + String str = String.format("%s.avro中注册缺少:%s", tableName, next); + errorDate.add(str); + } + } + } + } + + + /** + * 获取avro文件 + * + * @param path + * @return + */ + private Map<String, JsonProperties> getSchemaInfo(String path) { + Map<String, JsonProperties> schema = new HashMap<>(); + String pathRoot = null; + try { + pathRoot = new File("").getCanonicalPath(); + File file = new File(pathRoot + path); + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + if (!f.getName().toLowerCase().endsWith(".avsc")) { + continue; + } + try { + Schema ckSchema = new Schema.Parser().parse(new File(f.getPath())); + schema.put(ckSchema.getName(), ckSchema); + } catch (Exception ex) { + log.error("遍历" + path + "路径下文件失败: " + f.getName(), ex); + String str = String.format("%s读取文件失败:%s", path, f.getName()); + errorDate.add(str); + } + } + } + } catch (Exception e) { + log.error("获取 项目路径/{}失败", path); + String str = String.format("获取项目路径%s失败", path); + errorDate.add(str); + } + return schema; + } + + + /** + * 核对avro文件 + * + * @param path 路径 + */ + private void checkSchema(String path) { + Map<String, JsonProperties> ckeckSchemaInfo = getSchemaInfo(path); + Set<String> name = ckeckSchemaInfo.keySet(); + Iterator<String> iterator = name.iterator(); + while (iterator.hasNext()) {//遍历表 + String tableName = iterator.next(); + Map resultMap = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(ckeckSchemaInfo.get(tableName)), Map.class); + if (resultMap == null) { + throw new BusinessException(ResultStatusEnum.FAIL.getCode(), "schema未注册此表: " + tableName, null); + } + List<Map> fields = (List<Map>) resultMap.get("fields");//遍历列 + Iterator<Map> fieldsIterator = fields.iterator(); + Map mapDoc = null; + while (fieldsIterator.hasNext()) { + Map next = fieldsIterator.next(); + try { + mapDoc = (Map) com.zdjizhi.utils.JsonMapper.fromJsonString(String.valueOf(next.get("doc")), Map.class); + } catch (Exception ex) { + log.error("该字段的 doc 为非JSON字符串,不需类型转换"); + String str = String.format("%s.avro中doc非json格式字段%s", tableName, next.get("name")); + errorDate.add(str); + } + if (mapDoc != null) { + next.put("doc", mapDoc); + } + if (StringUtil.isEmpty(mapDoc) && !StringUtil.isEmpty(next.get("doc"))) { + log.error("该字段的 doc 为非JSON字符串,不需类型转换"); + String str = String.format("%s.avro中doc非json格式字段%s", tableName, next.get("name")); + errorDate.add(str); + } + } + } + } +} |
