summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml13
-rw-r--r--properties/default_config.properties9
-rw-r--r--properties/service_flow_config.properties19
-rw-r--r--src/main/java/com/zdjizhi/common/VoipRelationConfig.java14
-rw-r--r--src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java8
-rw-r--r--src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java11
-rw-r--r--src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java64
-rw-r--r--src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java99
-rw-r--r--src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java106
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java306
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java142
-rw-r--r--src/main/java/com/zdjizhi/tools/json/TypeUtils.java162
-rw-r--r--src/main/java/com/zdjizhi/tools/utils/RelationUtils.java33
-rw-r--r--src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java6
-rw-r--r--src/test/java/com/zdjizhi/EncryptorTest.java1
-rw-r--r--src/test/java/com/zdjizhi/json/JsonPathTest.java79
-rw-r--r--src/test/java/com/zdjizhi/json/JsonTransfromTest.java30
-rw-r--r--src/test/java/com/zdjizhi/json/VoipSip.java104
-rw-r--r--src/test/java/com/zdjizhi/nacos/NacosTest.java100
-rw-r--r--src/test/testdata/一条RTP和两条SIP (renamed from src/test/testdata/oneRTPtwoSIP)0
-rw-r--r--src/test/testdata/一条RTP和两条单向流SIP (renamed from src/test/testdata/oneRTPUniFlowSIP)0
-rw-r--r--src/test/testdata/一条SIP两条RTP (renamed from src/test/testdata/oneSIPtwoRTP)0
-rw-r--r--src/test/testdata/一条SIP和两条单向流RTP (renamed from src/test/testdata/oneSIPUniFlowRTP)0
-rw-r--r--src/test/testdata/两条VSYS不同的RTP和一条SIP (renamed from src/test/testdata/differentVSYS)0
-rw-r--r--src/test/testdata/各两条SIP和RTP (renamed from src/test/testdata/twoRTPandSIP)0
-rw-r--r--src/test/testdata/相同VSYS但RTP为单向流 (renamed from src/test/testdata/differentVSYSUniFlowRTP)0
26 files changed, 295 insertions, 1011 deletions
diff --git a/pom.xml b/pom.xml
index e5435ec..9b4493e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-voip-relation</artifactId>
- <version>220914-VSYS</version>
+ <version>230607-FastJson2</version>
<name>log-stream-voip-relation</name>
<url>http://www.example.com</url>
@@ -16,7 +16,7 @@
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
- <url>http://192.168.40.125:8099/content/groups/public</url>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -39,8 +39,9 @@
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
- <zdjz.tools.version>1.0.8</zdjz.tools.version>
+ <zdjz.tools.version>1.1.3</zdjz.tools.version>
<scope.type>provided</scope.type>
+ <fastjson.version>2.0.32</fastjson.version>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -132,6 +133,12 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index c750148..c562cad 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -39,15 +39,6 @@ kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
-#====================nacos default====================#
-#nacos username
-nacos.username=nacos
-
-#nacos password
-nacos.pin=nacos
-
-#nacos group
-nacos.group=Galaxy
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 9cd2e94..7f80b1c 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,5 +1,4 @@
#--------------------------------地址配置------------------------------#
-
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
@@ -10,25 +9,15 @@ sink.kafka.servers=192.168.44.12:9094
#定位库地址
tools.library=D:\\workerspace\\dat\\
-#--------------------------------nacos配置------------------------------#
-#nacos 地址
-nacos.server=192.168.44.12:8848
-
-#nacos namespace
-nacos.schema.namespace=test
-
-#nacos data id
-nacos.data.id=voip_record.json
-
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=test
+source.kafka.topic=etl-test
#补全数据 输出 topic
-sink.kafka.topic=test-result
+sink.kafka.topic=etl-test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=mytest-1
+group.id=voip-test-230613-1
#--------------------------------topology配置------------------------------#
#map函数并行度
@@ -44,7 +33,7 @@ one.sided.window.time=10
voip.calibration.window.time=30
#voip二次对准时间 seconds
-sec.combine.sr.cache.secs=60
+sec.combine.sr.cache.secs=10
#check ip is Inner network;0 off, 1 on.
check.inner.network=0 \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index 758af0f..0e59021 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -20,20 +20,6 @@ public class VoipRelationConfig {
*/
public static final String CORRELATION_STR = "_";
- public static final String VISIBILITY = "disabled";
- public static final String FORMAT_SPLITTER = ",";
-
-
- /**
- * Nacos
- */
- public static final String NACOS_SERVER = VoipRelationConfigurations.getStringProperty(0, "nacos.server");
- public static final String NACOS_SCHEMA_NAMESPACE = VoipRelationConfigurations.getStringProperty(0, "nacos.schema.namespace");
- public static final String NACOS_DATA_ID = VoipRelationConfigurations.getStringProperty(0, "nacos.data.id");
- public static final String NACOS_PIN = VoipRelationConfigurations.getStringProperty(1, "nacos.pin");
- public static final String NACOS_GROUP = VoipRelationConfigurations.getStringProperty(1, "nacos.group");
- public static final String NACOS_USERNAME = VoipRelationConfigurations.getStringProperty(1, "nacos.username");
-
/**
* System
*/
diff --git a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
index 351bee7..27affa9 100644
--- a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
+++ b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
@@ -1,11 +1,9 @@
package com.zdjizhi.operator.group;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
-import java.util.Map;
/**
* @author qidaijie
@@ -13,9 +11,9 @@ import java.util.Map;
* @Description:
* @date 2022/8/2911:08
*/
-public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, Map<String, Object>, Integer>, Integer> {
+public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, JSONObject, Integer>, Integer> {
@Override
- public Integer getKey(Tuple3<String, Map<String, Object>, Integer> value) throws Exception {
+ public Integer getKey(Tuple3<String, JSONObject, Integer> value) throws Exception {
//vsys id
return value.f2;
}
diff --git a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
index a184566..063ac0a 100644
--- a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
+++ b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
@@ -1,24 +1,19 @@
package com.zdjizhi.operator.group;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
+import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
-import java.util.Map;
-
/**
* @author qidaijie
* @Package com.zdjizhi.operator.group
* @Description:
* @date 2022/8/2911:08
*/
-public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, Map<String, Object>, Integer>, Tuple2<String, Integer>> {
+public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, JSONObject, Integer>, Tuple2<String, Integer>> {
@Override
- public Tuple2<String, Integer> getKey(Tuple4<String, String, Map<String, Object>, Integer> value) throws Exception {
+ public Tuple2<String, Integer> getKey(Tuple4<String, String, JSONObject, Integer> value) throws Exception {
String fourKey = value.f0;
Integer vsysId = value.f3;
diff --git a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
index 3d7ca4b..9fc6981 100644
--- a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
+++ b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
@@ -2,15 +2,13 @@ package com.zdjizhi.operator.parse;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.tools.utils.RelationUtils;
-import com.zdjizhi.tools.json.JsonParseUtil;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import java.util.Map;
/**
* @author qidaijie
@@ -18,42 +16,44 @@ import java.util.Map;
* @Description:
* @date 2022/9/916:21
*/
-public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<String, Object>, Integer>> {
+public class ParseMapFunction implements MapFunction<String, Tuple3<String, JSONObject, Integer>> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple3<String, Map<String, Object>, Integer> map(String input) throws Exception {
+ public Tuple3<String, JSONObject, Integer> map(String input) throws Exception {
try {
if (StringUtil.isNotBlank(input)) {
- Map<String, Object> jsonMap = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
+ JSONObject jsonObject = JSONObject.parseObject(input);
- String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE);
- Integer vsysId = getVsysId(jsonMap);
- String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
+ String commonSchemaType = jsonObject.getString(JsonProConfig.SCHEMA_TYPE);
+ Integer vsysId = getVsysId(jsonObject);
+ String sipCallId = jsonObject.getString(JsonProConfig.SIP_CALL_ID);
//1:c2s,2:s2c;3;double
- int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR);
+ int commonStreamDir = jsonObject.getInteger(JsonProConfig.STREAM_DIR);
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
- if (StringUtil.isNotBlank(sipCallId)) {
- if (RelationUtils.checkSipCompleteness(jsonMap)) {
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- return new Tuple3<>("sip-single", jsonMap, vsysId);
+ switch (commonSchemaType) {
+ case JsonProConfig.SIP_MARK:
+ if (StringUtil.isNotBlank(sipCallId)) {
+ if (RelationUtils.checkSipCompleteness(jsonObject)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ return new Tuple3<>("sip-single", jsonObject, vsysId);
+ } else {
+ return new Tuple3<>("sip-double", jsonObject, vsysId);
+ }
} else {
- return new Tuple3<>("sip-double", jsonMap, vsysId);
+ return new Tuple3<>("violation", jsonObject, vsysId);
}
} else {
- return new Tuple3<>("violation", jsonMap, vsysId);
+ return new Tuple3<>("violation", jsonObject, vsysId);
}
- } else {
- return new Tuple3<>("violation", jsonMap, vsysId);
- }
- } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- if (commonStreamDir == JsonProConfig.DOUBLE) {
- return new Tuple3<>("rtp-double", jsonMap, vsysId);
- } else {
- return new Tuple3<>("rtp-single", jsonMap, vsysId);
- }
+ case JsonProConfig.RTP_MARK:
+ if (commonStreamDir == JsonProConfig.DOUBLE) {
+ return new Tuple3<>("rtp-double", jsonObject, vsysId);
+ } else {
+ return new Tuple3<>("rtp-single", jsonObject, vsysId);
+ }
+ default:
}
}
} catch (RuntimeException e) {
@@ -65,17 +65,11 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<
/**
* 获取VSYS ID若无该字段,则默认值为1
*
- * @param jsonMap 原始日志
+ * @param jsonObject 原始日志
* @return vsysid
*/
- private static int getVsysId(Map<String, Object> jsonMap) {
- Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null);
- if (value != null) {
- return Integer.parseInt(value.toString());
- } else {
- return 1;
- }
-
+ private static int getVsysId(JSONObject jsonObject) {
+ return jsonObject.getIntValue(JsonProConfig.VSYS_ID, 1);
}
diff --git a/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
index cf25763..dba60de 100644
--- a/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
@@ -2,22 +2,19 @@ package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.tools.utils.RelationUtils;
-import com.zdjizhi.tools.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import scala.Int;
import java.util.HashMap;
-import java.util.Map;
/**
* @author qidaijie
@@ -25,38 +22,38 @@ import java.util.Map;
* @Description:
* @date 2022/8/2911:44
*/
-public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, Map<String, Object>, Integer>, Tuple4<String, String, Map<String, Object>, Integer>, Integer, TimeWindow> {
+public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, JSONObject, Integer>, Tuple4<String, String, JSONObject, Integer>, Integer, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
- private static HashMap<String, Map<String, Object>> sipOriHmList = new HashMap<>(32);
+ private static HashMap<String, JSONObject> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
- private static HashMap<String, Map<String, Object>> rtpOriHmList = new HashMap<>(32);
+ private static HashMap<String, JSONObject> rtpOriHmList = new HashMap<>(32);
@Override
- public void process(Integer key, Context context, Iterable<Tuple3<String, Map<String, Object>, Integer>> inputs, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) throws Exception {
- for (Tuple3<String, Map<String, Object>, Integer> input : inputs) {
+ public void process(Integer key, Context context, Iterable<Tuple3<String, JSONObject, Integer>> inputs, Collector<Tuple4<String, String, JSONObject, Integer>> out) {
+ for (Tuple3<String, JSONObject, Integer> input : inputs) {
if (input != null) {
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = input.f0;
- Map<String, Object> jsonMap = input.f1;
+ JSONObject jsonMap = input.f1;
Integer vsysId = input.f2;
- logger.error("" + type + "----" + jsonMap.toString());
switch (type) {
case "sip-double":
separateInnerIp(jsonMap, out, vsysId);
break;
case "rtp-double":
- String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
- JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ case "rtp-single":
+ String rtpDouble4Key = getFourKey(jsonMap.getString(JsonProConfig.CLIENT_IP),
+ jsonMap.getInteger(JsonProConfig.CLIENT_PORT),
+ jsonMap.getString(JsonProConfig.SERVER_IP),
+ jsonMap.getInteger(JsonProConfig.SERVER_PORT));
out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId));
break;
@@ -65,27 +62,17 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
break;
//单向流的SIP
case "sip-single":
- String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
- String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
- JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ String sipCallId = jsonMap.getString(JsonProConfig.SIP_CALL_ID);
+ String sipSingle4Key = getFourKey(jsonMap.getString(JsonProConfig.CLIENT_IP),
+ jsonMap.getInteger(JsonProConfig.CLIENT_PORT),
+ jsonMap.getString(JsonProConfig.SERVER_IP),
+ jsonMap.getInteger(JsonProConfig.SERVER_PORT));
if (StringUtil.isNotBlank(sipCallId)) {
putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out);
} else {
out.collect(new Tuple4<>("", type, jsonMap, vsysId));
}
break;
- //单向流的RTP
- case "rtp-single":
- String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
- JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
- JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
- //对rtp单向流进行关联
- putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out);
- break;
- //内网的SIP
default:
logger.error("type is beyond expectation:" + type);
break;
@@ -95,20 +82,20 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
}
if (sipOriHmList.size() > 0) {
- HashMap<String, Map<String, Object>> tmpSipOriHmList = new HashMap<>(sipOriHmList);
+ HashMap<String, JSONObject> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
- Map<String, Object> sipSingleMsg = tmpSipOriHmList.get(sipKey);
+ JSONObject sipSingleMsg = tmpSipOriHmList.get(sipKey);
//sipKey为sip_call_id,未关联成功的sip是不能使用的
out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0));
}
}
if (rtpOriHmList.size() > 0) {
- HashMap<String, Map<String, Object>> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
+ HashMap<String, JSONObject> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
- Map<String, Object> rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
+ JSONObject rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
//未关联成功的rtp还可以继续关联,因为有四元组
out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0));
}
@@ -120,32 +107,32 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
* 存放key并关联拼接对应Key
*/
@SuppressWarnings("unchecked")
- private static void putKeyAndMsg(Map<String, Object> secondSipOrRtpLog, String hmStrKey, HashMap<String, Map<String, Object>> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) {
+ private static void putKeyAndMsg(JSONObject secondSipOrRtpLog, String hmStrKey, HashMap<String, JSONObject> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, JSONObject, Integer>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
- HashMap<String, Object> jsonCommonMap = new HashMap<>(128);
- Map<String, Object> firstSipOrRtpLog = hashMapStr.remove(hmStrKey);
+ JSONObject jsonCommon = new JSONObject();
+ JSONObject firstSipOrRtpLog = hashMapStr.remove(hmStrKey);
//1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准
- if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
- jsonCommonMap.putAll(secondSipOrRtpLog);
- jsonCommonMap.putAll(firstSipOrRtpLog);
+ if (firstSipOrRtpLog.getIntValue(JsonProConfig.STREAM_DIR) == 1) {
+ jsonCommon.putAll(secondSipOrRtpLog);
+ jsonCommon.putAll(firstSipOrRtpLog);
} else {
- jsonCommonMap.putAll(firstSipOrRtpLog);
- jsonCommonMap.putAll(secondSipOrRtpLog);
+ jsonCommon.putAll(firstSipOrRtpLog);
+ jsonCommon.putAll(secondSipOrRtpLog);
}
- accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap);
- jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
+ accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommon);
+ jsonCommon.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
- separateInnerIp(jsonCommonMap, out, vsysId);
+ separateInnerIp(jsonCommon, out, vsysId);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
- jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
- out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId));
+ jsonCommon.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
+ out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommon, vsysId));
}
} else {
hashMapStr.put(hmStrKey, secondSipOrRtpLog);
@@ -155,12 +142,12 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
- private static void separateInnerIp(Map<String, Object> jsonMap, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out, Integer vsysid) {
+ private static void separateInnerIp(JSONObject jsonMap, Collector<Tuple4<String, String, JSONObject, Integer>> out, Integer vsysid) {
- String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP);
- int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT);
- String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP);
- int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT);
+ String sipOriginatorIp = jsonMap.getString(JsonProConfig.SIP_ORIGINATOR_IP);
+ int sipOriginatorPort = jsonMap.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT);
+ String sipResponderIp = jsonMap.getString(JsonProConfig.SIP_RESPONDER_IP);
+ int sipResponderPort = jsonMap.getInteger(JsonProConfig.SIP_RESPONDER_PORT);
if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) {
/*
@@ -277,7 +264,7 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
* @param secondSipOrRtpLog 第二条单向流日志
* @param sipOrRtpCombin SIP双向流日志集合
*/
- private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) {
+ private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secondSipOrRtpLog, JSONObject sipOrRtpCombin) {
//common_sessions
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
@@ -309,10 +296,10 @@ public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<Str
* @param secendSipOrRtpLog 第二个单向流日志
* @return 文件路径
*/
- private static String setRtpPacpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
+ private static String setRtpPacpPath(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog) {
- String firstPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
- String secondPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
+ String firstPcapPath = firstSipOrRtpLog.getString(JsonProConfig.RTP_PCAP_PATH);
+ String secondPcapPath = secendSipOrRtpLog.getString(JsonProConfig.RTP_PCAP_PATH);
if (StringUtil.isNotBlank(firstPcapPath) && StringUtil.isNotBlank(secondPcapPath)) {
if (firstPcapPath.equals(secondPcapPath)) {
diff --git a/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
index 9929212..946e946 100644
--- a/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
@@ -2,12 +2,10 @@ package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.tools.utils.RelationUtils;
-import com.zdjizhi.tools.json.JsonParseUtil;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -22,7 +20,7 @@ import java.util.*;
* @Description:
* @date 2022/9/617:46
*/
-public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, Map<String, Object>, Integer>, String, Tuple2<String, Integer>, TimeWindow> {
+public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, JSONObject, Integer>, String, Tuple2<String, Integer>, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -32,7 +30,7 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
- private static HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList = new HashMap<>(32);
+ private static HashMap<String, LinkedList<JSONObject>> combineSRHmList = new HashMap<>(32);
/**
* 二次关联用HashMap
@@ -41,33 +39,33 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
- private static HashMap<String, LinkedList<Map<String, Object>>> secCombineSRHmList = new HashMap<>(32);
+ private static HashMap<String, LinkedList<JSONObject>> secCombineSRHmList = new HashMap<>(32);
@Override
- public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, Map<String, Object>, Integer>> input, Collector<String> out) throws Exception {
- for (Tuple4<String, String, Map<String, Object>, Integer> tuple : input) {
+ public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, JSONObject, Integer>> input, Collector<String> out) {
+ for (Tuple4<String, String, JSONObject, Integer> tuple : input) {
//拼接的四元组
String fourKey = tuple.f0;
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = tuple.f1;
- Map<String, Object> jsonMap = tuple.f2;
- logger.error("" + fourKey + "----" + type);
+ JSONObject jsonMap = tuple.f2;
+
switch (type) {
//单向流对准后的SIP
case "sip-double":
- //单向流对准后的RTP
+ //双向流RTP
case "rtp-double":
+ //单向流RTP
+ case "rtp-single":
putKeyAndMsg(jsonMap, fourKey, combineSRHmList);
break;
//单向流的SIP
case "sip-single":
- //单向流的RTP
- case "rtp-single":
//内网的SIP
case "sip-in":
//违规的日志
case "violation":
- out.collect(JsonMapper.toJsonString(jsonMap));
+ out.collect(jsonMap.toJSONString());
break;
default:
logger.error("type is beyond expectation:" + type);
@@ -86,14 +84,14 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
/**
* 存放key并添加对应List
*/
- private static void putKeyAndMsg(Map<String, Object> message, String fourStrKey, HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList) {
+ private static void putKeyAndMsg(JSONObject jsonObject, String fourStrKey, HashMap<String, LinkedList<JSONObject>> combineSRHmList) {
if (combineSRHmList.containsKey(fourStrKey)) {
- LinkedList<Map<String, Object>> tmpList = combineSRHmList.get(fourStrKey);
- tmpList.add(message);
+ LinkedList<JSONObject> tmpList = combineSRHmList.get(fourStrKey);
+ tmpList.add(jsonObject);
combineSRHmList.put(fourStrKey, tmpList);
} else {
- LinkedList<Map<String, Object>> tmpList = new LinkedList<>();
- tmpList.add(message);
+ LinkedList<JSONObject> tmpList = new LinkedList<>();
+ tmpList.add(jsonObject);
combineSRHmList.put(fourStrKey, tmpList);
}
}
@@ -104,24 +102,24 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
* @param combineHmList
*/
@SuppressWarnings("unchecked")
- private void tickCombineHmList(HashMap<String, LinkedList<Map<String, Object>>> combineHmList, Collector<String> output) {
+ private void tickCombineHmList(HashMap<String, LinkedList<JSONObject>> combineHmList, Collector<String> output) {
if (combineHmList.size() > 0) {
long nowTime = System.currentTimeMillis() / 1000;
- HashMap<String, LinkedList<Map<String, Object>>> tempCombineSRhmList = new HashMap<>(combineHmList);
+ HashMap<String, LinkedList<JSONObject>> tempCombineSRhmList = new HashMap<>(combineHmList);
combineHmList.clear();
for (String fourStrKey : tempCombineSRhmList.keySet()) {
- LinkedList<Map<String, Object>> tempCombineSRList = tempCombineSRhmList.get(fourStrKey);
+ LinkedList<JSONObject> tempCombineSRList = tempCombineSRhmList.get(fourStrKey);
//包含SIP和RTP,集合大于1则可能是sip和rtp,多条sip或多条rtp
int listSize = tempCombineSRList.size();
if (listSize > 1) {
- List<Map<String, Object>> sipBeanArr = new ArrayList<>();
- List<Map<String, Object>> rtpBeanArr = new ArrayList<>();
+ List<JSONObject> sipBeanArr = new ArrayList<>();
+ List<JSONObject> rtpBeanArr = new ArrayList<>();
- for (Map<String, Object> tempSipOrRtpLog : tempCombineSRList) {
- String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
+ for (JSONObject tempSipOrRtpLog : tempCombineSRList) {
+ String schemaType = tempSipOrRtpLog.getString(JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(schemaType)) {
sipBeanArr.add(tempSipOrRtpLog);
} else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
@@ -132,19 +130,19 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
int sipSize = sipBeanArr.size();
//只允许一对多的情况,其余视为异常数据
if (rtpSize == 1 && sipSize >= 1) {
- for (Map<String, Object> voIpLog : sipBeanArr) {
- Map<String, Object> rtpLog = rtpBeanArr.get(0);
+ for (JSONObject voIpLog : sipBeanArr) {
+ JSONObject rtpLog = rtpBeanArr.get(0);
accumulateVoipMsg(voIpLog, rtpLog);
//四元组,voip,关联后的数据
output.collect(mergeJson(voIpLog, rtpLog));
}
} else if (sipSize == 1 && rtpSize >= 1) {
- for (Map<String, Object> rtpLog : rtpBeanArr) {
- HashMap<String, Object> voIpLog = new HashMap<>(128);
- voIpLog.putAll(sipBeanArr.get(0));
- accumulateVoipMsg(voIpLog, rtpLog);
+ for (JSONObject rtpLog : rtpBeanArr) {
+ JSONObject voipLog = new JSONObject();
+ voipLog.putAll(sipBeanArr.get(0));
+ accumulateVoipMsg(voipLog, rtpLog);
//四元组,voip,关联后的数据
- output.collect(mergeJson(voIpLog, rtpLog));
+ output.collect(mergeJson(voipLog, rtpLog));
}
} else {
logger.error("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
@@ -153,14 +151,14 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
}
} else {
- Map<String, Object> voIpLog = tempCombineSRList.get(0);
- long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME);
+ JSONObject voipLog = tempCombineSRList.get(0);
+ long commonEndTime = voipLog.getLong(JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
logger.warn("当前日志时间未超过二次对准时间,进入队列等待再次对准,日志时间差:" + intervalTime);
- putKeyAndMsg(voIpLog, fourStrKey, secCombineSRHmList);
+ putKeyAndMsg(voipLog, fourStrKey, secCombineSRHmList);
} else {
- output.collect(JsonMapper.toJsonString(voIpLog));
+ output.collect(voipLog.toJSONString());
}
}
}
@@ -173,7 +171,7 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
* @param voIpLog 融合后voip日志
* @param rtpLog RTP日志
*/
- private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
+ private void accumulateVoipMsg(JSONObject voIpLog, JSONObject rtpLog) {
//common_sessions
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
@@ -201,13 +199,13 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
/**
* 定时处理中List元素数仅为1的情况
*/
- private void sendDirectlyOneElement(String msg, Map<String, Object> voIpLog, Collector<String> output) {
+ private void sendDirectlyOneElement(String msg, JSONObject voIpLog, Collector<String> output) {
//四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据
- String commonSchemaType = JsonParseUtil.getString(voIpLog, JsonProConfig.SCHEMA_TYPE);
+ String commonSchemaType = voIpLog.getString(JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
output.collect(msg);
} else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- int commonStreamDir = JsonParseUtil.getInteger(voIpLog, JsonProConfig.STREAM_DIR);
+ int commonStreamDir = voIpLog.getInteger(JsonProConfig.STREAM_DIR);
if (commonStreamDir != JsonProConfig.DOUBLE) {
output.collect(msg);
} else {
@@ -219,10 +217,10 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
/**
* 发送不符合逻辑的日志到kafka
*/
- private static void sendViolationLogs(List<Map<String, Object>> violationLogs, Collector<String> output) {
+ private static void sendViolationLogs(List<JSONObject> violationLogs, Collector<String> output) {
if (violationLogs.size() > 0) {
- for (Map<String, Object> log : violationLogs) {
- output.collect(JsonMapper.toJsonString(log));
+ for (JSONObject log : violationLogs) {
+ output.collect(log.toJSONString());
}
}
}
@@ -230,19 +228,19 @@ public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<
/**
* 发送VOIP日志到Kafka
*/
- private static String mergeJson(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
+ private static String mergeJson(JSONObject voIpLog, JSONObject rtpLog) {
- int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
- int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
- String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
+ int rtpPayloadTypeC2s = rtpLog.getIntValue(JsonProConfig.RTP_PAYLOAD_TYPE_C2S, 0);
+ int rtpPayloadTypeS2c = rtpLog.getIntValue(JsonProConfig.RTP_PAYLOAD_TYPE_S2C, 0);
+ String rtpPcapPath = rtpLog.getString(JsonProConfig.RTP_PCAP_PATH);
- JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog));
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
- JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
+ voIpLog.put(JsonProConfig.SCHEMA_TYPE, "VoIP");
+ voIpLog.put(JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog));
+ voIpLog.put(JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
+ voIpLog.put(JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
+ voIpLog.put(JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
- return JsonMapper.toJsonString(voIpLog);
+ return voIpLog.toJSONString();
}
}
diff --git a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
deleted file mode 100644
index 4413754..0000000
--- a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package com.zdjizhi.tools.json;
-
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.config.ConfigService;
-
-import com.alibaba.nacos.api.exception.NacosException;
-import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import net.sf.cglib.beans.BeanGenerator;
-import net.sf.cglib.beans.BeanMap;
-
-import java.util.*;
-import java.util.concurrent.Executor;
-
-/**
- * 使用FastJson解析json的工具类
- *
- * @author qidaijie
- */
-public class JsonParseUtil {
-
- private static final Log logger = LogFactory.get();
- private static Properties propNacos = new Properties();
-
- /**
- * 获取需要删除字段的列表
- */
- private static ArrayList<String> dropList = new ArrayList<>();
-
-
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> jsonFieldsMap;
-
-
- static {
- propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
- propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
- propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
- propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
- try {
- ConfigService configService = NacosFactory.createConfigService(propNacos);
- String dataId = VoipRelationConfig.NACOS_DATA_ID;
- String group = VoipRelationConfig.NACOS_GROUP;
- String schema = configService.getConfig(dataId, group, 5000);
- if (StringUtil.isNotBlank(schema)) {
- jsonFieldsMap = getFieldsFromSchema(schema);
- }
- configService.addListener(dataId, group, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configMsg) {
- if (StringUtil.isNotBlank(configMsg)) {
- jsonFieldsMap = getFieldsFromSchema(configMsg);
- }
- }
- });
- } catch (NacosException e) {
- logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
- }
- }
-
- /**
- * 模式匹配,给定一个类型字符串返回一个类类型
- *
- * @param type 类型
- * @return 类类型
- */
-
- public static Class getClassName(String type) {
- Class clazz;
-
- switch (type) {
- case "int":
- clazz = Integer.class;
- break;
- case "string":
- clazz = String.class;
- break;
- case "long":
- clazz = long.class;
- break;
- case "array":
- clazz = List.class;
- break;
- case "double":
- clazz = double.class;
- break;
- case "float":
- clazz = float.class;
- break;
- case "char":
- clazz = char.class;
- break;
- case "byte":
- clazz = byte.class;
- break;
- case "boolean":
- clazz = boolean.class;
- break;
- case "short":
- clazz = short.class;
- break;
- default:
- clazz = String.class;
- }
- return clazz;
- }
-
- /**
- * 类型转换
- *
- * @param jsonMap 原始日志map
- */
- public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
- JsonParseUtil.dropJsonField(jsonMap);
- HashMap<String, Object> tmpMap = new HashMap<>(192);
- for (String key : jsonMap.keySet()) {
- if (jsonFieldsMap.containsKey(key)) {
- String simpleName = jsonFieldsMap.get(key).getSimpleName();
- switch (simpleName) {
- case "String":
- tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
- break;
- case "Integer":
- tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
- break;
- case "long":
- tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
- break;
- case "List":
- tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
- break;
- case "Map":
- tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
- break;
- case "double":
- tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
- break;
- default:
- tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
- }
- }
- }
- return tmpMap;
- }
-
- /**
- * 获取属性值的方法
- *
- * @param jsonMap 原始日志
- * @param property key
- * @return 属性的值
- */
- public static Object getValue(Map<String, Object> jsonMap, String property) {
- try {
- return jsonMap.getOrDefault(property, null);
- } catch (RuntimeException e) {
- logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
- return null;
- }
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @return Long value
- */
- public static Long getLong(Map<String, Object> jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- /**
- * int 类型检验转换方法,若为空返回基础值
- *
- * @return int value
- */
- public static int getInteger(Map<String, Object> jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- Integer intVal = TypeUtils.castToInt(value);
-
- if (intVal == null) {
- return 0;
- }
-
- return intVal;
- }
-
- public static String getString(Map<String, Object> jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * 更新属性值的方法
- *
- * @param jsonMap 原始日志json parse
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
- try {
- jsonMap.put(property, value);
- } catch (RuntimeException e) {
- logger.error("赋予实体类错误类型数据", e);
- }
- }
-
- /**
- * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
- * <p>
- * // * @param http 网关schema地址
- *
- * @return 用于反射生成schema类型的对象的一个map集合
- */
- private static HashMap<String, Class> getFieldsFromSchema(String schema) {
- HashMap<String, Class> map = new HashMap<>(16);
-
- //获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(schema);
- JSONArray fields = (JSONArray) schemaJson.get("fields");
-
- for (Object field : fields) {
- String filedStr = field.toString();
- if (checkKeepField(filedStr)) {
- String name = JsonPath.read(filedStr, "$.name").toString();
- String type = JsonPath.read(filedStr, "$.type").toString();
- if (type.contains("{")) {
- type = JsonPath.read(filedStr, "$.type.type").toString();
- }
- //组合用来生成实体类的map
- map.put(name, getClassName(type));
- } else {
- dropList.add(filedStr);
- }
- }
- return map;
- }
-
-
- /**
- * 判断字段是否需要保留
- *
- * @param message 单个field-json
- * @return true or false
- */
- private static boolean checkKeepField(String message) {
- boolean isKeepField = true;
- boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
- if (isHiveDoc) {
- boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
- if (isHiveVi) {
- String visibility = JsonPath.read(message, "$.doc.visibility").toString();
- if (VoipRelationConfig.VISIBILITY.equals(visibility)) {
- isKeepField = false;
- }
- }
- }
- return isKeepField;
- }
-
- /**
- * 删除schema内指定的无效字段(jackson)
- *
- * @param jsonMap
- */
- private static void dropJsonField(Map<String, Object> jsonMap) {
- for (String field : dropList) {
- jsonMap.remove(field);
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
deleted file mode 100644
index 3e56b84..0000000
--- a/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package com.zdjizhi.tools.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.tools.exception.VoipRelationException;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1217:34
- */
-class JsonTypeUtil {
- private static final Log logger = LogFactory.get();
- /**
- * String 类型检验转换方法
- *
- * @param value json value
- * @return String value
- */
- static String checkString(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- static Map checkObject(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return (Map) value;
- }
-
- throw new VoipRelationException("can not cast to parse, value : " + value);
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- static List checkArray(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof List) {
- return (List) value;
- }
-
- throw new VoipRelationException("can not cast to List, value : " + value);
- }
-
- private static Long checkLong(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToLong(value);
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return Long value
- */
- static long checkLongValue(Object value) {
-
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- /**
- * Double 类型校验转换方法
- *
- * @param value json value
- * @return Double value
- */
- static Double checkDouble(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToDouble(value);
- }
-
-
- private static Integer checkInt(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToInt(value);
- }
-
-
- /**
- * int 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return int value
- */
- static int getIntValue(Object value) {
-
- Integer intVal = TypeUtils.castToInt(value);
-
- if (intVal == null) {
- return 0;
- }
- return intVal;
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/tools/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
deleted file mode 100644
index c2fb497..0000000
--- a/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.zdjizhi.tools.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.tools.exception.VoipRelationException;
-
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1218:20
- */
-public class TypeUtils {
- private static final Log logger = LogFactory.get();
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- public static Object castToIfFunction(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof String) {
- return value.toString();
- }
-
- if (value instanceof Integer) {
- return ((Number) value).intValue();
- }
-
- if (value instanceof Long) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new VoipRelationException("can not cast to int, value : " + value);
- }
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- static Integer castToInt(Object value) {
-
- if (value == null) {
- return null;
- }
-
- if (value instanceof Integer) {
- return (Integer) value;
- }
-
- if (value instanceof String) {
- String strVal = (String) value;
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Integer.parseInt(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Integer Error,The error Str is:" + strVal);
- }
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new VoipRelationException("can not cast to int, value : " + value);
- }
-
- /**
- * Double类型判断方法
- *
- * @param value json value
- * @return double value or null
- */
- static Double castToDouble(Object value) {
-
- if (value instanceof Double) {
- return (Double) value;
- }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Double.parseDouble(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Double Error,The error Str is:" + strVal);
- }
- }
-
- throw new VoipRelationException("can not cast to double, value : " + value);
- }
-
- /**
- * Long类型判断方法
- *
- * @param value json value
- * @return (Long)value or null
- */
- static Long castToLong(Object value) {
- if (value == null) {
- return null;
- }
-
-// 此判断数值超范围不抛出异常,会截取成对应类型数值
- if (value instanceof Number) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Long.parseLong(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Long Error,The error Str is:" + strVal);
- }
- }
-
- throw new VoipRelationException("can not cast to long, value : " + value);
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java
index 1498fd9..9176a5f 100644
--- a/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java
+++ b/src/main/java/com/zdjizhi/tools/utils/RelationUtils.java
@@ -1,12 +1,11 @@
package com.zdjizhi.tools.utils;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.tools.json.JsonParseUtil;
-import java.util.Map;
/**
* @author qidaijie
@@ -24,13 +23,11 @@ public class RelationUtils {
* @param secondLog B日志
* @param key 指标 json key
*/
- public static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
- Long firstMetric = JsonParseUtil.getLong(firstLog, key);
- Long secondMetric = JsonParseUtil.getLong(secondLog, key);
+ public static void metricSum(JSONObject firstLog, JSONObject secondLog, String key) {
+ Long firstMetric = firstLog.getLongValue(key, 0L);
+ Long secondMetric = secondLog.getLongValue(key, 0L);
- Long sum = firstMetric + secondMetric;
-
- JsonParseUtil.setValue(firstLog, key, sum);
+ firstLog.put(key, (firstMetric + secondMetric));
}
@@ -42,13 +39,11 @@ public class RelationUtils {
* @param otherLog C日志
* @param key 指标 json key
*/
- public static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
- Long firstMetric = JsonParseUtil.getLong(firstLog, key);
- Long secondMetric = JsonParseUtil.getLong(secondLog, key);
-
- Long sum = firstMetric + secondMetric;
+ public static void metricSumSetOtherLog(JSONObject firstLog, JSONObject secondLog, JSONObject otherLog, String key) {
+ Long firstMetric = firstLog.getLongValue(key, 0L);
+ Long secondMetric = secondLog.getLongValue(key, 0L);
- JsonParseUtil.setValue(otherLog, key, sum);
+ otherLog.put(key, (firstMetric + secondMetric));
}
@@ -58,7 +53,7 @@ public class RelationUtils {
* @param sipLog SIP日志
* @return true or false
*/
- public static boolean checkSipCompleteness(Map<String, Object> sipLog) {
+ public static boolean checkSipCompleteness(JSONObject sipLog) {
return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
@@ -117,11 +112,11 @@ public class RelationUtils {
* @param voIpLog 融合后VOIP日志
* @return 方向 0:未知 1:c2s 2:s2c
*/
- public static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
+ public static int judgeDirection(JSONObject rtpLog, JSONObject voIpLog) {
- String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
- String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
+ String ip = rtpLog.getString(JsonProConfig.CLIENT_IP);
+ String sipOriginatorIp = voIpLog.getString(JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = voIpLog.getString(JsonProConfig.SIP_RESPONDER_IP);
if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
return 1;
diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
index 95fbfc1..8ca7c06 100644
--- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
+++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
@@ -2,6 +2,7 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.operator.filter.NullFilterFunction;
import com.zdjizhi.operator.group.MergeUniFlowKeyByFunction;
@@ -18,8 +19,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-import java.util.Map;
-
/**
* @author qidaijie
@@ -35,8 +34,7 @@ public class VoIpRelationTopology {
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
-
- SingleOutputStreamOperator<Tuple4<String, String, Map<String, Object>, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction())
+ SingleOutputStreamOperator<Tuple4<String, String, JSONObject, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction())
.map(new ParseMapFunction())
.keyBy(new MergeUniFlowKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java
index 170086c..194369d 100644
--- a/src/test/java/com/zdjizhi/EncryptorTest.java
+++ b/src/test/java/com/zdjizhi/EncryptorTest.java
@@ -32,4 +32,5 @@ public class EncryptorTest {
System.out.println("The pin is: "+rawUser);
}
+
}
diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java
deleted file mode 100644
index 46f6f85..0000000
--- a/src/test/java/com/zdjizhi/json/JsonPathTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.zdjizhi.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.json
- * @Description:
- * @date 2022/3/2410:22
- */
-public class JsonPathTest {
- private static final Log logger = LogFactory.get();
-
- private static Properties propNacos = new Properties();
-
- /**
- * 获取需要删除字段的列表
- */
- private static ArrayList<String> dropList = new ArrayList<>();
-
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> map;
-
- /**
- * 获取任务列表
- * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
- * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
- */
- private static ArrayList<String[]> jobList;
-
- private static String schema;
-
- static {
- propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
- propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
- propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
- propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
- try {
- ConfigService configService = NacosFactory.createConfigService(propNacos);
- String dataId = VoipRelationConfig.NACOS_DATA_ID;
- String group = VoipRelationConfig.NACOS_GROUP;
- String config = configService.getConfig(dataId, group, 5000);
- if (StringUtil.isNotBlank(config)) {
- schema = config;
- }
- } catch (NacosException e) {
- logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
- }
- }
-
- @Test
- public void parseSchemaGetFields() {
- DocumentContext parse = JsonPath.parse(schema);
- List<Object> fields = parse.read("$.fields[*]");
- for (Object field : fields) {
- String name = JsonPath.read(field, "$.name").toString();
- String type = JsonPath.read(field, "$.type").toString();
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/json/JsonTransfromTest.java b/src/test/java/com/zdjizhi/json/JsonTransfromTest.java
new file mode 100644
index 0000000..889f41b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/JsonTransfromTest.java
@@ -0,0 +1,30 @@
+package com.zdjizhi.json;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONReader;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.json
+ * @Description:
+ * @date 2023/6/717:38
+ */
+public class JsonTransfromTest {
+
+ @Test
+ public void jsonSizeTest() {
+ String a = "{\"sip_call_id\":\"NjI2NzU4YWZiNTE4YzNkNzAxZGVlMzNhMTMwNGRhMWQ.\",\"sip_originator_description\":\"\\\"1075\\\"<sip:[email protected]>\",\"sip_responder_description\":\"\\\"1078\\\"<sip:[email protected]>\",\"sip_user_agent\":\"eyeBeam release 1011d stamp 40820\",\"sip_originator_sdp_content\":\"v=0\\r\\no=- 8 2 IN IP4 192.50.111.35\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.111.35\\r\\nt=0 0\\r\\nm=audio 58593 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : 78u1b+Nh KPPh5Azo 192.50.111.35 58593\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:2C43D09FE8BD480FB1645410CFEA557A\\r\\n\",\"sip_originator_sdp_connect_ip\":\"192.50.111.35\",\"sip_originator_sdp_media_port\":58593,\"sip_originator_sdp_media_type\":\"18 G729/8000\",\"sip_server\":\"OpenSIPS (2.4.11 (x86_64/linux))\",\"sip_responder_sdp_content\":\"v=0\\r\\no=- 4 2 IN IP4 192.50.194.63\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.194.63\\r\\nt=0 0\\r\\nm=audio 35798 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : W3SaU1GA ukx4xt4M 192.50.194.63 35798\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:76939F3CBC6048849EAC851A4D093C9A\\r\\n\",\"sip_responder_sdp_connect_ip\":\"192.50.194.63\",\"sip_responder_sdp_media_port\":35798,\"sip_responder_sdp_media_type\":\"18 G729/8000\",\"sip_duration_s\":4,\"sip_bye\":\"originator\"}";
+ String b = "{\"sip_originator_description\":\"\\\"1075\\\"<sip:[email protected]>\",\"sip_responder_description\":\"\\\"1078\\\"<sip:[email protected]>\",\"sip_user_agent\":\"eyeBeam release 1011d stamp 40820\",\"sip_originator_sdp_content\":\"v=0\\r\\no=- 8 2 IN IP4 192.50.111.35\\r\\ns=CounterPath eyeBeam 1.5\\r\\nc=IN IP4 192.50.111.35\\r\\nt=0 0\\r\\nm=audio 58593 RTP/AVP 0 8 18 101\\r\\na=alt:1 1 : 78u1b+Nh KPPh5Azo 192.50.111.35 58593\\r\\na=fmtp:18 annexb=no\\r\\na=fmtp:101 0-15\\r\\na=rtpmap:18 G729/8000\\r\\na=rtpmap:101 telephone-event/8000\\r\\na=sendrecv\\r\\na=x-rtp-session-id:2C43D09FE8BD480FB1645410CFEA557A\\r\\n\",\"sip_originator_sdp_connect_ip\":\"192.50.111.35\",\"sip_originator_sdp_media_port\":58593,\"sip_originator_sdp_media_type\":\"18 G729/8000\",\"sip_server\":\"OpenSIPS (2.4.11 (x86_64/linux))\",\"sip_responder_sdp_content\":\"\"}";
+
+
+ JSONObject jsonObject1 = JSONObject.parseObject(a);
+ JSONObject jsonObject2 = JSONObject.parseObject(b);
+ System.out.println(jsonObject1.size());
+ System.out.println(jsonObject2.size());
+
+
+ VoipSip voipSip1 = JSONObject.parseObject(a, VoipSip.class, JSONReader.Feature.IgnoreSetNullValue);
+ VoipSip voipSip2 = JSONObject.parseObject(b, VoipSip.class, JSONReader.Feature.IgnoreSetNullValue);
+ }
+}
diff --git a/src/test/java/com/zdjizhi/json/VoipSip.java b/src/test/java/com/zdjizhi/json/VoipSip.java
new file mode 100644
index 0000000..9054e19
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/VoipSip.java
@@ -0,0 +1,104 @@
+package com.zdjizhi.json;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.json
+ * @Description:
+ * @date 2023/6/717:48
+ */
+public class VoipSip {
+ private String sip_call_id;
+ private String sip_originator_description;
+ private String sip_responder_description;
+ private String sip_user_agent;
+ private String sip_server;
+ private String sip_originator_sdp_connect_ip;
+ private int sip_originator_sdp_media_port;
+ private String sip_responder_sdp_connect_ip;
+ private int sip_responder_sdp_media_port;
+
+ public VoipSip(String sip_call_id, String sip_originator_description, String sip_responder_description, String sip_user_agent, String sip_server, String sip_originator_sdp_connect_ip, int sip_originator_sdp_media_port, String sip_responder_sdp_connect_ip, int sip_responder_sdp_media_port) {
+ this.sip_call_id = sip_call_id;
+ this.sip_originator_description = sip_originator_description;
+ this.sip_responder_description = sip_responder_description;
+ this.sip_user_agent = sip_user_agent;
+ this.sip_server = sip_server;
+ this.sip_originator_sdp_connect_ip = sip_originator_sdp_connect_ip;
+ this.sip_originator_sdp_media_port = sip_originator_sdp_media_port;
+ this.sip_responder_sdp_connect_ip = sip_responder_sdp_connect_ip;
+ this.sip_responder_sdp_media_port = sip_responder_sdp_media_port;
+ }
+
+ public String getSip_call_id() {
+ return sip_call_id;
+ }
+
+ public void setSip_call_id(String sip_call_id) {
+ this.sip_call_id = sip_call_id;
+ }
+
+ public String getSip_originator_description() {
+ return sip_originator_description;
+ }
+
+ public void setSip_originator_description(String sip_originator_description) {
+ this.sip_originator_description = sip_originator_description;
+ }
+
+ public String getSip_responder_description() {
+ return sip_responder_description;
+ }
+
+ public void setSip_responder_description(String sip_responder_description) {
+ this.sip_responder_description = sip_responder_description;
+ }
+
+ public String getSip_user_agent() {
+ return sip_user_agent;
+ }
+
+ public void setSip_user_agent(String sip_user_agent) {
+ this.sip_user_agent = sip_user_agent;
+ }
+
+ public String getSip_server() {
+ return sip_server;
+ }
+
+ public void setSip_server(String sip_server) {
+ this.sip_server = sip_server;
+ }
+
+ public String getSip_originator_sdp_connect_ip() {
+ return sip_originator_sdp_connect_ip;
+ }
+
+ public void setSip_originator_sdp_connect_ip(String sip_originator_sdp_connect_ip) {
+ this.sip_originator_sdp_connect_ip = sip_originator_sdp_connect_ip;
+ }
+
+ public int getSip_originator_sdp_media_port() {
+ return sip_originator_sdp_media_port;
+ }
+
+ public void setSip_originator_sdp_media_port(int sip_originator_sdp_media_port) {
+ this.sip_originator_sdp_media_port = sip_originator_sdp_media_port;
+ }
+
+ public String getSip_responder_sdp_connect_ip() {
+ return sip_responder_sdp_connect_ip;
+ }
+
+ public void setSip_responder_sdp_connect_ip(String sip_responder_sdp_connect_ip) {
+ this.sip_responder_sdp_connect_ip = sip_responder_sdp_connect_ip;
+ }
+
+ public int getSip_responder_sdp_media_port() {
+ return sip_responder_sdp_media_port;
+ }
+
+ public void setSip_responder_sdp_media_port(int sip_responder_sdp_media_port) {
+ this.sip_responder_sdp_media_port = sip_responder_sdp_media_port;
+ }
+
+}
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
deleted file mode 100644
index 52b99e5..0000000
--- a/src/test/java/com/zdjizhi/nacos/NacosTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.zdjizhi.nacos;
-
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2022/3/1016:58
- */
-public class NacosTest {
-
- /**
- * <dependency>
- * <groupId>com.alibaba.nacos</groupId>
- * <artifactId>nacos-client</artifactId>
- * <version>1.2.0</version>
- * </dependency>
- */
-
- private static Properties properties = new Properties();
- /**
- * config data id = config name
- */
- private static final String DATA_ID = "test";
- /**
- * config group
- */
- private static final String GROUP = "Galaxy";
-
- private void getProperties() {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
- properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
- properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
- }
-
-
- @Test
- public void GetConfigurationTest() {
- try {
- getProperties();
- ConfigService configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig(DATA_ID, GROUP, 5000);
- Properties nacosConfigMap = new Properties();
- nacosConfigMap.load(new StringReader(content));
- System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
- } catch (NacosException | IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- @Test
- public void ListenerConfigurationTest() {
- getProperties();
- try {
- //first get config
- ConfigService configService = NacosFactory.createConfigService(properties);
- String config = configService.getConfig(DATA_ID, GROUP, 5000);
- System.out.println(config);
-
- //start listenner
- configService.addListener(DATA_ID, GROUP, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configMsg) {
- System.out.println(configMsg);
- }
- });
- } catch (NacosException e) {
- e.printStackTrace();
- }
-
- //keep running,change nacos config,print new config
- while (true) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/src/test/testdata/oneRTPtwoSIP b/src/test/testdata/一条RTP和两条SIP
index 994a4bd..994a4bd 100644
--- a/src/test/testdata/oneRTPtwoSIP
+++ b/src/test/testdata/一条RTP和两条SIP
diff --git a/src/test/testdata/oneRTPUniFlowSIP b/src/test/testdata/一条RTP和两条单向流SIP
index 8aa4f24..8aa4f24 100644
--- a/src/test/testdata/oneRTPUniFlowSIP
+++ b/src/test/testdata/一条RTP和两条单向流SIP
diff --git a/src/test/testdata/oneSIPtwoRTP b/src/test/testdata/一条SIP两条RTP
index fea59fd..fea59fd 100644
--- a/src/test/testdata/oneSIPtwoRTP
+++ b/src/test/testdata/一条SIP两条RTP
diff --git a/src/test/testdata/oneSIPUniFlowRTP b/src/test/testdata/一条SIP和两条单向流RTP
index c2422a7..c2422a7 100644
--- a/src/test/testdata/oneSIPUniFlowRTP
+++ b/src/test/testdata/一条SIP和两条单向流RTP
diff --git a/src/test/testdata/differentVSYS b/src/test/testdata/两条VSYS不同的RTP和一条SIP
index 75ff819..75ff819 100644
--- a/src/test/testdata/differentVSYS
+++ b/src/test/testdata/两条VSYS不同的RTP和一条SIP
diff --git a/src/test/testdata/twoRTPandSIP b/src/test/testdata/各两条SIP和RTP
index 7d05757..7d05757 100644
--- a/src/test/testdata/twoRTPandSIP
+++ b/src/test/testdata/各两条SIP和RTP
diff --git a/src/test/testdata/differentVSYSUniFlowRTP b/src/test/testdata/相同VSYS但RTP为单向流
index 8d3f06f..8d3f06f 100644
--- a/src/test/testdata/differentVSYSUniFlowRTP
+++ b/src/test/testdata/相同VSYS但RTP为单向流