diff options
| author | qidaijie <[email protected]> | 2023-06-13 18:40:37 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-06-13 18:40:37 +0800 |
| commit | f73c71ae2c760e38eec17dba4ab307bc06afaf5f (patch) | |
| tree | 3667dffd08f25da715b3d9855b9a015f032ec1a7 | |
| parent | 9d7f822e0a63bad67563ecf3a8bdd89010eec9b5 (diff) | |
修改JSON解析类库为Fastjson2。(GAL-350)23.05-FastJson2
26 files changed, 295 insertions, 1011 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-stream-voip-relation</artifactId> - <version>220914-VSYS</version> + <version>230607-FastJson2</version> <name>log-stream-voip-relation</name> <url>http://www.example.com</url> @@ -16,7 +16,7 @@ <repository> <id>nexus</id> <name>Team Nexus Repository</name> - <url>http://192.168.40.125:8099/content/groups/public</url> + <url>http://192.168.40.153:8099/content/groups/public</url> </repository> <repository> @@ -39,8 +39,9 @@ <kafka.version>1.0.0</kafka.version> <hbase.version>2.2.3</hbase.version> <nacos.version>1.2.0</nacos.version> - <zdjz.tools.version>1.0.8</zdjz.tools.version> + <zdjz.tools.version>1.1.3</zdjz.tools.version> <scope.type>provided</scope.type> + <fastjson.version>2.0.32</fastjson.version> <!--<scope.type>compile</scope.type>--> </properties> @@ -132,6 +133,12 @@ </exclusions> </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>${fastjson.version}</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/properties/default_config.properties b/properties/default_config.properties index c750148..c562cad 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -39,15 +39,6 @@ kafka.user=nsyGpHKGFA4KW0zro9MDdw== #kafka SASL及SSL验证密码-加密 kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ -#====================nacos default====================# -#nacos username -nacos.username=nacos - -#nacos password -nacos.pin=nacos - -#nacos group -nacos.group=Galaxy diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 9cd2e94..7f80b1c 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,5 +1,4 @@ #--------------------------------地址配置------------------------------# - #管理kafka地址 source.kafka.servers=192.168.44.12:9094 @@ -10,25 +9,15 @@ sink.kafka.servers=192.168.44.12:9094 #定位库地址 tools.library=D:\\workerspace\\dat\\ -#--------------------------------nacos配置------------------------------# -#nacos 地址 -nacos.server=192.168.44.12:8848 - -#nacos namespace -nacos.schema.namespace=test - -#nacos data id -nacos.data.id=voip_record.json - #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=etl-test #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=etl-test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=mytest-1 +group.id=voip-test-230613-1 #--------------------------------topology配置------------------------------# #map函数并行度 @@ -44,7 +33,7 @@ one.sided.window.time=10 voip.calibration.window.time=30 #voip二次对准时间 seconds -sec.combine.sr.cache.secs=60 +sec.combine.sr.cache.secs=10 #check ip is Inner network;0 off, 1 on. check.inner.network=0
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java index 758af0f..0e59021 100644 --- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -20,20 +20,6 @@ public class VoipRelationConfig { */ public static final String CORRELATION_STR = "_"; - public static final String VISIBILITY = "disabled"; - public static final String FORMAT_SPLITTER = ","; - - - /** - * Nacos - */ - public static final String NACOS_SERVER = VoipRelationConfigurations.getStringProperty(0, "nacos.server"); - public static final String NACOS_SCHEMA_NAMESPACE = VoipRelationConfigurations.getStringProperty(0, "nacos.schema.namespace"); - public static final String NACOS_DATA_ID = VoipRelationConfigurations.getStringProperty(0, "nacos.data.id"); - public static final String NACOS_PIN = VoipRelationConfigurations.getStringProperty(1, "nacos.pin"); - public static final String NACOS_GROUP = VoipRelationConfigurations.getStringProperty(1, "nacos.group"); - public static final String NACOS_USERNAME = VoipRelationConfigurations.getStringProperty(1, "nacos.username"); - /** * System */ diff --git a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java index 351bee7..27affa9 100644 --- a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java +++ b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java @@ -1,11 +1,9 @@ package com.zdjizhi.operator.group; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; -import java.util.Map; /** * @author qidaijie @@ -13,9 +11,9 @@ import java.util.Map; * @Description: * @date 2022/8/2911:08 */ -public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, Map<String, Object>, Integer>, Integer> { +public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, JSONObject, Integer>, Integer> { @Override - public Integer getKey(Tuple3<String, Map<String, Object>, Integer> value) throws Exception { + public Integer getKey(Tuple3<String, JSONObject, Integer> value) throws Exception { //vsys id return value.f2; } diff --git a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java index a184566..063ac0a 100644 --- a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java +++ b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java @@ -1,24 +1,19 @@ package com.zdjizhi.operator.group; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; +import com.alibaba.fastjson2.JSONObject; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import java.util.Map; - /** * @author qidaijie * @Package com.zdjizhi.operator.group * @Description: * @date 2022/8/2911:08 */ -public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, Map<String, Object>, Integer>, Tuple2<String, Integer>> { +public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, JSONObject, Integer>, Tuple2<String, Integer>> { @Override - public Tuple2<String, Integer> getKey(Tuple4<String, String, Map<String, Object>, Integer> value) throws Exception { + public Tuple2<String, Integer> getKey(Tuple4<String, String, JSONObject, Integer> value) throws Exception { String fourKey = value.f0; Integer vsysId = value.f3; diff --git a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java index 3d7ca4b..9fc6981 100644 --- a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java +++ b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java @@ -2,15 +2,13 @@ package com.zdjizhi.operator.parse; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.tools.utils.RelationUtils; -import com.zdjizhi.tools.json.JsonParseUtil; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import java.util.Map; /** * @author qidaijie @@ -18,42 +16,44 @@ import java.util.Map; * @Description: * @date 2022/9/916:21 */ -public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<String, Object>, Integer>> { +public class ParseMapFunction implements MapFunction<String, Tuple3<String, JSONObject, Integer>> { private static final Log logger = LogFactory.get(); @Override - public Tuple3<String, Map<String, Object>, Integer> map(String input) throws Exception { + public Tuple3<String, JSONObject, Integer> map(String input) throws Exception { try { if (StringUtil.isNotBlank(input)) { - Map<String, Object> jsonMap = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class)); + JSONObject jsonObject = JSONObject.parseObject(input); - String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE); - Integer vsysId = getVsysId(jsonMap); - String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID); + String commonSchemaType = jsonObject.getString(JsonProConfig.SCHEMA_TYPE); + Integer vsysId = getVsysId(jsonObject); + String sipCallId = jsonObject.getString(JsonProConfig.SIP_CALL_ID); //1:c2s,2:s2c;3;double - int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR); + int commonStreamDir = jsonObject.getInteger(JsonProConfig.STREAM_DIR); - if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { - if (StringUtil.isNotBlank(sipCallId)) { - if (RelationUtils.checkSipCompleteness(jsonMap)) { - if (commonStreamDir != JsonProConfig.DOUBLE) { - return new Tuple3<>("sip-single", jsonMap, vsysId); + switch (commonSchemaType) { + case JsonProConfig.SIP_MARK: + if (StringUtil.isNotBlank(sipCallId)) { + if (RelationUtils.checkSipCompleteness(jsonObject)) { + if (commonStreamDir != JsonProConfig.DOUBLE) { + return new Tuple3<>("sip-single", jsonObject, vsysId); + } else { + return new Tuple3<>("sip-double", jsonObject, vsysId); + } } else { - return new Tuple3<>("sip-double", jsonMap, vsysId); + return new Tuple3<>("violation", jsonObject, vsysId); } } else { - return new Tuple3<>("violation", jsonMap, vsysId); + return new Tuple3<>("violation", jsonObject, vsysId); } - } else { - return new Tuple3<>("violation", jsonMap, vsysId); - } - } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - if (commonStreamDir == JsonProConfig.DOUBLE) { - return new Tuple3<>("rtp-double", jsonMap, vsysId); - } else { - return new Tuple3<>("rtp-single", jsonMap, vsysId); - } + case JsonProConfig.RTP_MARK: + if (commonStreamDir == JsonProConfig.DOUBLE) { + return new Tuple3<>("rtp-double", jsonObject, vsysId); + } else { + return new Tuple3<>("rtp-single", jsonObject, vsysId); + } + default: } } } catch (RuntimeException e) { @@ -65,17 +65,11 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map< /** * 获取VSYS ID若无该字段,则默认值为1 * - * @param jsonMap 原始日志 + * @param jsonObject 原始日志 * @return vsysid */ - private static int getVsysId(Map<String, Object> jsonMap) { - Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null); - if (value != null) { - return Integer.parseInt(value.toString()); - } else { - return 1; - } - + private static int getVsysId(JSONObject jsonObject) { + return jsonObject.getIntValue(JsonProConfig.VSYS_ID, 1); } diff --git a/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java index cf25763..dba60de 100644 --- a/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java +++ b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java @@ -2,22 +2,19 @@ package com.zdjizhi.operator.window; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.IPUtil; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.tools.utils.RelationUtils; -import com.zdjizhi.tools.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import scala.Int; import java.util.HashMap; -import java.util.Map; /** * @author qidaijie @@ -25,38 +22,38 @@ import java.util.Map; * @Description: * @date 2022/8/2911:44 */ -public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, Map<String, Object>, Integer>, Tuple4<String, String, Map<String, Object>, Integer>, Integer, TimeWindow> { +public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, JSONObject, Integer>, Tuple4<String, String, JSONObject, Integer>, Integer, TimeWindow> { private static final Log logger = LogFactory.get(); /** * key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流) */ - private static HashMap<String, Map<String, Object>> sipOriHmList = new HashMap<>(32); + private static HashMap<String, JSONObject> sipOriHmList = new HashMap<>(32); /** * key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流) */ - private static HashMap<String, Map<String, Object>> rtpOriHmList = new HashMap<>(32); + private static HashMap<String, JSONObject> rtpOriHmList = new HashMap<>(32); @Override - public void process(Integer key, Context context, Iterable<Tuple3<String, Map<String, Object>, Integer>> inputs, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) throws Exception { - for (Tuple3<String, Map<String, Object>, Integer> input : inputs) { + public void process(Integer key, Context context, Iterable<Tuple3<String, JSONObject, Integer>> inputs, Collector<Tuple4<String, String, JSONObject, Integer>> out) { + for (Tuple3<String, JSONObject, Integer> input : inputs) { if (input != null) { //已关联的sip,rtp;未关联的sip,rtp;内网的sip String type = input.f0; - Map<String, Object> jsonMap = input.f1; + JSONObject jsonMap = input.f1; Integer vsysId = input.f2; - logger.error("" + type + "----" + jsonMap.toString()); switch (type) { case "sip-double": separateInnerIp(jsonMap, out, vsysId); break; case "rtp-double": - String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), - JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); + case "rtp-single": + String rtpDouble4Key = getFourKey(jsonMap.getString(JsonProConfig.CLIENT_IP), + jsonMap.getInteger(JsonProConfig.CLIENT_PORT), + jsonMap.getString(JsonProConfig.SERVER_IP), + jsonMap.getInteger(JsonProConfig.SERVER_PORT)); out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId)); break; @@ -65,27 +62,17 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str break; //单向流的SIP case "sip-single": - String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID); - String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), - JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); + String sipCallId = jsonMap.getString(JsonProConfig.SIP_CALL_ID); + String sipSingle4Key = getFourKey(jsonMap.getString(JsonProConfig.CLIENT_IP), + jsonMap.getInteger(JsonProConfig.CLIENT_PORT), + jsonMap.getString(JsonProConfig.SERVER_IP), + jsonMap.getInteger(JsonProConfig.SERVER_PORT)); if (StringUtil.isNotBlank(sipCallId)) { putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out); } else { out.collect(new Tuple4<>("", type, jsonMap, vsysId)); } break; - //单向流的RTP - case "rtp-single": - String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT), - JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP), - JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT)); - //对rtp单向流进行关联 - putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out); - break; - //内网的SIP default: logger.error("type is beyond expectation:" + type); break; @@ -95,20 +82,20 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str } if (sipOriHmList.size() > 0) { - HashMap<String, Map<String, Object>> tmpSipOriHmList = new HashMap<>(sipOriHmList); + HashMap<String, JSONObject> tmpSipOriHmList = new HashMap<>(sipOriHmList); sipOriHmList.clear(); for (String sipKey : tmpSipOriHmList.keySet()) { - Map<String, Object> sipSingleMsg = tmpSipOriHmList.get(sipKey); + JSONObject sipSingleMsg = tmpSipOriHmList.get(sipKey); //sipKey为sip_call_id,未关联成功的sip是不能使用的 out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0)); } } if (rtpOriHmList.size() > 0) { - HashMap<String, Map<String, Object>> tmpRtpOriHmList = new HashMap<>(rtpOriHmList); + HashMap<String, JSONObject> tmpRtpOriHmList = new HashMap<>(rtpOriHmList); rtpOriHmList.clear(); for (String rtpKey : tmpRtpOriHmList.keySet()) { - Map<String, Object> rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); + JSONObject rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); //未关联成功的rtp还可以继续关联,因为有四元组 out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0)); } @@ -120,32 +107,32 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str * 存放key并关联拼接对应Key */ @SuppressWarnings("unchecked") - private static void putKeyAndMsg(Map<String, Object> secondSipOrRtpLog, String hmStrKey, HashMap<String, Map<String, Object>> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) { + private static void putKeyAndMsg(JSONObject secondSipOrRtpLog, String hmStrKey, HashMap<String, JSONObject> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, JSONObject, Integer>> out) { //和上次存入的数据关联 if (hashMapStr.containsKey(hmStrKey)) { - HashMap<String, Object> jsonCommonMap = new HashMap<>(128); - Map<String, Object> firstSipOrRtpLog = hashMapStr.remove(hmStrKey); + JSONObject jsonCommon = new JSONObject(); + JSONObject firstSipOrRtpLog = hashMapStr.remove(hmStrKey); //1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准 - if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) { - jsonCommonMap.putAll(secondSipOrRtpLog); - jsonCommonMap.putAll(firstSipOrRtpLog); + if (firstSipOrRtpLog.getIntValue(JsonProConfig.STREAM_DIR) == 1) { + jsonCommon.putAll(secondSipOrRtpLog); + jsonCommon.putAll(firstSipOrRtpLog); } else { - jsonCommonMap.putAll(firstSipOrRtpLog); - jsonCommonMap.putAll(secondSipOrRtpLog); + jsonCommon.putAll(firstSipOrRtpLog); + jsonCommon.putAll(secondSipOrRtpLog); } - accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap); - jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); + accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommon); + jsonCommon.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); if (JsonProConfig.SIP_MARK.equals(protocolType)) { //手动关联SIP后区分内外网IP再下发 - separateInnerIp(jsonCommonMap, out, vsysId); + separateInnerIp(jsonCommon, out, vsysId); } else if (JsonProConfig.RTP_MARK.equals(protocolType)) { //手动关联RTP后按四元组下发 - jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog)); - out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId)); + jsonCommon.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog)); + out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommon, vsysId)); } } else { hashMapStr.put(hmStrKey, secondSipOrRtpLog); @@ -155,12 +142,12 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str /** * 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP */ - private static void separateInnerIp(Map<String, Object> jsonMap, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out, Integer vsysid) { + private static void separateInnerIp(JSONObject jsonMap, Collector<Tuple4<String, String, JSONObject, Integer>> out, Integer vsysid) { - String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP); - int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT); - String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP); - int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT); + String sipOriginatorIp = jsonMap.getString(JsonProConfig.SIP_ORIGINATOR_IP); + int sipOriginatorPort = jsonMap.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT); + String sipResponderIp = jsonMap.getString(JsonProConfig.SIP_RESPONDER_IP); + int sipResponderPort = jsonMap.getInteger(JsonProConfig.SIP_RESPONDER_PORT); if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) { /* @@ -277,7 +264,7 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str * @param secondSipOrRtpLog 第二条单向流日志 * @param sipOrRtpCombin SIP双向流日志集合 */ - private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) { + private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secondSipOrRtpLog, JSONObject sipOrRtpCombin) { //common_sessions RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS); //common_c2s_pkt_num @@ -309,10 +296,10 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str * @param secendSipOrRtpLog 第二个单向流日志 * @return 文件路径 */ - private static String setRtpPacpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) { + private static String setRtpPacpPath(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog) { - String firstPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); - String secondPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); + String firstPcapPath = firstSipOrRtpLog.getString(JsonProConfig.RTP_PCAP_PATH); + String secondPcapPath = secendSipOrRtpLog.getString(JsonProConfig.RTP_PCAP_PATH); if (StringUtil.isNotBlank(firstPcapPath) && StringUtil.isNotBlank(secondPcapPath)) { if (firstPcapPath.equals(secondPcapPath)) { diff --git a/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java index 9929212..946e946 100644 --- a/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java +++ b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java @@ -2,12 +2,10 @@ package com.zdjizhi.operator.window; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.tools.utils.RelationUtils; -import com.zdjizhi.tools.json.JsonParseUtil; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -22,7 +20,7 @@ import java.util.*; * @Description: * @date 2022/9/617:46 */ -public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, Map<String, Object>, Integer>, String, Tuple2<String, Integer>, TimeWindow> { +public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, JSONObject, Integer>, String, Tuple2<String, Integer>, TimeWindow> { private static final Log logger = LogFactory.get(); /** @@ -32,7 +30,7 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< * 存放数据:rtp-single,rtp-two,sip-two * 不存放的数据:sip-single与sip-in */ - private static HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList = new HashMap<>(32); + private static HashMap<String, LinkedList<JSONObject>> combineSRHmList = new HashMap<>(32); /** * 二次关联用HashMap @@ -41,33 +39,33 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< * 存放数据:rtp-single,rtp-two,sip-two * 不存放的数据:sip-single与sip-in */ - private static HashMap<String, LinkedList<Map<String, Object>>> secCombineSRHmList = new HashMap<>(32); + private static HashMap<String, LinkedList<JSONObject>> secCombineSRHmList = new HashMap<>(32); @Override - public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, Map<String, Object>, Integer>> input, Collector<String> out) throws Exception { - for (Tuple4<String, String, Map<String, Object>, Integer> tuple : input) { + public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, JSONObject, Integer>> input, Collector<String> out) { + for (Tuple4<String, String, JSONObject, Integer> tuple : input) { //拼接的四元组 String fourKey = tuple.f0; //已关联的sip,rtp;未关联的sip,rtp;内网的sip String type = tuple.f1; - Map<String, Object> jsonMap = tuple.f2; - logger.error("" + fourKey + "----" + type); + JSONObject jsonMap = tuple.f2; + switch (type) { //单向流对准后的SIP case "sip-double": - //单向流对准后的RTP + //双向流RTP case "rtp-double": + //单向流RTP + case "rtp-single": putKeyAndMsg(jsonMap, fourKey, combineSRHmList); break; //单向流的SIP case "sip-single": - //单向流的RTP - case "rtp-single": //内网的SIP case "sip-in": //违规的日志 case "violation": - out.collect(JsonMapper.toJsonString(jsonMap)); + out.collect(jsonMap.toJSONString()); break; default: logger.error("type is beyond expectation:" + type); @@ -86,14 +84,14 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< /** * 存放key并添加对应List */ - private static void putKeyAndMsg(Map<String, Object> message, String fourStrKey, HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList) { + private static void putKeyAndMsg(JSONObject jsonObject, String fourStrKey, HashMap<String, LinkedList<JSONObject>> combineSRHmList) { if (combineSRHmList.containsKey(fourStrKey)) { - LinkedList<Map<String, Object>> tmpList = combineSRHmList.get(fourStrKey); - tmpList.add(message); + LinkedList<JSONObject> tmpList = combineSRHmList.get(fourStrKey); + tmpList.add(jsonObject); combineSRHmList.put(fourStrKey, tmpList); } else { - LinkedList<Map<String, Object>> tmpList = new LinkedList<>(); - tmpList.add(message); + LinkedList<JSONObject> tmpList = new LinkedList<>(); + tmpList.add(jsonObject); combineSRHmList.put(fourStrKey, tmpList); } } @@ -104,24 +102,24 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< * @param combineHmList */ @SuppressWarnings("unchecked") - private void tickCombineHmList(HashMap<String, LinkedList<Map<String, Object>>> combineHmList, Collector<String> output) { + private void tickCombineHmList(HashMap<String, LinkedList<JSONObject>> combineHmList, Collector<String> output) { if (combineHmList.size() > 0) { long nowTime = System.currentTimeMillis() / 1000; - HashMap<String, LinkedList<Map<String, Object>>> tempCombineSRhmList = new HashMap<>(combineHmList); + HashMap<String, LinkedList<JSONObject>> tempCombineSRhmList = new HashMap<>(combineHmList); combineHmList.clear(); for (String fourStrKey : tempCombineSRhmList.keySet()) { - LinkedList<Map<String, Object>> tempCombineSRList = tempCombineSRhmList.get(fourStrKey); + LinkedList<JSONObject> tempCombineSRList = tempCombineSRhmList.get(fourStrKey); //包含SIP和RTP,集合大于1则可能是sip和rtp,多条sip或多条rtp int listSize = tempCombineSRList.size(); if (listSize > 1) { - List<Map<String, Object>> sipBeanArr = new ArrayList<>(); - List<Map<String, Object>> rtpBeanArr = new ArrayList<>(); + List<JSONObject> sipBeanArr = new ArrayList<>(); + List<JSONObject> rtpBeanArr = new ArrayList<>(); - for (Map<String, Object> tempSipOrRtpLog : tempCombineSRList) { - String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); + for (JSONObject tempSipOrRtpLog : tempCombineSRList) { + String schemaType = tempSipOrRtpLog.getString(JsonProConfig.SCHEMA_TYPE); if (JsonProConfig.SIP_MARK.equals(schemaType)) { sipBeanArr.add(tempSipOrRtpLog); } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { @@ -132,19 +130,19 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< int sipSize = sipBeanArr.size(); //只允许一对多的情况,其余视为异常数据 if (rtpSize == 1 && sipSize >= 1) { - for (Map<String, Object> voIpLog : sipBeanArr) { - Map<String, Object> rtpLog = rtpBeanArr.get(0); + for (JSONObject voIpLog : sipBeanArr) { + JSONObject rtpLog = rtpBeanArr.get(0); accumulateVoipMsg(voIpLog, rtpLog); //四元组,voip,关联后的数据 output.collect(mergeJson(voIpLog, rtpLog)); } } else if (sipSize == 1 && rtpSize >= 1) { - for (Map<String, Object> rtpLog : rtpBeanArr) { - HashMap<String, Object> voIpLog = new HashMap<>(128); - voIpLog.putAll(sipBeanArr.get(0)); - accumulateVoipMsg(voIpLog, rtpLog); + for (JSONObject rtpLog : rtpBeanArr) { + JSONObject voipLog = new JSONObject(); + voipLog.putAll(sipBeanArr.get(0)); + accumulateVoipMsg(voipLog, rtpLog); //四元组,voip,关联后的数据 - output.collect(mergeJson(voIpLog, rtpLog)); + output.collect(mergeJson(voipLog, rtpLog)); } } else { logger.error("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); @@ -153,14 +151,14 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< } } else { - Map<String, Object> voIpLog = tempCombineSRList.get(0); - long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME); + JSONObject voipLog = tempCombineSRList.get(0); + long commonEndTime = voipLog.getLong(JsonProConfig.END_TIME); long intervalTime = nowTime - commonEndTime; if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { logger.warn("当前日志时间未超过二次对准时间,进入队列等待再次对准,日志时间差:" + intervalTime); - putKeyAndMsg(voIpLog, fourStrKey, secCombineSRHmList); + putKeyAndMsg(voipLog, fourStrKey, secCombineSRHmList); } else { - output.collect(JsonMapper.toJsonString(voIpLog)); + output.collect(voipLog.toJSONString()); } } } @@ -173,7 +171,7 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< * @param voIpLog 融合后voip日志 * @param rtpLog RTP日志 */ - private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) { + private void accumulateVoipMsg(JSONObject voIpLog, JSONObject rtpLog) { //common_sessions RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS); //common_c2s_pkt_num @@ -201,13 +199,13 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< /** * 定时处理中List元素数仅为1的情况 */ - private void sendDirectlyOneElement(String msg, Map<String, Object> voIpLog, Collector<String> output) { + private void sendDirectlyOneElement(String msg, JSONObject voIpLog, Collector<String> output) { //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据 - String commonSchemaType = JsonParseUtil.getString(voIpLog, JsonProConfig.SCHEMA_TYPE); + String commonSchemaType = voIpLog.getString(JsonProConfig.SCHEMA_TYPE); if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { output.collect(msg); } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - int commonStreamDir = JsonParseUtil.getInteger(voIpLog, JsonProConfig.STREAM_DIR); + int commonStreamDir = voIpLog.getInteger(JsonProConfig.STREAM_DIR); if (commonStreamDir != JsonProConfig.DOUBLE) { output.collect(msg); } else { @@ -219,10 +217,10 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< /** * 发送不符合逻辑的日志到kafka */ - private static void sendViolationLogs(List<Map<String, Object>> violationLogs, Collector<String> output) { + private static void sendViolationLogs(List<JSONObject> violationLogs, Collector<String> output) { if (violationLogs.size() > 0) { - for (Map<String, Object> log : violationLogs) { - output.collect(JsonMapper.toJsonString(log)); + for (JSONObject log : violationLogs) { + output.collect(log.toJSONString()); } } } @@ -230,19 +228,19 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4< /** * 发送VOIP日志到Kafka */ - private static String mergeJson(Map<String, Object> voIpLog, Map<String, Object> rtpLog) { + private static String mergeJson(JSONObject voIpLog, JSONObject rtpLog) { - int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); - int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); - String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); + int rtpPayloadTypeC2s = rtpLog.getIntValue(JsonProConfig.RTP_PAYLOAD_TYPE_C2S, 0); + int rtpPayloadTypeS2c = rtpLog.getIntValue(JsonProConfig.RTP_PAYLOAD_TYPE_S2C, 0); + String rtpPcapPath = rtpLog.getString(JsonProConfig.RTP_PCAP_PATH); - JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog)); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); - JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); + voIpLog.put(JsonProConfig.SCHEMA_TYPE, "VoIP"); + voIpLog.put(JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog)); + voIpLog.put(JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); + voIpLog.put(JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); + voIpLog.put(JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); - return JsonMapper.toJsonString(voIpLog); + return voIpLog.toJSONString(); } } diff --git a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java deleted file mode 100644 index 4413754..0000000 --- a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java +++ /dev/null @@ -1,306 +0,0 @@ -package com.zdjizhi.tools.json; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.config.ConfigService; - -import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import net.sf.cglib.beans.BeanGenerator; -import net.sf.cglib.beans.BeanMap; - -import java.util.*; -import java.util.concurrent.Executor; - -/** - * 使用FastJson解析json的工具类 - * - * @author qidaijie - */ -public class JsonParseUtil { - - private static final Log logger = LogFactory.get(); - private static Properties propNacos = new Properties(); - - /** - * 获取需要删除字段的列表 - */ - private static ArrayList<String> dropList = new ArrayList<>(); - - - /** - * 在内存中加载反射类用的map - */ - private static HashMap<String, Class> jsonFieldsMap; - - - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN); - try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = VoipRelationConfig.NACOS_DATA_ID; - String group = VoipRelationConfig.NACOS_GROUP; - String schema = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(schema)) { - jsonFieldsMap = getFieldsFromSchema(schema); - } - configService.addListener(dataId, group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - if (StringUtil.isNotBlank(configMsg)) { - jsonFieldsMap = getFieldsFromSchema(configMsg); - } - } - }); - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); - } - } - - /** - * 模式匹配,给定一个类型字符串返回一个类类型 - * - * @param type 类型 - * @return 类类型 - */ - - public static Class getClassName(String type) { - Class clazz; - - switch (type) { - case "int": - clazz = Integer.class; - break; - case "string": - clazz = String.class; - break; - case "long": - clazz = long.class; - break; - case "array": - clazz = List.class; - break; - case "double": - clazz = double.class; - break; - case "float": - clazz = float.class; - break; - case "char": - clazz = char.class; - break; - case "byte": - clazz = byte.class; - break; - case "boolean": - clazz = boolean.class; - break; - case "short": - clazz = short.class; - break; - default: - clazz = String.class; - } - return clazz; - } - - /** - * 类型转换 - * - * @param jsonMap 原始日志map - */ - public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException { - JsonParseUtil.dropJsonField(jsonMap); - HashMap<String, Object> tmpMap = new HashMap<>(192); - for (String key : jsonMap.keySet()) { - if (jsonFieldsMap.containsKey(key)) { - String simpleName = jsonFieldsMap.get(key).getSimpleName(); - switch (simpleName) { - case "String": - tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); - break; - case "Integer": - tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key))); - break; - case "long": - tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key))); - break; - case "List": - tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key))); - break; - case "Map": - tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key))); - break; - case "double": - tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key))); - break; - default: - tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); - } - } - } - return tmpMap; - } - - /** - * 获取属性值的方法 - * - * @param jsonMap 原始日志 - * @param property key - * @return 属性的值 - */ - public static Object getValue(Map<String, Object> jsonMap, String property) { - try { - return jsonMap.getOrDefault(property, null); - } catch (RuntimeException e) { - logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); - return null; - } - } - - /** - * long 类型检验转换方法,若为空返回基础值 - * - * @return Long value - */ - public static Long getLong(Map<String, Object> jsonMap, String property) { - Object value = jsonMap.getOrDefault(property, null); - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - /** - * int 类型检验转换方法,若为空返回基础值 - * - * @return int value - */ - public static int getInteger(Map<String, Object> jsonMap, String property) { - Object value = jsonMap.getOrDefault(property, null); - Integer intVal = TypeUtils.castToInt(value); - - if (intVal == null) { - return 0; - } - - return intVal; - } - - public static String getString(Map<String, Object> jsonMap, String property) { - Object value = jsonMap.getOrDefault(property, null); - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** - * 更新属性值的方法 - * - * @param jsonMap 原始日志json parse - * @param property 更新的key - * @param value 更新的值 - */ - public static void setValue(Map<String, Object> jsonMap, String property, Object value) { - try { - jsonMap.put(property, value); - } catch (RuntimeException e) { - logger.error("赋予实体类错误类型数据", e); - } - } - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * <p> - * // * @param http 网关schema地址 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - private static HashMap<String, Class> getFieldsFromSchema(String schema) { - HashMap<String, Class> map = new HashMap<>(16); - - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(schema); - JSONArray fields = (JSONArray) schemaJson.get("fields"); - - for (Object field : fields) { - String filedStr = field.toString(); - if (checkKeepField(filedStr)) { - String name = JsonPath.read(filedStr, "$.name").toString(); - String type = JsonPath.read(filedStr, "$.type").toString(); - if (type.contains("{")) { - type = JsonPath.read(filedStr, "$.type.type").toString(); - } - //组合用来生成实体类的map - map.put(name, getClassName(type)); - } else { - dropList.add(filedStr); - } - } - return map; - } - - - /** - * 判断字段是否需要保留 - * - * @param message 单个field-json - * @return true or false - */ - private static boolean checkKeepField(String message) { - boolean isKeepField = true; - boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); - if (isHiveDoc) { - boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); - if (isHiveVi) { - String visibility = JsonPath.read(message, "$.doc.visibility").toString(); - if (VoipRelationConfig.VISIBILITY.equals(visibility)) { - isKeepField = false; - } - } - } - return isKeepField; - } - - /** - * 删除schema内指定的无效字段(jackson) - * - * @param jsonMap - */ - private static void dropJsonField(Map<String, Object> jsonMap) { - for (String field : dropList) { - jsonMap.remove(field); - } - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java deleted file mode 100644 index 3e56b84..0000000 --- a/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.zdjizhi.tools.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.tools.exception.VoipRelationException; - -import java.util.List; -import java.util.Map; - -/** - * @author qidaijie - * @Package PACKAGE_NAME - * @Description: - * @date 2021/7/1217:34 - */ -class JsonTypeUtil { - private static final Log logger = LogFactory.get(); - /** - * String 类型检验转换方法 - * - * @param value json value - * @return String value - */ - static String checkString(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - static Map checkObject(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return (Map) value; - } - - throw new VoipRelationException("can not cast to parse, value : " + value); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - static List checkArray(Object value) { - if (value == null) { - return null; - } - - if (value instanceof List) { - return (List) value; - } - - throw new VoipRelationException("can not cast to List, value : " + value); - } - - private static Long checkLong(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToLong(value); - } - - /** - * long 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return Long value - */ - static long checkLongValue(Object value) { - - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - /** - * Double 类型校验转换方法 - * - * @param value json value - * @return Double value - */ - static Double checkDouble(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToDouble(value); - } - - - private static Integer checkInt(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToInt(value); - } - - - /** - * int 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return int value - */ - static int getIntValue(Object value) { - - Integer intVal = TypeUtils.castToInt(value); - - if (intVal == null) { - return 0; - } - return intVal; - } - -} diff --git a/src/main/java/com/zdjizhi/tools/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java deleted file mode 100644 index c2fb497..0000000 --- a/src/main/java/com/zdjizhi/tools/json/TypeUtils.java +++ /dev/null @@ -1,162 +0,0 @@ -package com.zdjizhi.tools.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.tools.exception.VoipRelationException; - - -/** - * @author qidaijie - * @Package PACKAGE_NAME - * @Description: - * @date 2021/7/1218:20 - */ -public class TypeUtils { - private static final Log logger = LogFactory.get(); - - /** - * Integer 类型判断方法 - * - * @param value json value - * @return Integer value or null - */ - public static Object castToIfFunction(Object value) { - if (value == null) { - return null; - } - - if (value instanceof String) { - return value.toString(); - } - - if (value instanceof Integer) { - return ((Number) value).intValue(); - } - - if (value instanceof Long) { - return ((Number) value).longValue(); - } - - if (value instanceof Boolean) { - return (Boolean) value ? 1 : 0; - } - - throw new VoipRelationException("can not cast to int, value : " + value); - } - - /** - * Integer 类型判断方法 - * - * @param value json value - * @return Integer value or null - */ - static Integer castToInt(Object value) { - - if (value == null) { - return null; - } - - if (value instanceof Integer) { - return (Integer) value; - } - - if (value instanceof String) { - String strVal = (String) value; - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Integer.parseInt(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Integer Error,The error Str is:" + strVal); - } - } - - if (value instanceof Boolean) { - return (Boolean) value ? 1 : 0; - } - - throw new VoipRelationException("can not cast to int, value : " + value); - } - - /** - * Double类型判断方法 - * - * @param value json value - * @return double value or null - */ - static Double castToDouble(Object value) { - - if (value instanceof Double) { - return (Double) value; - } - - if (value instanceof String) { - String strVal = (String) value; - - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Double.parseDouble(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Double Error,The error Str is:" + strVal); - } - } - - throw new VoipRelationException("can not cast to double, value : " + value); - } - - /** - * Long类型判断方法 - * - * @param value json value - * @return (Long)value or null - */ - static Long castToLong(Object value) { - if (value == null) { - return null; - } - -// 此判断数值超范围不抛出异常,会截取成对应类型数值 - if (value instanceof Number) { - return ((Number) value).longValue(); - } - - if (value instanceof String) { - String strVal = (String) value; - - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Long.parseLong(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Long Error,The error Str is:" + strVal); - } - } - - throw new VoipRelationException("can not cast to long, value : " + value); - } - -} diff --git a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java index 1498fd9..9176a5f 100644 --- a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java +++ b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java @@ -1,12 +1,11 @@ package com.zdjizhi.tools.utils; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.IPUtil; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.tools.json.JsonParseUtil; -import java.util.Map; /** * @author qidaijie @@ -24,13 +23,11 @@ public class RelationUtils { * @param secondLog B日志 * @param key 指标 json key */ - public static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) { - Long firstMetric = JsonParseUtil.getLong(firstLog, key); - Long secondMetric = JsonParseUtil.getLong(secondLog, key); + public static void metricSum(JSONObject firstLog, JSONObject secondLog, String key) { + Long firstMetric = firstLog.getLongValue(key, 0L); + Long secondMetric = secondLog.getLongValue(key, 0L); - Long sum = firstMetric + secondMetric; - - JsonParseUtil.setValue(firstLog, key, sum); + firstLog.put(key, (firstMetric + secondMetric)); } @@ -42,13 +39,11 @@ public class RelationUtils { * @param otherLog C日志 * @param key 指标 json key */ - public static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) { - Long firstMetric = JsonParseUtil.getLong(firstLog, key); - Long secondMetric = JsonParseUtil.getLong(secondLog, key); - - Long sum = firstMetric + secondMetric; + public static void metricSumSetOtherLog(JSONObject firstLog, JSONObject secondLog, JSONObject otherLog, String key) { + Long firstMetric = firstLog.getLongValue(key, 0L); + Long secondMetric = secondLog.getLongValue(key, 0L); - JsonParseUtil.setValue(otherLog, key, sum); + otherLog.put(key, (firstMetric + secondMetric)); } @@ -58,7 +53,7 @@ public class RelationUtils { * @param sipLog SIP日志 * @return true or false */ - public static boolean checkSipCompleteness(Map<String, Object> sipLog) { + public static boolean checkSipCompleteness(JSONObject sipLog) { return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) && @@ -117,11 +112,11 @@ public class RelationUtils { * @param voIpLog 融合后VOIP日志 * @return 方向 0:未知 1:c2s 2:s2c */ - public static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) { + public static int judgeDirection(JSONObject rtpLog, JSONObject voIpLog) { - String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); - String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP); + String ip = rtpLog.getString(JsonProConfig.CLIENT_IP); + String sipOriginatorIp = voIpLog.getString(JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = voIpLog.getString(JsonProConfig.SIP_RESPONDER_IP); if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { return 1; diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java index 95fbfc1..8ca7c06 100644 --- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java +++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java @@ -2,6 +2,7 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.operator.filter.NullFilterFunction; import com.zdjizhi.operator.group.MergeUniFlowKeyByFunction; @@ -18,8 +19,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import java.util.Map; - /** * @author qidaijie @@ -35,8 +34,7 @@ public class VoIpRelationTopology { DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); - - SingleOutputStreamOperator<Tuple4<String, String, Map<String, Object>, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction()) + SingleOutputStreamOperator<Tuple4<String, String, JSONObject, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction()) .map(new ParseMapFunction()) .keyBy(new MergeUniFlowKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java index 170086c..194369d 100644 --- a/src/test/java/com/zdjizhi/EncryptorTest.java +++ b/src/test/java/com/zdjizhi/EncryptorTest.java @@ -32,4 +32,5 @@ public class EncryptorTest { System.out.println("The pin is: "+rawUser); } + } diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java deleted file mode 100644 index 46f6f85..0000000 --- a/src/test/java/com/zdjizhi/json/JsonPathTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.zdjizhi.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.StringUtil; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author qidaijie - * @Package com.zdjizhi.json - * @Description: - * @date 2022/3/2410:22 - */ -public class JsonPathTest { - private static final Log logger = LogFactory.get(); - - private static Properties propNacos = new Properties(); - - /** - * 获取需要删除字段的列表 - */ - private static ArrayList<String> dropList = new ArrayList<>(); - - /** - * 在内存中加载反射类用的map - */ - private static HashMap<String, Class> map; - - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList<String[]> jobList; - - private static String schema; - - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN); - try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = VoipRelationConfig.NACOS_DATA_ID; - String group = VoipRelationConfig.NACOS_GROUP; - String config = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(config)) { - schema = config; - } - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); - } - } - - @Test - public void parseSchemaGetFields() { - DocumentContext parse = JsonPath.parse(schema); - List<Object> fields = parse.read("$.fields[*]"); - for (Object field : fields) { - String name = JsonPath.read(field, "$.name").toString(); - String type = JsonPath.read(field, "$.type").toString(); - } - } -} diff --git a/src/test/java/com/zdjizhi/json/JsonTransfromTest.java b/src/test/java/com/zdjizhi/json/JsonTransfromTest.java new file mode 100644 index 0000000..889f41b --- /dev/null +++ b/src/test/java/com/zdjizhi/json/JsonTransfromTest.java @@ -0,0 +1,30 @@ +package com.zdjizhi.json; + +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONReader; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi.json + * @Description: + * @date 2023/6/717:38 + */ +public class JsonTransfromTest { + + @Test + public void jsonSizeTest() { + String a = "{\"sip_call_id\":\"NjI2NzU4YWZiNTE4YzNkNzAxZGVlMzNhMTMwNGRhMWQ.\",\"sip_originator_description\":\"\\\"1075\\\"<sip:[email protected]>\",\"sip_responder_description\":\"\\\"1078\\\"<sip:[email protected]>\",\"sip_user_agent\":\"eyeBeam release 1011d stamp 40820\",\"sip_originator_sdp_content\":\"v=0\\r\\no=- 8 2 IN IP4 192.50.111.35\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.111.35\\r\\nt=0 0\\r\\nm=audio 58593 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : 78u1b+Nh KPPh5Azo 192.50.111.35 58593\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:2C43D09FE8BD480FB1645410CFEA557A\\r\\n\",\"sip_originator_sdp_connect_ip\":\"192.50.111.35\",\"sip_originator_sdp_media_port\":58593,\"sip_originator_sdp_media_type\":\"18 G729/8000\",\"sip_server\":\"OpenSIPS (2.4.11 (x86_64/linux))\",\"sip_responder_sdp_content\":\"v=0\\r\\no=- 4 2 IN IP4 192.50.194.63\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.194.63\\r\\nt=0 0\\r\\nm=audio 35798 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : W3SaU1GA ukx4xt4M 192.50.194.63 35798\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:76939F3CBC6048849EAC851A4D093C9A\\r\\n\",\"sip_responder_sdp_connect_ip\":\"192.50.194.63\",\"sip_responder_sdp_media_port\":35798,\"sip_responder_sdp_media_type\":\"18 G729/8000\",\"sip_duration_s\":4,\"sip_bye\":\"originator\"}"; + String b = "{\"sip_originator_description\":\"\\\"1075\\\"<sip:[email protected]>\",\"sip_responder_description\":\"\\\"1078\\\"<sip:[email protected]>\",\"sip_user_agent\":\"eyeBeam release 1011d stamp 40820\",\"sip_originator_sdp_content\":\"v=0\\r\\no=- 8 2 IN IP4 192.50.111.35\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.111.35\\r\\nt=0 0\\r\\nm=audio 58593 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : 78u1b+Nh KPPh5Azo 192.50.111.35 58593\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:2C43D09FE8BD480FB1645410CFEA557A\\r\\n\",\"sip_originator_sdp_connect_ip\":\"192.50.111.35\",\"sip_originator_sdp_media_port\":58593,\"sip_originator_sdp_media_type\":\"18 G729/8000\",\"sip_server\":\"OpenSIPS (2.4.11 (x86_64/linux))\",\"sip_responder_sdp_content\":\"\"}"; + + + JSONObject jsonObject1 = JSONObject.parseObject(a); + JSONObject jsonObject2 = JSONObject.parseObject(b); + System.out.println(jsonObject1.size()); + System.out.println(jsonObject2.size()); + + + VoipSip voipSip1 = JSONObject.parseObject(a, VoipSip.class, JSONReader.Feature.IgnoreSetNullValue); + VoipSip voipSip2 = JSONObject.parseObject(b, VoipSip.class, JSONReader.Feature.IgnoreSetNullValue); + } +} diff --git a/src/test/java/com/zdjizhi/json/VoipSip.java b/src/test/java/com/zdjizhi/json/VoipSip.java new file mode 100644 index 0000000..9054e19 --- /dev/null +++ b/src/test/java/com/zdjizhi/json/VoipSip.java @@ -0,0 +1,104 @@ +package com.zdjizhi.json; + +/** + * @author qidaijie + * @Package com.zdjizhi.json + * @Description: + * @date 2023/6/717:48 + */ +public class VoipSip { + private String sip_call_id; + private String sip_originator_description; + private String sip_responder_description; + private String sip_user_agent; + private String sip_server; + private String sip_originator_sdp_connect_ip; + private int sip_originator_sdp_media_port; + private String sip_responder_sdp_connect_ip; + private int sip_responder_sdp_media_port; + + public VoipSip(String sip_call_id, String sip_originator_description, String sip_responder_description, String sip_user_agent, String sip_server, String sip_originator_sdp_connect_ip, int sip_originator_sdp_media_port, String sip_responder_sdp_connect_ip, int sip_responder_sdp_media_port) { + this.sip_call_id = sip_call_id; + this.sip_originator_description = sip_originator_description; + this.sip_responder_description = sip_responder_description; + this.sip_user_agent = sip_user_agent; + this.sip_server = sip_server; + this.sip_originator_sdp_connect_ip = sip_originator_sdp_connect_ip; + this.sip_originator_sdp_media_port = sip_originator_sdp_media_port; + this.sip_responder_sdp_connect_ip = sip_responder_sdp_connect_ip; + this.sip_responder_sdp_media_port = sip_responder_sdp_media_port; + } + + public String getSip_call_id() { + return sip_call_id; + } + + public void setSip_call_id(String sip_call_id) { + this.sip_call_id = sip_call_id; + } + + public String getSip_originator_description() { + return sip_originator_description; + } + + public void setSip_originator_description(String sip_originator_description) { + this.sip_originator_description = sip_originator_description; + } + + public String getSip_responder_description() { + return sip_responder_description; + } + + public void setSip_responder_description(String sip_responder_description) { + this.sip_responder_description = sip_responder_description; + } + + public String getSip_user_agent() { + return sip_user_agent; + } + + public void setSip_user_agent(String sip_user_agent) { + this.sip_user_agent = sip_user_agent; + } + + public String getSip_server() { + return sip_server; + } + + public void setSip_server(String sip_server) { + this.sip_server = sip_server; + } + + public String getSip_originator_sdp_connect_ip() { + return sip_originator_sdp_connect_ip; + } + + public void setSip_originator_sdp_connect_ip(String sip_originator_sdp_connect_ip) { + this.sip_originator_sdp_connect_ip = sip_originator_sdp_connect_ip; + } + + public int getSip_originator_sdp_media_port() { + return sip_originator_sdp_media_port; + } + + public void setSip_originator_sdp_media_port(int sip_originator_sdp_media_port) { + this.sip_originator_sdp_media_port = sip_originator_sdp_media_port; + } + + public String getSip_responder_sdp_connect_ip() { + return sip_responder_sdp_connect_ip; + } + + public void setSip_responder_sdp_connect_ip(String sip_responder_sdp_connect_ip) { + this.sip_responder_sdp_connect_ip = sip_responder_sdp_connect_ip; + } + + public int getSip_responder_sdp_media_port() { + return sip_responder_sdp_media_port; + } + + public void setSip_responder_sdp_media_port(int sip_responder_sdp_media_port) { + this.sip_responder_sdp_media_port = sip_responder_sdp_media_port; + } + +} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java deleted file mode 100644 index 52b99e5..0000000 --- a/src/test/java/com/zdjizhi/nacos/NacosTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.zdjizhi.nacos; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import org.junit.Test; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Properties; -import java.util.concurrent.Executor; - - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2022/3/1016:58 - */ -public class NacosTest { - - /** - * <dependency> - * <groupId>com.alibaba.nacos</groupId> - * <artifactId>nacos-client</artifactId> - * <version>1.2.0</version> - * </dependency> - */ - - private static Properties properties = new Properties(); - /** - * config data id = config name - */ - private static final String DATA_ID = "test"; - /** - * config group - */ - private static final String GROUP = "Galaxy"; - - private void getProperties() { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); - properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - } - - - @Test - public void GetConfigurationTest() { - try { - getProperties(); - ConfigService configService = NacosFactory.createConfigService(properties); - String content = configService.getConfig(DATA_ID, GROUP, 5000); - Properties nacosConfigMap = new Properties(); - nacosConfigMap.load(new StringReader(content)); - System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); - } catch (NacosException | IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - @Test - public void ListenerConfigurationTest() { - getProperties(); - try { - //first get config - ConfigService configService = NacosFactory.createConfigService(properties); - String config = configService.getConfig(DATA_ID, GROUP, 5000); - System.out.println(config); - - //start listenner - configService.addListener(DATA_ID, GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - System.out.println(configMsg); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - - //keep running,change nacos config,print new config - while (true) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/src/test/testdata/oneRTPtwoSIP b/src/test/testdata/一条RTP和两条SIP index 994a4bd..994a4bd 100644 --- a/src/test/testdata/oneRTPtwoSIP +++ b/src/test/testdata/一条RTP和两条SIP diff --git a/src/test/testdata/oneRTPUniFlowSIP b/src/test/testdata/一条RTP和两条单向流SIP index 8aa4f24..8aa4f24 100644 --- a/src/test/testdata/oneRTPUniFlowSIP +++ b/src/test/testdata/一条RTP和两条单向流SIP diff --git a/src/test/testdata/oneSIPtwoRTP b/src/test/testdata/一条SIP两条RTP index fea59fd..fea59fd 100644 --- a/src/test/testdata/oneSIPtwoRTP +++ b/src/test/testdata/一条SIP两条RTP diff --git a/src/test/testdata/oneSIPUniFlowRTP b/src/test/testdata/一条SIP和两条单向流RTP index c2422a7..c2422a7 100644 --- a/src/test/testdata/oneSIPUniFlowRTP +++ b/src/test/testdata/一条SIP和两条单向流RTP diff --git a/src/test/testdata/differentVSYS b/src/test/testdata/两条VSYS不同的RTP和一条SIP index 75ff819..75ff819 100644 --- a/src/test/testdata/differentVSYS +++ b/src/test/testdata/两条VSYS不同的RTP和一条SIP diff --git a/src/test/testdata/twoRTPandSIP b/src/test/testdata/各两条SIP和RTP index 7d05757..7d05757 100644 --- a/src/test/testdata/twoRTPandSIP +++ b/src/test/testdata/各两条SIP和RTP diff --git a/src/test/testdata/differentVSYSUniFlowRTP b/src/test/testdata/相同VSYS但RTP为单向流 index 8d3f06f..8d3f06f 100644 --- a/src/test/testdata/differentVSYSUniFlowRTP +++ b/src/test/testdata/相同VSYS但RTP为单向流 |
