summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-12-14 10:55:03 +0800
committerqidaijie <[email protected]>2023-12-14 10:55:03 +0800
commit9e2d7350ea3516698510439eead1d5e7c047d73a (patch)
tree4a443f05538f5256ba6a0abff72f4fefd3088ee0 /src/main
parent87a6951c23edaf96ff2fca1e0137ca3a2775d987 (diff)
修改Fields各字段类型
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/zdjizhi/common/config/MergeConfigs.java1
-rw-r--r--src/main/java/com/zdjizhi/common/pojo/Fields.java117
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/general/MetricUtil.java20
4 files changed, 71 insertions, 103 deletions
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
index 738537c..8929f6c 100644
--- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -69,4 +69,5 @@ public class MergeConfigs {
.stringType()
.defaultValue("application_protocol_stat")
.withDescription("The data identification.");
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/pojo/Fields.java b/src/main/java/com/zdjizhi/common/pojo/Fields.java
index baa5b25..40b3995 100644
--- a/src/main/java/com/zdjizhi/common/pojo/Fields.java
+++ b/src/main/java/com/zdjizhi/common/pojo/Fields.java
@@ -7,28 +7,28 @@ package com.zdjizhi.common.pojo;
* @date 2023/4/2311:47
*/
public class Fields {
- private Long sessions;
- private Long in_bytes;
- private Long out_bytes;
- private Long in_pkts;
- private Long out_pkts;
- private Long c2s_pkts;
- private Long s2c_pkts;
- private Long c2s_bytes;
- private Long s2c_bytes;
- private Long c2s_fragments;
- private Long s2c_fragments;
- private Long c2s_tcp_lost_bytes;
- private Long s2c_tcp_lost_bytes;
- private Long c2s_tcp_ooorder_pkts;
- private Long s2c_tcp_ooorder_pkts;
- private Long c2s_tcp_retransmitted_pkts;
- private Long s2c_tcp_retransmitted_pkts;
- private Long c2s_tcp_retransmitted_bytes;
- private Long s2c_tcp_retransmitted_bytes;
+ private long sessions;
+ private long in_bytes;
+ private long out_bytes;
+ private long in_pkts;
+ private long out_pkts;
+ private long c2s_pkts;
+ private long s2c_pkts;
+ private long c2s_bytes;
+ private long s2c_bytes;
+ private long c2s_fragments;
+ private long s2c_fragments;
+ private long c2s_tcp_lost_bytes;
+ private long s2c_tcp_lost_bytes;
+ private long c2s_tcp_ooorder_pkts;
+ private long s2c_tcp_ooorder_pkts;
+ private long c2s_tcp_retransmitted_pkts;
+ private long s2c_tcp_retransmitted_pkts;
+ private long c2s_tcp_retransmitted_bytes;
+ private long s2c_tcp_retransmitted_bytes;
private String client_ip_sketch;
- public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) {
+ public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes, String client_ip_sketch) {
this.sessions = sessions;
this.in_bytes = in_bytes;
this.out_bytes = out_bytes;
@@ -51,155 +51,155 @@ public class Fields {
this.client_ip_sketch = client_ip_sketch;
}
- public Long getSessions() {
+ public long getSessions() {
return sessions;
}
- public void setSessions(Long sessions) {
+ public void setSessions(long sessions) {
this.sessions = sessions;
}
- public Long getIn_bytes() {
+ public long getIn_bytes() {
return in_bytes;
}
- public void setIn_bytes(Long in_bytes) {
+ public void setIn_bytes(long in_bytes) {
this.in_bytes = in_bytes;
}
- public Long getOut_bytes() {
+ public long getOut_bytes() {
return out_bytes;
}
- public void setOut_bytes(Long out_bytes) {
+ public void setOut_bytes(long out_bytes) {
this.out_bytes = out_bytes;
}
- public Long getIn_pkts() {
+ public long getIn_pkts() {
return in_pkts;
}
- public void setIn_pkts(Long in_pkts) {
+ public void setIn_pkts(long in_pkts) {
this.in_pkts = in_pkts;
}
- public Long getOut_pkts() {
+ public long getOut_pkts() {
return out_pkts;
}
- public void setOut_pkts(Long out_pkts) {
+ public void setOut_pkts(long out_pkts) {
this.out_pkts = out_pkts;
}
- public Long getC2s_pkts() {
+ public long getC2s_pkts() {
return c2s_pkts;
}
- public void setC2s_pkts(Long c2s_pkts) {
+ public void setC2s_pkts(long c2s_pkts) {
this.c2s_pkts = c2s_pkts;
}
- public Long getS2c_pkts() {
+ public long getS2c_pkts() {
return s2c_pkts;
}
- public void setS2c_pkts(Long s2c_pkts) {
+ public void setS2c_pkts(long s2c_pkts) {
this.s2c_pkts = s2c_pkts;
}
- public Long getC2s_bytes() {
+ public long getC2s_bytes() {
return c2s_bytes;
}
- public void setC2s_bytes(Long c2s_bytes) {
+ public void setC2s_bytes(long c2s_bytes) {
this.c2s_bytes = c2s_bytes;
}
- public Long getS2c_bytes() {
+ public long getS2c_bytes() {
return s2c_bytes;
}
- public void setS2c_bytes(Long s2c_bytes) {
+ public void setS2c_bytes(long s2c_bytes) {
this.s2c_bytes = s2c_bytes;
}
- public Long getC2s_fragments() {
+ public long getC2s_fragments() {
return c2s_fragments;
}
- public void setC2s_fragments(Long c2s_fragments) {
+ public void setC2s_fragments(long c2s_fragments) {
this.c2s_fragments = c2s_fragments;
}
- public Long getS2c_fragments() {
+ public long getS2c_fragments() {
return s2c_fragments;
}
- public void setS2c_fragments(Long s2c_fragments) {
+ public void setS2c_fragments(long s2c_fragments) {
this.s2c_fragments = s2c_fragments;
}
- public Long getC2s_tcp_lost_bytes() {
+ public long getC2s_tcp_lost_bytes() {
return c2s_tcp_lost_bytes;
}
- public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) {
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
}
- public Long getS2c_tcp_lost_bytes() {
+ public long getS2c_tcp_lost_bytes() {
return s2c_tcp_lost_bytes;
}
- public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) {
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
}
- public Long getC2s_tcp_ooorder_pkts() {
+ public long getC2s_tcp_ooorder_pkts() {
return c2s_tcp_ooorder_pkts;
}
- public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) {
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
}
- public Long getS2c_tcp_ooorder_pkts() {
+ public long getS2c_tcp_ooorder_pkts() {
return s2c_tcp_ooorder_pkts;
}
- public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) {
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
}
- public Long getC2s_tcp_retransmitted_pkts() {
+ public long getC2s_tcp_retransmitted_pkts() {
return c2s_tcp_retransmitted_pkts;
}
- public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) {
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
}
- public Long getS2c_tcp_retransmitted_pkts() {
+ public long getS2c_tcp_retransmitted_pkts() {
return s2c_tcp_retransmitted_pkts;
}
- public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) {
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
}
- public Long getC2s_tcp_retransmitted_bytes() {
+ public long getC2s_tcp_retransmitted_bytes() {
return c2s_tcp_retransmitted_bytes;
}
- public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) {
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
}
- public Long getS2c_tcp_retransmitted_bytes() {
+ public long getS2c_tcp_retransmitted_bytes() {
return s2c_tcp_retransmitted_bytes;
}
- public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) {
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
}
@@ -210,4 +210,5 @@ public class Fields {
public void setClient_ip_sketch(String client_ip_sketch) {
this.client_ip_sketch = client_ip_sketch;
}
+
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java b/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java
deleted file mode 100644
index cbf9572..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.utils.functions.filter;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSONPath;
-import com.alibaba.fastjson2.JSONReader;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions.filter
- * @Description:
- * @date 2023/4/1919:02
- */
-public class DataTypeFilter implements FilterFunction<String> {
- private static final Log logger = LogFactory.get();
-
- private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
-
- @Override
- public boolean filter(String message) throws Exception {
- boolean protocolData = false;
- try {
- if (StringUtil.isNotBlank(message)) {
- Object name = JSONPath.eval(message, dataTypeExpr);
- if (name != null) {
- protocolData = true;
- }
- }
- } catch (RuntimeException e) {
- logger.error("Parsing metric data is abnormal! The exception message is:" + e.getMessage());
- }
- return protocolData;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
index cc8b32c..eeabd72 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
@@ -55,6 +55,7 @@ public class MetricUtil {
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
+
// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
// return new Fields(sessions,
// inBytes, outBytes, inPkts, outPkts,
@@ -80,21 +81,22 @@ public class MetricUtil {
/**
* Long类型的数据求和
*
- * @param value1 第一个值
- * @param value2 第二个值
- * @return value1 + value2
+ * @param cacheData 缓存中的值
+ * @param newData 新来数据的值
+ * @return cacheData + newData
*/
- private static Long longSum(Long value1, Long value2) {
+ private static Long longSum(Long cacheData, Long newData) {
+
Long result;
try {
- if (value1 >= 0 && value2 >= 0) {
- result = value1 + value2;
+ if (cacheData >= 0 && newData >= 0) {
+ result = cacheData + newData;
} else {
- result = value1;
+ result = cacheData;
}
} catch (RuntimeException e) {
- logger.error("Abnormal sending of traffic indicator statistics! The message is:" + e.getMessage());
- result = value1;
+ logger.error("Abnormal sending of traffic indicator statistics! The message is:{}" , e);
+ result = cacheData;
}
return result;