diff options
| author | wangwei <[email protected]> | 2020-12-08 22:58:28 +0800 |
|---|---|---|
| committer | wangwei <[email protected]> | 2020-12-08 22:58:28 +0800 |
| commit | 886eea35087d567abee452bc7e9a0f3dd606e764 (patch) | |
| tree | 7915b5e075d6e017a62e28308f10194d9c8ead17 | |
| parent | 60e8f06fc3d0d1f2366c181670a6daebba066262 (diff) | |
移除query-engine中的liveChart接口
6 files changed, 0 insertions, 645 deletions
diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/controller/NetworkMonitorController.java b/galaxy-query-engine/src/main/java/com/mesalab/network/controller/NetworkMonitorController.java deleted file mode 100644 index 2bd44fd..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/controller/NetworkMonitorController.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.mesalab.network.controller; - -import com.mesalab.common.base.BaseResult; -import com.mesalab.common.enums.ResultCodeEnum; -import com.mesalab.common.enums.ResultStatusEnum; -import com.mesalab.common.exception.BusinessException; -import com.mesalab.network.dsl.DSLObject; -import com.mesalab.network.dsl.DSLValidate; -import com.mesalab.network.service.NetworkMonitorService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.*; - -import javax.servlet.http.HttpServletRequest; - - -/** - * @author wangwei - * @version 1.0 - * @date 2020/6/30 10:38 上午 - */ -@Slf4j -@RestController -@RequestMapping(value = "/") -public class NetworkMonitorController { - - @Autowired - private DSLValidate dslValidate; - @Autowired - private NetworkMonitorService networkMonitorService; - private static final String PROTOCOL = "protocol"; - - @PostMapping(value = "/traffic/v1/", produces = "application/json") - public BaseResult trafficDistribution(HttpServletRequest request, @Validated @RequestBody DSLObject dslObject) { - log.info("流量分布接口, 参数: queryString is {},params is {}", request.getQueryString(), dslObject); - dslValidate.executeValidate(dslObject); - if (PROTOCOL.equalsIgnoreCase(request.getQueryString())) { - return networkMonitorService.getTrafficDistributedInfo(dslObject); - } else { - throw new BusinessException(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Support protocol only", null); - } - } -} diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLObject.java b/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLObject.java deleted file mode 100644 index 4bd98e4..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLObject.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.mesalab.network.dsl; - -import lombok.Data; - -import java.io.Serializable; -import java.util.List; - -/** - * @Date: 2020-09-18 09:48 - * @Author : wangwei - * @ClassName : DSLObject - * @Description : - */ -@Data -public class DSLObject implements Serializable { - - private String clientId; - private QueryBean query; - - @Data - public static class QueryBean { - private String queryType; - private String dataSource; - private Parameters parameters; - - @Data - public static class Parameters { - private String granularity; - private List<MatchBean> match; - private List<String> intervals; - - @Data - public static class MatchBean { - private String type; - private String fieldKey; - private List<String> fieldValues; - } - } - } -} diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLValidate.java b/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLValidate.java deleted file mode 100644 index 3f32eba..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/DSLValidate.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.mesalab.network.dsl; - -import com.mesalab.common.enums.ResultCodeEnum; -import com.mesalab.common.exception.BusinessException; -import com.mesalab.knowledge.enums.MatchEnum; -import com.zdjizhi.utils.StringUtil; -import org.apache.commons.lang.Validate; -import org.apache.commons.lang3.EnumUtils; -import org.apache.http.HttpStatus; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.util.List; -import java.util.regex.Pattern; - -/** - * @Date: 2020-09-17 18:10 - * @Author : wangwei - * @ClassName : DSLValidate - * @Description : DSL格式校验 - */ -@Component -public class DSLValidate { - - public static Pattern periodOfPT = Pattern.compile("PT(\\d+)[SMH]", Pattern.CASE_INSENSITIVE); - public static Pattern periodOfP = Pattern.compile("P(\\d+)[DWMY]", Pattern.CASE_INSENSITIVE); - public static Pattern strFormatDateTime = Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", Pattern.CASE_INSENSITIVE); - - - public void executeValidate(DSLObject dslObject) throws BusinessException { - if (StringUtil.isEmpty(dslObject)) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), - "DSLObject is invalid"); - } - if (StringUtil.isEmpty(dslObject.getQuery())) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), - "DSLObject.query is invalid"); - } - DSLObject.QueryBean.Parameters parameters = dslObject.getQuery().getParameters(); - if (StringUtil.isEmpty(parameters)) { - return; - } - if (!isValidGranularity(parameters.getGranularity())) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), - "query.Granularity value is invalid"); - } - if (!isValidMatch(parameters.getMatch())) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), - "query.Match type is invalid"); - } - if (!isValidIntervals(parameters.getIntervals())) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), - "query.Intervals is invalid"); - } - } - - /** - * 时间粒度校验, 遵循ISO8601 durations 定义 - * - * @param granularity - * @return - */ - private boolean isValidGranularity(String granularity) { - if (StringUtil.isBlank(granularity)) { - return true; - } - if (periodOfP.matcher(granularity).find() || periodOfPT.matcher(granularity).find()) { - return true; - } - return false; - } - - /** - * 校验match: - * 1.是否属于{@link MatchEnum}限定类型 - * 2.不能以*开始、或$结尾 - * - * @param matchList - * @return - */ - private boolean isValidMatch(List<DSLObject.QueryBean.Parameters.MatchBean> matchList) { - if (CollectionUtils.isEmpty(matchList)) { - return true; - } - for (DSLObject.QueryBean.Parameters.MatchBean matchBean : matchList) { - Validate.isTrue(EnumUtils.isValidEnum(MatchEnum.class, StringUtil.upperCase(matchBean.getType())), "match type is illegal"); - for (String fieldValue : matchBean.getFieldValues()) { - if (fieldValue.startsWith("*") || fieldValue.endsWith("$")) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Match fieldValues cannot startWith '*' or endWith '$'"); - } - } - } - return true; - } - - /** - * 校验Interval: - * 1.目前只支持between类型: ["2020-01-01 00:00:00/2020-01-02 00:00:00"] - * 2.时间区间必须是 开始时间 < 结束时间 - * - * @param intervals - */ - private boolean isValidIntervals(List<String> intervals) { - if (CollectionUtils.isEmpty(intervals)) { - return true; - } - if (intervals.size() != 1) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error"); - } - String[] split = intervals.get(0).split("/"); - if (split.length != 2) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error"); - } - for (String dateTimeStr : split) { - if (!strFormatDateTime.matcher(dateTimeStr).find()) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Time format should be: yyyy-MM-dd HH:mm:ss"); - } - } - try { - DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); - if (dateTimeFormatter.parseMillis(split[1]) < dateTimeFormatter.parseMillis(split[0])) { - throw new RuntimeException("Intervals value should be [start, end]"); - } - } catch (Exception e) { - throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), e.getMessage()); - } - return true; - } - -} diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/Parameter.java b/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/Parameter.java deleted file mode 100644 index 30a1a87..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/dsl/Parameter.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.mesalab.network.dsl; - -/** - * @author wangwei - * @description: - * @date 2020/9/18 6:31 下午 - */ -public class Parameter { - -} diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/service/NetworkMonitorService.java b/galaxy-query-engine/src/main/java/com/mesalab/network/service/NetworkMonitorService.java deleted file mode 100644 index 6455a4d..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/service/NetworkMonitorService.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.mesalab.network.service; - -import com.mesalab.common.base.BaseResult; -import com.mesalab.network.dsl.DSLObject; - -/** - * @author wangwei - * @version 1.0 - * @date 2020/6/30 4:20 下午 - */ - - -public interface NetworkMonitorService { - - BaseResult getTrafficDistributedInfo(DSLObject dslObject); - -} diff --git a/galaxy-query-engine/src/main/java/com/mesalab/network/service/impl/NetworkMonitorServiceImpl.java b/galaxy-query-engine/src/main/java/com/mesalab/network/service/impl/NetworkMonitorServiceImpl.java deleted file mode 100644 index 1792837..0000000 --- a/galaxy-query-engine/src/main/java/com/mesalab/network/service/impl/NetworkMonitorServiceImpl.java +++ /dev/null @@ -1,402 +0,0 @@ -package com.mesalab.network.service.impl; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.mesalab.common.base.BaseResult; -import com.mesalab.common.base.BaseResultGenerator; -import com.mesalab.common.enums.ResultCodeEnum; -import com.mesalab.common.enums.ResultStatusEnum; -import com.mesalab.common.exception.BusinessException; -import com.mesalab.common.utils.JsonMapper; -import com.mesalab.common.utils.TreeUtils; -import com.mesalab.knowledge.common.utils.HttpConfig; -import com.mesalab.knowledge.enums.MatchEnum; -import com.mesalab.network.dsl.DSLObject; -import com.mesalab.network.service.NetworkMonitorService; -import com.mesalab.qgw.model.protocol.ProtocolTree; -import com.mesalab.qgw.service.impl.HttpClientService; -import com.zdjizhi.utils.StringUtil; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpStatus; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -import javax.annotation.Nullable; -import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.URLEncoder; -import java.net.UnknownHostException; -import java.util.*; -import java.util.stream.Collectors; - -/** - * @author wangwei - * @version 1.0 - * @date 2020/6/30 4:24 下午 - */ -@Slf4j -@Service("networkMonitorService") -public class NetworkMonitorServiceImpl implements NetworkMonitorService { - @Autowired - HttpClientService httpClientService; - @Autowired - HttpConfig httpConfig; - @Value("${server.port}") - private int serverPort; - private static String hostAddress; - - private static final String PROTOCOL_NODE = "Protocols/"; - - static { - try { - hostAddress = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - log.error("get LocalHost error: ", e); - } - } - - @Override - public BaseResult getTrafficDistributedInfo(DSLObject dslObject) { - BaseResult baseResult; - String sql; - if (QueryType.PROTOCOL_TREE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { - sql = generateProtocolTreeSql(dslObject.getQuery()); - baseResult = getResultProtocolTree(sql); - } else if (QueryType.PROTOCOL_DATA_RATE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { - sql = generateProtocolDataRateSql(dslObject.getQuery()); - baseResult = getProtocolDataRateSummary(sql); - } else if (QueryType.NETWORK_OVERVIEW_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { - sql = generateOverviewSummarySql(dslObject.getQuery()); - Map<String, String> dataRateResult = executeQuery(sql); - String statSql = generateStatSql(dslObject.getQuery()); - Map<String, String> statResult = executeQuery(statSql); - baseResult = buildNetworkOverviewResult(dataRateResult, statResult); - } else { - baseResult = BaseResultGenerator.failure(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "No match queryType"); - } - if (!baseResult.isSuccess()) { - throw new BusinessException(baseResult.getStatus(), baseResult.getCode(), baseResult.getMessage(), null); - } - return baseResult; - } - - private BaseResult buildNetworkOverviewResult(Map<String, String> dataRateResult, Map<String, String> currentSessionResult) { - BaseResult baseResult; - if (String.valueOf(HttpStatus.SC_OK).equals(currentSessionResult.get("status")) - && String.valueOf(HttpStatus.SC_OK).equals(dataRateResult.get("status"))) { - Map statDataResult = (Map) JsonMapper.fromJsonString(currentSessionResult.get("result"), Map.class); - List<Map> statData = (List<Map>) statDataResult.get("data"); - Map summaryDataResult = (Map) JsonMapper.fromJsonString(dataRateResult.get("result"), Map.class); - List<Map> summaryData = (List<Map>) summaryDataResult.get("data"); - - Map<String, Object> data = new HashMap<>(16); - if (!statData.isEmpty()) { - data.putAll(statData.get(0)); - } - if (statData.size() >= 2) { - data.put("current_sessions", statData.get(1).get("total_bytes")); - } - if (!summaryData.isEmpty()) { - data.putAll(summaryData.get(0)); - } - long uniqClientIp = Long.parseLong(String.valueOf(data.get("uniq_client_ip") == null ? 0 : data.get("uniq_client_ip"))); - long totalSessions = Long.parseLong(String.valueOf(data.get("total_sessions") == null ? 0 : data.get("total_sessions"))); - long summaryTotalSessions = Long.parseLong(String.valueOf(data.get("summarySessions") == null ? 0 : data.get("summarySessions"))); - long summaryTotalBytes = Long.parseLong(String.valueOf(data.get("summaryTotalBytes") == null ? 0 : data.get("summaryTotalBytes"))); - long summaryTotalPackets = Long.parseLong(String.valueOf(data.get("summaryTotalPackets") == null ? 0 : data.get("summaryTotalPackets"))); - long currentSessions = Long.parseLong(String.valueOf(data.get("current_sessions") == null ? 0 : data.get("current_sessions"))); - long dataRate = Long.parseLong(String.valueOf(data.get("data_rate") == null ? 0 : data.get("data_rate"))); - long totalBytes = Long.parseLong(String.valueOf(data.get("total_bytes") == null ? 0 : data.get("total_bytes"))); - long totalUncategorizedBytes = Long.parseLong(String.valueOf(data.get("total_uncategorized_bytes") == null ? 0 : data.get("total_uncategorized_bytes"))); - double totalUncategorizedPercent = summaryTotalBytes == 0 ? 0 : totalUncategorizedBytes * 1.0 / summaryTotalBytes; - long oneSidedConnections = Long.parseLong(String.valueOf(data.get("one_sided_connections") == null ? 0 : data.get("one_sided_connections"))); - double oneSidedPercent = summaryTotalSessions == 0 ? 0 : oneSidedConnections * 1.0 / summaryTotalSessions; - long sequenceGapLossBytes = Long.parseLong(String.valueOf(data.get("sequence_gap_loss_bytes") == null ? 0 : data.get("sequence_gap_loss_bytes"))); - double sequenceGapLossBytesPercent = summaryTotalBytes == 0 ? 0 : sequenceGapLossBytes * 1.0 / summaryTotalBytes; - long fragmentationPackets = Long.parseLong(String.valueOf(data.get("fragmentation_packets") == null ? 0 : data.get("fragmentation_packets"))); - double fragmentationPacketsPercent = summaryTotalPackets == 0 ? 0 : fragmentationPackets * 1.0 / summaryTotalPackets; - - List result = new ArrayList<>(); - Map resultMap = new LinkedHashMap<>(); - resultMap.put("uniq_client_ip", uniqClientIp); - resultMap.put("total_sessions", totalSessions); - resultMap.put("current_sessions", currentSessions); - resultMap.put("data_rate", dataRate); - resultMap.put("total_bytes", totalBytes); - resultMap.put("total_uncategorized_bytes", totalUncategorizedBytes); - resultMap.put("total_uncategorized_percent", Double.parseDouble(String.format("%.4f", totalUncategorizedPercent))); - resultMap.put("one_sided_connections", oneSidedConnections); - resultMap.put("one_sided_percent", Double.parseDouble(String.format("%.4f", oneSidedPercent))); - resultMap.put("sequence_gap_loss_bytes", sequenceGapLossBytes); - resultMap.put("sequence_gap_loss_percent", Double.parseDouble(String.format("%.4f", sequenceGapLossBytesPercent))); - resultMap.put("fragmentation_packets", fragmentationPackets); - resultMap.put("fragmentation_percent", Double.parseDouble(String.format("%.4f", fragmentationPacketsPercent))); - result.add(resultMap); - - Map statistics = (Map) summaryDataResult.get("statistics"); - Map statStatistics = (Map) statDataResult.get("statistics"); - for (Object key : statistics.keySet()) { - if ("result_rows".equals(key.toString())) { - continue; - } - statistics.put(key, Long.parseLong(statistics.get(key).toString()) + Long.parseLong(statStatistics.get(key).toString())); - } - - baseResult = BaseResultGenerator.success("ok", result, statistics); - } else { - baseResult = BaseResultGenerator.error("dataRate result: " + dataRateResult.toString() + ";\n currentSession result is: " + currentSessionResult); - } - return baseResult; - } - - private BaseResult getProtocolDataRateSummary(String sql) { - BaseResult baseResult; - Map<String, String> result = executeQuery(sql); - if (String.valueOf(HttpStatus.SC_OK).equals(result.get("status"))) { - Map resultMap = (Map) JsonMapper.fromJsonString(result.get("result"), Map.class); - List<Map> data = (List<Map>) resultMap.get("data"); - data.forEach(o -> { - String[] protocolIds = String.valueOf(o.get("type")).split("/"); - String protocolId = protocolIds[protocolIds.length - 1]; - o.put("type", protocolId); - }); - baseResult = BaseResultGenerator.success("ok", data, (Map) resultMap.get("statistics")); - } else { - baseResult = BaseResultGenerator.error(result.get("result")); - } - return baseResult; - } - - private BaseResult getResultProtocolTree(String sql) { - BaseResult baseResult; - Map<String, String> result = executeQuery(sql); - if (String.valueOf(HttpStatus.SC_OK).equals(result.get("status"))) { - Map resultMap = (Map) JsonMapper.fromJsonString(result.get("result"), Map.class); - List<ProtocolTree> listProtocol = getListProtocol(getProtocolTrees((List<Map>) resultMap.get("data"))); - baseResult = BaseResultGenerator.success("ok", listProtocol, (Map) resultMap.get("statistics")); - } else { - baseResult = BaseResultGenerator.error(result.get("result")); - } - return baseResult; - } - - private String generateStatSql(DSLObject.QueryBean queryParam) { - String whereOfTime = getWhereOfTime(queryParam.getParameters()); - String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); - whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; - String[] intervals = getIntervals(queryParam.getParameters().getIntervals()); - return String.format("(SELECT SUM(c2s_byte_num + s2c_byte_num) as total_bytes, SUM(sessions) as total_sessions, (SUM(c2s_byte_num + s2c_byte_num) * 8)/((TIMESTAMP_TO_MILLIS(TIMESTAMP '%s')-TIMESTAMP_TO_MILLIS(TIMESTAMP '%s'))/1000) AS data_rate " + - "FROM traffic_protocol_stat_log " + - "WHERE %s %s" + - "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) LIMIT 1) " + - "UNION ALL " + - "( SELECT SUM(sessions), 0, 0 " + - "FROM traffic_protocol_stat_log " + - "WHERE %s %s" + - "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) GROUP BY __time ORDER BY __time DESC LIMIT 1 ) ", - intervals[1], intervals[0], whereOfTime, whereOfExactly, - whereOfTime, whereOfExactly); - } - - private String generateProtocolDataRateSql(DSLObject.QueryBean queryParam) { - String whereOfTime = getWhereOfTime(queryParam.getParameters()); - String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); - whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; - String protocolStr = null; - List<DSLObject.QueryBean.Parameters.MatchBean> match = queryParam.getParameters().getMatch(); - for (DSLObject.QueryBean.Parameters.MatchBean item : match) { - if (MatchEnum.PREFIX.getType().equals(item.getType())) { - String[] split = item.getFieldValues().get(0).split(","); - protocolStr = split[0].replaceFirst(PROTOCOL_NODE, ""); - } - } - return String.format("(" + - "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " + - "from %s " + - "where %s %s and protocol_id = '%s' " + - "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " + - "order by stat_time asc" + - ") " + - "union all " + - "(" + - "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " + - "from %s " + - "where %s %s and protocol_id like CONCAT('%s','/%s') and LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) + 1 + %s " + - "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " + - "order by stat_time asc) ", - queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, queryParam.getParameters().getGranularity(), - queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, "%", - protocolStr.length() - protocolStr.replaceAll("/", "").length(), queryParam.getParameters().getGranularity()); - - } - - private String generateOverviewSummarySql(DSLObject.QueryBean queryParam) { - String whereOfTime = getWhereOfTime(queryParam.getParameters()); - String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); - - return String.format("SELECT APPROX_COUNT_DISTINCT_DS_HLL(ip_object) AS uniq_client_ip, " + - "SUM(one_sided_connections) AS one_sided_connections, " + - "SUM(uncategorized_bytes) AS total_uncategorized_bytes, " + - "SUM(fragmentation_packets) AS fragmentation_packets, " + - "SUM(sequence_gap_loss) AS sequence_gap_loss_bytes, " + - "SUM(s2c_byte_num+c2s_byte_num) AS summaryTotalBytes, " + - "SUM(s2c_pkt_num+c2s_pkt_num) AS summaryTotalPackets, " + - "SUM(sessions) AS summarySessions " + - "FROM %s " + - "WHERE %s" + - " %s LIMIT 1 ", - queryParam.getDataSource(), whereOfTime, StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly); - } - - private String getWhereOfTime(DSLObject.QueryBean.Parameters parameters) { - if (CollectionUtils.isEmpty(parameters.getIntervals())) { - return StringUtil.EMPTY; - } - StringBuffer whereOfTime = new StringBuffer(); - String[] intervals = getIntervals(parameters.getIntervals()); - whereOfTime.append("__time >= TIMESTAMP '").append(intervals[0]).append("' AND __time < TIMESTAMP '").append(intervals[1]).append("'"); - return whereOfTime.toString(); - } - - private String[] getIntervals(List<String> intervals) { - return intervals.get(0).split("/"); - } - - private String getWhereOfExactly(DSLObject.QueryBean.Parameters parameters) { - if (CollectionUtils.isEmpty(parameters.getMatch())) { - return StringUtil.EMPTY; - } - StringBuffer whereOfExactly = new StringBuffer(); - List<DSLObject.QueryBean.Parameters.MatchBean> match = parameters.getMatch(); - for (DSLObject.QueryBean.Parameters.MatchBean item : match) { - if (MatchEnum.EXACTLY.getType().equals(item.getType()) && !CollectionUtils.isEmpty(item.getFieldValues())) { - whereOfExactly.append(item.getFieldKey()).append(" IN ('").append(String.join("','", item.getFieldValues())).append("')"); - } - } - return whereOfExactly.toString(); - } - - private String generateProtocolTreeSql(DSLObject.QueryBean queryParam) { - String whereOfTime = getWhereOfTime(queryParam.getParameters()); - String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); - whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; - return String.format("SELECT protocol_id, SUM(sessions) as sessions,SUM(c2s_byte_num) as c2s_byte_num, SUM(c2s_pkt_num) as c2s_pkt_num, SUM(s2c_byte_num) as s2c_byte_num, SUM(s2c_pkt_num) as s2c_pkt_num " + - "FROM %s " + - "WHERE %s %s" + - "GROUP BY protocol_id ", - queryParam.getDataSource(), whereOfTime, whereOfExactly); - } - - private List<ProtocolTree> getProtocolTrees(List<Map> data) { - Iterable<ProtocolTree> resultConcat = Iterables.concat(new ArrayList<>()); - for (Map datum : data) { - String s = JsonMapper.toJsonString(datum); - List<ProtocolTree> nodes = parserRawData(s); - resultConcat = Iterables.concat(resultConcat, nodes); - } - List<ProtocolTree> listNodes = Lists.newArrayList(resultConcat); - ProtocolTree root = new ProtocolTree("Protocols", "Protocols", null); - List<ProtocolTree> roots = listNodes.stream().filter(o -> StringUtil.isBlank(o.getParentId())).collect(Collectors.toList()); - roots.forEach(item -> { - root.setSentBytes(root.getSentBytes() + item.getSentBytes()); - root.setReceivedBytes(root.getReceivedBytes() + item.getReceivedBytes()); - }); - - listNodes.forEach(item -> { - item.setId(PROTOCOL_NODE + item.getId()); - }); - - listNodes.add(root); - return listNodes; - } - - private Map<String, String> executeQuery(String sql) { - try { - sql = URLEncoder.encode(sql, "utf-8").replaceAll("\\+", "%20"); - } catch (UnsupportedEncodingException e) { - hostAddress = "127.0.0.1"; - log.error("sql Encode error: ", e); - } - String url = "http://" + hostAddress + ":" + serverPort + "/?query="; - int socketTimeOut = httpConfig.getServerResponseTimeOut(); - return httpClientService.httpGet(url + sql, socketTimeOut); - } - - - private List<ProtocolTree> parserRawData(String jsonString) { - List<ProtocolTree> lists = Lists.newArrayList(); - Map<String, Object> results = (Map<String, Object>) JsonMapper.fromJsonString(jsonString, Map.class); - long sessions = Long.parseLong(results.get("sessions").toString()); - long sentBytes = Long.parseLong(results.get("c2s_byte_num").toString()); - long receivedBytes = Long.parseLong(results.get("s2c_byte_num").toString()); - long sentPackets = Long.parseLong(results.get("c2s_pkt_num").toString()); - long receivedPackets = Long.parseLong(results.get("s2c_pkt_num").toString()); - String protocolId = results.get("protocol_id").toString(); - List<String> protocolList = Lists.newArrayList(protocolId.split("/")); - ProtocolTree protocolTree = new ProtocolTree(protocolId, protocolList.size() <= 0 ? null : protocolList.get(protocolList.size() - 1), null); - protocolTree.setSentBytes(sentBytes); - protocolTree.setReceivedBytes(receivedBytes); - protocolTree.getMetrics().put("sentPackets", sentPackets); - protocolTree.getMetrics().put("receivedPackets", receivedPackets); - protocolTree.getMetrics().put("sessions", sessions); - lists.add(protocolTree); - return lists; - } - - private List<ProtocolTree> getListProtocol(List<ProtocolTree> listNodes) { - Map<String, List<ProtocolTree>> resultMap = listNodes.stream().collect(Collectors.groupingBy(ProtocolTree::getId)); - Map<String, Object> mergeResultMap = - Maps.transformValues(resultMap, new com.google.common.base.Function<List<ProtocolTree>, Object>() { - @Nullable - @Override - public Object apply(@Nullable List<ProtocolTree> input) { - if (StringUtil.isEmpty(input)) { - return null; - } - if (input.size() == 1) { - return input.get(0); - } - ProtocolTree firstProtocolTree = input.get(0); - ProtocolTree mergeProtocolTree = new ProtocolTree(firstProtocolTree.getId(), firstProtocolTree.getName(), - firstProtocolTree.getParentId(), 0, 0); - mergeProtocolTree.setMetrics(Maps.newLinkedHashMap()); - - for (ProtocolTree protocolTree : input) { - mergeProtocolTree.setSentBytes(mergeProtocolTree.getSentBytes() + protocolTree.getSentBytes()); - mergeProtocolTree.setReceivedBytes(mergeProtocolTree.getReceivedBytes() + protocolTree.getReceivedBytes()); - for (String key : protocolTree.getMetrics().keySet()) { - if (StringUtil.isEmpty(mergeProtocolTree.getMetrics().get(key))) { - mergeProtocolTree.getMetrics().put(key, protocolTree.getMetrics().get(key)); - } else { - long sumValue = Long.parseLong(mergeProtocolTree.getMetrics().get(key).toString()) - + Long.parseLong(protocolTree.getMetrics().get(key).toString()); - mergeProtocolTree.getMetrics().put(key, sumValue); - } - } - - } - - return mergeProtocolTree; - } - }); - List<ProtocolTree> mergeNodes = new ArrayList(mergeResultMap.values()); - return TreeUtils.mergeTree(mergeNodes, ProtocolTree::getId, ProtocolTree::getParentId, ProtocolTree::setChildrens); - } - - enum QueryType { - - PROTOCOL_TREE_SUMMARY("protocolTreeSummary"), PROTOCOL_DATA_RATE_SUMMARY("protocolDataRateSummary"), NETWORK_OVERVIEW_SUMMARY("networkOverviewSummary"); - private final String value; - - QueryType(String value) { - this.value = value; - } - - public String getValue() { - return value; - } - } -} |
