diff options
| author | qidaijie <[email protected]> | 2022-09-14 16:58:06 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-09-14 16:58:06 +0800 |
| commit | 9d2f2cfd8965441277f9255bc26f6a8ad424ed99 (patch) | |
| tree | 2ee50652a7fef59b9190a38cb37fb0ddeb44f072 /src | |
| parent | 3df5d8c51ed11919d37c4e71c48b344344104dc4 (diff) | |
1:重构VOIP任务结构,使用多window的方式进行融合。
2:增加VSYS融合维度(TSG-11721)。
Diffstat (limited to 'src')
27 files changed, 571 insertions, 624 deletions
diff --git a/src/main/java/com/zdjizhi/common/JsonProConfig.java b/src/main/java/com/zdjizhi/common/JsonProConfig.java index 6e9d437..11de17c 100644 --- a/src/main/java/com/zdjizhi/common/JsonProConfig.java +++ b/src/main/java/com/zdjizhi/common/JsonProConfig.java @@ -1,8 +1,6 @@ package com.zdjizhi.common; -import com.zdjizhi.utils.system.VoipRelationConfigurations; - /** * @author Administrator */ @@ -13,123 +11,103 @@ public class JsonProConfig { public static final int DOUBLE = 3; /** - * + * SIP日志标识 */ public static final String SIP_MARK = "SIP"; /** - * + * RTP日志标识 */ public static final String RTP_MARK = "RTP"; + /** - * + * 所属vsys,缺省为1 + */ + public static final String VSYS_ID = "common_vsys_id"; + + /** + * 日志类型 */ public static final String SCHEMA_TYPE = "common_schema_type"; /** - * + * 会话结束时间 */ public static final String END_TIME = "common_end_time"; /** - * + * 流类型 + * 1:c2s,2:s2c;3;double */ public static final String STREAM_DIR = "common_stream_dir"; + /** - * - */ - public static final String SESSIONS = "common_sessions"; - /** - * - */ - public static final String C2S_PKT_NUM = "common_c2s_pkt_num"; - /** - * - */ - public static final String S2C_PKT_NUM = "common_s2c_pkt_num"; - /** - * - */ - public static final String C2S_BYTE_NUM = "common_c2s_byte_num"; - /** - * - */ - public static final String S2C_BYTE_NUM = "common_s2c_byte_num"; - /** - * - */ - public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num"; - /** - * - */ - public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num"; - /** - * - */ - public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen"; - /** - * - */ - public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen"; - /** - * - */ - public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num"; - /** - * - */ - public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num"; - /** - * + * 客户端ip地址 */ public static final String CLIENT_IP = "common_client_ip"; /** - * + * 客户端端口 */ public static final String CLIENT_PORT = "common_client_port"; /** - * + * 服务端ip地址 */ public static final String SERVER_IP = "common_server_ip"; /** - * + * 服务端端口 */ public static final String SERVER_PORT = "common_server_port"; /** - * + * 会话ID */ public static final String SIP_CALL_ID = "sip_call_id"; /** - * + * 协商的主叫语音传输IP */ public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip"; /** - * + * 协商的主叫语音传输端口 */ public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port"; /** - * + * 协商的被叫语音传输IP */ public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip"; /** - * + * 协商的被叫语音传输端口 */ public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port"; /** - * + * RTP原始包文件地址 */ public static final String RTP_PCAP_PATH = "rtp_pcap_path"; /** - * + * 主叫方向 */ public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir"; /** - * + * c2s编码方式序号(PT) */ public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s"; /** - * + * s2c编码方式序号(PT) */ public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c"; + /** + * 各类Transmission指标key + */ + public static final String SESSIONS = "common_sessions"; + public static final String C2S_PKT_NUM = "common_c2s_pkt_num"; + public static final String S2C_PKT_NUM = "common_s2c_pkt_num"; + public static final String C2S_BYTE_NUM = "common_c2s_byte_num"; + public static final String S2C_BYTE_NUM = "common_s2c_byte_num"; + public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num"; + public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num"; + public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen"; + public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen"; + public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num"; + public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num"; + + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java index 61c50a1..758af0f 100644 --- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -1,7 +1,7 @@ package com.zdjizhi.common; -import com.zdjizhi.utils.system.VoipRelationConfigurations; +import com.zdjizhi.tools.system.VoipRelationConfigurations; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** @@ -39,7 +39,8 @@ public class VoipRelationConfig { */ public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time"); public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time"); - public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism"); + public static final Integer MERGE_UNIFLOW_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "merge.uniflow.window.parallelism"); + public static final Integer CALIBRATION_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "calibration.window.parallelism"); /** * connection kafka @@ -72,15 +73,11 @@ public class VoipRelationConfig { public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size"); public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library"); - /** - * http - */ - public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http"); /** * voip */ public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs"); - public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network"); + public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(0, "check.inner.network"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java new file mode 100644 index 0000000..8be77f8 --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.operator.filter; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.operator.filter + * @Description: + * @date 2022/9/214:59 + */ +public class NullFilterFunction implements FilterFunction<String> { + @Override + public boolean filter(String message) throws Exception { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java new file mode 100644 index 0000000..351bee7 --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java @@ -0,0 +1,22 @@ +package com.zdjizhi.operator.group; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.operator.group + * @Description: + * @date 2022/8/2911:08 + */ +public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, Map<String, Object>, Integer>, Integer> { + @Override + public Integer getKey(Tuple3<String, Map<String, Object>, Integer> value) throws Exception { + //vsys id + return value.f2; + } +} diff --git a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java new file mode 100644 index 0000000..a184566 --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java @@ -0,0 +1,27 @@ +package com.zdjizhi.operator.group; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.operator.group + * @Description: + * @date 2022/8/2911:08 + */ +public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, Map<String, Object>, Integer>, Tuple2<String, Integer>> { + @Override + public Tuple2<String, Integer> getKey(Tuple4<String, String, Map<String, Object>, Integer> value) throws Exception { + String fourKey = value.f0; + Integer vsysId = value.f3; + + return new Tuple2<>(fourKey, vsysId); + } +} diff --git a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java new file mode 100644 index 0000000..3d7ca4b --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java @@ -0,0 +1,82 @@ +package com.zdjizhi.operator.parse; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.tools.utils.RelationUtils; +import com.zdjizhi.tools.json.JsonParseUtil; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.operator.parse + * @Description: + * @date 2022/9/916:21 + */ +public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<String, Object>, Integer>> { + private static final Log logger = LogFactory.get(); + + @Override + public Tuple3<String, Map<String, Object>, Integer> map(String input) throws Exception { + try { + if (StringUtil.isNotBlank(input)) { + Map<String, Object> jsonMap = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class)); + + String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE); + Integer vsysId = getVsysId(jsonMap); + String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID); + + //1:c2s,2:s2c;3;double + int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR); + + if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { + if (StringUtil.isNotBlank(sipCallId)) { + if (RelationUtils.checkSipCompleteness(jsonMap)) { + if (commonStreamDir != JsonProConfig.DOUBLE) { + return new Tuple3<>("sip-single", jsonMap, vsysId); + } else { + return new Tuple3<>("sip-double", jsonMap, vsysId); + } + } else { + return new Tuple3<>("violation", jsonMap, vsysId); + } + } else { + return new Tuple3<>("violation", jsonMap, vsysId); + } + } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + if (commonStreamDir == JsonProConfig.DOUBLE) { + return new Tuple3<>("rtp-double", jsonMap, vsysId); + } else { + return new Tuple3<>("rtp-single", jsonMap, vsysId); + } + } + } + } catch (RuntimeException e) { + logger.error("TransForm logs failed,The exception is :" + e.getMessage()); + } + return null; + } + + /** + * 获取VSYS ID若无该字段,则默认值为1 + * + * @param jsonMap 原始日志 + * @return vsysid + */ + private static int getVsysId(Map<String, Object> jsonMap) { + Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null); + if (value != null) { + return Integer.parseInt(value.toString()); + } else { + return 1; + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java index 0e244c3..cf25763 100644 --- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java +++ b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.operator.window; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -7,167 +7,162 @@ import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.IPUtil; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.tools.utils.RelationUtils; +import com.zdjizhi.tools.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; +import scala.Int; import java.util.HashMap; import java.util.Map; /** * @author qidaijie - * @Package com.zdjizhi.utils.functions + * @Package com.zdjizhi.operator * @Description: - * @date 2021/8/1818:04 + * @date 2022/8/2911:44 */ -public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tuple3<String, String, String>, TimeWindow> { +public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, Map<String, Object>, Integer>, Tuple4<String, String, Map<String, Object>, Integer>, Integer, TimeWindow> { private static final Log logger = LogFactory.get(); /** * key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流) */ - private static HashMap<String, String> sipOriHmList = new HashMap<>(32); + private static HashMap<String, Map<String, Object>> sipOriHmList = new HashMap<>(32); /** * key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流) */ - private static HashMap<String, String> rtpOriHmList = new HashMap<>(32); + private static HashMap<String, Map<String, Object>> rtpOriHmList = new HashMap<>(32); - @Override - @SuppressWarnings("unchecked") - public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) { - for (String input : inputs) { - if (StringUtil.isNotBlank(input)) { - try { - Map<String, Object> object = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class)); - String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE); - String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID); + @Override + public void process(Integer key, Context context, Iterable<Tuple3<String, Map<String, Object>, Integer>> inputs, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) throws Exception { + for (Tuple3<String, Map<String, Object>, Integer> input : inputs) { + if (input != null) { + //已关联的sip,rtp;未关联的sip,rtp;内网的sip + String type = input.f0; + Map<String, Object> jsonMap = input.f1; + Integer vsysId = input.f2; - //1:c2s,2:s2c;3;double - int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR); + logger.error("" + type + "----" + jsonMap.toString()); + switch (type) { + case "sip-double": + separateInnerIp(jsonMap, out, vsysId); + break; + case "rtp-double": + String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), + JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); - /* - * 针对SIP日志进行处理 - */ - if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { - if (relationUtils.checkSipCompleteness(object)) { - if (commonStreamDir != JsonProConfig.DOUBLE) { - putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); - } else { - separateInnerIp(object, out); - } + out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId)); + break; + case "violation": + out.collect(new Tuple4<>("", type, jsonMap, vsysId)); + break; + //单向流的SIP + case "sip-single": + String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID); + String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), + JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); + if (StringUtil.isNotBlank(sipCallId)) { + putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out); } else { - out.collect(new Tuple3<>("", "violation", input)); + out.collect(new Tuple4<>("", type, jsonMap, vsysId)); } - } - - /* - * 针对RTP日志进行处理 - */ - if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP); - int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT); - String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP); - int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT); + break; + //单向流的RTP + case "rtp-single": + String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), + JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), + JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); + //对rtp单向流进行关联 + putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out); + break; + //内网的SIP + default: + logger.error("type is beyond expectation:" + type); + break; - String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort); - - if (commonStreamDir != JsonProConfig.DOUBLE) { - //对rtp单向流进行关联 - putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out); - - } else { - //RTP双向流,按四元组下发 - out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input)); - } - } - } catch (RuntimeException e) { - logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e); } } } - /* - * 定时发送SIP未关联上数据 - */ + if (sipOriHmList.size() > 0) { - HashMap<String, String> tmpSipOriHmList = new HashMap<>(sipOriHmList); + HashMap<String, Map<String, Object>> tmpSipOriHmList = new HashMap<>(sipOriHmList); sipOriHmList.clear(); for (String sipKey : tmpSipOriHmList.keySet()) { - String sipSingleMsg = tmpSipOriHmList.get(sipKey); + Map<String, Object> sipSingleMsg = tmpSipOriHmList.get(sipKey); //sipKey为sip_call_id,未关联成功的sip是不能使用的 - out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg)); + out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0)); } } - /* - * 定时发送RTP未关联上数据 - */ + if (rtpOriHmList.size() > 0) { - HashMap<String, String> tmpRtpOriHmList = new HashMap<>(rtpOriHmList); + HashMap<String, Map<String, Object>> tmpRtpOriHmList = new HashMap<>(rtpOriHmList); rtpOriHmList.clear(); for (String rtpKey : tmpRtpOriHmList.keySet()) { - String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); + Map<String, Object> rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); //未关联成功的rtp还可以继续关联,因为有四元组 - out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg)); + out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0)); } } + } /** * 存放key并关联拼接对应Key */ @SuppressWarnings("unchecked") - private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) { + private static void putKeyAndMsg(Map<String, Object> secondSipOrRtpLog, String hmStrKey, HashMap<String, Map<String, Object>> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) { //和上次存入的数据关联 if (hashMapStr.containsKey(hmStrKey)) { - HashMap<String, Object> jsonCommonMap = new HashMap<>(32); - String[] strArr = new String[2]; - String firstMsg = hashMapStr.remove(hmStrKey); - Map<String, Object> firstSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(firstMsg, Map.class); - Map<String, Object> secondSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + HashMap<String, Object> jsonCommonMap = new HashMap<>(128); + Map<String, Object> firstSipOrRtpLog = hashMapStr.remove(hmStrKey); //1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准 if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) { - strArr[0] = message; - strArr[1] = firstMsg; + jsonCommonMap.putAll(secondSipOrRtpLog); + jsonCommonMap.putAll(firstSipOrRtpLog); } else { - strArr[0] = firstMsg; - strArr[1] = message; + jsonCommonMap.putAll(firstSipOrRtpLog); + jsonCommonMap.putAll(secondSipOrRtpLog); } - jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[0], Map.class)); - jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[1], Map.class)); - String sipTwoMsg = jsonCommonMap.toString(); + accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap); + jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); - Map<String, Object> sipOrRtpCombin = (Map<String, Object>) JsonMapper.fromJsonString(sipTwoMsg, Map.class); - accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin); - sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); if (JsonProConfig.SIP_MARK.equals(protocolType)) { //手动关联SIP后区分内外网IP再下发 - separateInnerIp(sipOrRtpCombin, out); + separateInnerIp(jsonCommonMap, out, vsysId); } else if (JsonProConfig.RTP_MARK.equals(protocolType)) { //手动关联RTP后按四元组下发 - sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog)); - out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin))); + jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog)); + out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId)); } } else { - hashMapStr.put(hmStrKey, message); + hashMapStr.put(hmStrKey, secondSipOrRtpLog); } } /** * 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP */ - private static void separateInnerIp(Map<String, Object> object, Collector<Tuple3<String, String, String>> out) { + private static void separateInnerIp(Map<String, Object> jsonMap, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out, Integer vsysid) { - String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP); - int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT); - String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP); - int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT); + String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP); + int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT); + String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP); + int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT); - if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) { + if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) { /* * 按from-ip_from-port_to-ip_to-port */ @@ -176,7 +171,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup + sipResponderIp + VoipRelationConfig.CORRELATION_STR + sipResponderPort; //包含内网IP的SIP关联后数据 - out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object))); + out.collect(new Tuple4<>(sipInnerEmitKey, "sip-in", jsonMap, vsysid)); } else { String sipIpPort4Key = getFourKey(sipOriginatorIp, sipOriginatorPort, @@ -184,7 +179,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup sipResponderPort); //按照四元组的Key发送到下一个bolt - out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object))); + out.collect(new Tuple4<>(sipIpPort4Key, "sip-double", jsonMap, vsysid)); } } @@ -199,7 +194,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup */ private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) { String ipPort4Key = ""; - int comparePortResult = compareNum(commonClientPort, commonServerPort); + int comparePortResult = RelationUtils.compareNum(commonClientPort, commonServerPort); /* * 按端口比较 @@ -247,7 +242,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup private static String compareQuadruple(String clientIp, String serverIp, int clientPort, int serverPort) { long clientIpNum = IPUtil.getIpHostDesimal(clientIp); long serverIpNum = IPUtil.getIpHostDesimal(serverIp); - int compareIpResult = compareNum(clientIpNum, serverIpNum); + int compareIpResult = RelationUtils.compareNum(clientIpNum, serverIpNum); switch (compareIpResult) { //clientIpNum > serverIpNum case 1: @@ -284,57 +279,27 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup */ private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) { //common_sessions - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS); //common_c2s_pkt_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM); //common_s2c_pkt_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM); //common_c2s_byte_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM); //common_s2c_byte_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM); //common_c2s_ipfrag_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM); //common_s2c_ipfrag_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM); //common_c2s_tcp_lostlen - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN); //common_s2c_tcp_lostlen - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN); //common_c2s_tcp_unorder_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM); + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM); //common_s2c_tcp_unorder_num - relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM); - } - - /** - * int类型 - * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 - * - * @param numOne 数值1 - * @param numTwo 数值2 - */ - private static int compareNum(int numOne, int numTwo) { - if (numOne > 0 && numTwo > 0) { - return Integer.compare(numOne, numTwo); - } else { - return -2; - } - } - - /** - * long类型 - * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 - * - * @param numOne 数值1 - * @param numTwo 数值2 - */ - private static int compareNum(long numOne, long numTwo) { - if (numOne > 0 && numTwo > 0) { - return Long.compare(numOne, numTwo); - } else { - return -2; - } + RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM); } /** diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java index 7180419..9929212 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java +++ b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java @@ -1,14 +1,16 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.operator.window; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.tools.utils.RelationUtils; +import com.zdjizhi.tools.json.JsonParseUtil; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -16,11 +18,11 @@ import java.util.*; /** * @author qidaijie - * @Package com.zdjizhi.utils.functions + * @Package com.zdjizhi.operator.window * @Description: - * @date 2021/7/2113:55 + * @date 2022/9/617:46 */ -public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> { +public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, Map<String, Object>, Integer>, String, Tuple2<String, Integer>, TimeWindow> { private static final Log logger = LogFactory.get(); /** @@ -30,7 +32,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple * 存放数据:rtp-single,rtp-two,sip-two * 不存放的数据:sip-single与sip-in */ - private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16); + private static HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList = new HashMap<>(32); /** * 二次关联用HashMap @@ -39,31 +41,33 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple * 存放数据:rtp-single,rtp-two,sip-two * 不存放的数据:sip-single与sip-in */ - private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16); + private static HashMap<String, LinkedList<Map<String, Object>>> secCombineSRHmList = new HashMap<>(32); @Override - public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { - for (Tuple3<String, String, String> tuple : input) { + public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, Map<String, Object>, Integer>> input, Collector<String> out) throws Exception { + for (Tuple4<String, String, Map<String, Object>, Integer> tuple : input) { //拼接的四元组 String fourKey = tuple.f0; //已关联的sip,rtp;未关联的sip,rtp;内网的sip String type = tuple.f1; - String msg = tuple.f2; + Map<String, Object> jsonMap = tuple.f2; + logger.error("" + fourKey + "----" + type); switch (type) { //单向流对准后的SIP - case "sip-two": + case "sip-double": //单向流对准后的RTP - case "rtp-two": - //对不上的RTP - case "rtp-single": - putKeyAndMsg(msg, fourKey, combineSRHmList); + case "rtp-double": + putKeyAndMsg(jsonMap, fourKey, combineSRHmList); break; //单向流的SIP case "sip-single": + //单向流的RTP + case "rtp-single": //内网的SIP case "sip-in": + //违规的日志 case "violation": - output.collect(msg); + out.collect(JsonMapper.toJsonString(jsonMap)); break; default: logger.error("type is beyond expectation:" + type); @@ -72,82 +76,91 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple } } //初次关联 - tickCombineHmList(combineSRHmList, output); + tickCombineHmList(combineSRHmList, out); //和缓存中的数据二次关联 - tickCombineHmList(secCombineSRHmList, output); + tickCombineHmList(secCombineSRHmList, out); + } /** + * 存放key并添加对应List + */ + private static void putKeyAndMsg(Map<String, Object> message, String fourStrKey, HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList) { + if (combineSRHmList.containsKey(fourStrKey)) { + LinkedList<Map<String, Object>> tmpList = combineSRHmList.get(fourStrKey); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } else { + LinkedList<Map<String, Object>> tmpList = new LinkedList<>(); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } + } + + /** * 定时关联,包括初次关联以及后续二次关联 * * @param combineHmList */ @SuppressWarnings("unchecked") - private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) { + private void tickCombineHmList(HashMap<String, LinkedList<Map<String, Object>>> combineHmList, Collector<String> output) { if (combineHmList.size() > 0) { long nowTime = System.currentTimeMillis() / 1000; - HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList); + HashMap<String, LinkedList<Map<String, Object>>> tempCombineSRhmList = new HashMap<>(combineHmList); combineHmList.clear(); for (String fourStrKey : tempCombineSRhmList.keySet()) { - LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey); - //包含SIP和RTP - int listSize = tempList.size(); + LinkedList<Map<String, Object>> tempCombineSRList = tempCombineSRhmList.get(fourStrKey); + //包含SIP和RTP,集合大于1则可能是sip和rtp,多条sip或多条rtp + int listSize = tempCombineSRList.size(); if (listSize > 1) { - List<String> sipBeanArr = new ArrayList<>(); - List<String> rtpBeanArr = new ArrayList<>(); + List<Map<String, Object>> sipBeanArr = new ArrayList<>(); + List<Map<String, Object>> rtpBeanArr = new ArrayList<>(); - for (String message : tempList) { - Map<String, Object> tempSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + for (Map<String, Object> tempSipOrRtpLog : tempCombineSRList) { String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); if (JsonProConfig.SIP_MARK.equals(schemaType)) { - sipBeanArr.add(message); + sipBeanArr.add(tempSipOrRtpLog); } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { - rtpBeanArr.add(message); + rtpBeanArr.add(tempSipOrRtpLog); } } int rtpSize = rtpBeanArr.size(); int sipSize = sipBeanArr.size(); - + //只允许一对多的情况,其余视为异常数据 if (rtpSize == 1 && sipSize >= 1) { - for (String sipMessage : sipBeanArr) { - Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class); - Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipMessage, Map.class); + for (Map<String, Object> voIpLog : sipBeanArr) { + Map<String, Object> rtpLog = rtpBeanArr.get(0); accumulateVoipMsg(voIpLog, rtpLog); - JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog)); //四元组,voip,关联后的数据 output.collect(mergeJson(voIpLog, rtpLog)); } } else if (sipSize == 1 && rtpSize >= 1) { - for (String rtpMessage : rtpBeanArr) { - Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpMessage, Map.class); - Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class); + for (Map<String, Object> rtpLog : rtpBeanArr) { + HashMap<String, Object> voIpLog = new HashMap<>(128); + voIpLog.putAll(sipBeanArr.get(0)); accumulateVoipMsg(voIpLog, rtpLog); - JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog)); //四元组,voip,关联后的数据 output.collect(mergeJson(voIpLog, rtpLog)); } } else { - logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); - sendErrorLogToKafka(sipBeanArr, output); - sendErrorLogToKafka(rtpBeanArr, output); + logger.error("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); + sendViolationLogs(sipBeanArr, output); + sendViolationLogs(rtpBeanArr, output); } } else { - String msg = tempList.get(0); - Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(msg, Map.class); + Map<String, Object> voIpLog = tempCombineSRList.get(0); long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME); long intervalTime = nowTime - commonEndTime; - logger.error("VoIP日志时间差值记录:" + intervalTime); if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { - putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); + logger.warn("当前日志时间未超过二次对准时间,进入队列等待再次对准,日志时间差:" + intervalTime); + putKeyAndMsg(voIpLog, fourStrKey, secCombineSRHmList); } else { - sendDirectlyOneElement(msg, voIpLog, output); + output.collect(JsonMapper.toJsonString(voIpLog)); } } } @@ -162,27 +175,27 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple */ private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) { //common_sessions - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS); //common_c2s_pkt_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM); //common_s2c_pkt_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM); //common_c2s_byte_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM); //common_s2c_byte_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM); //common_c2s_ipfrag_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM); //common_s2c_ipfrag_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM); //common_c2s_tcp_lostlen - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); //common_s2c_tcp_lostlen - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); //common_c2s_tcp_unorder_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); //common_s2c_tcp_unorder_num - relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); + RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); } /** @@ -203,51 +216,13 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple } } - - /** - * 存放key并添加对应List - */ - private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) { - if (combineSRHmList.containsKey(fourStrKey)) { - LinkedList<String> tmpList = combineSRHmList.get(fourStrKey); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } else { - LinkedList<String> tmpList = new LinkedList<>(); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } - } - - /** - * 判断RTP主叫方向-测试 - * - * @param rtpLog RTP原始日志 - * @param voIpLog 融合后VOIP日志 - * @return 方向 0:未知 1:c2s 2:s2c - */ - private static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) { - - String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); - String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP); - - if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { - return 1; - } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { - return 2; - } - - return 0; - } - /** * 发送不符合逻辑的日志到kafka */ - private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) { - if (logList.size() > 0) { - for (String log : logList) { - output.collect(log); + private static void sendViolationLogs(List<Map<String, Object>> violationLogs, Collector<String> output) { + if (violationLogs.size() > 0) { + for (Map<String, Object> log : violationLogs) { + output.collect(JsonMapper.toJsonString(log)); } } } @@ -261,6 +236,8 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); + JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog)); JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); diff --git a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java b/src/main/java/com/zdjizhi/tools/exception/VoipRelationException.java index b2ef9e9..7af60ab 100644 --- a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java +++ b/src/main/java/com/zdjizhi/tools/exception/VoipRelationException.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.exception; +package com.zdjizhi.tools.exception; /** * @author qidaijie diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java index 2bfd1f2..4413754 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.log.Log; @@ -160,29 +160,6 @@ public class JsonParseUtil { return tmpMap; } - - /** - * 获取属性值的方法 - * - * @param obj 对象 - * @param property key - * @return 属性的值 - */ - public static Object getValue(Object obj, String property) { - - try { - BeanMap beanMap = BeanMap.create(obj); - if (beanMap.containsKey(property)) { - return beanMap.get(property); - } else { - return null; - } - } catch (RuntimeException e) { - logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); - return null; - } - } - /** * 获取属性值的方法 * @@ -216,22 +193,6 @@ public class JsonParseUtil { } /** - * long 类型检验转换方法,若为空返回基础值 - * - * @return Long value - */ - public static Long getLong(Object jsonMap, String property) { - Object value = getValue(jsonMap, property); - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - /** * int 类型检验转换方法,若为空返回基础值 * * @return int value @@ -243,21 +204,7 @@ public class JsonParseUtil { if (intVal == null) { return 0; } - return intVal; - } - - /** - * int 类型检验转换方法,若为空返回基础值 - * - * @return int value - */ - public static int getInteger(Object jsonMap, String property) { - Object value = getValue(jsonMap, property); - Integer intVal = TypeUtils.castToInt(value); - if (intVal == null) { - return 0; - } return intVal; } @@ -279,33 +226,9 @@ public class JsonParseUtil { } /** - * Object 方式获取getString类型字段 - * - * @param jsonObject - * @param property - * @return - */ - public static String getString(Object jsonObject, String property) { - Object value = getValue(jsonObject, property); - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** * 更新属性值的方法 * - * @param jsonMap 原始日志json map + * @param jsonMap 原始日志json parse * @param property 更新的key * @param value 更新的值 */ @@ -318,38 +241,6 @@ public class JsonParseUtil { } /** - * 更新属性值的方法 - * - * @param obj 对象 - * @param property 更新的key - * @param value 更新的值 - */ - public static void setValue(Object obj, String property, Object value) { - try { - BeanMap beanMap = BeanMap.create(obj); - beanMap.put(property, value); - } catch (ClassCastException e) { - logger.error("赋予实体类错误类型数据", e); - } - } - - /** - * 根据反射生成对象的方法 - * - * @param properties 反射类用的map - * @return 生成的Object类型的对象 - */ - public static Object generateObject(Map properties) { - BeanGenerator generator = new BeanGenerator(); - Set keySet = properties.keySet(); - for (Object aKeySet : keySet) { - String key = (String) aKeySet; - generator.addProperty(key, (Class) properties.get(key)); - } - return generator.create(); - } - - /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 * <p> * // * @param http 网关schema地址 diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java index 48e4620..3e56b84 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java @@ -1,9 +1,9 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.exception.VoipRelationException; +import com.zdjizhi.tools.exception.VoipRelationException; import java.util.List; import java.util.Map; @@ -53,7 +53,7 @@ class JsonTypeUtil { return (Map) value; } - throw new VoipRelationException("can not cast to map, value : " + value); + throw new VoipRelationException("can not cast to parse, value : " + value); } /** diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java index 1e0f156..c2fb497 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java @@ -1,10 +1,10 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.VoipRelationException; +import com.zdjizhi.tools.exception.VoipRelationException; /** @@ -39,14 +39,6 @@ public class TypeUtils { return ((Number) value).longValue(); } -// if (value instanceof Map) { -// return (Map) value; -// } -// -// if (value instanceof List) { -// return Collections.singletonList(value.toString()); -// } - if (value instanceof Boolean) { return (Boolean) value ? 1 : 0; } @@ -70,11 +62,6 @@ public class TypeUtils { return (Integer) value; } - //此判断数值超范围不抛出异常,会截取成对应类型数值 -// if (value instanceof Number) { -// return ((Number) value).intValue(); -// } - if (value instanceof String) { String strVal = (String) value; if (StringUtil.isBlank(strVal)) { @@ -112,11 +99,6 @@ public class TypeUtils { return (Double) value; } - //此判断数值超范围不抛出异常,会截取成对应类型数值 -// if (value instanceof Number) { -// return ((Number) value).doubleValue(); -// } - if (value instanceof String) { String strVal = (String) value; diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java index 808723a..9e701a7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.VoipRelationConfig; import org.apache.kafka.common.config.SslConfigs; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index ada1477..20f75b6 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -1,9 +1,8 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.VoipRelationConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java index 2660c12..6939025 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.VoipRelationConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -42,9 +42,6 @@ public class KafkaProducer { //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 kafkaProducer.setLogFailuresOnly(true); - //写入kafka的消息携带时间戳 -// kafkaProducer.setWriteTimestampToKafka(true); - return kafkaProducer; } } diff --git a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java b/src/main/java/com/zdjizhi/tools/system/VoipRelationConfigurations.java index 95e322b..a24292c 100644 --- a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java +++ b/src/main/java/com/zdjizhi/tools/system/VoipRelationConfigurations.java @@ -1,7 +1,7 @@ -package com.zdjizhi.utils.system; +package com.zdjizhi.tools.system; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.VoipRelationException; +import com.zdjizhi.tools.exception.VoipRelationException; import java.io.IOException; import java.util.Locale; diff --git a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java new file mode 100644 index 0000000..1498fd9 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java @@ -0,0 +1,134 @@ +package com.zdjizhi.tools.utils; + +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.tools.json.JsonParseUtil; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.operator + * @Description: + * @date 2022/4/1911:30 + */ +public class RelationUtils { + + + /** + * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中 + * + * @param firstLog A日志 + * @param secondLog B日志 + * @param key 指标 json key + */ + public static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) { + Long firstMetric = JsonParseUtil.getLong(firstLog, key); + Long secondMetric = JsonParseUtil.getLong(secondLog, key); + + Long sum = firstMetric + secondMetric; + + JsonParseUtil.setValue(firstLog, key, sum); + } + + + /** + * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中 + * + * @param firstLog A日志 + * @param secondLog B日志 + * @param otherLog C日志 + * @param key 指标 json key + */ + public static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) { + Long firstMetric = JsonParseUtil.getLong(firstLog, key); + Long secondMetric = JsonParseUtil.getLong(secondLog, key); + + Long sum = firstMetric + secondMetric; + + JsonParseUtil.setValue(otherLog, key, sum); + } + + + /** + * 校验sip日志,必须包含协商四元组,否则将原样输出不处理 + * + * @param sipLog SIP日志 + * @return true or false + */ + public static boolean checkSipCompleteness(Map<String, Object> sipLog) { + return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && + sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && + sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) && + sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT); + } + + /** + * 是否为内网IP + * + * @param ip ip Address + * @return true or false + */ + public static boolean isInnerIp(String ip) { + if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) { + //为空或者为特定IP时也算作内网IP + return StringUtil.isBlank(ip) || IPUtil.internalIp(ip); + } + return false; + } + + /** + * int类型 + * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 + * + * @param numOne 数值1 + * @param numTwo 数值2 + */ + public static int compareNum(int numOne, int numTwo) { + if (numOne > 0 && numTwo > 0) { + return Integer.compare(numOne, numTwo); + } else { + return -2; + } + } + + /** + * long类型 + * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 + * + * @param numOne 数值1 + * @param numTwo 数值2 + */ + public static int compareNum(long numOne, long numTwo) { + if (numOne > 0 && numTwo > 0) { + return Long.compare(numOne, numTwo); + } else { + return -2; + } + } + + + /** + * 判断RTP主叫方向 + * + * @param rtpLog RTP原始日志 + * @param voIpLog 融合后VOIP日志 + * @return 方向 0:未知 1:c2s 2:s2c + */ + public static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) { + + String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); + String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP); + + if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { + return 1; + } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { + return 2; + } + + return 0; + } +} diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java index 80a80ad..95fbfc1 100644 --- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java +++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java @@ -3,16 +3,23 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.functions.*; -import com.zdjizhi.utils.kafka.KafkaConsumer; -import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.api.java.tuple.Tuple3; +import com.zdjizhi.operator.filter.NullFilterFunction; +import com.zdjizhi.operator.group.MergeUniFlowKeyByFunction; +import com.zdjizhi.operator.group.VoipCalibrationKeyByFunction; +import com.zdjizhi.operator.parse.ParseMapFunction; +import com.zdjizhi.operator.window.MergeUniFlowWindowFunction; +import com.zdjizhi.operator.window.VoipCalibrationWindowFunction; +import com.zdjizhi.tools.kafka.KafkaConsumer; +import com.zdjizhi.tools.kafka.KafkaProducer; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import java.util.Map; + /** * @author qidaijie @@ -28,20 +35,28 @@ public class VoIpRelationTopology { DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); - SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation = - streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) - .process(new OneSidedWindowFunction()).name("OneSidedWindow"); - SingleOutputStreamOperator<String> window = sipCorrelation.windowAll( - TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))) - .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction"); + SingleOutputStreamOperator<Tuple4<String, String, Map<String, Object>, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction()) + .map(new ParseMapFunction()) + .keyBy(new MergeUniFlowKeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) + .process(new MergeUniFlowWindowFunction()) + .name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.MERGE_UNIFLOW_WINDOW_PARALLELISM); + + + SingleOutputStreamOperator<String> mergeUnidirectionalFlowWindow = mergeUniFlowWindow.keyBy(new VoipCalibrationKeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))) + .process(new VoipCalibrationWindowFunction()) + .name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.CALIBRATION_WINDOW_PARALLELISM); + - window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka"); + mergeUnidirectionalFlowWindow.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka"); try { environment.execute(args[0]); } catch (Exception e) { - logger.error("This Flink task start ERROR! Exception information is :" + e); + logger.error("This Flink task start ERROR! Exception information is :" + e.getMessage()); + e.printStackTrace(); } } } diff --git a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java deleted file mode 100644 index c74170d..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.common.JsonProConfig; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.IPUtil; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2022/4/1911:30 - */ -class relationUtils { - - - /** - * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中 - * - * @param firstLog A日志 - * @param secondLog B日志 - * @param key 指标 json key - */ - static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) { - Long firstMetric = JsonParseUtil.getLong(firstLog, key); - Long secondMetric = JsonParseUtil.getLong(secondLog, key); - - Long sum = firstMetric + secondMetric; - - JsonParseUtil.setValue(firstLog, key, sum); - } - - - /** - * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中 - * - * @param firstLog A日志 - * @param secondLog B日志 - * @param otherLog C日志 - * @param key 指标 json key - */ - static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) { - Long firstMetric = JsonParseUtil.getLong(firstLog, key); - Long secondMetric = JsonParseUtil.getLong(secondLog, key); - - Long sum = firstMetric + secondMetric; - - JsonParseUtil.setValue(otherLog, key, sum); - } - - - /** - * 校验sip日志,必须包含协商四元组,否则将原样输出不处理 - * - * @param sipLog SIP日志 - * @return true or false - */ - static boolean checkSipCompleteness(Map<String, Object> sipLog) { - return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && - sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && - sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) && - sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT); - } - - /** - * 是否为内网IP - * - * @param ip ip Address - * @return true or false - */ - static boolean isInnerIp(String ip) { - if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) { - //为空或者为特定IP时也算作内网IP - return StringUtil.isBlank(ip) || IPUtil.internalIp(ip); - } - return false; - } -} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java deleted file mode 100644 index 1adb1d1..0000000 --- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.zdjizhi.utils.http; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - private static final Log logger = LogFactory.get(); - - /** - * 请求网关获取schema - * - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder; - - HttpGet get = new HttpGet(http); - BufferedReader bufferedReader = null; - CloseableHttpResponse httpResponse = null; - try { - httpResponse = httpClient.execute(get); - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - int intC; - while ((intC = bufferedReader.read()) != -1) { - char c = (char) intC; - if (c == '\n') { - break; - } - entityStringBuilder.append(c); - } - - return entityStringBuilder.toString(); - } - } catch (IOException e) { - logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); - } finally { - if (httpClient != null) { - try { - httpClient.close(); - } catch (IOException e) { - logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); - } - } - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - logger.error("Close httpResponse ERROR! Exception messgae is:" + e); - } - } - if (bufferedReader != null) { - IOUtils.closeQuietly(bufferedReader); - } - } - return ""; - } -} diff --git a/src/test/testdata/differentVSYS b/src/test/testdata/differentVSYS new file mode 100644 index 0000000..75ff819 --- /dev/null +++ b/src/test/testdata/differentVSYS @@ -0,0 +1,3 @@ +{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2} +{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":3} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2}
\ No newline at end of file diff --git a/src/test/testdata/differentVSYSUniFlowRTP b/src/test/testdata/differentVSYSUniFlowRTP new file mode 100644 index 0000000..8d3f06f --- /dev/null +++ b/src/test/testdata/differentVSYSUniFlowRTP @@ -0,0 +1,3 @@ +{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2} +{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_456.pcap","raw_log_status":"CLOSE","common_vsys_id":2} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2}
\ No newline at end of file diff --git a/src/test/testdata/oneRTPUniFlowSIP b/src/test/testdata/oneRTPUniFlowSIP new file mode 100644 index 0000000..8aa4f24 --- /dev/null +++ b/src/test/testdata/oneRTPUniFlowSIP @@ -0,0 +1,3 @@ +{"common_action":0,"common_address_list":"50732-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50732,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50732_123.pcap","raw_log_status":"CLOSE"} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":1,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":2,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
\ No newline at end of file diff --git a/src/test/testdata/oneRTPtwoSIP b/src/test/testdata/oneRTPtwoSIP new file mode 100644 index 0000000..994a4bd --- /dev/null +++ b/src/test/testdata/oneRTPtwoSIP @@ -0,0 +1,3 @@ +{"common_action":0,"common_address_list":"50731-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50731,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50731_123.pcap","raw_log_status":"CLOSE"} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237333333","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
\ No newline at end of file diff --git a/src/test/testdata/oneSIPUniFlowRTP b/src/test/testdata/oneSIPUniFlowRTP new file mode 100644 index 0000000..c2422a7 --- /dev/null +++ b/src/test/testdata/oneSIPUniFlowRTP @@ -0,0 +1,3 @@ +{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_123.pcap","raw_log_status":"CLOSE"} +{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_456.pcap","raw_log_status":"CLOSE"} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50734RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650734\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50734,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
\ No newline at end of file diff --git a/src/test/testdata/oneSIPtwoRTP b/src/test/testdata/oneSIPtwoRTP new file mode 100644 index 0000000..fea59fd --- /dev/null +++ b/src/test/testdata/oneSIPtwoRTP @@ -0,0 +1,3 @@ +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50733RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650733\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50733,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} +{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_123.pcap","raw_log_status":"CLOSE"} +{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_456.pcap","raw_log_status":"CLOSE"}
\ No newline at end of file diff --git a/src/test/testdata/twoRTPandSIP b/src/test/testdata/twoRTPandSIP new file mode 100644 index 0000000..7d05757 --- /dev/null +++ b/src/test/testdata/twoRTPandSIP @@ -0,0 +1,4 @@ +{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"} +{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0} +{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:[email protected]>","sip_responder_description":"\"test2\"<sip:[email protected]>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
\ No newline at end of file |
