From 3df5d8c51ed11919d37c4e71c48b344344104dc4 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Tue, 19 Apr 2022 14:16:38 +0800 Subject: 集成Nacos动态获取schema功能 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 18 +- properties/default_config.properties | 17 +- properties/service_flow_config.properties | 25 +- .../com/zdjizhi/common/VoipRelationConfig.java | 15 +- .../utils/functions/OneSidedWindowFunction.java | 256 ++++++++++----------- .../functions/SipCalibrationWindowFunction.java | 147 ++++-------- .../com/zdjizhi/utils/functions/relationUtils.java | 81 +++++++ src/main/java/com/zdjizhi/utils/ip/IPUtils.java | 101 -------- .../java/com/zdjizhi/utils/json/JsonParseUtil.java | 124 +++++++++- .../java/com/zdjizhi/utils/json/JsonTypeUtil.java | 142 ++++++++++++ .../java/com/zdjizhi/utils/json/JsonTypeUtils.java | 142 ------------ .../com/zdjizhi/utils/kafka/KafkaProducer.java | 2 +- src/test/java/com/zdjizhi/EncryptorTest.java | 35 +++ src/test/java/com/zdjizhi/FunctionTest.java | 40 ++++ src/test/java/com/zdjizhi/json/JsonPathTest.java | 79 +++++++ src/test/java/com/zdjizhi/nacos/NacosTest.java | 100 ++++++++ 16 files changed, 811 insertions(+), 513 deletions(-) create mode 100644 src/main/java/com/zdjizhi/utils/functions/relationUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/ip/IPUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java delete mode 100644 src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java create mode 100644 src/test/java/com/zdjizhi/EncryptorTest.java create mode 100644 src/test/java/com/zdjizhi/FunctionTest.java create mode 100644 src/test/java/com/zdjizhi/json/JsonPathTest.java create mode 100644 src/test/java/com/zdjizhi/nacos/NacosTest.java diff --git a/pom.xml b/pom.xml index 003d904..e063681 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-stream-voip-relation - 220316-encryption + 220418-Nacos log-stream-voip-relation http://www.example.com @@ -38,6 +38,8 @@ 2.7.1 1.0.0 2.2.3 + 1.2.0 + 1.0.8 provided @@ -113,16 +115,11 @@ - - com.alibaba - fastjson - 1.2.70 - com.zdjizhi galaxy - 1.0.8 + ${zdjz.tools.version} slf4j-log4j12 @@ -236,6 +233,13 @@ test + + + com.alibaba.nacos + nacos-client + ${nacos.version} + + org.jasypt diff --git a/properties/default_config.properties b/properties/default_config.properties index 158577b..da4adca 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -27,14 +27,29 @@ buffer.memory=134217728 #这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 #10M max.request.size=10485760 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + +#生产者ack +producer.ack=1 #====================kafka default====================# #kafka SASL验证用户名-加密 kafka.user=nsyGpHKGFA4KW0zro9MDdw== #kafka SASL及SSL验证密码-加密 kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ +#====================nacos default====================# +#nacos username +nacos.username=nacos + +#nacos password +nacos.pin=nacos + +#nacos group +nacos.group=Galaxy #====================Topology Default====================# #check ip is Inner network;0 off, 1 on. -check.inner.network=1 +check.inner.network=0 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 85d3d17..f875d95 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -10,11 +10,17 @@ sink.kafka.servers=192.168.44.12:9094 #定位库地址 tools.library=D:\\workerspace\\dat\\ -#网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.44.12:8848 -#--------------------------------Kafka消费组信息------------------------------# +#nacos namespace +nacos.schema.namespace=flink + +#nacos data id +nacos.data.id=voip_record.json +#--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic source.kafka.topic=test @@ -24,22 +30,15 @@ sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=mytest-1 -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none - -#生产者ack -producer.ack=1 - #--------------------------------topology配置------------------------------# - #map函数并行度 window.parallelism=1 #voip日志对准窗口时间 seconds -voip.calibration.window.time=30 +voip.calibration.window.time=60 #单向流对准窗口时间 seconds -one.sided.window.time=5 +one.sided.window.time=10 #voip二次对准时间 seconds -sec.combine.sr.cache.secs=180 \ No newline at end of file +sec.combine.sr.cache.secs=120 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java index 41884b9..61c50a1 100644 --- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -23,6 +23,17 @@ public class VoipRelationConfig { public static final String VISIBILITY = "disabled"; public static final String FORMAT_SPLITTER = ","; + + /** + * Nacos + */ + public static final String NACOS_SERVER = VoipRelationConfigurations.getStringProperty(0, "nacos.server"); + public static final String NACOS_SCHEMA_NAMESPACE = VoipRelationConfigurations.getStringProperty(0, "nacos.schema.namespace"); + public static final String NACOS_DATA_ID = VoipRelationConfigurations.getStringProperty(0, "nacos.data.id"); + public static final String NACOS_PIN = VoipRelationConfigurations.getStringProperty(1, "nacos.pin"); + public static final String NACOS_GROUP = VoipRelationConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_USERNAME = VoipRelationConfigurations.getStringProperty(1, "nacos.username"); + /** * System */ @@ -38,8 +49,8 @@ public class VoipRelationConfig { public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic"); public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id"); - public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack"); - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(1, "producer.kafka.compression.type"); public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.user")); public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(VoipRelationConfigurations.getStringProperty(1, "kafka.pin")); diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java index 061fa12..0e244c3 100644 --- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java @@ -2,11 +2,11 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.ip.IPUtils; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; @@ -27,67 +27,72 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction sipOriHmList = new HashMap<>(16); + private static HashMap sipOriHmList = new HashMap<>(32); /** * key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流) */ - private static HashMap rtpOriHmList = new HashMap<>(16); + private static HashMap rtpOriHmList = new HashMap<>(32); @Override @SuppressWarnings("unchecked") - public void process(Context context, Iterable inputs, Collector> out) throws Exception { - + public void process(Context context, Iterable inputs, Collector> out) { for (String input : inputs) { if (StringUtil.isNotBlank(input)) { - JSONObject object = JSONObject.parseObject(input); - String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE); - String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID); - - //1:c2s,2:s2c;3;double - int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR); - - /* - * 针对SIP日志进行处理 - */ - if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { - if (checkSipCompleteness(object)) { - if (commonStreamDir != JsonProConfig.DOUBLE) { - putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); + try { + Map object = JsonParseUtil.typeTransform((Map) JsonMapper.fromJsonString(input, Map.class)); + + String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE); + String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID); + + //1:c2s,2:s2c;3;double + int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR); + + /* + * 针对SIP日志进行处理 + */ + if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { + if (relationUtils.checkSipCompleteness(object)) { + if (commonStreamDir != JsonProConfig.DOUBLE) { + putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); + } else { + separateInnerIp(object, out); + } } else { - separateInnerIp(object, out); + out.collect(new Tuple3<>("", "violation", input)); } - } else { - out.collect(new Tuple3<>("", "violation", input)); } - } - /* - * 针对RTP日志进行处理 - */ - if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + /* + * 针对RTP日志进行处理 + */ + if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP); + int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT); + String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP); + int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT); - String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP), - object.getInteger(JsonProConfig.CLIENT_PORT), - object.getString(JsonProConfig.SERVER_IP), - object.getInteger(JsonProConfig.SERVER_PORT)); + String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort); - if (commonStreamDir != JsonProConfig.DOUBLE) { - //对rtp单向流进行关联 - putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out); + if (commonStreamDir != JsonProConfig.DOUBLE) { + //对rtp单向流进行关联 + putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out); - } else { - //RTP双向流,按四元组下发 - out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input)); + } else { + //RTP双向流,按四元组下发 + out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input)); + } } + } catch (RuntimeException e) { + logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e); } } } /* - * 定时发送SIP或RTP未关联上数据 + * 定时发送SIP未关联上数据 */ if (sipOriHmList.size() > 0) { - HashMap tmpSipOriHmList = new HashMap(sipOriHmList); + HashMap tmpSipOriHmList = new HashMap<>(sipOriHmList); sipOriHmList.clear(); for (String sipKey : tmpSipOriHmList.keySet()) { String sipSingleMsg = tmpSipOriHmList.get(sipKey); @@ -95,9 +100,11 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction(sipKey, "sip-single", sipSingleMsg)); } } - + /* + * 定时发送RTP未关联上数据 + */ if (rtpOriHmList.size() > 0) { - HashMap tmpRtpOriHmList = new HashMap(rtpOriHmList); + HashMap tmpRtpOriHmList = new HashMap<>(rtpOriHmList); rtpOriHmList.clear(); for (String rtpKey : tmpRtpOriHmList.keySet()) { String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); @@ -110,41 +117,40 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction hashMapStr, String protocolType, Collector> out) { //和上次存入的数据关联 if (hashMapStr.containsKey(hmStrKey)) { - - JSONObject jsonCombinObject = new JSONObject(); + HashMap jsonCommonMap = new HashMap<>(32); String[] strArr = new String[2]; String firstMsg = hashMapStr.remove(hmStrKey); - - JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg); - JSONObject secendSipOrRtpLog = JSONObject.parseObject(message); + Map firstSipOrRtpLog = (Map) JsonMapper.fromJsonString(firstMsg, Map.class); + Map secondSipOrRtpLog = (Map) JsonMapper.fromJsonString(message, Map.class); //1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准 - if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) { + if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) { strArr[0] = message; strArr[1] = firstMsg; } else { strArr[0] = firstMsg; strArr[1] = message; } - jsonCombinObject.putAll(JSONObject.parseObject(strArr[0])); - jsonCombinObject.putAll(JSONObject.parseObject(strArr[1])); - String sipTwoMsg = jsonCombinObject.toString(); + jsonCommonMap.putAll((Map) JsonMapper.fromJsonString(strArr[0], Map.class)); + jsonCommonMap.putAll((Map) JsonMapper.fromJsonString(strArr[1], Map.class)); + String sipTwoMsg = jsonCommonMap.toString(); - JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg); - accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin); + Map sipOrRtpCombin = (Map) JsonMapper.fromJsonString(sipTwoMsg, Map.class); + accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin); sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); if (JsonProConfig.SIP_MARK.equals(protocolType)) { //手动关联SIP后区分内外网IP再下发 separateInnerIp(sipOrRtpCombin, out); } else if (JsonProConfig.RTP_MARK.equals(protocolType)) { //手动关联RTP后按四元组下发 - sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog)); - out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin))); + sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog)); + out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin))); } } else { hashMapStr.put(hmStrKey, message); @@ -154,16 +160,15 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction> out) { + private static void separateInnerIp(Map object, Collector> out) { - String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP); - int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT); - int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT); + String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP); + int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT); + String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP); + int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT); - if (IPUtils.isInnerIp(sipOriginatorIp) - || IPUtils.isInnerIp(sipResponderIp)) { - /** + if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) { + /* * 按from-ip_from-port_to-ip_to-port */ String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR @@ -171,7 +176,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object))); + out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object))); } else { String sipIpPort4Key = getFourKey(sipOriginatorIp, sipOriginatorPort, @@ -179,7 +184,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object))); + out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object))); } } @@ -216,7 +221,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction serverIpNum case 1: - return commonServerIp + VoipRelationConfig.CORRELATION_STR - + commonServerPort + VoipRelationConfig.CORRELATION_STR - + commonClientIp + VoipRelationConfig.CORRELATION_STR - + commonClientPort; + return serverIp + VoipRelationConfig.CORRELATION_STR + + serverPort + VoipRelationConfig.CORRELATION_STR + + clientIp + VoipRelationConfig.CORRELATION_STR + + clientPort; //clientIpNum < serverIpNum case -1: - return commonClientIp + VoipRelationConfig.CORRELATION_STR - + commonClientPort + VoipRelationConfig.CORRELATION_STR - + commonServerIp + VoipRelationConfig.CORRELATION_STR - + commonServerPort; + return clientIp + VoipRelationConfig.CORRELATION_STR + + clientPort + VoipRelationConfig.CORRELATION_STR + + serverIp + VoipRelationConfig.CORRELATION_STR + + serverPort; //clientIpNum = serverIpNum,说明两个IP值一样,即IP异常 case 0: //IP值异常 case -2: default: logger.error("compareNum is error," + - "common_client_ip:" + commonClientIp + "," + - "commonServerIp:" + commonServerIp + "," + - "commonClientPort:" + commonClientPort + "," + - "commonServerPort:" + commonServerPort); + "common_client_ip:" + clientIp + "," + + "commonServerIp:" + serverIp + "," + + "commonClientPort:" + clientPort + "," + + "commonServerPort:" + serverPort); return ""; } } /** - * 计算相关字节信息,主要是累加 + * 对SIP单向流日志的指标数据进行聚合。 * - * @param firstSipOrRtpLog - * @param secendSipOrRtpLog - * @param sipOrRtpCombin + * @param firstSipOrRtpLog 第一条单向流日志 + * @param secondSipOrRtpLog 第二条单向流日志 + * @param sipOrRtpCombin SIP双向流日志集合 */ - private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) { + private static void accumulateMsg(Map firstSipOrRtpLog, Map secondSipOrRtpLog, Map sipOrRtpCombin) { //common_sessions - sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS); //common_c2s_pkt_num - sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM); //common_s2c_pkt_num - sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM); //common_c2s_byte_num - sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM); //common_s2c_byte_num - sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM); //common_c2s_ipfrag_num - sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM); //common_s2c_ipfrag_num - sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM); //common_c2s_tcp_lostlen - sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN); //common_s2c_tcp_lostlen - sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN); //common_c2s_tcp_unorder_num - sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM))); - + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM); //common_s2c_tcp_unorder_num - sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM))); + relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM); } /** * int类型 * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 * - * @param numOne - * @param numTwo + * @param numOne 数值1 + * @param numTwo 数值2 */ private static int compareNum(int numOne, int numTwo) { if (numOne > 0 && numTwo > 0) { @@ -331,8 +326,8 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 * - * @param numOne - * @param numTwo + * @param numOne 数值1 + * @param numTwo 数值2 */ private static int compareNum(long numOne, long numTwo) { if (numOne > 0 && numTwo > 0) { @@ -349,28 +344,21 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction firstSipOrRtpLog, Map secendSipOrRtpLog) { + private static String setRtpPacpPath(Map firstSipOrRtpLog, Map secendSipOrRtpLog) { - String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); - String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); + String firstPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); + String secondPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); - if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) { - if (firstRtpPcapPath.equals(secendRtpPcapPath)) { - return firstRtpPcapPath; + if (StringUtil.isNotBlank(firstPcapPath) && StringUtil.isNotBlank(secondPcapPath)) { + if (firstPcapPath.equals(secondPcapPath)) { + return firstPcapPath; } else { - return firstRtpPcapPath + ";" + secendRtpPcapPath; + return firstPcapPath + ";" + secondPcapPath; } - } else if (StringUtil.isNotBlank(firstRtpPcapPath)) { - return firstRtpPcapPath; + } else if (StringUtil.isNotBlank(firstPcapPath)) { + return firstPcapPath; } else { - return secendRtpPcapPath; + return secondPcapPath; } } - - private static boolean checkSipCompleteness(JSONObject object) { - return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && - object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && - object.containsKey(JsonProConfig.SIP_RESPONDER_IP) && - object.containsKey(JsonProConfig.SIP_RESPONDER_PORT); - } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java index 8b92c10..7180419 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java @@ -2,9 +2,9 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; @@ -12,10 +12,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFuncti import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; +import java.util.*; /** * @author qidaijie @@ -26,16 +23,6 @@ import java.util.List; public class SipCalibrationWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> { private static final Log logger = LogFactory.get(); - /** - * 实体类反射map - */ - private static HashMap classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP); - - /** - * 反射成一个类 - */ - private static Object voipObject = JsonParseUtil.generateObject(classMap); - /** * 关联用HashMap * key---四元组 @@ -96,6 +83,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction> combineHmList, Collector output) { if (combineHmList.size() > 0) { long nowTime = System.currentTimeMillis() / 1000; @@ -109,12 +97,11 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction 1) { - List sipBeanArr = new ArrayList<>(); List rtpBeanArr = new ArrayList<>(); for (String message : tempList) { - Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass()); + Map tempSipOrRtpLog = (Map) JsonMapper.fromJsonString(message, Map.class); String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); if (JsonProConfig.SIP_MARK.equals(schemaType)) { sipBeanArr.add(message); @@ -127,25 +114,23 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction= 1) { for (String sipMessage : sipBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + Map rtpLog = (Map) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class); + Map voIpLog = (Map) JsonMapper.fromJsonString(sipMessage, Map.class); + accumulateVoipMsg(voIpLog, rtpLog); + JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog)); //四元组,voip,关联后的数据 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + output.collect(mergeJson(voIpLog, rtpLog)); } } else if (sipSize == 1 && rtpSize >= 1) { for (String rtpMessage : rtpBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + Map rtpLog = (Map) JsonMapper.fromJsonString(rtpMessage, Map.class); + Map voIpLog = (Map) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class); + accumulateVoipMsg(voIpLog, rtpLog); + JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog)); //四元组,voip,关联后的数据 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + output.collect(mergeJson(voIpLog, rtpLog)); } } else { logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); @@ -155,15 +140,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction voIpLog = (Map) JsonMapper.fromJsonString(msg, Map.class); + long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME); long intervalTime = nowTime - commonEndTime; logger.error("VoIP日志时间差值记录:" + intervalTime); if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); } else { - sendDirectlyOneElement(msg, voipLog, output); + sendDirectlyOneElement(msg, voIpLog, output); } } } @@ -173,83 +157,44 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction voIpLog, Map rtpLog) { //common_sessions - JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions); - + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS); //common_c2s_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM); //common_s2c_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM); //common_c2s_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM); //common_s2c_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum); - + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM); //common_c2s_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM); //common_s2c_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum); - + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM); //common_c2s_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); //common_s2c_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen); - + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); //common_c2s_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum); + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); //common_s2c_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum); - + relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); } /** * 定时处理中List元素数仅为1的情况 */ - private void sendDirectlyOneElement(String msg, Object voipLog, Collector output) { + private void sendDirectlyOneElement(String msg, Map voIpLog, Collector output) { //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据 - String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE); + String commonSchemaType = JsonParseUtil.getString(voIpLog, JsonProConfig.SCHEMA_TYPE); if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { output.collect(msg); } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR); + int commonStreamDir = JsonParseUtil.getInteger(voIpLog, JsonProConfig.STREAM_DIR); if (commonStreamDir != JsonProConfig.DOUBLE) { output.collect(msg); } else { @@ -278,14 +223,14 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction rtpLog, Map voIpLog) { String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); - String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP); + String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP); if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { return 1; @@ -308,19 +253,19 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction voIpLog, Map rtpLog) { int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); + JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); - return JSONObject.toJSONString(voipLog); + return JsonMapper.toJsonString(voIpLog); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/relationUtils.java b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java new file mode 100644 index 0000000..c74170d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/relationUtils.java @@ -0,0 +1,81 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2022/4/1911:30 + */ +class relationUtils { + + + /** + * 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中 + * + * @param firstLog A日志 + * @param secondLog B日志 + * @param key 指标 json key + */ + static void metricSum(Map firstLog, Map secondLog, String key) { + Long firstMetric = JsonParseUtil.getLong(firstLog, key); + Long secondMetric = JsonParseUtil.getLong(secondLog, key); + + Long sum = firstMetric + secondMetric; + + JsonParseUtil.setValue(firstLog, key, sum); + } + + + /** + * 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中 + * + * @param firstLog A日志 + * @param secondLog B日志 + * @param otherLog C日志 + * @param key 指标 json key + */ + static void metricSumSetOtherLog(Map firstLog, Map secondLog, Map otherLog, String key) { + Long firstMetric = JsonParseUtil.getLong(firstLog, key); + Long secondMetric = JsonParseUtil.getLong(secondLog, key); + + Long sum = firstMetric + secondMetric; + + JsonParseUtil.setValue(otherLog, key, sum); + } + + + /** + * 校验sip日志,必须包含协商四元组,否则将原样输出不处理 + * + * @param sipLog SIP日志 + * @return true or false + */ + static boolean checkSipCompleteness(Map sipLog) { + return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && + sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && + sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) && + sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT); + } + + /** + * 是否为内网IP + * + * @param ip ip Address + * @return true or false + */ + static boolean isInnerIp(String ip) { + if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) { + //为空或者为特定IP时也算作内网IP + return StringUtil.isBlank(ip) || IPUtil.internalIp(ip); + } + return false; + } +} diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java deleted file mode 100644 index 887d9ba..0000000 --- a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.zdjizhi.utils.ip; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.IPUtil; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.VoipRelationException; - -/** - * IP转换工具类 - * - * @author Colbert - * @date 2021/03/16 - */ -public class IPUtils { - private static final Log logger = LogFactory.get(); - - private static final long A_BEGIN = ipToLong("10.0.0.0"); - private static final long A_END = ipToLong("10.255.255.255"); - private static final long B_BEGIN = ipToLong("172.16.0.0"); - private static final long B_END = ipToLong("172.31.255.255"); - private static final long C_BEGIN = ipToLong("192.168.0.0"); - private static final long C_END = ipToLong("192.168.255.255"); - - /** - * 将127.0.0.1形式的IP地址转换成十进制整数 - * - * @param strIp - * @return - */ - public static long ipToLong(String strIp) { - try { - if (StringUtil.isBlank(strIp)) { - logger.error("IPUtils.ipToLong input IP is null!!!"); - return 0L; - } - long[] ip = new long[4]; - int position1 = strIp.indexOf("."); - int position2 = strIp.indexOf(".", position1 + 1); - int position3 = strIp.indexOf(".", position2 + 1); - - ip[0] = Long.parseLong(strIp.substring(0, position1)); - ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2)); - ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3)); - ip[3] = Long.parseLong(strIp.substring(position3 + 1)); - return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3]; - } catch (VoipRelationException e) { - logger.error("IPUtils.ipToLong input IP is:" + strIp + ",convert IP to Long is error:" + e.getMessage()); - return 0L; - } - - } - - /** - * 将十进制整数形式转换成127.0.0.1形式的ip地址 - * - * @param longIp - * @return - */ - public static String longToIp(long longIp) { - StringBuffer sb = new StringBuffer(""); - sb.append(String.valueOf((longIp >>> 24))); - sb.append("."); - sb.append(String.valueOf((longIp & 0x00FFFFFF) >>> 16)); - sb.append("."); - sb.append(String.valueOf((longIp & 0x0000FFFF) >>> 8)); - sb.append("."); - sb.append(String.valueOf((longIp & 0x000000FF))); - return sb.toString(); - } - - - /** - * 是否为内网IP - * - * @param ipAddress - * @return - */ - public static boolean isInnerIp(String ipAddress) { - if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) { - if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) { - //为空或者为特定IP时也算作内网IP - return true; - } - - long ipNum = ipToLong(ipAddress); - - return isInner(ipNum, A_BEGIN, A_END) || - isInner(ipNum, B_BEGIN, B_END) || - isInner(ipNum, C_BEGIN, C_END); - } else { - return false; - } - } - - private static boolean isInner(long userIp, long begin, long end) { - return (userIp >= begin) && (userIp <= end); - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 2a3194f..2bfd1f2 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -6,15 +6,21 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.jayway.jsonpath.DocumentContext; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.config.ConfigService; + +import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.http.HttpClientUtil; +import com.zdjizhi.utils.StringUtil; import net.sf.cglib.beans.BeanGenerator; import net.sf.cglib.beans.BeanMap; import java.util.*; +import java.util.concurrent.Executor; /** * 使用FastJson解析json的工具类 @@ -24,6 +30,50 @@ import java.util.*; public class JsonParseUtil { private static final Log logger = LogFactory.get(); + private static Properties propNacos = new Properties(); + + /** + * 获取需要删除字段的列表 + */ + private static ArrayList dropList = new ArrayList<>(); + + + /** + * 在内存中加载反射类用的map + */ + private static HashMap jsonFieldsMap; + + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = VoipRelationConfig.NACOS_DATA_ID; + String group = VoipRelationConfig.NACOS_GROUP; + String schema = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(schema)) { + jsonFieldsMap = getFieldsFromSchema(schema); + } + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + if (StringUtil.isNotBlank(configMsg)) { + jsonFieldsMap = getFieldsFromSchema(configMsg); + } + } + }); + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } /** * 模式匹配,给定一个类型字符串返回一个类类型 @@ -72,6 +122,44 @@ public class JsonParseUtil { return clazz; } + /** + * 类型转换 + * + * @param jsonMap 原始日志map + */ + public static Map typeTransform(Map jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (jsonFieldsMap.containsKey(key)) { + String simpleName = jsonFieldsMap.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); + } + } + } + return tmpMap; + } + /** * 获取属性值的方法 @@ -155,7 +243,7 @@ public class JsonParseUtil { if (intVal == null) { return 0; } - return intVal.intValue(); + return intVal; } /** @@ -170,7 +258,7 @@ public class JsonParseUtil { if (intVal == null) { return 0; } - return intVal.intValue(); + return intVal; } public static String getString(Map jsonMap, String property) { @@ -263,18 +351,16 @@ public class JsonParseUtil { /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + *

+ * // * @param http 网关schema地址 * - * @param http 网关schema地址 * @return 用于反射生成schema类型的对象的一个map集合 */ - public static HashMap getMapFromHttp(String http) { - HashMap map = new HashMap<>(); - - String schema = HttpClientUtil.requestByGetMethod(http); - Object data = JSON.parseObject(schema).get("data"); + private static HashMap getFieldsFromSchema(String schema) { + HashMap map = new HashMap<>(16); //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONObject schemaJson = JSON.parseObject(schema); JSONArray fields = (JSONArray) schemaJson.get("fields"); for (Object field : fields) { @@ -282,13 +368,19 @@ public class JsonParseUtil { if (checkKeepField(filedStr)) { String name = JsonPath.read(filedStr, "$.name").toString(); String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } //组合用来生成实体类的map map.put(name, getClassName(type)); + } else { + dropList.add(filedStr); } } return map; } + /** * 判断字段是否需要保留 * @@ -310,4 +402,14 @@ public class JsonParseUtil { return isKeepField; } + /** + * 删除schema内指定的无效字段(jackson) + * + * @param jsonMap + */ + private static void dropJsonField(Map jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java new file mode 100644 index 0000000..48e4620 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -0,0 +1,142 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.VoipRelationException; + +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +class JsonTypeUtil { + private static final Log logger = LogFactory.get(); + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new VoipRelationException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new VoipRelationException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + static long checkLongValue(Object value) { + + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java deleted file mode 100644 index 8562679..0000000 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.zdjizhi.utils.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.exception.VoipRelationException; - -import java.util.List; -import java.util.Map; - -/** - * @author qidaijie - * @Package PACKAGE_NAME - * @Description: - * @date 2021/7/1217:34 - */ -public class JsonTypeUtils { - private static final Log logger = LogFactory.get(); - /** - * String 类型检验转换方法 - * - * @param value json value - * @return String value - */ - public static String checkString(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - private static Map checkObject(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return (Map) value; - } - - throw new VoipRelationException("can not cast to map, value : " + value); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - private static List checkArray(Object value) { - if (value == null) { - return null; - } - - if (value instanceof List) { - return (List) value; - } - - throw new VoipRelationException("can not cast to List, value : " + value); - } - - private static Long checkLong(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToLong(value); - } - - /** - * long 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return Long value - */ - public static long checkLongValue(Object value) { - - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - /** - * Double 类型校验转换方法 - * - * @param value json value - * @return Double value - */ - private static Double checkDouble(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToDouble(value); - } - - - private static Integer checkInt(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToInt(value); - } - - - /** - * int 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return int value - */ - private static int getIntValue(Object value) { - - Integer intVal = TypeUtils.castToInt(value); - - if (intVal == null) { - return 0; - } - return intVal; - } - -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index 65e253a..2660c12 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -40,7 +40,7 @@ public class KafkaProducer { createProducerConfig(), Optional.empty()); //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 - kafkaProducer.setLogFailuresOnly(false); + kafkaProducer.setLogFailuresOnly(true); //写入kafka的消息携带时间戳 // kafkaProducer.setWriteTimestampToKafka(true); diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java new file mode 100644 index 0000000..170086c --- /dev/null +++ b/src/test/java/com/zdjizhi/EncryptorTest.java @@ -0,0 +1,35 @@ +package com.zdjizhi; + +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1610:55 + */ +public class EncryptorTest { + + + @Test + public void passwordTest(){ + StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + // 配置加密解密的密码/salt值 + encryptor.setPassword("galaxy"); + // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p + String pin = "galaxy2019"; + String encPin = encryptor.encrypt(pin); + String user = "admin"; + String encUser = encryptor.encrypt(user); + System.out.println(encPin); + System.out.println(encUser); + // 再进行解密:raw_password + String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ"); + String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw=="); + + System.out.println("The username is: "+rawPwd); + System.out.println("The pin is: "+rawUser); + } + +} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..8403c45 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,40 @@ +package com.zdjizhi; + +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.IpLookupV2; +import org.junit.Test; + +import java.util.Calendar; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/11/611:38 + */ +public class FunctionTest { + + private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) + .loadDataFileV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") + .loadDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") + .loadDataFilePrivateV4(VoipRelationConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") + .loadDataFilePrivateV6(VoipRelationConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") + .loadAsnDataFile(VoipRelationConfig.TOOLS_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(VoipRelationConfig.TOOLS_LIBRARY + "asn_v6.mmdb") + .build(); + + @Test + public void ipLookupTest() { + String ip = "61.144.36.144"; + System.out.println(ipLookup.cityLookupDetail(ip)); + System.out.println(ipLookup.countryLookup(ip)); + } + + @Test + public void timestampTest(){ + Calendar cal = Calendar.getInstance(); + Long utcTime=cal.getTimeInMillis(); + System.out.println(utcTime); + System.out.println(System.currentTimeMillis()); + } +} diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java new file mode 100644 index 0000000..46f6f85 --- /dev/null +++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java @@ -0,0 +1,79 @@ +package com.zdjizhi.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.json + * @Description: + * @date 2022/3/2410:22 + */ +public class JsonPathTest { + private static final Log logger = LogFactory.get(); + + private static Properties propNacos = new Properties(); + + /** + * 获取需要删除字段的列表 + */ + private static ArrayList dropList = new ArrayList<>(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap map; + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList; + + private static String schema; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, VoipRelationConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, VoipRelationConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, VoipRelationConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, VoipRelationConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = VoipRelationConfig.NACOS_DATA_ID; + String group = VoipRelationConfig.NACOS_GROUP; + String config = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(config)) { + schema = config; + } + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + @Test + public void parseSchemaGetFields() { + DocumentContext parse = JsonPath.parse(schema); + List fields = parse.read("$.fields[*]"); + for (Object field : fields) { + String name = JsonPath.read(field, "$.name").toString(); + String type = JsonPath.read(field, "$.type").toString(); + } + } +} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java new file mode 100644 index 0000000..52b99e5 --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java @@ -0,0 +1,100 @@ +package com.zdjizhi.nacos; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; +import java.util.concurrent.Executor; + + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1016:58 + */ +public class NacosTest { + + /** + * + * com.alibaba.nacos + * nacos-client + * 1.2.0 + * + */ + + private static Properties properties = new Properties(); + /** + * config data id = config name + */ + private static final String DATA_ID = "test"; + /** + * config group + */ + private static final String GROUP = "Galaxy"; + + private void getProperties() { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + } + + + @Test + public void GetConfigurationTest() { + try { + getProperties(); + ConfigService configService = NacosFactory.createConfigService(properties); + String content = configService.getConfig(DATA_ID, GROUP, 5000); + Properties nacosConfigMap = new Properties(); + nacosConfigMap.load(new StringReader(content)); + System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + } catch (NacosException | IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + @Test + public void ListenerConfigurationTest() { + getProperties(); + try { + //first get config + ConfigService configService = NacosFactory.createConfigService(properties); + String config = configService.getConfig(DATA_ID, GROUP, 5000); + System.out.println(config); + + //start listenner + configService.addListener(DATA_ID, GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + System.out.println(configMsg); + } + }); + } catch (NacosException e) { + e.printStackTrace(); + } + + //keep running,change nacos config,print new config + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} -- cgit v1.2.3