diff options
| author | wangwei <[email protected]> | 2020-12-08 22:52:42 +0800 |
|---|---|---|
| committer | wangwei <[email protected]> | 2020-12-08 22:52:42 +0800 |
| commit | 60e8f06fc3d0d1f2366c181670a6daebba066262 (patch) | |
| tree | 71fbe47256e9e4212516a79ff1fba6ff7b36c82b | |
| parent | e74d8cff698b21a9ef9648b9ceb56a8c97ae41e8 (diff) | |
traffic接口: liveChart 迁移至business-api
11 files changed, 846 insertions, 0 deletions
diff --git a/galaxy-business-api/pom.xml b/galaxy-business-api/pom.xml index e9d88d3..1bc0040 100644 --- a/galaxy-business-api/pom.xml +++ b/galaxy-business-api/pom.xml @@ -62,6 +62,16 @@ <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java new file mode 100644 index 0000000..f974205 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/MatchEnum.java @@ -0,0 +1,45 @@ +package com.mesalab.api.modules.network; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 匹配查询 + * <p> + * type:匹配方式,["exactly"|"prefix"|"suffix"|"substring"|"regex"] + * <p> + * exactly:完全匹配 + * prefix:前缀匹配 + * suffix:后缀匹配 + * substring:子串匹配 + * regex:正则符号匹配 + * fieldKey:属性名称, + * <p> + * fieldValues:属性值 + * <p> + * 注:当type=regex时 + * <p> + * fieldValues值匹配方式符合下列规则,数组内多个值为“或”的关系 + * <p> + * 匹配方式转义 + * <p> + * 以*结尾,不以*或者$开始,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示左匹配(前缀匹配),例如aaaa*bbb$ccc*$* + * 以*开始,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示右匹配(后缀匹配),例如*$*aaa*bbb$ccc + * 以*开始,以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示子串匹配,例如*aaa$bbb*ccc* + * 不以*或者$开头,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示子串匹配,例如aaa*bbb$ccc*$ + * 以$开头,不以*结尾,无论关键字其他位置是否包含表示匹配方式的字符*和$,即表示完整匹配,例如$aaa*bbb$或者$aaa*$bbb + */ +@Getter +@AllArgsConstructor +public enum MatchEnum { + + EXACTLY("exactly", "\"{0}\""), + PREFIX("prefix", "\"{0}%\""), + SUFFIX("suffix", "\"%{0}\""), + SUBSTRING("substring", "\"%{0}%\""), + REGEX("regex", "\"{0}\""); + + private final String type; + private final String matchExp; + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java new file mode 100644 index 0000000..37aaaab --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/TreeUtils.java @@ -0,0 +1,100 @@ +package com.mesalab.api.modules.network; + + +import com.zdjizhi.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +public class TreeUtils { + + + /** + * 遍历树结构 + * @param root 节点树 + * @param getChildrenNode 获取当前节点的子节点列表;函数,接收当前节点对象 + * @param behavior 当前节点操作函数;定义遍历当前节点的操作行为 + * @param <T> 树的节点对象 + */ + public static <T> void traversalTree(List<T> root, Function<T, List<T>> getChildrenNode, + Consumer<T> behavior) { + + Stack<T> stack = new Stack<>(); + root.forEach(stack :: push); + while (!stack.isEmpty()) { + T o = stack.pop(); + behavior.accept(o); + List<T> childrens = getChildrenNode.apply(o); + if (StringUtil.isNotEmpty(childrens)) { + childrens.forEach(stack :: push); + } + } + + } + + /** + * 平铺树结构 + * @param root 节点树 + * @param getChildrenNode 获取当前节点的子节点列表;函数,接收当前节点对象 + * @param <T> 树的节点对象 + * @return 平铺后的节点结构 + */ + public static <T> List<T> flatTree(List<T> root, + Function<T, List<T>> getChildrenNode) { + List<T> list = new ArrayList<>(); + traversalTree(root, getChildrenNode, list :: add); + return list; + } + + + + + + /** + * 聚合为树结构 + * @param list 节点列表数据 + * @param loadKey 节点的ID;函数,接收一个节点获取唯一ID + * @param loadParentKey 节点的父亲ID;函数,接收一个节点获取父亲ID + * @param write 节点Children写入函数;函数,接收当前节点和子节点列表,然后将子节点项写入当前节点。 + * @param <T> 当前节点对象 + * @param <R> 请求节点返回的属性信息 + * @return 树结构列表 + */ + public static <T, R> List<T> mergeTree(List<T> list, + Function<T, R> loadKey, + Function<T, R> loadParentKey, + BiConsumer<T, List<T>> write) { + List<T> root = list.stream().filter(o -> StringUtil.isEmpty(loadParentKey.apply(o))).collect(Collectors.toList()); + Stack<T> stack = new Stack<T>(); + root.forEach(stack::push); + while (!stack.isEmpty()) { + T o = stack.pop(); + R key = loadKey.apply(o); + List<T> childrens = list.stream().filter(k -> key.equals(loadParentKey.apply(k))).collect(Collectors.toList()); + write.accept(o, childrens); + if (childrens.size() > 0) { + childrens.forEach(stack :: push); + } + + } + + return root; + } + + + + + + + + + + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java new file mode 100644 index 0000000..9074bbc --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/controller/NetworkMonitorController.java @@ -0,0 +1,48 @@ +package com.mesalab.api.modules.network.controller; + +import com.mesalab.api.common.base.BaseResult; +import com.mesalab.api.common.base.BaseResultGenerator; +import com.mesalab.api.common.enums.ResultCodeEnum; +import com.mesalab.api.common.enums.ResultStatusEnum; +import com.mesalab.api.common.exception.BusinessException; +import com.mesalab.api.modules.network.dsl.DSLObject; +import com.mesalab.api.modules.network.dsl.DSLValidate; +import com.mesalab.api.modules.network.service.NetworkMonitorService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.client.RestTemplate; + +import javax.servlet.http.HttpServletRequest; + + +/** + * @author wangwei + * @version 1.0 + * @date 2020/6/30 10:38 上午 + */ +@Slf4j +@RestController +@RequestMapping(value = "/") +public class NetworkMonitorController { + + @Autowired + private DSLValidate dslValidate; + @Autowired + private NetworkMonitorService networkMonitorService; + @Autowired + RestTemplate restTemplate; + private static final String PROTOCOL = "protocol"; + + @PostMapping(value = "/traffic/v1/", produces = "application/json") + public BaseResult trafficDistribution(HttpServletRequest request, @Validated @RequestBody DSLObject dslObject) { + log.info("流量分布接口, 参数: queryString is {},params is {}", request.getQueryString(), dslObject); + dslValidate.executeValidate(dslObject); + if (PROTOCOL.equalsIgnoreCase(request.getQueryString())) { + return networkMonitorService.getTrafficDistributedInfo(dslObject); + } else { + throw new BusinessException(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Support protocol only", null); + } + } +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java new file mode 100644 index 0000000..a1799e2 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLObject.java @@ -0,0 +1,40 @@ +package com.mesalab.api.modules.network.dsl; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * @Date: 2020-09-18 09:48 + * @Author : wangwei + * @ClassName : DSLObject + * @Description : + */ +@Data +public class DSLObject implements Serializable { + + private String clientId; + private QueryBean query; + + @Data + public static class QueryBean { + private String queryType; + private String dataSource; + private Parameters parameters; + + @Data + public static class Parameters { + private String granularity; + private List<MatchBean> match; + private List<String> intervals; + + @Data + public static class MatchBean { + private String type; + private String fieldKey; + private List<String> fieldValues; + } + } + } +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java new file mode 100644 index 0000000..af11539 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/DSLValidate.java @@ -0,0 +1,132 @@ +package com.mesalab.api.modules.network.dsl; + +import com.mesalab.api.common.enums.ResultCodeEnum; +import com.mesalab.api.common.exception.BusinessException; +import com.mesalab.api.modules.network.MatchEnum; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.Validate; +import org.apache.commons.lang3.EnumUtils; +import org.apache.http.HttpStatus; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.regex.Pattern; + +/** + * @Date: 2020-09-17 18:10 + * @Author : wangwei + * @ClassName : DSLValidate + * @Description : DSL格式校验 + */ +@Component +public class DSLValidate { + + public static Pattern periodOfPT = Pattern.compile("PT(\\d+)[SMH]", Pattern.CASE_INSENSITIVE); + public static Pattern periodOfP = Pattern.compile("P(\\d+)[DWMY]", Pattern.CASE_INSENSITIVE); + public static Pattern strFormatDateTime = Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", Pattern.CASE_INSENSITIVE); + + + public void executeValidate(DSLObject dslObject) throws BusinessException { + if (StringUtil.isEmpty(dslObject)) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), + "DSLObject is invalid"); + } + if (StringUtil.isEmpty(dslObject.getQuery())) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), + "DSLObject.query is invalid"); + } + DSLObject.QueryBean.Parameters parameters = dslObject.getQuery().getParameters(); + if (StringUtil.isEmpty(parameters)) { + return; + } + if (!isValidGranularity(parameters.getGranularity())) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), + "query.Granularity value is invalid"); + } + if (!isValidMatch(parameters.getMatch())) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), + "query.Match type is invalid"); + } + if (!isValidIntervals(parameters.getIntervals())) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), + "query.Intervals is invalid"); + } + } + + /** + * 时间粒度校验, 遵循ISO8601 durations 定义 + * + * @param granularity + * @return + */ + private boolean isValidGranularity(String granularity) { + if (StringUtil.isBlank(granularity)) { + return true; + } + if (periodOfP.matcher(granularity).find() || periodOfPT.matcher(granularity).find()) { + return true; + } + return false; + } + + /** + * 校验match: + * 1.是否属于{@link MatchEnum}限定类型 + * 2.不能以*开始、或$结尾 + * + * @param matchList + * @return + */ + private boolean isValidMatch(List<DSLObject.QueryBean.Parameters.MatchBean> matchList) { + if (CollectionUtils.isEmpty(matchList)) { + return true; + } + for (DSLObject.QueryBean.Parameters.MatchBean matchBean : matchList) { + Validate.isTrue(EnumUtils.isValidEnum(MatchEnum.class, StringUtil.upperCase(matchBean.getType())), "match type is illegal"); + for (String fieldValue : matchBean.getFieldValues()) { + if (fieldValue.startsWith("*") || fieldValue.endsWith("$")) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Match fieldValues cannot startWith '*' or endWith '$'"); + } + } + } + return true; + } + + /** + * 校验Interval: + * 1.目前只支持between类型: ["2020-01-01 00:00:00/2020-01-02 00:00:00"] + * 2.时间区间必须是 开始时间 < 结束时间 + * + * @param intervals + */ + private boolean isValidIntervals(List<String> intervals) { + if (CollectionUtils.isEmpty(intervals)) { + return true; + } + if (intervals.size() != 1) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error"); + } + String[] split = intervals.get(0).split("/"); + if (split.length != 2) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Intervals param error"); + } + for (String dateTimeStr : split) { + if (!strFormatDateTime.matcher(dateTimeStr).find()) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "Time format should be: yyyy-MM-dd HH:mm:ss"); + } + } + try { + DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); + if (dateTimeFormatter.parseMillis(split[1]) < dateTimeFormatter.parseMillis(split[0])) { + throw new RuntimeException("Intervals value should be [start, end]"); + } + } catch (Exception e) { + throw new BusinessException(HttpStatus.SC_BAD_REQUEST, ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), e.getMessage()); + } + return true; + } + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java new file mode 100644 index 0000000..b12bb10 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/dsl/Parameter.java @@ -0,0 +1,10 @@ +package com.mesalab.api.modules.network.dsl; + +/** + * @author wangwei + * @description: + * @date 2020/9/18 6:31 下午 + */ +public class Parameter { + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java new file mode 100644 index 0000000..2be6455 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/protocol/ProtocolTree.java @@ -0,0 +1,68 @@ +package com.mesalab.api.modules.network.protocol; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ProtocolTree { + private String id; + private String name; + private String parentId; + private List<ProtocolTree> childrens = Lists.newArrayList(); + private Map<String, Object> metrics = Maps.newLinkedHashMap(); + private long sentBytes; + private long receivedBytes; + private static final String HIERARCCY_FLAG = "/"; + + + + public ProtocolTree(String id, String name, String parentId) { + this.id = id; + this.name = name; + this.parentId = parentId; + } + + public ProtocolTree(String id, String name, String parentId, long sentBytes, long receivedBytes) { + this.id = id; + this.name = name; + this.parentId = parentId; + this.sentBytes = sentBytes; + this.receivedBytes = receivedBytes; + } + + + public String getParentId() { + return id.lastIndexOf(HIERARCCY_FLAG) > 0 ? + id.substring(0, id.lastIndexOf(HIERARCCY_FLAG)) : null; + } + + public void setParentId(String parentId) { + this.parentId = parentId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ProtocolTree that = (ProtocolTree) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } + + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java new file mode 100644 index 0000000..2730324 --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/NetworkMonitorService.java @@ -0,0 +1,18 @@ +package com.mesalab.api.modules.network.service; + + +import com.mesalab.api.common.base.BaseResult; +import com.mesalab.api.modules.network.dsl.DSLObject; + +/** + * @author wangwei + * @version 1.0 + * @date 2020/6/30 4:20 下午 + */ + + +public interface NetworkMonitorService { + + BaseResult getTrafficDistributedInfo(DSLObject dslObject); + +} diff --git a/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java new file mode 100644 index 0000000..66ad39d --- /dev/null +++ b/galaxy-business-api/src/main/java/com/mesalab/api/modules/network/service/impl/NetworkMonitorServiceImpl.java @@ -0,0 +1,371 @@ +package com.mesalab.api.modules.network.service.impl; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.mesalab.api.common.base.BaseResult; +import com.mesalab.api.common.base.BaseResultGenerator; +import com.mesalab.api.common.enums.ResultCodeEnum; +import com.mesalab.api.common.exception.BusinessException; +import com.mesalab.api.modules.network.MatchEnum; +import com.mesalab.api.modules.network.TreeUtils; +import com.mesalab.api.modules.network.dsl.DSLObject; +import com.mesalab.api.modules.network.protocol.ProtocolTree; +import com.mesalab.api.modules.network.service.NetworkMonitorService; +import com.mesalab.api.common.enums.*; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author wangwei + * @version 1.0 + * @date 2020/6/30 4:24 下午 + */ +@Slf4j +@Service("networkMonitorService") +public class NetworkMonitorServiceImpl implements NetworkMonitorService { + + @Autowired + RestTemplate restTemplate; + + private static final String PROTOCOL_NODE = "Protocols/"; + + @Override + public BaseResult getTrafficDistributedInfo(DSLObject dslObject) { + BaseResult baseResult; + String sql; + if (QueryType.PROTOCOL_TREE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { + sql = generateProtocolTreeSql(dslObject.getQuery()); + baseResult = getResultProtocolTree(sql); + } else if (QueryType.PROTOCOL_DATA_RATE_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { + sql = generateProtocolDataRateSql(dslObject.getQuery()); + baseResult = getProtocolDataRateSummary(sql); + } else if (QueryType.NETWORK_OVERVIEW_SUMMARY.getValue().equalsIgnoreCase(dslObject.getQuery().getQueryType())) { + sql = generateOverviewSummarySql(dslObject.getQuery()); + BaseResult dataRateResult = executeQuery(sql); + String statSql = generateStatSql(dslObject.getQuery()); + BaseResult statResult = executeQuery(statSql); + baseResult = buildNetworkOverviewResult(dataRateResult, statResult); + } else { + baseResult = BaseResultGenerator.failure(ResultStatusEnum.FAIL.getCode(), ResultCodeEnum.PARAM_SYNTAX_ERROR.getCode(), "No match queryType"); + } + if (!baseResult.isSuccess()) { + throw new BusinessException(baseResult.getStatus(), baseResult.getCode(), baseResult.getMessage(), null); + } + return baseResult; + } + + private BaseResult buildNetworkOverviewResult(BaseResult dataRateResult, BaseResult currentSessionResult) { + BaseResult baseResult; + if (dataRateResult.isSuccess() && currentSessionResult.isSuccess()) { + List<Map> statData = (List<Map>) currentSessionResult.getData(); + List<Map> summaryData = (List<Map>) dataRateResult.getData(); + + Map<String, Object> data = new HashMap<>(16); + if (!statData.isEmpty()) { + data.putAll(statData.get(0)); + } + if (statData.size() >= 2) { + data.put("current_sessions", statData.get(1).get("total_bytes")); + } + if (!summaryData.isEmpty()) { + data.putAll(summaryData.get(0)); + } + long uniqClientIp = Long.parseLong(String.valueOf(data.get("uniq_client_ip") == null ? 0 : data.get("uniq_client_ip"))); + long totalSessions = Long.parseLong(String.valueOf(data.get("total_sessions") == null ? 0 : data.get("total_sessions"))); + long summaryTotalSessions = Long.parseLong(String.valueOf(data.get("summarySessions") == null ? 0 : data.get("summarySessions"))); + long summaryTotalBytes = Long.parseLong(String.valueOf(data.get("summaryTotalBytes") == null ? 0 : data.get("summaryTotalBytes"))); + long summaryTotalPackets = Long.parseLong(String.valueOf(data.get("summaryTotalPackets") == null ? 0 : data.get("summaryTotalPackets"))); + long currentSessions = Long.parseLong(String.valueOf(data.get("current_sessions") == null ? 0 : data.get("current_sessions"))); + long dataRate = Long.parseLong(String.valueOf(data.get("data_rate") == null ? 0 : data.get("data_rate"))); + long totalBytes = Long.parseLong(String.valueOf(data.get("total_bytes") == null ? 0 : data.get("total_bytes"))); + long totalUncategorizedBytes = Long.parseLong(String.valueOf(data.get("total_uncategorized_bytes") == null ? 0 : data.get("total_uncategorized_bytes"))); + double totalUncategorizedPercent = summaryTotalBytes == 0 ? 0 : totalUncategorizedBytes * 1.0 / summaryTotalBytes; + long oneSidedConnections = Long.parseLong(String.valueOf(data.get("one_sided_connections") == null ? 0 : data.get("one_sided_connections"))); + double oneSidedPercent = summaryTotalSessions == 0 ? 0 : oneSidedConnections * 1.0 / summaryTotalSessions; + long sequenceGapLossBytes = Long.parseLong(String.valueOf(data.get("sequence_gap_loss_bytes") == null ? 0 : data.get("sequence_gap_loss_bytes"))); + double sequenceGapLossBytesPercent = summaryTotalBytes == 0 ? 0 : sequenceGapLossBytes * 1.0 / summaryTotalBytes; + long fragmentationPackets = Long.parseLong(String.valueOf(data.get("fragmentation_packets") == null ? 0 : data.get("fragmentation_packets"))); + double fragmentationPacketsPercent = summaryTotalPackets == 0 ? 0 : fragmentationPackets * 1.0 / summaryTotalPackets; + + List result = new ArrayList<>(); + Map resultMap = new LinkedHashMap<>(); + resultMap.put("uniq_client_ip", uniqClientIp); + resultMap.put("total_sessions", totalSessions); + resultMap.put("current_sessions", currentSessions); + resultMap.put("data_rate", dataRate); + resultMap.put("total_bytes", totalBytes); + resultMap.put("total_uncategorized_bytes", totalUncategorizedBytes); + resultMap.put("total_uncategorized_percent", Double.parseDouble(String.format("%.4f", totalUncategorizedPercent))); + resultMap.put("one_sided_connections", oneSidedConnections); + resultMap.put("one_sided_percent", Double.parseDouble(String.format("%.4f", oneSidedPercent))); + resultMap.put("sequence_gap_loss_bytes", sequenceGapLossBytes); + resultMap.put("sequence_gap_loss_percent", Double.parseDouble(String.format("%.4f", sequenceGapLossBytesPercent))); + resultMap.put("fragmentation_packets", fragmentationPackets); + resultMap.put("fragmentation_percent", Double.parseDouble(String.format("%.4f", fragmentationPacketsPercent))); + result.add(resultMap); + + Map statistics = dataRateResult.getStatistics(); + Map statStatistics = currentSessionResult.getStatistics(); + for (Object key : statistics.keySet()) { + if ("result_rows".equals(key.toString())) { + continue; + } + statistics.put(key, Long.parseLong(statistics.get(key).toString()) + Long.parseLong(statStatistics.get(key).toString())); + } + + baseResult = BaseResultGenerator.success("ok", result, statistics); + } else { + baseResult = BaseResultGenerator.error("dataRate result: " + dataRateResult.toString() + ";\n currentSession result is: " + currentSessionResult); + } + return baseResult; + } + + private BaseResult getProtocolDataRateSummary(String sql) { + BaseResult baseResult; + BaseResult result = executeQuery(sql); + if (result.isSuccess()) { + List<Map> data = (List<Map>) result.getData(); + data.forEach(o -> { + String[] protocolIds = String.valueOf(o.get("type")).split("/"); + String protocolId = protocolIds[protocolIds.length - 1]; + o.put("type", protocolId); + }); + baseResult = BaseResultGenerator.success("ok", data, result.getStatistics()); + } else { + baseResult = BaseResultGenerator.error(result.getMessage()); + } + return baseResult; + } + + private BaseResult getResultProtocolTree(String sql) { + BaseResult baseResult = executeQuery(sql); + if (baseResult.isSuccess()) { + List<ProtocolTree> listProtocol = getListProtocol(getProtocolTrees((List<Map>) baseResult.getData())); + baseResult = BaseResultGenerator.success("ok", listProtocol, baseResult.getStatistics()); + } else { + baseResult = BaseResultGenerator.error(baseResult.getMessage()); + } + return baseResult; + } + + private String generateStatSql(DSLObject.QueryBean queryParam) { + String whereOfTime = getWhereOfTime(queryParam.getParameters()); + String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); + whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; + String[] intervals = getIntervals(queryParam.getParameters().getIntervals()); + return String.format("(SELECT SUM(c2s_byte_num + s2c_byte_num) as total_bytes, SUM(sessions) as total_sessions, (SUM(c2s_byte_num + s2c_byte_num) * 8)/((TIMESTAMP_TO_MILLIS(TIMESTAMP '%s')-TIMESTAMP_TO_MILLIS(TIMESTAMP '%s'))/1000) AS data_rate " + + "FROM traffic_protocol_stat_log " + + "WHERE %s %s" + + "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) LIMIT 1) " + + "UNION ALL " + + "( SELECT SUM(sessions), 0, 0 " + + "FROM traffic_protocol_stat_log " + + "WHERE %s %s" + + "AND LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) GROUP BY __time ORDER BY __time DESC LIMIT 1 ) ", + intervals[1], intervals[0], whereOfTime, whereOfExactly, + whereOfTime, whereOfExactly); + } + + private String generateProtocolDataRateSql(DSLObject.QueryBean queryParam) { + String whereOfTime = getWhereOfTime(queryParam.getParameters()); + String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); + whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; + String protocolStr = null; + List<DSLObject.QueryBean.Parameters.MatchBean> match = queryParam.getParameters().getMatch(); + for (DSLObject.QueryBean.Parameters.MatchBean item : match) { + if (MatchEnum.PREFIX.getType().equals(item.getType())) { + String[] split = item.getFieldValues().get(0).split(","); + protocolStr = split[0].replaceFirst(PROTOCOL_NODE, ""); + } + } + return String.format("(" + + "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " + + "from %s " + + "where %s %s and protocol_id = '%s' " + + "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " + + "order by stat_time asc" + + ") " + + "union all " + + "(" + + "SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes " + + "from %s " + + "where %s %s and protocol_id like CONCAT('%s','/%s') and LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'/','')) + 1 + %s " + + "group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, '%s', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id " + + "order by stat_time asc) ", + queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, queryParam.getParameters().getGranularity(), + queryParam.getParameters().getGranularity(), queryParam.getDataSource(), whereOfTime, whereOfExactly, protocolStr, "%", + protocolStr.length() - protocolStr.replaceAll("/", "").length(), queryParam.getParameters().getGranularity()); + + } + + private String generateOverviewSummarySql(DSLObject.QueryBean queryParam) { + String whereOfTime = getWhereOfTime(queryParam.getParameters()); + String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); + + return String.format("SELECT APPROX_COUNT_DISTINCT_DS_HLL(ip_object) AS uniq_client_ip, " + + "SUM(one_sided_connections) AS one_sided_connections, " + + "SUM(uncategorized_bytes) AS total_uncategorized_bytes, " + + "SUM(fragmentation_packets) AS fragmentation_packets, " + + "SUM(sequence_gap_loss) AS sequence_gap_loss_bytes, " + + "SUM(s2c_byte_num+c2s_byte_num) AS summaryTotalBytes, " + + "SUM(s2c_pkt_num+c2s_pkt_num) AS summaryTotalPackets, " + + "SUM(sessions) AS summarySessions " + + "FROM %s " + + "WHERE %s" + + " %s LIMIT 1 ", + queryParam.getDataSource(), whereOfTime, StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly); + } + + private String getWhereOfTime(DSLObject.QueryBean.Parameters parameters) { + if (CollectionUtils.isEmpty(parameters.getIntervals())) { + return StringUtil.EMPTY; + } + StringBuffer whereOfTime = new StringBuffer(); + String[] intervals = getIntervals(parameters.getIntervals()); + whereOfTime.append("__time >= TIMESTAMP '").append(intervals[0]).append("' AND __time < TIMESTAMP '").append(intervals[1]).append("'"); + return whereOfTime.toString(); + } + + private String[] getIntervals(List<String> intervals) { + return intervals.get(0).split("/"); + } + + private String getWhereOfExactly(DSLObject.QueryBean.Parameters parameters) { + if (CollectionUtils.isEmpty(parameters.getMatch())) { + return StringUtil.EMPTY; + } + StringBuffer whereOfExactly = new StringBuffer(); + List<DSLObject.QueryBean.Parameters.MatchBean> match = parameters.getMatch(); + for (DSLObject.QueryBean.Parameters.MatchBean item : match) { + if (MatchEnum.EXACTLY.getType().equals(item.getType()) && !CollectionUtils.isEmpty(item.getFieldValues())) { + whereOfExactly.append(item.getFieldKey()).append(" IN ('").append(String.join("','", item.getFieldValues())).append("')"); + } + } + return whereOfExactly.toString(); + } + + private String generateProtocolTreeSql(DSLObject.QueryBean queryParam) { + String whereOfTime = getWhereOfTime(queryParam.getParameters()); + String whereOfExactly = getWhereOfExactly(queryParam.getParameters()); + whereOfExactly = StringUtil.isBlank(whereOfExactly) ? "" : "AND " + whereOfExactly; + return String.format("SELECT protocol_id, SUM(sessions) as sessions,SUM(c2s_byte_num) as c2s_byte_num, SUM(c2s_pkt_num) as c2s_pkt_num, SUM(s2c_byte_num) as s2c_byte_num, SUM(s2c_pkt_num) as s2c_pkt_num " + + "FROM %s " + + "WHERE %s %s" + + "GROUP BY protocol_id ", + queryParam.getDataSource(), whereOfTime, whereOfExactly); + } + + private List<ProtocolTree> getProtocolTrees(List<Map> data) { + Iterable<ProtocolTree> resultConcat = Iterables.concat(new ArrayList<>()); + for (Map datum : data) { + String s = JsonMapper.toJsonString(datum); + List<ProtocolTree> nodes = parserRawData(s); + resultConcat = Iterables.concat(resultConcat, nodes); + } + List<ProtocolTree> listNodes = Lists.newArrayList(resultConcat); + ProtocolTree root = new ProtocolTree("Protocols", "Protocols", null); + List<ProtocolTree> roots = listNodes.stream().filter(o -> StringUtil.isBlank(o.getParentId())).collect(Collectors.toList()); + roots.forEach(item -> { + root.setSentBytes(root.getSentBytes() + item.getSentBytes()); + root.setReceivedBytes(root.getReceivedBytes() + item.getReceivedBytes()); + }); + + listNodes.forEach(item -> { + item.setId(PROTOCOL_NODE + item.getId()); + }); + + listNodes.add(root); + return listNodes; + } + + private BaseResult executeQuery(String sql) { + Map<String, Object> map = new HashMap<>(); + map.put("query", sql); + return restTemplate.getForObject("http://galaxy-query-engine/?query={query}", BaseResult.class, map); + } + + + private List<ProtocolTree> parserRawData(String jsonString) { + List<ProtocolTree> lists = Lists.newArrayList(); + Map<String, Object> results = (Map<String, Object>) JsonMapper.fromJsonString(jsonString, Map.class); + long sessions = Long.parseLong(results.get("sessions").toString()); + long sentBytes = Long.parseLong(results.get("c2s_byte_num").toString()); + long receivedBytes = Long.parseLong(results.get("s2c_byte_num").toString()); + long sentPackets = Long.parseLong(results.get("c2s_pkt_num").toString()); + long receivedPackets = Long.parseLong(results.get("s2c_pkt_num").toString()); + String protocolId = results.get("protocol_id").toString(); + List<String> protocolList = Lists.newArrayList(protocolId.split("/")); + ProtocolTree protocolTree = new ProtocolTree(protocolId, protocolList.size() <= 0 ? null : protocolList.get(protocolList.size() - 1), null); + protocolTree.setSentBytes(sentBytes); + protocolTree.setReceivedBytes(receivedBytes); + protocolTree.getMetrics().put("sentPackets", sentPackets); + protocolTree.getMetrics().put("receivedPackets", receivedPackets); + protocolTree.getMetrics().put("sessions", sessions); + lists.add(protocolTree); + return lists; + } + + private List<ProtocolTree> getListProtocol(List<ProtocolTree> listNodes) { + Map<String, List<ProtocolTree>> resultMap = listNodes.stream().collect(Collectors.groupingBy(ProtocolTree::getId)); + Map<String, Object> mergeResultMap = + Maps.transformValues(resultMap, new com.google.common.base.Function<List<ProtocolTree>, Object>() { + @Nullable + @Override + public Object apply(@Nullable List<ProtocolTree> input) { + if (StringUtil.isEmpty(input)) { + return null; + } + if (input.size() == 1) { + return input.get(0); + } + ProtocolTree firstProtocolTree = input.get(0); + ProtocolTree mergeProtocolTree = new ProtocolTree(firstProtocolTree.getId(), firstProtocolTree.getName(), + firstProtocolTree.getParentId(), 0, 0); + mergeProtocolTree.setMetrics(Maps.newLinkedHashMap()); + + for (ProtocolTree protocolTree : input) { + mergeProtocolTree.setSentBytes(mergeProtocolTree.getSentBytes() + protocolTree.getSentBytes()); + mergeProtocolTree.setReceivedBytes(mergeProtocolTree.getReceivedBytes() + protocolTree.getReceivedBytes()); + for (String key : protocolTree.getMetrics().keySet()) { + if (StringUtil.isEmpty(mergeProtocolTree.getMetrics().get(key))) { + mergeProtocolTree.getMetrics().put(key, protocolTree.getMetrics().get(key)); + } else { + long sumValue = Long.parseLong(mergeProtocolTree.getMetrics().get(key).toString()) + + Long.parseLong(protocolTree.getMetrics().get(key).toString()); + mergeProtocolTree.getMetrics().put(key, sumValue); + } + } + + } + + return mergeProtocolTree; + } + }); + List<ProtocolTree> mergeNodes = new ArrayList(mergeResultMap.values()); + return TreeUtils.mergeTree(mergeNodes, ProtocolTree::getId, ProtocolTree::getParentId, ProtocolTree::setChildrens); + } + + enum QueryType { + + PROTOCOL_TREE_SUMMARY("protocolTreeSummary"), PROTOCOL_DATA_RATE_SUMMARY("protocolDataRateSummary"), NETWORK_OVERVIEW_SUMMARY("networkOverviewSummary"); + private final String value; + + QueryType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } +} diff --git a/galaxy-gateway/config/application.yml b/galaxy-gateway/config/application.yml index 4b12036..c787a07 100644 --- a/galaxy-gateway/config/application.yml +++ b/galaxy-gateway/config/application.yml @@ -39,6 +39,10 @@ spring: uri: lb://galaxy-business-api predicates: - Path=/open-api/** + - id: traffic + uri: lb://galaxy-business-api + predicates: + - Path=/traffic/** - id: knowledge uri: lb://galaxy-query-engine predicates: |
