summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-09-14 16:58:06 +0800
committerqidaijie <[email protected]>2022-09-14 16:58:06 +0800
commit9d2f2cfd8965441277f9255bc26f6a8ad424ed99 (patch)
tree2ee50652a7fef59b9190a38cb37fb0ddeb44f072 /src
parent3df5d8c51ed11919d37c4e71c48b344344104dc4 (diff)
1:重构VOIP任务结构,使用多window的方式进行融合。
2:增加VSYS融合维度(TSG-11721)。
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/common/JsonProConfig.java106
-rw-r--r--src/main/java/com/zdjizhi/common/VoipRelationConfig.java11
-rw-r--r--src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java22
-rw-r--r--src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java27
-rw-r--r--src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java82
-rw-r--r--src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java)237
-rw-r--r--src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java)183
-rw-r--r--src/main/java/com/zdjizhi/tools/exception/VoipRelationException.java (renamed from src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java)113
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java)6
-rw-r--r--src/main/java/com/zdjizhi/tools/json/TypeUtils.java (renamed from src/main/java/com/zdjizhi/utils/json/TypeUtils.java)22
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/CertUtils.java (renamed from src/main/java/com/zdjizhi/utils/kafka/CertUtils.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java)3
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java)5
-rw-r--r--src/main/java/com/zdjizhi/tools/system/VoipRelationConfigurations.java (renamed from src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/utils/RelationUtils.java134
-rw-r--r--src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java39
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/relationUtils.java81
-rw-r--r--src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java77
-rw-r--r--src/test/testdata/differentVSYS3
-rw-r--r--src/test/testdata/differentVSYSUniFlowRTP3
-rw-r--r--src/test/testdata/oneRTPUniFlowSIP3
-rw-r--r--src/test/testdata/oneRTPtwoSIP3
-rw-r--r--src/test/testdata/oneSIPUniFlowRTP3
-rw-r--r--src/test/testdata/oneSIPtwoRTP3
-rw-r--r--src/test/testdata/twoRTPandSIP4
27 files changed, 571 insertions, 624 deletions
diff --git a/src/main/java/com/zdjizhi/common/JsonProConfig.java b/src/main/java/com/zdjizhi/common/JsonProConfig.java
index 6e9d437..11de17c 100644
--- a/src/main/java/com/zdjizhi/common/JsonProConfig.java
+++ b/src/main/java/com/zdjizhi/common/JsonProConfig.java
@@ -1,8 +1,6 @@
package com.zdjizhi.common;
-import com.zdjizhi.utils.system.VoipRelationConfigurations;
-
/**
* @author Administrator
*/
@@ -13,123 +11,103 @@ public class JsonProConfig {
public static final int DOUBLE = 3;
/**
- *
+ * SIP日志标识
*/
public static final String SIP_MARK = "SIP";
/**
- *
+ * RTP日志标识
*/
public static final String RTP_MARK = "RTP";
+
/**
- *
+ * 所属vsys,缺省为1
+ */
+ public static final String VSYS_ID = "common_vsys_id";
+
+ /**
+ * 日志类型
*/
public static final String SCHEMA_TYPE = "common_schema_type";
/**
- *
+ * 会话结束时间
*/
public static final String END_TIME = "common_end_time";
/**
- *
+ * 流类型
+ * 1:c2s,2:s2c;3;double
*/
public static final String STREAM_DIR = "common_stream_dir";
+
/**
- *
- */
- public static final String SESSIONS = "common_sessions";
- /**
- *
- */
- public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
- /**
- *
- */
- public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
- /**
- *
- */
- public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
- /**
- *
- */
- public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
- /**
- *
- */
- public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
- /**
- *
- */
- public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
- /**
- *
- */
- public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
- /**
- *
- */
- public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
- /**
- *
- */
- public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
- /**
- *
- */
- public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
- /**
- *
+ * 客户端ip地址
*/
public static final String CLIENT_IP = "common_client_ip";
/**
- *
+ * 客户端端口
*/
public static final String CLIENT_PORT = "common_client_port";
/**
- *
+ * 服务端ip地址
*/
public static final String SERVER_IP = "common_server_ip";
/**
- *
+ * 服务端端口
*/
public static final String SERVER_PORT = "common_server_port";
/**
- *
+ * 会话ID
*/
public static final String SIP_CALL_ID = "sip_call_id";
/**
- *
+ * 协商的主叫语音传输IP
*/
public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip";
/**
- *
+ * 协商的主叫语音传输端口
*/
public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port";
/**
- *
+ * 协商的被叫语音传输IP
*/
public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip";
/**
- *
+ * 协商的被叫语音传输端口
*/
public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port";
/**
- *
+ * RTP原始包文件地址
*/
public static final String RTP_PCAP_PATH = "rtp_pcap_path";
/**
- *
+ * 主叫方向
*/
public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir";
/**
- *
+ * c2s编码方式序号(PT)
*/
public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s";
/**
- *
+ * s2c编码方式序号(PT)
*/
public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c";
+ /**
+ * 各类Transmission指标key
+ */
+ public static final String SESSIONS = "common_sessions";
+ public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
+ public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
+ public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
+ public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
+ public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
+ public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
+ public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
+ public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
+ public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
+ public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
+
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index 61c50a1..758af0f 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -1,7 +1,7 @@
package com.zdjizhi.common;
-import com.zdjizhi.utils.system.VoipRelationConfigurations;
+import com.zdjizhi.tools.system.VoipRelationConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -39,7 +39,8 @@ public class VoipRelationConfig {
*/
public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time");
public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time");
- public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism");
+ public static final Integer MERGE_UNIFLOW_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "merge.uniflow.window.parallelism");
+ public static final Integer CALIBRATION_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "calibration.window.parallelism");
/**
* connection kafka
@@ -72,15 +73,11 @@ public class VoipRelationConfig {
public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size");
public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library");
- /**
- * http
- */
- public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http");
/**
* voip
*/
public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
- public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network");
+ public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(0, "check.inner.network");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java
new file mode 100644
index 0000000..8be77f8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.operator.filter;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.filter
+ * @Description:
+ * @date 2022/9/214:59
+ */
+public class NullFilterFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String message) throws Exception {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
new file mode 100644
index 0000000..351bee7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.operator.group;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.group
+ * @Description:
+ * @date 2022/8/2911:08
+ */
+public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, Map<String, Object>, Integer>, Integer> {
+ @Override
+ public Integer getKey(Tuple3<String, Map<String, Object>, Integer> value) throws Exception {
+ //vsys id
+ return value.f2;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
new file mode 100644
index 0000000..a184566
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
@@ -0,0 +1,27 @@
+package com.zdjizhi.operator.group;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.group
+ * @Description:
+ * @date 2022/8/2911:08
+ */
+public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, Map<String, Object>, Integer>, Tuple2<String, Integer>> {
+ @Override
+ public Tuple2<String, Integer> getKey(Tuple4<String, String, Map<String, Object>, Integer> value) throws Exception {
+ String fourKey = value.f0;
+ Integer vsysId = value.f3;
+
+ return new Tuple2<>(fourKey, vsysId);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
new file mode 100644
index 0000000..3d7ca4b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
@@ -0,0 +1,82 @@
+package com.zdjizhi.operator.parse;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.parse
+ * @Description:
+ * @date 2022/9/916:21
+ */
+public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<String, Object>, Integer>> {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public Tuple3<String, Map<String, Object>, Integer> map(String input) throws Exception {
+ try {
+ if (StringUtil.isNotBlank(input)) {
+ Map<String, Object> jsonMap = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
+
+ String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE);
+ Integer vsysId = getVsysId(jsonMap);
+ String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
+
+ //1:c2s,2:s2c;3;double
+ int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR);
+
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
+ if (StringUtil.isNotBlank(sipCallId)) {
+ if (RelationUtils.checkSipCompleteness(jsonMap)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ return new Tuple3<>("sip-single", jsonMap, vsysId);
+ } else {
+ return new Tuple3<>("sip-double", jsonMap, vsysId);
+ }
+ } else {
+ return new Tuple3<>("violation", jsonMap, vsysId);
+ }
+ } else {
+ return new Tuple3<>("violation", jsonMap, vsysId);
+ }
+ } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ if (commonStreamDir == JsonProConfig.DOUBLE) {
+ return new Tuple3<>("rtp-double", jsonMap, vsysId);
+ } else {
+ return new Tuple3<>("rtp-single", jsonMap, vsysId);
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("TransForm logs failed,The exception is :" + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * 获取VSYS ID若无该字段,则默认值为1
+ *
+ * @param jsonMap 原始日志
+ * @return vsysid
+ */
+ private static int getVsysId(Map<String, Object> jsonMap) {
+ Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null);
+ if (value != null) {
+ return Integer.parseInt(value.toString());
+ } else {
+ return 1;
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
index 0e244c3..cf25763 100644
--- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -7,167 +7,162 @@ import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
+import scala.Int;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
- * @Package com.zdjizhi.utils.functions
+ * @Package com.zdjizhi.operator
* @Description:
- * @date 2021/8/1818:04
+ * @date 2022/8/2911:44
*/
-public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tuple3<String, String, String>, TimeWindow> {
+public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, Map<String, Object>, Integer>, Tuple4<String, String, Map<String, Object>, Integer>, Integer, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
- private static HashMap<String, String> sipOriHmList = new HashMap<>(32);
+ private static HashMap<String, Map<String, Object>> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
- private static HashMap<String, String> rtpOriHmList = new HashMap<>(32);
+ private static HashMap<String, Map<String, Object>> rtpOriHmList = new HashMap<>(32);
- @Override
- @SuppressWarnings("unchecked")
- public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) {
- for (String input : inputs) {
- if (StringUtil.isNotBlank(input)) {
- try {
- Map<String, Object> object = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
- String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE);
- String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID);
+ @Override
+ public void process(Integer key, Context context, Iterable<Tuple3<String, Map<String, Object>, Integer>> inputs, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) throws Exception {
+ for (Tuple3<String, Map<String, Object>, Integer> input : inputs) {
+ if (input != null) {
+ //已关联的sip,rtp;未关联的sip,rtp;内网的sip
+ String type = input.f0;
+ Map<String, Object> jsonMap = input.f1;
+ Integer vsysId = input.f2;
- //1:c2s,2:s2c;3;double
- int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR);
+ logger.error("" + type + "----" + jsonMap.toString());
+ switch (type) {
+ case "sip-double":
+ separateInnerIp(jsonMap, out, vsysId);
+ break;
+ case "rtp-double":
+ String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
- /*
- * 针对SIP日志进行处理
- */
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
- if (relationUtils.checkSipCompleteness(object)) {
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
- } else {
- separateInnerIp(object, out);
- }
+ out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId));
+ break;
+ case "violation":
+ out.collect(new Tuple4<>("", type, jsonMap, vsysId));
+ break;
+ //单向流的SIP
+ case "sip-single":
+ String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
+ String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ if (StringUtil.isNotBlank(sipCallId)) {
+ putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out);
} else {
- out.collect(new Tuple3<>("", "violation", input));
+ out.collect(new Tuple4<>("", type, jsonMap, vsysId));
}
- }
-
- /*
- * 针对RTP日志进行处理
- */
- if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP);
- int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT);
- String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP);
- int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT);
+ break;
+ //单向流的RTP
+ case "rtp-single":
+ String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ //对rtp单向流进行关联
+ putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out);
+ break;
+ //内网的SIP
+ default:
+ logger.error("type is beyond expectation:" + type);
+ break;
- String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort);
-
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- //对rtp单向流进行关联
- putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
-
- } else {
- //RTP双向流,按四元组下发
- out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
- }
- }
- } catch (RuntimeException e) {
- logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e);
}
}
}
- /*
- * 定时发送SIP未关联上数据
- */
+
if (sipOriHmList.size() > 0) {
- HashMap<String, String> tmpSipOriHmList = new HashMap<>(sipOriHmList);
+ HashMap<String, Map<String, Object>> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
- String sipSingleMsg = tmpSipOriHmList.get(sipKey);
+ Map<String, Object> sipSingleMsg = tmpSipOriHmList.get(sipKey);
//sipKey为sip_call_id,未关联成功的sip是不能使用的
- out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
+ out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0));
}
}
- /*
- * 定时发送RTP未关联上数据
- */
+
if (rtpOriHmList.size() > 0) {
- HashMap<String, String> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
+ HashMap<String, Map<String, Object>> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
- String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
+ Map<String, Object> rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
//未关联成功的rtp还可以继续关联,因为有四元组
- out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg));
+ out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0));
}
}
+
}
/**
* 存放key并关联拼接对应Key
*/
@SuppressWarnings("unchecked")
- private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) {
+ private static void putKeyAndMsg(Map<String, Object> secondSipOrRtpLog, String hmStrKey, HashMap<String, Map<String, Object>> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
- HashMap<String, Object> jsonCommonMap = new HashMap<>(32);
- String[] strArr = new String[2];
- String firstMsg = hashMapStr.remove(hmStrKey);
- Map<String, Object> firstSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(firstMsg, Map.class);
- Map<String, Object> secondSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ HashMap<String, Object> jsonCommonMap = new HashMap<>(128);
+ Map<String, Object> firstSipOrRtpLog = hashMapStr.remove(hmStrKey);
//1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准
if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
- strArr[0] = message;
- strArr[1] = firstMsg;
+ jsonCommonMap.putAll(secondSipOrRtpLog);
+ jsonCommonMap.putAll(firstSipOrRtpLog);
} else {
- strArr[0] = firstMsg;
- strArr[1] = message;
+ jsonCommonMap.putAll(firstSipOrRtpLog);
+ jsonCommonMap.putAll(secondSipOrRtpLog);
}
- jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[0], Map.class));
- jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[1], Map.class));
- String sipTwoMsg = jsonCommonMap.toString();
+ accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap);
+ jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
- Map<String, Object> sipOrRtpCombin = (Map<String, Object>) JsonMapper.fromJsonString(sipTwoMsg, Map.class);
- accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin);
- sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
- separateInnerIp(sipOrRtpCombin, out);
+ separateInnerIp(jsonCommonMap, out, vsysId);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
- sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
- out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin)));
+ jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
+ out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId));
}
} else {
- hashMapStr.put(hmStrKey, message);
+ hashMapStr.put(hmStrKey, secondSipOrRtpLog);
}
}
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
- private static void separateInnerIp(Map<String, Object> object, Collector<Tuple3<String, String, String>> out) {
+ private static void separateInnerIp(Map<String, Object> jsonMap, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out, Integer vsysid) {
- String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP);
- int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT);
- String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP);
- int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT);
+ String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP);
+ int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT);
+ String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP);
+ int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT);
- if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) {
+ if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) {
/*
* 按from-ip_from-port_to-ip_to-port
*/
@@ -176,7 +171,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
+ sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ sipResponderPort;
//包含内网IP的SIP关联后数据
- out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object)));
+ out.collect(new Tuple4<>(sipInnerEmitKey, "sip-in", jsonMap, vsysid));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
@@ -184,7 +179,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
sipResponderPort);
//按照四元组的Key发送到下一个bolt
- out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object)));
+ out.collect(new Tuple4<>(sipIpPort4Key, "sip-double", jsonMap, vsysid));
}
}
@@ -199,7 +194,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
*/
private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) {
String ipPort4Key = "";
- int comparePortResult = compareNum(commonClientPort, commonServerPort);
+ int comparePortResult = RelationUtils.compareNum(commonClientPort, commonServerPort);
/*
* 按端口比较
@@ -247,7 +242,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
private static String compareQuadruple(String clientIp, String serverIp, int clientPort, int serverPort) {
long clientIpNum = IPUtil.getIpHostDesimal(clientIp);
long serverIpNum = IPUtil.getIpHostDesimal(serverIp);
- int compareIpResult = compareNum(clientIpNum, serverIpNum);
+ int compareIpResult = RelationUtils.compareNum(clientIpNum, serverIpNum);
switch (compareIpResult) {
//clientIpNum > serverIpNum
case 1:
@@ -284,57 +279,27 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
*/
private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) {
//common_sessions
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
- }
-
- /**
- * int类型
- * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
- *
- * @param numOne 数值1
- * @param numTwo 数值2
- */
- private static int compareNum(int numOne, int numTwo) {
- if (numOne > 0 && numTwo > 0) {
- return Integer.compare(numOne, numTwo);
- } else {
- return -2;
- }
- }
-
- /**
- * long类型
- * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
- *
- * @param numOne 数值1
- * @param numTwo 数值2
- */
- private static int compareNum(long numOne, long numTwo) {
- if (numOne > 0 && numTwo > 0) {
- return Long.compare(numOne, numTwo);
- } else {
- return -2;
- }
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
index 7180419..9929212 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
@@ -1,14 +1,16 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -16,11 +18,11 @@ import java.util.*;
/**
* @author qidaijie
- * @Package com.zdjizhi.utils.functions
+ * @Package com.zdjizhi.operator.window
* @Description:
- * @date 2021/7/2113:55
+ * @date 2022/9/617:46
*/
-public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> {
+public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, Map<String, Object>, Integer>, String, Tuple2<String, Integer>, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -30,7 +32,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
- private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16);
+ private static HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList = new HashMap<>(32);
/**
* 二次关联用HashMap
@@ -39,31 +41,33 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
- private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16);
+ private static HashMap<String, LinkedList<Map<String, Object>>> secCombineSRHmList = new HashMap<>(32);
@Override
- public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception {
- for (Tuple3<String, String, String> tuple : input) {
+ public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, Map<String, Object>, Integer>> input, Collector<String> out) throws Exception {
+ for (Tuple4<String, String, Map<String, Object>, Integer> tuple : input) {
//拼接的四元组
String fourKey = tuple.f0;
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = tuple.f1;
- String msg = tuple.f2;
+ Map<String, Object> jsonMap = tuple.f2;
+ logger.error("" + fourKey + "----" + type);
switch (type) {
//单向流对准后的SIP
- case "sip-two":
+ case "sip-double":
//单向流对准后的RTP
- case "rtp-two":
- //对不上的RTP
- case "rtp-single":
- putKeyAndMsg(msg, fourKey, combineSRHmList);
+ case "rtp-double":
+ putKeyAndMsg(jsonMap, fourKey, combineSRHmList);
break;
//单向流的SIP
case "sip-single":
+ //单向流的RTP
+ case "rtp-single":
//内网的SIP
case "sip-in":
+ //违规的日志
case "violation":
- output.collect(msg);
+ out.collect(JsonMapper.toJsonString(jsonMap));
break;
default:
logger.error("type is beyond expectation:" + type);
@@ -72,82 +76,91 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
}
//初次关联
- tickCombineHmList(combineSRHmList, output);
+ tickCombineHmList(combineSRHmList, out);
//和缓存中的数据二次关联
- tickCombineHmList(secCombineSRHmList, output);
+ tickCombineHmList(secCombineSRHmList, out);
+
}
/**
+ * 存放key并添加对应List
+ */
+ private static void putKeyAndMsg(Map<String, Object> message, String fourStrKey, HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList) {
+ if (combineSRHmList.containsKey(fourStrKey)) {
+ LinkedList<Map<String, Object>> tmpList = combineSRHmList.get(fourStrKey);
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ } else {
+ LinkedList<Map<String, Object>> tmpList = new LinkedList<>();
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ }
+ }
+
+ /**
* 定时关联,包括初次关联以及后续二次关联
*
* @param combineHmList
*/
@SuppressWarnings("unchecked")
- private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) {
+ private void tickCombineHmList(HashMap<String, LinkedList<Map<String, Object>>> combineHmList, Collector<String> output) {
if (combineHmList.size() > 0) {
long nowTime = System.currentTimeMillis() / 1000;
- HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList);
+ HashMap<String, LinkedList<Map<String, Object>>> tempCombineSRhmList = new HashMap<>(combineHmList);
combineHmList.clear();
for (String fourStrKey : tempCombineSRhmList.keySet()) {
- LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey);
- //包含SIP和RTP
- int listSize = tempList.size();
+ LinkedList<Map<String, Object>> tempCombineSRList = tempCombineSRhmList.get(fourStrKey);
+ //包含SIP和RTP,集合大于1则可能是sip和rtp,多条sip或多条rtp
+ int listSize = tempCombineSRList.size();
if (listSize > 1) {
- List<String> sipBeanArr = new ArrayList<>();
- List<String> rtpBeanArr = new ArrayList<>();
+ List<Map<String, Object>> sipBeanArr = new ArrayList<>();
+ List<Map<String, Object>> rtpBeanArr = new ArrayList<>();
- for (String message : tempList) {
- Map<String, Object> tempSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ for (Map<String, Object> tempSipOrRtpLog : tempCombineSRList) {
String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(schemaType)) {
- sipBeanArr.add(message);
+ sipBeanArr.add(tempSipOrRtpLog);
} else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
- rtpBeanArr.add(message);
+ rtpBeanArr.add(tempSipOrRtpLog);
}
}
int rtpSize = rtpBeanArr.size();
int sipSize = sipBeanArr.size();
-
+ //只允许一对多的情况,其余视为异常数据
if (rtpSize == 1 && sipSize >= 1) {
- for (String sipMessage : sipBeanArr) {
- Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class);
- Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipMessage, Map.class);
+ for (Map<String, Object> voIpLog : sipBeanArr) {
+ Map<String, Object> rtpLog = rtpBeanArr.get(0);
accumulateVoipMsg(voIpLog, rtpLog);
- JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voIpLog, rtpLog));
}
} else if (sipSize == 1 && rtpSize >= 1) {
- for (String rtpMessage : rtpBeanArr) {
- Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpMessage, Map.class);
- Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class);
+ for (Map<String, Object> rtpLog : rtpBeanArr) {
+ HashMap<String, Object> voIpLog = new HashMap<>(128);
+ voIpLog.putAll(sipBeanArr.get(0));
accumulateVoipMsg(voIpLog, rtpLog);
- JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voIpLog, rtpLog));
}
} else {
- logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
- sendErrorLogToKafka(sipBeanArr, output);
- sendErrorLogToKafka(rtpBeanArr, output);
+ logger.error("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
+ sendViolationLogs(sipBeanArr, output);
+ sendViolationLogs(rtpBeanArr, output);
}
} else {
- String msg = tempList.get(0);
- Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(msg, Map.class);
+ Map<String, Object> voIpLog = tempCombineSRList.get(0);
long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
- logger.error("VoIP日志时间差值记录:" + intervalTime);
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
- putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
+ logger.warn("当前日志时间未超过二次对准时间,进入队列等待再次对准,日志时间差:" + intervalTime);
+ putKeyAndMsg(voIpLog, fourStrKey, secCombineSRHmList);
} else {
- sendDirectlyOneElement(msg, voIpLog, output);
+ output.collect(JsonMapper.toJsonString(voIpLog));
}
}
}
@@ -162,27 +175,27 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
*/
private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
//common_sessions
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
- relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
+ RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
@@ -203,51 +216,13 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
}
-
- /**
- * 存放key并添加对应List
- */
- private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) {
- if (combineSRHmList.containsKey(fourStrKey)) {
- LinkedList<String> tmpList = combineSRHmList.get(fourStrKey);
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- } else {
- LinkedList<String> tmpList = new LinkedList<>();
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- }
- }
-
- /**
- * 判断RTP主叫方向-测试
- *
- * @param rtpLog RTP原始日志
- * @param voIpLog 融合后VOIP日志
- * @return 方向 0:未知 1:c2s 2:s2c
- */
- private static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
-
- String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
- String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
-
- if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
- return 1;
- } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
- return 2;
- }
-
- return 0;
- }
-
/**
* 发送不符合逻辑的日志到kafka
*/
- private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) {
- if (logList.size() > 0) {
- for (String log : logList) {
- output.collect(log);
+ private static void sendViolationLogs(List<Map<String, Object>> violationLogs, Collector<String> output) {
+ if (violationLogs.size() > 0) {
+ for (Map<String, Object> log : violationLogs) {
+ output.collect(JsonMapper.toJsonString(log));
}
}
}
@@ -261,6 +236,8 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog));
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
diff --git a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java b/src/main/java/com/zdjizhi/tools/exception/VoipRelationException.java
index b2ef9e9..7af60ab 100644
--- a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java
+++ b/src/main/java/com/zdjizhi/tools/exception/VoipRelationException.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.exception;
+package com.zdjizhi.tools.exception;
/**
* @author qidaijie
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
index 2bfd1f2..4413754 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
@@ -160,29 +160,6 @@ public class JsonParseUtil {
return tmpMap;
}
-
- /**
- * 获取属性值的方法
- *
- * @param obj 对象
- * @param property key
- * @return 属性的值
- */
- public static Object getValue(Object obj, String property) {
-
- try {
- BeanMap beanMap = BeanMap.create(obj);
- if (beanMap.containsKey(property)) {
- return beanMap.get(property);
- } else {
- return null;
- }
- } catch (RuntimeException e) {
- logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
- return null;
- }
- }
-
/**
* 获取属性值的方法
*
@@ -216,22 +193,6 @@ public class JsonParseUtil {
}
/**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @return Long value
- */
- public static Long getLong(Object jsonMap, String property) {
- Object value = getValue(jsonMap, property);
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- /**
* int 类型检验转换方法,若为空返回基础值
*
* @return int value
@@ -243,21 +204,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
- return intVal;
- }
-
- /**
- * int 类型检验转换方法,若为空返回基础值
- *
- * @return int value
- */
- public static int getInteger(Object jsonMap, String property) {
- Object value = getValue(jsonMap, property);
- Integer intVal = TypeUtils.castToInt(value);
- if (intVal == null) {
- return 0;
- }
return intVal;
}
@@ -279,33 +226,9 @@ public class JsonParseUtil {
}
/**
- * Object 方式获取getString类型字段
- *
- * @param jsonObject
- * @param property
- * @return
- */
- public static String getString(Object jsonObject, String property) {
- Object value = getValue(jsonObject, property);
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
* 更新属性值的方法
*
- * @param jsonMap 原始日志json map
+ * @param jsonMap 原始日志json parse
* @param property 更新的key
* @param value 更新的值
*/
@@ -318,38 +241,6 @@ public class JsonParseUtil {
}
/**
- * 更新属性值的方法
- *
- * @param obj 对象
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Object obj, String property, Object value) {
- try {
- BeanMap beanMap = BeanMap.create(obj);
- beanMap.put(property, value);
- } catch (ClassCastException e) {
- logger.error("赋予实体类错误类型数据", e);
- }
- }
-
- /**
- * 根据反射生成对象的方法
- *
- * @param properties 反射类用的map
- * @return 生成的Object类型的对象
- */
- public static Object generateObject(Map properties) {
- BeanGenerator generator = new BeanGenerator();
- Set keySet = properties.keySet();
- for (Object aKeySet : keySet) {
- String key = (String) aKeySet;
- generator.addProperty(key, (Class) properties.get(key));
- }
- return generator.create();
- }
-
- /**
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
* <p>
* // * @param http 网关schema地址
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
index 48e4620..3e56b84 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
@@ -1,9 +1,9 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.exception.VoipRelationException;
+import com.zdjizhi.tools.exception.VoipRelationException;
import java.util.List;
import java.util.Map;
@@ -53,7 +53,7 @@ class JsonTypeUtil {
return (Map) value;
}
- throw new VoipRelationException("can not cast to map, value : " + value);
+ throw new VoipRelationException("can not cast to parse, value : " + value);
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
index 1e0f156..c2fb497 100644
--- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
+++ b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
@@ -1,10 +1,10 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.VoipRelationException;
+import com.zdjizhi.tools.exception.VoipRelationException;
/**
@@ -39,14 +39,6 @@ public class TypeUtils {
return ((Number) value).longValue();
}
-// if (value instanceof Map) {
-// return (Map) value;
-// }
-//
-// if (value instanceof List) {
-// return Collections.singletonList(value.toString());
-// }
-
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
@@ -70,11 +62,6 @@ public class TypeUtils {
return (Integer) value;
}
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).intValue();
-// }
-
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
@@ -112,11 +99,6 @@ public class TypeUtils {
return (Double) value;
}
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).doubleValue();
-// }
-
if (value instanceof String) {
String strVal = (String) value;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
index 808723a..9e701a7 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.kafka.common.config.SslConfigs;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index ada1477..20f75b6 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -1,9 +1,8 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
index 2660c12..6939025 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -42,9 +42,6 @@ public class KafkaProducer {
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
kafkaProducer.setLogFailuresOnly(true);
- //写入kafka的消息携带时间戳
-// kafkaProducer.setWriteTimestampToKafka(true);
-
return kafkaProducer;
}
}
diff --git a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java b/src/main/java/com/zdjizhi/tools/system/VoipRelationConfigurations.java
index 95e322b..a24292c 100644
--- a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java
+++ b/src/main/java/com/zdjizhi/tools/system/VoipRelationConfigurations.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.utils.system;
+package com.zdjizhi.tools.system;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.VoipRelationException;
+import com.zdjizhi.tools.exception.VoipRelationException;
import java.io.IOException;
import java.util.Locale;
diff --git a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java
new file mode 100644
index 0000000..1498fd9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java
@@ -0,0 +1,134 @@
+package com.zdjizhi.tools.utils;
+
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.IPUtil;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.tools.json.JsonParseUtil;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator
+ * @Description:
+ * @date 2022/4/1911:30
+ */
+public class RelationUtils {
+
+
+ /**
+ * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
+ *
+ * @param firstLog A日志
+ * @param secondLog B日志
+ * @param key 指标 json key
+ */
+ public static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
+ Long firstMetric = JsonParseUtil.getLong(firstLog, key);
+ Long secondMetric = JsonParseUtil.getLong(secondLog, key);
+
+ Long sum = firstMetric + secondMetric;
+
+ JsonParseUtil.setValue(firstLog, key, sum);
+ }
+
+
+ /**
+ * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
+ *
+ * @param firstLog A日志
+ * @param secondLog B日志
+ * @param otherLog C日志
+ * @param key 指标 json key
+ */
+ public static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
+ Long firstMetric = JsonParseUtil.getLong(firstLog, key);
+ Long secondMetric = JsonParseUtil.getLong(secondLog, key);
+
+ Long sum = firstMetric + secondMetric;
+
+ JsonParseUtil.setValue(otherLog, key, sum);
+ }
+
+
+ /**
+ * 校验sip日志,必须包含协商四元组,否则将原样输出不处理
+ *
+ * @param sipLog SIP日志
+ * @return true or false
+ */
+ public static boolean checkSipCompleteness(Map<String, Object> sipLog) {
+ return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
+ sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
+ sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
+ sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
+ }
+
+ /**
+ * 是否为内网IP
+ *
+ * @param ip ip Address
+ * @return true or false
+ */
+ public static boolean isInnerIp(String ip) {
+ if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
+ //为空或者为特定IP时也算作内网IP
+ return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
+ }
+ return false;
+ }
+
+ /**
+ * int类型
+ * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
+ *
+ * @param numOne 数值1
+ * @param numTwo 数值2
+ */
+ public static int compareNum(int numOne, int numTwo) {
+ if (numOne > 0 && numTwo > 0) {
+ return Integer.compare(numOne, numTwo);
+ } else {
+ return -2;
+ }
+ }
+
+ /**
+ * long类型
+ * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
+ *
+ * @param numOne 数值1
+ * @param numTwo 数值2
+ */
+ public static int compareNum(long numOne, long numTwo) {
+ if (numOne > 0 && numTwo > 0) {
+ return Long.compare(numOne, numTwo);
+ } else {
+ return -2;
+ }
+ }
+
+
+ /**
+ * 判断RTP主叫方向
+ *
+ * @param rtpLog RTP原始日志
+ * @param voIpLog 融合后VOIP日志
+ * @return 方向 0:未知 1:c2s 2:s2c
+ */
+ public static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
+
+ String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
+ String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
+
+ if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
+ return 1;
+ } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
+ return 2;
+ }
+
+ return 0;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
index 80a80ad..95fbfc1 100644
--- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
+++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
@@ -3,16 +3,23 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.functions.*;
-import com.zdjizhi.utils.kafka.KafkaConsumer;
-import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.api.java.tuple.Tuple3;
+import com.zdjizhi.operator.filter.NullFilterFunction;
+import com.zdjizhi.operator.group.MergeUniFlowKeyByFunction;
+import com.zdjizhi.operator.group.VoipCalibrationKeyByFunction;
+import com.zdjizhi.operator.parse.ParseMapFunction;
+import com.zdjizhi.operator.window.MergeUniFlowWindowFunction;
+import com.zdjizhi.operator.window.VoipCalibrationWindowFunction;
+import com.zdjizhi.tools.kafka.KafkaConsumer;
+import com.zdjizhi.tools.kafka.KafkaProducer;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import java.util.Map;
+
/**
* @author qidaijie
@@ -28,20 +35,28 @@ public class VoIpRelationTopology {
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
- SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation =
- streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
- .process(new OneSidedWindowFunction()).name("OneSidedWindow");
- SingleOutputStreamOperator<String> window = sipCorrelation.windowAll(
- TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
- .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction");
+ SingleOutputStreamOperator<Tuple4<String, String, Map<String, Object>, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction())
+ .map(new ParseMapFunction())
+ .keyBy(new MergeUniFlowKeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
+ .process(new MergeUniFlowWindowFunction())
+ .name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.MERGE_UNIFLOW_WINDOW_PARALLELISM);
+
+
+ SingleOutputStreamOperator<String> mergeUnidirectionalFlowWindow = mergeUniFlowWindow.keyBy(new VoipCalibrationKeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
+ .process(new VoipCalibrationWindowFunction())
+ .name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.CALIBRATION_WINDOW_PARALLELISM);
+
- window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
+ mergeUnidirectionalFlowWindow.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
try {
environment.execute(args[0]);
} catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
+ logger.error("This Flink task start ERROR! Exception information is :" + e.getMessage());
+ e.printStackTrace();
}
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java
deleted file mode 100644
index c74170d..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.common.JsonProConfig;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.IPUtil;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2022/4/1911:30
- */
-class relationUtils {
-
-
- /**
- * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
- *
- * @param firstLog A日志
- * @param secondLog B日志
- * @param key 指标 json key
- */
- static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
- Long firstMetric = JsonParseUtil.getLong(firstLog, key);
- Long secondMetric = JsonParseUtil.getLong(secondLog, key);
-
- Long sum = firstMetric + secondMetric;
-
- JsonParseUtil.setValue(firstLog, key, sum);
- }
-
-
- /**
- * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
- *
- * @param firstLog A日志
- * @param secondLog B日志
- * @param otherLog C日志
- * @param key 指标 json key
- */
- static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
- Long firstMetric = JsonParseUtil.getLong(firstLog, key);
- Long secondMetric = JsonParseUtil.getLong(secondLog, key);
-
- Long sum = firstMetric + secondMetric;
-
- JsonParseUtil.setValue(otherLog, key, sum);
- }
-
-
- /**
- * 校验sip日志,必须包含协商四元组,否则将原样输出不处理
- *
- * @param sipLog SIP日志
- * @return true or false
- */
- static boolean checkSipCompleteness(Map<String, Object> sipLog) {
- return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
- sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
- sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
- sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
- }
-
- /**
- * 是否为内网IP
- *
- * @param ip ip Address
- * @return true or false
- */
- static boolean isInnerIp(String ip) {
- if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
- //为空或者为特定IP时也算作内网IP
- return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
- }
- return false;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
deleted file mode 100644
index 1adb1d1..0000000
--- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.zdjizhi.utils.http;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * 获取网关schema的工具类
- *
- * @author qidaijie
- */
-public class HttpClientUtil {
- private static final Log logger = LogFactory.get();
-
- /**
- * 请求网关获取schema
- *
- * @param http 网关url
- * @return schema
- */
- public static String requestByGetMethod(String http) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder;
-
- HttpGet get = new HttpGet(http);
- BufferedReader bufferedReader = null;
- CloseableHttpResponse httpResponse = null;
- try {
- httpResponse = httpClient.execute(get);
- HttpEntity entity = httpResponse.getEntity();
- entityStringBuilder = new StringBuilder();
- if (null != entity) {
- bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- int intC;
- while ((intC = bufferedReader.read()) != -1) {
- char c = (char) intC;
- if (c == '\n') {
- break;
- }
- entityStringBuilder.append(c);
- }
-
- return entityStringBuilder.toString();
- }
- } catch (IOException e) {
- logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
- } finally {
- if (httpClient != null) {
- try {
- httpClient.close();
- } catch (IOException e) {
- logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
- }
- }
- if (httpResponse != null) {
- try {
- httpResponse.close();
- } catch (IOException e) {
- logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
- }
- }
- if (bufferedReader != null) {
- IOUtils.closeQuietly(bufferedReader);
- }
- }
- return "";
- }
-}
diff --git a/src/test/testdata/differentVSYS b/src/test/testdata/differentVSYS
new file mode 100644
index 0000000..75ff819
--- /dev/null
+++ b/src/test/testdata/differentVSYS
@@ -0,0 +1,3 @@
+{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
+{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":3}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2} \ No newline at end of file
diff --git a/src/test/testdata/differentVSYSUniFlowRTP b/src/test/testdata/differentVSYSUniFlowRTP
new file mode 100644
index 0000000..8d3f06f
--- /dev/null
+++ b/src/test/testdata/differentVSYSUniFlowRTP
@@ -0,0 +1,3 @@
+{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
+{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_456.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2} \ No newline at end of file
diff --git a/src/test/testdata/oneRTPUniFlowSIP b/src/test/testdata/oneRTPUniFlowSIP
new file mode 100644
index 0000000..8aa4f24
--- /dev/null
+++ b/src/test/testdata/oneRTPUniFlowSIP
@@ -0,0 +1,3 @@
+{"common_action":0,"common_address_list":"50732-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50732,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50732_123.pcap","raw_log_status":"CLOSE"}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":1,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":2,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} \ No newline at end of file
diff --git a/src/test/testdata/oneRTPtwoSIP b/src/test/testdata/oneRTPtwoSIP
new file mode 100644
index 0000000..994a4bd
--- /dev/null
+++ b/src/test/testdata/oneRTPtwoSIP
@@ -0,0 +1,3 @@
+{"common_action":0,"common_address_list":"50731-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50731,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50731_123.pcap","raw_log_status":"CLOSE"}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237333333","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} \ No newline at end of file
diff --git a/src/test/testdata/oneSIPUniFlowRTP b/src/test/testdata/oneSIPUniFlowRTP
new file mode 100644
index 0000000..c2422a7
--- /dev/null
+++ b/src/test/testdata/oneSIPUniFlowRTP
@@ -0,0 +1,3 @@
+{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_123.pcap","raw_log_status":"CLOSE"}
+{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_456.pcap","raw_log_status":"CLOSE"}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50734RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650734\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50734,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} \ No newline at end of file
diff --git a/src/test/testdata/oneSIPtwoRTP b/src/test/testdata/oneSIPtwoRTP
new file mode 100644
index 0000000..fea59fd
--- /dev/null
+++ b/src/test/testdata/oneSIPtwoRTP
@@ -0,0 +1,3 @@
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50733RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650733\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50733,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
+{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_123.pcap","raw_log_status":"CLOSE"}
+{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_456.pcap","raw_log_status":"CLOSE"} \ No newline at end of file
diff --git a/src/test/testdata/twoRTPandSIP b/src/test/testdata/twoRTPandSIP
new file mode 100644
index 0000000..7d05757
--- /dev/null
+++ b/src/test/testdata/twoRTPandSIP
@@ -0,0 +1,4 @@
+{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"}
+{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
+{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} \ No newline at end of file