summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangwei <[email protected]>2020-12-08 22:52:42 +0800
committerwangwei <[email protected]>2020-12-08 22:52:42 +0800
commit60e8f06fc3d0d1f2366c181670a6daebba066262 (patch)
tree71fbe47256e9e4212516a79ff1fba6ff7b36c82b
parente74d8cff698b21a9ef9648b9ceb56a8c97ae41e8 (diff)
traffic接口: liveChart 迁移至business-api
-rw-r--r--galaxy-business-api/pom.xml10
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java45
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java100
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java48
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java40
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java132
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java10
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java68
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java18
-rw-r--r--galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java371
-rw-r--r--galaxy-gateway/config/application.yml4
11 files changed, 846 insertions, 0 deletions
diff --git a/galaxy-business-api/pom.xml b/galaxy-business-api/pom.xml
index e9d88d3..1bc0040 100644
--- a/galaxy-business-api/pom.xml
+++ b/galaxy-business-api/pom.xml
@@ -62,6 +62,16 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java
new file mode 100644
index 0000000..f974205
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java
@@ -0,0 +1,45 @@
+package com.mesalab.api.modules.network;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * 匹配查询
+ * <p>
+ * type:匹配方式,["exactly"|"prefix"|"suffix"|"substring"|"regex"]
+ * <p>
+ * exactly:完全匹配
+ * prefix:前缀匹配
+ * suffix:后缀匹配
+ * substring:子串匹配
+ * regex:正则符号匹配
+ * fieldKey:属性名称,
+ * <p>
+ * fieldValues:属性值
+ * <p>
+ * 注:当type=regex时
+ * <p>
+ * fieldValues值匹配方式符合下列规则,数组内多个值为“或”的关系
+ * <p>
+ * 匹配方式转义
+ * <p>
+ * 以*结尾,不以*或者$开始,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示左匹配(前缀匹配),例如aaaa*bbb$ccc*$*
+ * 以*开始,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示右匹配(后缀匹配),例如*$*aaa*bbb$ccc
+ * 以*开始,以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示子串匹配,例如*aaa$bbb*ccc*
+ * 不以*或者$开头,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示子串匹配,例如aaa*bbb$ccc*$
+ * 以$开头,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示完整匹配,例如$aaa*bbb$或者$aaa*$bbb
+ */
+@Getter
+@AllArgsConstructor
+public enum MatchEnum {
+
+ EXACTLY("exactly", "\"{0}\""),
+ PREFIX("prefix", "\"{0}%\""),
+ SUFFIX("suffix", "\"%{0}\""),
+ SUBSTRING("substring", "\"%{0}%\""),
+ REGEX("regex", "\"{0}\"");
+
+ private final String type;
+ private final String matchExp;
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java
new file mode 100644
index 0000000..37aaaab
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java
@@ -0,0 +1,100 @@
+package com.mesalab.api.modules.network;
+
+
+import com.zdjizhi.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TreeUtils {
+
+
+ /**
+ * 遍历树结构
+ * @param root 节点树
+ * @param getChildrenNode 获取当前节点的子节点列表;函数,接收当前节点对象
+ * @param behavior 当前节点操作函数;定义遍历当前节点的操作行为
+ * @param <T> 树的节点对象
+ */
+ public static <T> void traversalTree(List<T> root, Function<T, List<T>> getChildrenNode,
+ Consumer<T> behavior) {
+
+ Stack<T> stack = new Stack<>();
+ root.forEach(stack :: push);
+ while (!stack.isEmpty()) {
+ T o = stack.pop();
+ behavior.accept(o);
+ List<T> childrens = getChildrenNode.apply(o);
+ if (StringUtil.isNotEmpty(childrens)) {
+ childrens.forEach(stack :: push);
+ }
+ }
+
+ }
+
+ /**
+ * 平铺树结构
+ * @param root 节点树
+ * @param getChildrenNode 获取当前节点的子节点列表;函数,接收当前节点对象
+ * @param <T> 树的节点对象
+ * @return 平铺后的节点结构
+ */
+ public static <T> List<T> flatTree(List<T> root,
+ Function<T, List<T>> getChildrenNode) {
+ List<T> list = new ArrayList<>();
+ traversalTree(root, getChildrenNode, list :: add);
+ return list;
+ }
+
+
+
+
+
+ /**
+ * 聚合为树结构
+ * @param list 节点列表数据
+ * @param loadKey 节点的ID;函数,接收一个节点获取唯一ID
+ * @param loadParentKey 节点的父亲ID;函数,接收一个节点获取父亲ID
+ * @param write 节点Children写入函数;函数,接收当前节点和子节点列表,然后将子节点项写入当前节点。
+ * @param <T> 当前节点对象
+ * @param <R> 请求节点返回的属性信息
+ * @return 树结构列表
+ */
+ public static <T, R> List<T> mergeTree(List<T> list,
+ Function<T, R> loadKey,
+ Function<T, R> loadParentKey,
+ BiConsumer<T, List<T>> write) {
+ List<T> root = list.stream().filter(o -> StringUtil.isEmpty(loadParentKey.apply(o))).collect(Collectors.toList());
+ Stack<T> stack = new Stack<T>();
+ root.forEach(stack::push);
+ while (!stack.isEmpty()) {
+ T o = stack.pop();
+ R key = loadKey.apply(o);
+ List<T> childrens = list.stream().filter(k -> key.equals(loadParentKey.apply(k))).collect(Collectors.toList());
+ write.accept(o, childrens);
+ if (childrens.size() > 0) {
+ childrens.forEach(stack :: push);
+ }
+
+ }
+
+ return root;
+ }
+
+
+
+
+
+
+
+
+
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java
new file mode 100644
index 0000000..9074bbc
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java
@@ -0,0 +1,48 @@
+package com.mesalab.api.modules.network.controller;
+
+import com.mesalab.api.common.base.BaseResult;
+import com.mesalab.api.common.base.BaseResultGenerator;
+import com.mesalab.api.common.enums.ResultCodeEnum;
+import com.mesalab.api.common.enums.ResultStatusEnum;
+import com.mesalab.api.common.exception.BusinessException;
+import com.mesalab.api.modules.network.dsl.DSLObject;
+import com.mesalab.api.modules.network.dsl.DSLValidate;
+import com.mesalab.api.modules.network.service.NetworkMonitorService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.client.RestTemplate;
+
+import javax.servlet.http.HttpServletRequest;
+
+
+/**
+ * @author wangwei
+ * @version 1.0
+ * @date 2020/6/30 10:38 上午
+ */
+@Slf4j
+@RestController
+@RequestMapping(value = "/")
+public class NetworkMonitorController {
+
+ @Autowired
+ private DSLValidate dslValidate;
+ @Autowired
+ private NetworkMonitorService networkMonitorService;
+ @Autowired
+ RestTemplate restTemplate;
+ private static final String PROTOCOL = "protocol";
+
+ @PostMapping(value = "/traffic/v1/", produces = "application/json")
+ public BaseResult trafficDistribution(HttpServletRequest request, @Validated @RequestBody DSLObject dslObject) {
+ log.info("流量分布接口, 参数: queryString is {},params is {}", request.getQueryString(), dslObject);
+ dslValidate.executeValidate(dslObject);
+ if (PROTOCOL.equalsIgnoreCase(request.getQueryString())) {
+ return networkMonitorService.getTrafficDistributedInfo(dslObject);
+ } else {
+ throw new BusinessException(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Support protocol only", null);
+ }
+ }
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java
new file mode 100644
index 0000000..a1799e2
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java
@@ -0,0 +1,40 @@
+package com.mesalab.api.modules.network.dsl;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @Date: 2020-09-18 09:48
+ * @Author : wangwei
+ * @ClassName : DSLObject
+ * @Description :
+ */
+@Data
+public class DSLObject implements Serializable {
+
+ private String clientId;
+ private QueryBean query;
+
+ @Data
+ public static class QueryBean {
+ private String queryType;
+ private String dataSource;
+ private Parameters parameters;
+
+ @Data
+ public static class Parameters {
+ private String granularity;
+ private List<MatchBean> match;
+ private List<String> intervals;
+
+ @Data
+ public static class MatchBean {
+ private String type;
+ private String fieldKey;
+ private List<String> fieldValues;
+ }
+ }
+ }
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java
new file mode 100644
index 0000000..af11539
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java
@@ -0,0 +1,132 @@
+package com.mesalab.api.modules.network.dsl;
+
+import com.mesalab.api.common.enums.ResultCodeEnum;
+import com.mesalab.api.common.exception.BusinessException;
+import com.mesalab.api.modules.network.MatchEnum;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.http.HttpStatus;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * @Date: 2020-09-17 18:10
+ * @Author : wangwei
+ * @ClassName : DSLValidate
+ * @Description : DSL格式校验
+ */
+@Component
+public class DSLValidate {
+
+ public static Pattern periodOfPT = Pattern.compile("PT(\\d+)[SMH]", Pattern.CASE_INSENSITIVE);
+ public static Pattern periodOfP = Pattern.compile("P(\\d+)[DWMY]", Pattern.CASE_INSENSITIVE);
+ public static Pattern strFormatDateTime = Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", Pattern.CASE_INSENSITIVE);
+
+
+ public void executeValidate(DSLObject dslObject) throws BusinessException {
+ if (StringUtil.isEmpty(dslObject)) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(),
+ "DSLObject is invalid");
+ }
+ if (StringUtil.isEmpty(dslObject.getQuery())) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(),
+ "DSLObject.query is invalid");
+ }
+ DSLObject.QueryBean.Parameters parameters = dslObject.getQuery().getParameters();
+ if (StringUtil.isEmpty(parameters)) {
+ return;
+ }
+ if (!isValidGranularity(parameters.getGranularity())) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(),
+ "query.Granularity value is invalid");
+ }
+ if (!isValidMatch(parameters.getMatch())) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(),
+ "query.Match type is invalid");
+ }
+ if (!isValidIntervals(parameters.getIntervals())) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(),
+ "query.Intervals is invalid");
+ }
+ }
+
+ /**
+ * 时间粒度校验, 遵循ISO8601 durations 定义
+ *
+ * @param granularity
+ * @return
+ */
+ private boolean isValidGranularity(String granularity) {
+ if (StringUtil.isBlank(granularity)) {
+ return true;
+ }
+ if (periodOfP.matcher(granularity).find() || periodOfPT.matcher(granularity).find()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * 校验match:
+ * 1.是否属于{@link MatchEnum}限定类型
+ * 2.不能以*开始、或$结尾
+ *
+ * @param matchList
+ * @return
+ */
+ private boolean isValidMatch(List<DSLObject.QueryBean.Parameters.MatchBean> matchList) {
+ if (CollectionUtils.isEmpty(matchList)) {
+ return true;
+ }
+ for (DSLObject.QueryBean.Parameters.MatchBean matchBean : matchList) {
+ Validate.isTrue(EnumUtils.isValidEnum(MatchEnum.class, StringUtil.upperCase(matchBean.getType())), "match type is illegal");
+ for (String fieldValue : matchBean.getFieldValues()) {
+ if (fieldValue.startsWith("*") || fieldValue.endsWith("$")) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Match fieldValues cannot startWith '*' or endWith '$'");
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * 校验Interval:
+ * 1.目前只支持between类型: ["2020-01-01 00:00:00/2020-01-02 00:00:00"]
+ * 2.时间区间必须是 开始时间 < 结束时间
+ *
+ * @param intervals
+ */
+ private boolean isValidIntervals(List<String> intervals) {
+ if (CollectionUtils.isEmpty(intervals)) {
+ return true;
+ }
+ if (intervals.size() != 1) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error");
+ }
+ String[] split = intervals.get(0).split("/");
+ if (split.length != 2) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error");
+ }
+ for (String dateTimeStr : split) {
+ if (!strFormatDateTime.matcher(dateTimeStr).find()) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Time format should be: yyyy-MM-dd HH:mm:ss");
+ }
+ }
+ try {
+ DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+ if (dateTimeFormatter.parseMillis(split[1]) < dateTimeFormatter.parseMillis(split[0])) {
+ throw new RuntimeException("Intervals value should be [start, end]");
+ }
+ } catch (Exception e) {
+ throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), e.getMessage());
+ }
+ return true;
+ }
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java
new file mode 100644
index 0000000..b12bb10
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java
@@ -0,0 +1,10 @@
+package com.mesalab.api.modules.network.dsl;
+
+/**
+ * @author wangwei
+ * @description:
+ * @date 2020/9/18 6:31 下午
+ */
+public class Parameter {
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java
new file mode 100644
index 0000000..2be6455
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java
@@ -0,0 +1,68 @@
+package com.mesalab.api.modules.network.protocol;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProtocolTree {
+ private String id;
+ private String name;
+ private String parentId;
+ private List<ProtocolTree> childrens = Lists.newArrayList();
+ private Map<String, Object> metrics = Maps.newLinkedHashMap();
+ private long sentBytes;
+ private long receivedBytes;
+ private static final String HIERARCCY_FLAG = "/";
+
+
+
+ public ProtocolTree(String id, String name, String parentId) {
+ this.id = id;
+ this.name = name;
+ this.parentId = parentId;
+ }
+
+ public ProtocolTree(String id, String name, String parentId, long sentBytes, long receivedBytes) {
+ this.id = id;
+ this.name = name;
+ this.parentId = parentId;
+ this.sentBytes = sentBytes;
+ this.receivedBytes = receivedBytes;
+ }
+
+
+ public String getParentId() {
+ return id.lastIndexOf(HIERARCCY_FLAG) > 0 ?
+ id.substring(0, id.lastIndexOf(HIERARCCY_FLAG)) : null;
+ }
+
+ public void setParentId(String parentId) {
+ this.parentId = parentId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ProtocolTree that = (ProtocolTree) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name);
+ }
+
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java
new file mode 100644
index 0000000..2730324
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java
@@ -0,0 +1,18 @@
+package com.mesalab.api.modules.network.service;
+
+
+import com.mesalab.api.common.base.BaseResult;
+import com.mesalab.api.modules.network.dsl.DSLObject;
+
+/**
+ * @author wangwei
+ * @version 1.0
+ * @date 2020/6/30 4:20 下午
+ */
+
+
+public interface NetworkMonitorService {
+
+ BaseResult getTrafficDistributedInfo(DSLObject dslObject);
+
+}
diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java
new file mode 100644
index 0000000..66ad39d
--- /dev/null
+++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java
@@ -0,0 +1,371 @@
+package com.mesalab.api.modules.network.service.impl;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.mesalab.api.common.base.BaseResult;
+import com.mesalab.api.common.base.BaseResultGenerator;
+import com.mesalab.api.common.enums.ResultCodeEnum;
+import com.mesalab.api.common.exception.BusinessException;
+import com.mesalab.api.modules.network.MatchEnum;
+import com.mesalab.api.modules.network.TreeUtils;
+import com.mesalab.api.modules.network.dsl.DSLObject;
+import com.mesalab.api.modules.network.protocol.ProtocolTree;
+import com.mesalab.api.modules.network.service.NetworkMonitorService;
+import com.mesalab.api.common.enums.*;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author wangwei
+ * @version 1.0
+ * @date 2020/6/30 4:24 下午
+ */
+@Slf4j
+@Service("networkMonitorService")
+public class NetworkMonitorServiceImpl implements NetworkMonitorService {
+
+ @Autowired
+ RestTemplate restTemplate;
+
+ private static final String PROTOCOL_NODE = "Protocols/";
+
+ @Override
+ public BaseResult getTrafficDistributedInfo(DSLObject dslObject) {
+ BaseResult baseResult;
+ String sql;
+ if (QueryType.PROTOCOL_TREE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) {
+ sql = generateProtocolTreeSql(dslObject.getQuery());
+ baseResult = getResultProtocolTree(sql);
+ } else if (QueryType.PROTOCOL_DATA_RATE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) {
+ sql = generateProtocolDataRateSql(dslObject.getQuery());
+ baseResult = getProtocolDataRateSummary(sql);
+ } else if (QueryType.NETWORK_OVERVIEW_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) {
+ sql = generateOverviewSummarySql(dslObject.getQuery());
+ BaseResult dataRateResult = executeQuery(sql);
+ String statSql = generateStatSql(dslObject.getQuery());
+ BaseResult statResult = executeQuery(statSql);
+ baseResult = buildNetworkOverviewResult(dataRateResult, statResult);
+ } else {
+ baseResult = BaseResultGenerator.failure(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "No match queryType");
+ }
+ if (!baseResult.isSuccess()) {
+ throw new BusinessException(baseResult.getStatus(), baseResult.getCode(), baseResult.getMessage(), null);
+ }
+ return baseResult;
+ }
+
+ private BaseResult buildNetworkOverviewResult(BaseResult dataRateResult, BaseResult currentSessionResult) {
+ BaseResult baseResult;
+ if (dataRateResult.isSuccess() && currentSessionResult.isSuccess()) {
+ List<Map> statData = (List<Map>) currentSessionResult.getData();
+ List<Map> summaryData = (List<Map>) dataRateResult.getData();
+
+ Map<String, Object> data = new HashMap<>(16);
+ if (!statData.isEmpty()) {
+ data.putAll(statData.get(0));
+ }
+ if (statData.size() >= 2) {
+ data.put("current_sessions", statData.get(1).get("total_bytes"));
+ }
+ if (!summaryData.isEmpty()) {
+ data.putAll(summaryData.get(0));
+ }
+ long uniqClientIp = Long.parseLong(String.valueOf(data.get("uniq_client_ip") == null ? 0 : data.get("uniq_client_ip")));
+ long totalSessions = Long.parseLong(String.valueOf(data.get("total_sessions") == null ? 0 : data.get("total_sessions")));
+ long summaryTotalSessions = Long.parseLong(String.valueOf(data.get("summarySessions") == null ? 0 : data.get("summarySessions")));
+ long summaryTotalBytes = Long.parseLong(String.valueOf(data.get("summaryTotalBytes") == null ? 0 : data.get("summaryTotalBytes")));
+ long summaryTotalPackets = Long.parseLong(String.valueOf(data.get("summaryTotalPackets") == null ? 0 : data.get("summaryTotalPackets")));
+ long currentSessions = Long.parseLong(String.valueOf(data.get("current_sessions") == null ? 0 : data.get("current_sessions")));
+ long dataRate = Long.parseLong(String.valueOf(data.get("data_rate") == null ? 0 : data.get("data_rate")));
+ long totalBytes = Long.parseLong(String.valueOf(data.get("total_bytes") == null ? 0 : data.get("total_bytes")));
+ long totalUncategorizedBytes = Long.parseLong(String.valueOf(data.get("total_uncategorized_bytes") == null ? 0 : data.get("total_uncategorized_bytes")));
+ double totalUncategorizedPercent = summaryTotalBytes == 0 ? 0 : totalUncategorizedBytes * 1.0 / summaryTotalBytes;
+ long oneSidedConnections = Long.parseLong(String.valueOf(data.get("one_sided_connections") == null ? 0 : data.get("one_sided_connections")));
+ double oneSidedPercent = summaryTotalSessions == 0 ? 0 : oneSidedConnections * 1.0 / summaryTotalSessions;
+ long sequenceGapLossBytes = Long.parseLong(String.valueOf(data.get("sequence_gap_loss_bytes") == null ? 0 : data.get("sequence_gap_loss_bytes")));
+ double sequenceGapLossBytesPercent = summaryTotalBytes == 0 ? 0 : sequenceGapLossBytes * 1.0 / summaryTotalBytes;
+ long fragmentationPackets = Long.parseLong(String.valueOf(data.get("fragmentation_packets") == null ? 0 : data.get("fragmentation_packets")));
+ double fragmentationPacketsPercent = summaryTotalPackets == 0 ? 0 : fragmentationPackets * 1.0 / summaryTotalPackets;
+
+ List result = new ArrayList<>();
+ Map resultMap = new LinkedHashMap<>();
+ resultMap.put("uniq_client_ip", uniqClientIp);
+ resultMap.put("total_sessions", totalSessions);
+ resultMap.put("current_sessions", currentSessions);
+ resultMap.put("data_rate", dataRate);
+ resultMap.put("total_bytes", totalBytes);
+ resultMap.put("total_uncategorized_bytes", totalUncategorizedBytes);
+ resultMap.put("total_uncategorized_percent", Double.parseDouble(String.format("%.4f", totalUncategorizedPercent)));
+ resultMap.put("one_sided_connections", oneSidedConnections);
+ resultMap.put("one_sided_percent", Double.parseDouble(String.format("%.4f", oneSidedPercent)));
+ resultMap.put("sequence_gap_loss_bytes", sequenceGapLossBytes);
+ resultMap.put("sequence_gap_loss_percent", Double.parseDouble(String.format("%.4f", sequenceGapLossBytesPercent)));
+ resultMap.put("fragmentation_packets", fragmentationPackets);
+ resultMap.put("fragmentation_percent", Double.parseDouble(String.format("%.4f", fragmentationPacketsPercent)));
+ result.add(resultMap);
+
+ Map statistics = dataRateResult.getStatistics();
+ Map statStatistics = currentSessionResult.getStatistics();
+ for (Object key : statistics.keySet()) {
+ if ("result_rows".equals(key.toString())) {
+ continue;
+ }
+ statistics.put(key, Long.parseLong(statistics.get(key).toString()) + Long.parseLong(statStatistics.get(key).toString()));
+ }
+
+ baseResult = BaseResultGenerator.success("ok", result, statistics);
+ } else {
+ baseResult = BaseResultGenerator.error("dataRate result: " + dataRateResult.toString() + ";\n currentSession result is: " + currentSessionResult);
+ }
+ return baseResult;
+ }
+
+ private BaseResult getProtocolDataRateSummary(String sql) {
+ BaseResult baseResult;
+ BaseResult result = executeQuery(sql);
+ if (result.isSuccess()) {
+ List<Map> data = (List<Map>) result.getData();
+ data.forEach(o -> {
+ String[] protocolIds = String.valueOf(o.get("type")).split("/");
+ String protocolId = protocolIds[protocolIds.length - 1];
+ o.put("type", protocolId);
+ });
+ baseResult = BaseResultGenerator.success("ok", data, result.getStatistics());
+ } else {
+ baseResult = BaseResultGenerator.error(result.getMessage());
+ }
+ return baseResult;
+ }
+
+ private BaseResult getResultProtocolTree(String sql) {
+ BaseResult baseResult = executeQuery(sql);
+ if (baseResult.isSuccess()) {
+ List<ProtocolTree> listProtocol = getListProtocol(getProtocolTrees((List<Map>) baseResult.getData()));
+ baseResult = BaseResultGenerator.success("ok", listProtocol, baseResult.getStatistics());
+ } else {
+ baseResult = BaseResultGenerator.error(baseResult.getMessage());
+ }
+ return baseResult;
+ }
+
+ private String generateStatSql(DSLObject.QueryBean queryParam) {
+ String whereOfTime = getWhereOfTime(queryParam.getParameters());
+ String whereOfExactly = getWhereOfExactly(queryParam.getParameters());
+ whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly;
+ String[] intervals = getIntervals(queryParam.getParameters().getIntervals());
+ return String.format("(SELECT SUM(c2s_byte_num + s2c_byte_num) as total_bytes, SUM(sessions) as total_sessions, (SUM(c2s_byte_num + s2c_byte_num) * 8)/((TIMESTAMP_TO_MILLIS(TIMESTAMP '%s')-TIMESTAMP_TO_MILLIS(TIMESTAMP '%s'))/1000) AS data_rate " +
+ "FROM traffic_protocol_stat_log " +
+ "WHERE %s %s" +
+ "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) LIMIT 1) " +
+ "UNION ALL " +
+ "( SELECT SUM(sessions), 0, 0 " +
+ "FROM traffic_protocol_stat_log " +
+ "WHERE %s %s" +
+ "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) GROUP BY __time ORDER BY __time DESC LIMIT 1 ) ",
+ intervals[1], intervals[0], whereOfTime, whereOfExactly,
+ whereOfTime, whereOfExactly);
+ }
+
+ private String generateProtocolDataRateSql(DSLObject.QueryBean queryParam) {
+ String whereOfTime = getWhereOfTime(queryParam.getParameters());
+ String whereOfExactly = getWhereOfExactly(queryParam.getParameters());
+ whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly;
+ String protocolStr = null;
+ List<DSLObject.QueryBean.Parameters.MatchBean> match = queryParam.getParameters().getMatch();
+ for (DSLObject.QueryBean.Parameters.MatchBean item : match) {
+ if (MatchEnum.PREFIX.getType().equals(item.getType())) {
+ String[] split = item.getFieldValues().get(0).split(",");
+ protocolStr = split[0].replaceFirst(PROTOCOL_NODE, "");
+ }
+ }
+ return String.format("(" +
+ "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " +
+ "from %s " +
+ "where %s %s and protocol_id = '%s' " +
+ "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " +
+ "order by stat_time asc" +
+ ") " +
+ "union all " +
+ "(" +
+ "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " +
+ "from %s " +
+ "where %s %s and protocol_id like CONCAT('%s','/%s') and LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) + 1 + %s " +
+ "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " +
+ "order by stat_time asc) ",
+ queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, queryParam.getParameters().getGranularity(),
+ queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, "%",
+ protocolStr.length() - protocolStr.replaceAll("/", "").length(), queryParam.getParameters().getGranularity());
+
+ }
+
+ private String generateOverviewSummarySql(DSLObject.QueryBean queryParam) {
+ String whereOfTime = getWhereOfTime(queryParam.getParameters());
+ String whereOfExactly = getWhereOfExactly(queryParam.getParameters());
+
+ return String.format("SELECT APPROX_COUNT_DISTINCT_DS_HLL(ip_object) AS uniq_client_ip, " +
+ "SUM(one_sided_connections) AS one_sided_connections, " +
+ "SUM(uncategorized_bytes) AS total_uncategorized_bytes, " +
+ "SUM(fragmentation_packets) AS fragmentation_packets, " +
+ "SUM(sequence_gap_loss) AS sequence_gap_loss_bytes, " +
+ "SUM(s2c_byte_num+c2s_byte_num) AS summaryTotalBytes, " +
+ "SUM(s2c_pkt_num+c2s_pkt_num) AS summaryTotalPackets, " +
+ "SUM(sessions) AS summarySessions " +
+ "FROM %s " +
+ "WHERE %s" +
+ " %s LIMIT 1 ",
+ queryParam.getDataSource(), whereOfTime, StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly);
+ }
+
+ private String getWhereOfTime(DSLObject.QueryBean.Parameters parameters) {
+ if (CollectionUtils.isEmpty(parameters.getIntervals())) {
+ return StringUtil.EMPTY;
+ }
+ StringBuffer whereOfTime = new StringBuffer();
+ String[] intervals = getIntervals(parameters.getIntervals());
+ whereOfTime.append("__time >= TIMESTAMP '").append(intervals[0]).append("' AND __time < TIMESTAMP '").append(intervals[1]).append("'");
+ return whereOfTime.toString();
+ }
+
+ private String[] getIntervals(List<String> intervals) {
+ return intervals.get(0).split("/");
+ }
+
+ private String getWhereOfExactly(DSLObject.QueryBean.Parameters parameters) {
+ if (CollectionUtils.isEmpty(parameters.getMatch())) {
+ return StringUtil.EMPTY;
+ }
+ StringBuffer whereOfExactly = new StringBuffer();
+ List<DSLObject.QueryBean.Parameters.MatchBean> match = parameters.getMatch();
+ for (DSLObject.QueryBean.Parameters.MatchBean item : match) {
+ if (MatchEnum.EXACTLY.getType().equals(item.getType()) && !CollectionUtils.isEmpty(item.getFieldValues())) {
+ whereOfExactly.append(item.getFieldKey()).append(" IN ('").append(String.join("','", item.getFieldValues())).append("')");
+ }
+ }
+ return whereOfExactly.toString();
+ }
+
+ private String generateProtocolTreeSql(DSLObject.QueryBean queryParam) {
+ String whereOfTime = getWhereOfTime(queryParam.getParameters());
+ String whereOfExactly = getWhereOfExactly(queryParam.getParameters());
+ whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly;
+ return String.format("SELECT protocol_id, SUM(sessions) as sessions,SUM(c2s_byte_num) as c2s_byte_num, SUM(c2s_pkt_num) as c2s_pkt_num, SUM(s2c_byte_num) as s2c_byte_num, SUM(s2c_pkt_num) as s2c_pkt_num " +
+ "FROM %s " +
+ "WHERE %s %s" +
+ "GROUP BY protocol_id ",
+ queryParam.getDataSource(), whereOfTime, whereOfExactly);
+ }
+
+ private List<ProtocolTree> getProtocolTrees(List<Map> data) {
+ Iterable<ProtocolTree> resultConcat = Iterables.concat(new ArrayList<>());
+ for (Map datum : data) {
+ String s = JsonMapper.toJsonString(datum);
+ List<ProtocolTree> nodes = parserRawData(s);
+ resultConcat = Iterables.concat(resultConcat, nodes);
+ }
+ List<ProtocolTree> listNodes = Lists.newArrayList(resultConcat);
+ ProtocolTree root = new ProtocolTree("Protocols", "Protocols", null);
+ List<ProtocolTree> roots = listNodes.stream().filter(o -> StringUtil.isBlank(o.getParentId())).collect(Collectors.toList());
+ roots.forEach(item -> {
+ root.setSentBytes(root.getSentBytes() + item.getSentBytes());
+ root.setReceivedBytes(root.getReceivedBytes() + item.getReceivedBytes());
+ });
+
+ listNodes.forEach(item -> {
+ item.setId(PROTOCOL_NODE + item.getId());
+ });
+
+ listNodes.add(root);
+ return listNodes;
+ }
+
+ private BaseResult executeQuery(String sql) {
+ Map<String, Object> map = new HashMap<>();
+ map.put("query", sql);
+ return restTemplate.getForObject("http://galaxy-query-engine/?query={query}", BaseResult.class, map);
+ }
+
+
+ private List<ProtocolTree> parserRawData(String jsonString) {
+ List<ProtocolTree> lists = Lists.newArrayList();
+ Map<String, Object> results = (Map<String, Object>) JsonMapper.fromJsonString(jsonString, Map.class);
+ long sessions = Long.parseLong(results.get("sessions").toString());
+ long sentBytes = Long.parseLong(results.get("c2s_byte_num").toString());
+ long receivedBytes = Long.parseLong(results.get("s2c_byte_num").toString());
+ long sentPackets = Long.parseLong(results.get("c2s_pkt_num").toString());
+ long receivedPackets = Long.parseLong(results.get("s2c_pkt_num").toString());
+ String protocolId = results.get("protocol_id").toString();
+ List<String> protocolList = Lists.newArrayList(protocolId.split("/"));
+ ProtocolTree protocolTree = new ProtocolTree(protocolId, protocolList.size() <= 0 ? null : protocolList.get(protocolList.size() - 1), null);
+ protocolTree.setSentBytes(sentBytes);
+ protocolTree.setReceivedBytes(receivedBytes);
+ protocolTree.getMetrics().put("sentPackets", sentPackets);
+ protocolTree.getMetrics().put("receivedPackets", receivedPackets);
+ protocolTree.getMetrics().put("sessions", sessions);
+ lists.add(protocolTree);
+ return lists;
+ }
+
+ private List<ProtocolTree> getListProtocol(List<ProtocolTree> listNodes) {
+ Map<String, List<ProtocolTree>> resultMap = listNodes.stream().collect(Collectors.groupingBy(ProtocolTree::getId));
+ Map<String, Object> mergeResultMap =
+ Maps.transformValues(resultMap, new com.google.common.base.Function<List<ProtocolTree>, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable List<ProtocolTree> input) {
+ if (StringUtil.isEmpty(input)) {
+ return null;
+ }
+ if (input.size() == 1) {
+ return input.get(0);
+ }
+ ProtocolTree firstProtocolTree = input.get(0);
+ ProtocolTree mergeProtocolTree = new ProtocolTree(firstProtocolTree.getId(), firstProtocolTree.getName(),
+ firstProtocolTree.getParentId(), 0, 0);
+ mergeProtocolTree.setMetrics(Maps.newLinkedHashMap());
+
+ for (ProtocolTree protocolTree : input) {
+ mergeProtocolTree.setSentBytes(mergeProtocolTree.getSentBytes() + protocolTree.getSentBytes());
+ mergeProtocolTree.setReceivedBytes(mergeProtocolTree.getReceivedBytes() + protocolTree.getReceivedBytes());
+ for (String key : protocolTree.getMetrics().keySet()) {
+ if (StringUtil.isEmpty(mergeProtocolTree.getMetrics().get(key))) {
+ mergeProtocolTree.getMetrics().put(key, protocolTree.getMetrics().get(key));
+ } else {
+ long sumValue = Long.parseLong(mergeProtocolTree.getMetrics().get(key).toString())
+ + Long.parseLong(protocolTree.getMetrics().get(key).toString());
+ mergeProtocolTree.getMetrics().put(key, sumValue);
+ }
+ }
+
+ }
+
+ return mergeProtocolTree;
+ }
+ });
+ List<ProtocolTree> mergeNodes = new ArrayList(mergeResultMap.values());
+ return TreeUtils.mergeTree(mergeNodes, ProtocolTree::getId, ProtocolTree::getParentId, ProtocolTree::setChildrens);
+ }
+
+ enum QueryType {
+
+ PROTOCOL_TREE_SUMMARY("protocolTreeSummary"), PROTOCOL_DATA_RATE_SUMMARY("protocolDataRateSummary"), NETWORK_OVERVIEW_SUMMARY("networkOverviewSummary");
+ private final String value;
+
+ QueryType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+}
diff --git a/galaxy-gateway/config/application.yml b/galaxy-gateway/config/application.yml
index 4b12036..c787a07 100644
--- a/galaxy-gateway/config/application.yml
+++ b/galaxy-gateway/config/application.yml
@@ -39,6 +39,10 @@ spring:
uri: lb://galaxy-business-api
predicates:
- Path=/open-api/**
+ - id: traffic
+ uri: lb://galaxy-business-api
+ predicates:
+ - Path=/traffic/**
- id: knowledge
uri: lb://galaxy-query-engine
predicates: