summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-04-19 14:16:38 +0800
committerqidaijie <[email protected]>2022-04-19 14:16:38 +0800
commit3df5d8c51ed11919d37c4e71c48b344344104dc4 (patch)
tree3963f348ab19a7eed07c5faee53b9537d306b717
parent2b16bd7fcedc27ffc3b7b040ce6239fff0740b74 (diff)
集成Nacos动态获取schema功能
-rw-r--r--pom.xml18
-rw-r--r--properties/default_config.properties17
-rw-r--r--properties/service_flow_config.properties25
-rw-r--r--src/main/java/com/zdjizhi/common/VoipRelationConfig.java15
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java256
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java147
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/relationUtils.java81
-rw-r--r--src/main/java/com/zdjizhi/utils/ip/IPUtils.java101
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java124
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java)14
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java2
-rw-r--r--src/test/java/com/zdjizhi/EncryptorTest.java35
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java40
-rw-r--r--src/test/java/com/zdjizhi/json/JsonPathTest.java79
-rw-r--r--src/test/java/com/zdjizhi/nacos/NacosTest.java100
15 files changed, 676 insertions, 378 deletions
diff --git a/pom.xml b/pom.xml
index 003d904..e063681 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-voip-relation</artifactId>
- <version>220316-encryption</version>
+ <version>220418-Nacos</version>
<name>log-stream-voip-relation</name>
<url>http://www.example.com</url>
@@ -38,6 +38,8 @@
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
+ <nacos.version>1.2.0</nacos.version>
+ <zdjz.tools.version>1.0.8</zdjz.tools.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -113,16 +115,11 @@
<dependencies>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.70</version>
- </dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
- <version>1.0.8</version>
+ <version>${zdjz.tools.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -236,6 +233,13 @@
<scope>test</scope>
</dependency>
+ <!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
+ <dependency>
+ <groupId>com.alibaba.nacos</groupId>
+ <artifactId>nacos-client</artifactId>
+ <version>${nacos.version}</version>
+ </dependency>
+
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 158577b..da4adca 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -27,14 +27,29 @@ buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=none
+
+#生产者ack
+producer.ack=1
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
+#====================nacos default====================#
+#nacos username
+nacos.username=nacos
+
+#nacos password
+nacos.pin=nacos
+
+#nacos group
+nacos.group=Galaxy
#====================Topology Default====================#
#check ip is Inner network;0 off, 1 on.
-check.inner.network=1
+check.inner.network=0
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 85d3d17..f875d95 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -10,11 +10,17 @@ sink.kafka.servers=192.168.44.12:9094
#定位库地址
tools.library=D:\\workerspace\\dat\\
-#网关的schema位置
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.44.12:8848
-#--------------------------------Kafka消费组信息------------------------------#
+#nacos namespace
+nacos.schema.namespace=flink
+
+#nacos data id
+nacos.data.id=voip_record.json
+#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
@@ -24,22 +30,15 @@ sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=mytest-1
-#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
-
-#生产者ack
-producer.ack=1
-
#--------------------------------topology配置------------------------------#
-
#map函数并行度
window.parallelism=1
#voip日志对准窗口时间 seconds
-voip.calibration.window.time=30
+voip.calibration.window.time=60
#单向流对准窗口时间 seconds
-one.sided.window.time=5
+one.sided.window.time=10
#voip二次对准时间 seconds
-sec.combine.sr.cache.secs=180 \ No newline at end of file
+sec.combine.sr.cache.secs=120 \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index 41884b9..61c50a1 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -23,6 +23,17 @@ public class VoipRelationConfig {
public static final String VISIBILITY = "disabled";
public static final String FORMAT_SPLITTER = ",";
+
+ /**
+ * Nacos
+ */
+ public static final String NACOS_SERVER = VoipRelationConfigurations.getStringProperty(0, "nacos.server");
+ public static final String NACOS_SCHEMA_NAMESPACE = VoipRelationConfigurations.getStringProperty(0, "nacos.schema.namespace");
+ public static final String NACOS_DATA_ID = VoipRelationConfigurations.getStringProperty(0, "nacos.data.id");
+ public static final String NACOS_PIN = VoipRelationConfigurations.getStringProperty(1, "nacos.pin");
+ public static final String NACOS_GROUP = VoipRelationConfigurations.getStringProperty(1, "nacos.group");
+ public static final String NACOS_USERNAME = VoipRelationConfigurations.getStringProperty(1, "nacos.username");
+
/**
* System
*/
@@ -38,8 +49,8 @@ public class VoipRelationConfig {
public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id");
- public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack");
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(1, "producer.kafka.compression.type");
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.pin"));
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
index 061fa12..0e244c3 100644
--- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
@@ -2,11 +2,11 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.IPUtil;
+import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.ip.IPUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
@@ -27,67 +27,72 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
- private static HashMap<String, String> sipOriHmList = new HashMap<>(16);
+ private static HashMap<String, String> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
- private static HashMap<String, String> rtpOriHmList = new HashMap<>(16);
+ private static HashMap<String, String> rtpOriHmList = new HashMap<>(32);
@Override
@SuppressWarnings("unchecked")
- public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) throws Exception {
-
+ public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) {
for (String input : inputs) {
if (StringUtil.isNotBlank(input)) {
- JSONObject object = JSONObject.parseObject(input);
- String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE);
- String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID);
-
- //1:c2s,2:s2c;3;double
- int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR);
-
- /*
- * 针对SIP日志进行处理
- */
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
- if (checkSipCompleteness(object)) {
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
+ try {
+ Map<String, Object> object = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
+
+ String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE);
+ String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID);
+
+ //1:c2s,2:s2c;3;double
+ int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR);
+
+ /*
+ * 针对SIP日志进行处理
+ */
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
+ if (relationUtils.checkSipCompleteness(object)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
+ } else {
+ separateInnerIp(object, out);
+ }
} else {
- separateInnerIp(object, out);
+ out.collect(new Tuple3<>("", "violation", input));
}
- } else {
- out.collect(new Tuple3<>("", "violation", input));
}
- }
- /*
- * 针对RTP日志进行处理
- */
- if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ /*
+ * 针对RTP日志进行处理
+ */
+ if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP);
+ int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT);
+ String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP);
+ int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT);
- String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP),
- object.getInteger(JsonProConfig.CLIENT_PORT),
- object.getString(JsonProConfig.SERVER_IP),
- object.getInteger(JsonProConfig.SERVER_PORT));
+ String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort);
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- //对rtp单向流进行关联
- putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ //对rtp单向流进行关联
+ putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
- } else {
- //RTP双向流,按四元组下发
- out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
+ } else {
+ //RTP双向流,按四元组下发
+ out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
+ }
}
+ } catch (RuntimeException e) {
+ logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e);
}
}
}
/*
- * 定时发送SIP或RTP未关联上数据
+ * 定时发送SIP未关联上数据
*/
if (sipOriHmList.size() > 0) {
- HashMap<String, String> tmpSipOriHmList = new HashMap<String, String>(sipOriHmList);
+ HashMap<String, String> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
String sipSingleMsg = tmpSipOriHmList.get(sipKey);
@@ -95,9 +100,11 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
}
}
-
+ /*
+ * 定时发送RTP未关联上数据
+ */
if (rtpOriHmList.size() > 0) {
- HashMap<String, String> tmpRtpOriHmList = new HashMap<String, String>(rtpOriHmList);
+ HashMap<String, String> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
@@ -110,41 +117,40 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* 存放key并关联拼接对应Key
*/
+ @SuppressWarnings("unchecked")
private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
-
- JSONObject jsonCombinObject = new JSONObject();
+ HashMap<String, Object> jsonCommonMap = new HashMap<>(32);
String[] strArr = new String[2];
String firstMsg = hashMapStr.remove(hmStrKey);
-
- JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg);
- JSONObject secendSipOrRtpLog = JSONObject.parseObject(message);
+ Map<String, Object> firstSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(firstMsg, Map.class);
+ Map<String, Object> secondSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
//1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准
- if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) {
+ if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
strArr[0] = message;
strArr[1] = firstMsg;
} else {
strArr[0] = firstMsg;
strArr[1] = message;
}
- jsonCombinObject.putAll(JSONObject.parseObject(strArr[0]));
- jsonCombinObject.putAll(JSONObject.parseObject(strArr[1]));
- String sipTwoMsg = jsonCombinObject.toString();
+ jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[0], Map.class));
+ jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[1], Map.class));
+ String sipTwoMsg = jsonCommonMap.toString();
- JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg);
- accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin);
+ Map<String, Object> sipOrRtpCombin = (Map<String, Object>) JsonMapper.fromJsonString(sipTwoMsg, Map.class);
+ accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin);
sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
separateInnerIp(sipOrRtpCombin, out);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
- sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog));
- out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin)));
+ sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
+ out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin)));
}
} else {
hashMapStr.put(hmStrKey, message);
@@ -154,16 +160,15 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
- private static void separateInnerIp(JSONObject object, Collector<Tuple3<String, String, String>> out) {
+ private static void separateInnerIp(Map<String, Object> object, Collector<Tuple3<String, String, String>> out) {
- String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP);
- int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT);
- int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT);
+ String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP);
+ int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT);
+ String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP);
+ int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT);
- if (IPUtils.isInnerIp(sipOriginatorIp)
- || IPUtils.isInnerIp(sipResponderIp)) {
- /**
+ if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) {
+ /*
* 按from-ip_from-port_to-ip_to-port
*/
String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR
@@ -171,7 +176,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
+ sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ sipResponderPort;
//包含内网IP的SIP关联后数据
- out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object)));
+ out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object)));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
@@ -179,7 +184,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
sipResponderPort);
//按照四元组的Key发送到下一个bolt
- out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object)));
+ out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object)));
}
}
@@ -216,7 +221,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
break;
//common_client_port = commonServerPort,开始按照IP比较
case 0:
- ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
+ ipPort4Key = compareQuadruple(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
break;
//port端口值异常
case -2:
@@ -231,93 +236,83 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
}
/**
- * 比较IP,并作key的拼接
+ * 比较四元组,拼接后作为key使用
*
- * @param commonClientIp
- * @param commonServerIp
- * @param commonClientPort
- * @param commonServerPort
- * @return
+ * @param clientIp 客户端IP
+ * @param serverIp 服务端IP
+ * @param clientPort 客户端端口
+ * @param serverPort 服务端端口
+ * @return 比较后拼接的四元组key 或 异常后返回空值
*/
- private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) {
- long clientIpNum = IPUtils.ipToLong(commonClientIp);
- long serverIpNum = IPUtils.ipToLong(commonServerIp);
+ private static String compareQuadruple(String clientIp, String serverIp, int clientPort, int serverPort) {
+ long clientIpNum = IPUtil.getIpHostDesimal(clientIp);
+ long serverIpNum = IPUtil.getIpHostDesimal(serverIp);
int compareIpResult = compareNum(clientIpNum, serverIpNum);
switch (compareIpResult) {
//clientIpNum > serverIpNum
case 1:
- return commonServerIp + VoipRelationConfig.CORRELATION_STR
- + commonServerPort + VoipRelationConfig.CORRELATION_STR
- + commonClientIp + VoipRelationConfig.CORRELATION_STR
- + commonClientPort;
+ return serverIp + VoipRelationConfig.CORRELATION_STR
+ + serverPort + VoipRelationConfig.CORRELATION_STR
+ + clientIp + VoipRelationConfig.CORRELATION_STR
+ + clientPort;
//clientIpNum < serverIpNum
case -1:
- return commonClientIp + VoipRelationConfig.CORRELATION_STR
- + commonClientPort + VoipRelationConfig.CORRELATION_STR
- + commonServerIp + VoipRelationConfig.CORRELATION_STR
- + commonServerPort;
+ return clientIp + VoipRelationConfig.CORRELATION_STR
+ + clientPort + VoipRelationConfig.CORRELATION_STR
+ + serverIp + VoipRelationConfig.CORRELATION_STR
+ + serverPort;
//clientIpNum = serverIpNum,说明两个IP值一样,即IP异常
case 0:
//IP值异常
case -2:
default:
logger.error("compareNum is error," +
- "common_client_ip:" + commonClientIp + "," +
- "commonServerIp:" + commonServerIp + "," +
- "commonClientPort:" + commonClientPort + "," +
- "commonServerPort:" + commonServerPort);
+ "common_client_ip:" + clientIp + "," +
+ "commonServerIp:" + serverIp + "," +
+ "commonClientPort:" + clientPort + "," +
+ "commonServerPort:" + serverPort);
return "";
}
}
/**
- * 计算相关字节信息,主要是累加
+ * 对SIP单向流日志的指标数据进行聚合。
*
- * @param firstSipOrRtpLog
- * @param secendSipOrRtpLog
- * @param sipOrRtpCombin
+ * @param firstSipOrRtpLog 第一条单向流日志
+ * @param secondSipOrRtpLog 第二条单向流日志
+ * @param sipOrRtpCombin SIP双向流日志集合
*/
- private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) {
+ private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) {
//common_sessions
- sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
- sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
- sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
- sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
- sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
- sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
- sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
- sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
- sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
- sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM)));
-
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
- sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM)));
+ relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
* int类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
- * @param numOne
- * @param numTwo
+ * @param numOne 数值1
+ * @param numTwo 数值2
*/
private static int compareNum(int numOne, int numTwo) {
if (numOne > 0 && numTwo > 0) {
@@ -331,8 +326,8 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* long类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
- * @param numOne
- * @param numTwo
+ * @param numOne 数值1
+ * @param numTwo 数值2
*/
private static int compareNum(long numOne, long numTwo) {
if (numOne > 0 && numTwo > 0) {
@@ -349,28 +344,21 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* @param secendSipOrRtpLog 第二个单向流日志
* @return 文件路径
*/
- private static String setRtpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
+ private static String setRtpPacpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
- String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
- String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
+ String firstPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
+ String secondPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
- if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) {
- if (firstRtpPcapPath.equals(secendRtpPcapPath)) {
- return firstRtpPcapPath;
+ if (StringUtil.isNotBlank(firstPcapPath) && StringUtil.isNotBlank(secondPcapPath)) {
+ if (firstPcapPath.equals(secondPcapPath)) {
+ return firstPcapPath;
} else {
- return firstRtpPcapPath + ";" + secendRtpPcapPath;
+ return firstPcapPath + ";" + secondPcapPath;
}
- } else if (StringUtil.isNotBlank(firstRtpPcapPath)) {
- return firstRtpPcapPath;
+ } else if (StringUtil.isNotBlank(firstPcapPath)) {
+ return firstPcapPath;
} else {
- return secendRtpPcapPath;
+ return secondPcapPath;
}
}
-
- private static boolean checkSipCompleteness(JSONObject object) {
- return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
- object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
- object.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
- object.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
- }
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
index 8b92c10..7180419 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
@@ -2,9 +2,9 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -12,10 +12,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFuncti
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
/**
* @author qidaijie
@@ -27,16 +24,6 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
private static final Log logger = LogFactory.get();
/**
- * 实体类反射map
- */
- private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
-
- /**
- * 反射成一个类
- */
- private static Object voipObject = JsonParseUtil.generateObject(classMap);
-
- /**
* 关联用HashMap
* key---四元组
* value---List存放对应SIP或者RTP数据
@@ -96,6 +83,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
*
* @param combineHmList
*/
+ @SuppressWarnings("unchecked")
private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) {
if (combineHmList.size() > 0) {
long nowTime = System.currentTimeMillis() / 1000;
@@ -109,12 +97,11 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
//包含SIP和RTP
int listSize = tempList.size();
if (listSize > 1) {
-
List<String> sipBeanArr = new ArrayList<>();
List<String> rtpBeanArr = new ArrayList<>();
for (String message : tempList) {
- Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
+ Map<String, Object> tempSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(schemaType)) {
sipBeanArr.add(message);
@@ -127,25 +114,23 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
if (rtpSize == 1 && sipSize >= 1) {
for (String sipMessage : sipBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class);
+ Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipMessage, Map.class);
+ accumulateVoipMsg(voIpLog, rtpLog);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ output.collect(mergeJson(voIpLog, rtpLog));
}
} else if (sipSize == 1 && rtpSize >= 1) {
for (String rtpMessage : rtpBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpMessage, Map.class);
+ Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class);
+ accumulateVoipMsg(voIpLog, rtpLog);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ output.collect(mergeJson(voIpLog, rtpLog));
}
} else {
logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
@@ -155,15 +140,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
} else {
String msg = tempList.get(0);
-
- Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
- long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
+ Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(msg, Map.class);
+ long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
logger.error("VoIP日志时间差值记录:" + intervalTime);
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
} else {
- sendDirectlyOneElement(msg, voipLog, output);
+ sendDirectlyOneElement(msg, voIpLog, output);
}
}
}
@@ -173,83 +157,44 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
/**
* 累加关联后的字节类参数值
*
- * @param voipLog
- * @param rtpLog
+ * @param voIpLog 融合后voip日志
+ * @param rtpLog RTP日志
*/
- private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
-
- Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
-
- Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
-
- Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
-
- Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
-
- Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
-
- Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
-
- Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
-
- Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
-
- Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
-
- Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
-
- Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
-
+ private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
//common_sessions
- JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
-
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
-
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
-
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
-
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
-
+ relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
* 定时处理中List元素数仅为1的情况
*/
- private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) {
+ private void sendDirectlyOneElement(String msg, Map<String, Object> voIpLog, Collector<String> output) {
//四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据
- String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
+ String commonSchemaType = JsonParseUtil.getString(voIpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
output.collect(msg);
} else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
+ int commonStreamDir = JsonParseUtil.getInteger(voIpLog, JsonProConfig.STREAM_DIR);
if (commonStreamDir != JsonProConfig.DOUBLE) {
output.collect(msg);
} else {
@@ -278,14 +223,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 判断RTP主叫方向-测试
*
* @param rtpLog RTP原始日志
- * @param voipLog 融合后VOIP日志
+ * @param voIpLog 融合后VOIP日志
* @return 方向 0:未知 1:c2s 2:s2c
*/
- private static int judgeDirection(Object rtpLog, Object voipLog) {
+ private static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
- String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
+ String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
return 1;
@@ -308,19 +253,19 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
/**
- *
+ * 发送VOIP日志到Kafka
*/
- private static String mergeJson(Object voipLog, Object rtpLog) {
+ private static String mergeJson(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
+ JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
- return JSONObject.toJSONString(voipLog);
+ return JsonMapper.toJsonString(voIpLog);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java
new file mode 100644
index 0000000..c74170d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java
@@ -0,0 +1,81 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.IPUtil;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2022/4/1911:30
+ */
+class relationUtils {
+
+
+ /**
+ * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
+ *
+ * @param firstLog A日志
+ * @param secondLog B日志
+ * @param key 指标 json key
+ */
+ static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
+ Long firstMetric = JsonParseUtil.getLong(firstLog, key);
+ Long secondMetric = JsonParseUtil.getLong(secondLog, key);
+
+ Long sum = firstMetric + secondMetric;
+
+ JsonParseUtil.setValue(firstLog, key, sum);
+ }
+
+
+ /**
+ * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
+ *
+ * @param firstLog A日志
+ * @param secondLog B日志
+ * @param otherLog C日志
+ * @param key 指标 json key
+ */
+ static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
+ Long firstMetric = JsonParseUtil.getLong(firstLog, key);
+ Long secondMetric = JsonParseUtil.getLong(secondLog, key);
+
+ Long sum = firstMetric + secondMetric;
+
+ JsonParseUtil.setValue(otherLog, key, sum);
+ }
+
+
+ /**
+ * 校验sip日志,必须包含协商四元组,否则将原样输出不处理
+ *
+ * @param sipLog SIP日志
+ * @return true or false
+ */
+ static boolean checkSipCompleteness(Map<String, Object> sipLog) {
+ return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
+ sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
+ sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
+ sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
+ }
+
+ /**
+ * 是否为内网IP
+ *
+ * @param ip ip Address
+ * @return true or false
+ */
+ static boolean isInnerIp(String ip) {
+ if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
+ //为空或者为特定IP时也算作内网IP
+ return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
deleted file mode 100644
index 887d9ba..0000000
--- a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.zdjizhi.utils.ip;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.IPUtil;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.VoipRelationException;
-
-/**
- * IP转换工具类
- *
- * @author Colbert
- * @date 2021/03/16
- */
-public class IPUtils {
- private static final Log logger = LogFactory.get();
-
- private static final long A_BEGIN = ipToLong("10.0.0.0");
- private static final long A_END = ipToLong("10.255.255.255");
- private static final long B_BEGIN = ipToLong("172.16.0.0");
- private static final long B_END = ipToLong("172.31.255.255");
- private static final long C_BEGIN = ipToLong("192.168.0.0");
- private static final long C_END = ipToLong("192.168.255.255");
-
- /**
- * 将127.0.0.1形式的IP地址转换成十进制整数
- *
- * @param strIp
- * @return
- */
- public static long ipToLong(String strIp) {
- try {
- if (StringUtil.isBlank(strIp)) {
- logger.error("IPUtils.ipToLong input IP is null!!!");
- return 0L;
- }
- long[] ip = new long[4];
- int position1 = strIp.indexOf(".");
- int position2 = strIp.indexOf(".", position1 + 1);
- int position3 = strIp.indexOf(".", position2 + 1);
-
- ip[0] = Long.parseLong(strIp.substring(0, position1));
- ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2));
- ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3));
- ip[3] = Long.parseLong(strIp.substring(position3 + 1));
- return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
- } catch (VoipRelationException e) {
- logger.error("IPUtils.ipToLong input IP is:" + strIp + ",convert IP to Long is error:" + e.getMessage());
- return 0L;
- }
-
- }
-
- /**
- * 将十进制整数形式转换成127.0.0.1形式的ip地址
- *
- * @param longIp
- * @return
- */
- public static String longToIp(long longIp) {
- StringBuffer sb = new StringBuffer("");
- sb.append(String.valueOf((longIp >>> 24)));
- sb.append(".");
- sb.append(String.valueOf((longIp & 0x00FFFFFF) >>> 16));
- sb.append(".");
- sb.append(String.valueOf((longIp & 0x0000FFFF) >>> 8));
- sb.append(".");
- sb.append(String.valueOf((longIp & 0x000000FF)));
- return sb.toString();
- }
-
-
- /**
- * 是否为内网IP
- *
- * @param ipAddress
- * @return
- */
- public static boolean isInnerIp(String ipAddress) {
- if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
- if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) {
- //为空或者为特定IP时也算作内网IP
- return true;
- }
-
- long ipNum = ipToLong(ipAddress);
-
- return isInner(ipNum, A_BEGIN, A_END) ||
- isInner(ipNum, B_BEGIN, B_END) ||
- isInner(ipNum, C_BEGIN, C_END);
- } else {
- return false;
- }
- }
-
- private static boolean isInner(long userIp, long begin, long end) {
- return (userIp >= begin) && (userIp <= end);
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 2a3194f..2bfd1f2 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -6,15 +6,21 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import com.jayway.jsonpath.DocumentContext;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.config.ConfigService;
+
+import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.http.HttpClientUtil;
+import com.zdjizhi.utils.StringUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
+import java.util.concurrent.Executor;
/**
* 使用FastJson解析json的工具类
@@ -24,6 +30,50 @@ import java.util.*;
public class JsonParseUtil {
private static final Log logger = LogFactory.get();
+ private static Properties propNacos = new Properties();
+
+ /**
+ * 获取需要删除字段的列表
+ */
+ private static ArrayList<String> dropList = new ArrayList<>();
+
+
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> jsonFieldsMap;
+
+
+ static {
+ propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
+ propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
+ propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
+ propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
+ try {
+ ConfigService configService = NacosFactory.createConfigService(propNacos);
+ String dataId = VoipRelationConfig.NACOS_DATA_ID;
+ String group = VoipRelationConfig.NACOS_GROUP;
+ String schema = configService.getConfig(dataId, group, 5000);
+ if (StringUtil.isNotBlank(schema)) {
+ jsonFieldsMap = getFieldsFromSchema(schema);
+ }
+ configService.addListener(dataId, group, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ if (StringUtil.isNotBlank(configMsg)) {
+ jsonFieldsMap = getFieldsFromSchema(configMsg);
+ }
+ }
+ });
+ } catch (NacosException e) {
+ logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ }
+ }
/**
* 模式匹配,给定一个类型字符串返回一个类类型
@@ -72,6 +122,44 @@ public class JsonParseUtil {
return clazz;
}
+ /**
+ * 类型转换
+ *
+ * @param jsonMap 原始日志map
+ */
+ public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
+ JsonParseUtil.dropJsonField(jsonMap);
+ HashMap<String, Object> tmpMap = new HashMap<>(192);
+ for (String key : jsonMap.keySet()) {
+ if (jsonFieldsMap.containsKey(key)) {
+ String simpleName = jsonFieldsMap.get(key).getSimpleName();
+ switch (simpleName) {
+ case "String":
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
+ break;
+ case "Integer":
+ tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
+ break;
+ case "long":
+ tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
+ break;
+ case "List":
+ tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
+ break;
+ case "Map":
+ tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
+ break;
+ case "double":
+ tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
+ break;
+ default:
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
+ }
+ }
+ }
+ return tmpMap;
+ }
+
/**
* 获取属性值的方法
@@ -155,7 +243,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
- return intVal.intValue();
+ return intVal;
}
/**
@@ -170,7 +258,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
- return intVal.intValue();
+ return intVal;
}
public static String getString(Map<String, Object> jsonMap, String property) {
@@ -263,18 +351,16 @@ public class JsonParseUtil {
/**
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ * <p>
+ * // * @param http 网关schema地址
*
- * @param http 网关schema地址
* @return 用于反射生成schema类型的对象的一个map集合
*/
- public static HashMap<String, Class> getMapFromHttp(String http) {
- HashMap<String, Class> map = new HashMap<>();
-
- String schema = HttpClientUtil.requestByGetMethod(http);
- Object data = JSON.parseObject(schema).get("data");
+ private static HashMap<String, Class> getFieldsFromSchema(String schema) {
+ HashMap<String, Class> map = new HashMap<>(16);
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONObject schemaJson = JSON.parseObject(schema);
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
@@ -282,13 +368,19 @@ public class JsonParseUtil {
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
+ if (type.contains("{")) {
+ type = JsonPath.read(filedStr, "$.type.type").toString();
+ }
//组合用来生成实体类的map
map.put(name, getClassName(type));
+ } else {
+ dropList.add(filedStr);
}
}
return map;
}
+
/**
* 判断字段是否需要保留
*
@@ -310,4 +402,14 @@ public class JsonParseUtil {
return isKeepField;
}
+ /**
+ * 删除schema内指定的无效字段(jackson)
+ *
+ * @param jsonMap
+ */
+ private static void dropJsonField(Map<String, Object> jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
index 8562679..48e4620 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
@@ -14,7 +14,7 @@ import java.util.Map;
* @Description:
* @date 2021/7/1217:34
*/
-public class JsonTypeUtils {
+class JsonTypeUtil {
private static final Log logger = LogFactory.get();
/**
* String 类型检验转换方法
@@ -22,7 +22,7 @@ public class JsonTypeUtils {
* @param value json value
* @return String value
*/
- public static String checkString(Object value) {
+ static String checkString(Object value) {
if (value == null) {
return null;
}
@@ -44,7 +44,7 @@ public class JsonTypeUtils {
* @param value json value
* @return List value
*/
- private static Map checkObject(Object value) {
+ static Map checkObject(Object value) {
if (value == null) {
return null;
}
@@ -62,7 +62,7 @@ public class JsonTypeUtils {
* @param value json value
* @return List value
*/
- private static List checkArray(Object value) {
+ static List checkArray(Object value) {
if (value == null) {
return null;
}
@@ -88,7 +88,7 @@ public class JsonTypeUtils {
* @param value json value
* @return Long value
*/
- public static long checkLongValue(Object value) {
+ static long checkLongValue(Object value) {
Long longVal = TypeUtils.castToLong(value);
@@ -105,7 +105,7 @@ public class JsonTypeUtils {
* @param value json value
* @return Double value
*/
- private static Double checkDouble(Object value) {
+ static Double checkDouble(Object value) {
if (value == null) {
return null;
}
@@ -129,7 +129,7 @@ public class JsonTypeUtils {
* @param value json value
* @return int value
*/
- private static int getIntValue(Object value) {
+ static int getIntValue(Object value) {
Integer intVal = TypeUtils.castToInt(value);
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index 65e253a..2660c12 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -40,7 +40,7 @@ public class KafkaProducer {
createProducerConfig(), Optional.empty());
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
- kafkaProducer.setLogFailuresOnly(false);
+ kafkaProducer.setLogFailuresOnly(true);
//写入kafka的消息携带时间戳
// kafkaProducer.setWriteTimestampToKafka(true);
diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java
new file mode 100644
index 0000000..170086c
--- /dev/null
+++ b/src/test/java/com/zdjizhi/EncryptorTest.java
@@ -0,0 +1,35 @@
+package com.zdjizhi;
+
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2022/3/1610:55
+ */
+public class EncryptorTest {
+
+
+ @Test
+ public void passwordTest(){
+ StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+ // 配置加密解密的密码/salt值
+ encryptor.setPassword("galaxy");
+ // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
+ String pin = "galaxy2019";
+ String encPin = encryptor.encrypt(pin);
+ String user = "admin";
+ String encUser = encryptor.encrypt(user);
+ System.out.println(encPin);
+ System.out.println(encUser);
+ // 再进行解密:raw_password
+ String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ");
+ String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw==");
+
+ System.out.println("The username is: "+rawPwd);
+ System.out.println("The pin is: "+rawUser);
+ }
+
+}
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
new file mode 100644
index 0000000..8403c45
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -0,0 +1,40 @@
+package com.zdjizhi;
+
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.IpLookupV2;
+import org.junit.Test;
+
+import java.util.Calendar;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/11/611:38
+ */
+public class FunctionTest {
+
+ private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
+ .loadDataFileV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
+ .loadDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
+ .loadDataFilePrivateV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
+ .loadDataFilePrivateV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
+ .loadAsnDataFile(VoipRelationConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ @Test
+ public void ipLookupTest() {
+ String ip = "61.144.36.144";
+ System.out.println(ipLookup.cityLookupDetail(ip));
+ System.out.println(ipLookup.countryLookup(ip));
+ }
+
+ @Test
+ public void timestampTest(){
+ Calendar cal = Calendar.getInstance();
+ Long utcTime=cal.getTimeInMillis();
+ System.out.println(utcTime);
+ System.out.println(System.currentTimeMillis());
+ }
+}
diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java
new file mode 100644
index 0000000..46f6f85
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.json
+ * @Description:
+ * @date 2022/3/2410:22
+ */
+public class JsonPathTest {
+ private static final Log logger = LogFactory.get();
+
+ private static Properties propNacos = new Properties();
+
+ /**
+ * 获取需要删除字段的列表
+ */
+ private static ArrayList<String> dropList = new ArrayList<>();
+
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> map;
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList;
+
+ private static String schema;
+
+ static {
+ propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER);
+ propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE);
+ propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME);
+ propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN);
+ try {
+ ConfigService configService = NacosFactory.createConfigService(propNacos);
+ String dataId = VoipRelationConfig.NACOS_DATA_ID;
+ String group = VoipRelationConfig.NACOS_GROUP;
+ String config = configService.getConfig(dataId, group, 5000);
+ if (StringUtil.isNotBlank(config)) {
+ schema = config;
+ }
+ } catch (NacosException e) {
+ logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void parseSchemaGetFields() {
+ DocumentContext parse = JsonPath.parse(schema);
+ List<Object> fields = parse.read("$.fields[*]");
+ for (Object field : fields) {
+ String name = JsonPath.read(field, "$.name").toString();
+ String type = JsonPath.read(field, "$.type").toString();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
new file mode 100644
index 0000000..52b99e5
--- /dev/null
+++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java
@@ -0,0 +1,100 @@
+package com.zdjizhi.nacos;
+
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2022/3/1016:58
+ */
+public class NacosTest {
+
+ /**
+ * <dependency>
+ * <groupId>com.alibaba.nacos</groupId>
+ * <artifactId>nacos-client</artifactId>
+ * <version>1.2.0</version>
+ * </dependency>
+ */
+
+ private static Properties properties = new Properties();
+ /**
+ * config data id = config name
+ */
+ private static final String DATA_ID = "test";
+ /**
+ * config group
+ */
+ private static final String GROUP = "Galaxy";
+
+ private void getProperties() {
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
+ properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
+ properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+ }
+
+
+ @Test
+ public void GetConfigurationTest() {
+ try {
+ getProperties();
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String content = configService.getConfig(DATA_ID, GROUP, 5000);
+ Properties nacosConfigMap = new Properties();
+ nacosConfigMap.load(new StringReader(content));
+ System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
+ } catch (NacosException | IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ @Test
+ public void ListenerConfigurationTest() {
+ getProperties();
+ try {
+ //first get config
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String config = configService.getConfig(DATA_ID, GROUP, 5000);
+ System.out.println(config);
+
+ //start listenner
+ configService.addListener(DATA_ID, GROUP, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ System.out.println(configMsg);
+ }
+ });
+ } catch (NacosException e) {
+ e.printStackTrace();
+ }
+
+ //keep running,change nacos config,print new config
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}