diff options
| author | qidaijie <[email protected]> | 2022-03-09 10:14:59 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-03-09 10:14:59 +0800 |
| commit | 00cfc1a1138b2cc8bf05c3797a062d053d52c679 (patch) | |
| tree | 25ab6900b00f477df3ed1da07a1f70013936d282 /src | |
| parent | e370a0d3dc679e7f9d6d28ce40064b46871b647d (diff) | |
优化Kafka认证方式,删除配置项通过连接端口判断
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/zdjizhi/common/VoipRelationConfig.java | 16 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java | 29 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java | 19 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java | 17 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java | 327 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java | 29 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/ip/IPUtils.java | 21 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/CertUtils.java | 48 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java) | 14 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java) | 8 |
10 files changed, 101 insertions, 427 deletions
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java index abd41c7..1a2f78e 100644 --- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -26,10 +26,10 @@ public class VoipRelationConfig { /** * connection kafka */ - public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers"); - public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic"); - public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String SOURCE_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id"); public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type"); @@ -37,6 +37,13 @@ public class VoipRelationConfig { public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin"); /** + * kafka source config + */ + public static final String SESSION_TIMEOUT_MS = VoipRelationConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = VoipRelationConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = VoipRelationConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + /** * kafka sink */ public static final String RETRIES = VoipRelationConfigurations.getStringProperty(1, "retries"); @@ -58,5 +65,6 @@ public class VoipRelationConfig { * voip */ public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs"); + public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java index 37b2f18..80a80ad 100644 --- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java +++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java @@ -4,17 +4,14 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.functions.*; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** @@ -29,9 +26,7 @@ public class VoIpRelationTopology { public static void main(String[] args) { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); -// environment.enableCheckpointing(5000); - - DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation = streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) @@ -39,26 +34,14 @@ public class VoIpRelationTopology { SingleOutputStreamOperator<String> window = sipCorrelation.windowAll( TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))) - .process(new SipCalibrationWindow()).name("SipCalibrationWindow"); - - window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); - -// KeyedStream<Tuple3<String, String, String>, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction()); -// -// WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = -// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))); -// -// SingleOutputStreamOperator<String> output = window.process(new SipCalibrationWindowFunction()) -// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM); + .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction"); -// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); + window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka"); try { - environment.execute("VOIP-RELATION"); + environment.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } - } - } diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java deleted file mode 100644 index 0b00b3c..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.utils.functions; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple3; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2112:13 - */ -public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> { - - @Override - public String getKey(Tuple3<String, String, String> value) throws Exception { - //以map拼接的key分组 - return value.f1; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java index 38e6bdd..061fa12 100644 --- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java @@ -41,7 +41,6 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup for (String input : inputs) { if (StringUtil.isNotBlank(input)) { JSONObject object = JSONObject.parseObject(input); - String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE); String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID); @@ -52,10 +51,14 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup * 针对SIP日志进行处理 */ if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { - if (commonStreamDir != JsonProConfig.DOUBLE) { - putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); + if (checkSipCompleteness(object)) { + if (commonStreamDir != JsonProConfig.DOUBLE) { + putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); + } else { + separateInnerIp(object, out); + } } else { - separateInnerIp(object, out); + out.collect(new Tuple3<>("", "violation", input)); } } @@ -364,4 +367,10 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup } } + private static boolean checkSipCompleteness(JSONObject object) { + return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && + object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && + object.containsKey(JsonProConfig.SIP_RESPONDER_IP) && + object.containsKey(JsonProConfig.SIP_RESPONDER_PORT); + } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java deleted file mode 100644 index 29b55a1..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java +++ /dev/null @@ -1,327 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.common.JsonProConfig; -import com.zdjizhi.common.VoipRelationConfig; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2113:55 - */ -public class SipCalibrationWindow extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> { - private static final Log logger = LogFactory.get(); - - /** - * 实体类反射map - */ - private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP); - - /** - * 反射成一个类 - */ - private static Object voipObject = JsonParseUtil.generateObject(classMap); - - /** - * 关联用HashMap - * key---四元组 - * value---List存放对应SIP或者RTP数据 - * 存放数据:rtp-single,rtp-two,sip-two - * 不存放的数据:sip-single与sip-in - */ - private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16); - - /** - * 二次关联用HashMap - * key---四元组 - * value---List存放对应SIP或者RTP数据 - * 存放数据:rtp-single,rtp-two,sip-two - * 不存放的数据:sip-single与sip-in - */ - private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16); - - @Override - public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { - for (Tuple3<String, String, String> tuple : input) { - //拼接的四元组 - String fourKey = tuple.f0; - //已关联的sip,rtp;未关联的sip,rtp;内网的sip - String type = tuple.f1; - String msg = tuple.f2; - switch (type) { - //单向流对准后的SIP - case "sip-two": - //单向流对准后的RTP - case "rtp-two": - //对不上的RTP - case "rtp-single": - putKeyAndMsg(msg, fourKey, combineSRHmList); - break; - //单向流的SIP - case "sip-single": - //内网的SIP - case "sip-in": - output.collect(msg); - break; - default: - logger.error("type is beyond expectation:" + type); - break; - - } - } - //初次关联 - tickCombineHmList(combineSRHmList, output); - //和缓存中的数据二次关联 - tickCombineHmList(secCombineSRHmList, output); - } - - - /** - * 定时关联,包括初次关联以及后续二次关联 - * - * @param combineHmList - */ - private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) { - if (combineHmList.size() > 0) { - long nowTime = System.currentTimeMillis() / 1000; - - HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList); - combineHmList.clear(); - - for (String fourStrKey : tempCombineSRhmList.keySet()) { - - LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey); - //包含SIP和RTP - int listSize = tempList.size(); - System.out.println(listSize); - if (listSize > 1) { - - List<String> sipBeanArr = new ArrayList<>(); - List<String> rtpBeanArr = new ArrayList<>(); - - for (String message : tempList) { - Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass()); - String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); - if (JsonProConfig.SIP_MARK.equals(schemaType)) { - sipBeanArr.add(message); - } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { - rtpBeanArr.add(message); - } - } - int rtpSize = rtpBeanArr.size(); - int sipSize = sipBeanArr.size(); - - if (rtpSize == 1 && sipSize >= 1) { - for (String sipMessage : sipBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); - //四元组,voip,关联后的数据 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); - } - } else if (sipSize == 1 && rtpSize >= 1) { - for (String rtpMessage : rtpBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); - //四元组,voip,关联后的数据 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); - } - } else { - logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); - sendErrorLogToKafka(sipBeanArr, output); - sendErrorLogToKafka(rtpBeanArr, output); - } - - } else { - String msg = tempList.get(0); - - Object voipLog = JSONObject.parseObject(msg, voipObject.getClass()); - long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME); - long intervalTime = nowTime - commonEndTime; - if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { - putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); - } else { - sendDirectlyOneElement(msg, voipLog, output); - } - - } - } - } - } - - /** - * 累加关联后的字节类参数值 - * - * @param voipLog - * @param rtpLog - */ - private void accumulateVoipMsg(Object voipLog, Object rtpLog) { - - Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS); - - Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM); - - Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM); - - Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM); - - Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM); - - Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM); - - Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM); - - Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); - - Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); - - Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); - - Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); - - //common_sessions - JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions); - - //common_c2s_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum); - //common_s2c_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum); - //common_c2s_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum); - //common_s2c_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum); - - //common_c2s_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum); - //common_s2c_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum); - - //common_c2s_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen); - //common_s2c_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen); - - //common_c2s_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum); - //common_s2c_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum); - - } - - /** - * 定时处理中List元素数仅为1的情况 - */ - private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) { - //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据 - String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE); - if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { - output.collect(msg); - } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR); - if (commonStreamDir != JsonProConfig.DOUBLE) { - output.collect(msg); - } else { - output.collect(msg); - } - } - } - - - /** - * 存放key并添加对应List - */ - private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) { - if (combineSRHmList.containsKey(fourStrKey)) { - LinkedList<String> tmpList = combineSRHmList.get(fourStrKey); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } else { - LinkedList<String> tmpList = new LinkedList<>(); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } - } - - /** - * 判断RTP主叫方向-测试 - * - * @param rtpLog RTP原始日志 - * @param voipLog 融合后VOIP日志 - * @return 方向 0:未知 1:c2s 2:s2c - */ - private static int judgeDirection(Object rtpLog, Object voipLog) { - - String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); - String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP); - - if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { - return 1; - } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { - return 2; - } - - return 0; - } - - /** - * 发送不符合逻辑的日志到kafka - */ - private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) { - if (logList.size() > 0) { - for (String log : logList) { - output.collect(log); - } - } - } - - /** - * - */ - private static String mergeJson(Object voipLog, Object rtpLog) { - - int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); - int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); - String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); - - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); - - return JSONObject.toJSONString(voipLog); - } - -} diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java index 2215d5b..3718775 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java @@ -8,11 +8,14 @@ import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; /** * @author qidaijie @@ -20,7 +23,7 @@ import java.util.*; * @Description: * @date 2021/7/2113:55 */ -public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> { +public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> { private static final Log logger = LogFactory.get(); /** @@ -52,8 +55,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16); @Override - @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { + public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { + logger.error("windowall窗口运行"); for (Tuple3<String, String, String> tuple : input) { //拼接的四元组 String fourKey = tuple.f0; @@ -63,16 +66,17 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S switch (type) { //单向流对准后的SIP case "sip-two": - //单向流对准后的RTP + //单向流对准后的RTP case "rtp-two": - //对不上的RTP + //对不上的RTP case "rtp-single": putKeyAndMsg(msg, fourKey, combineSRHmList); break; //单向流的SIP case "sip-single": - //内网的SIP + //内网的SIP case "sip-in": + case "violation": output.collect(msg); break; default: @@ -87,6 +91,7 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S tickCombineHmList(secCombineSRHmList, output); } + /** * 定时关联,包括初次关联以及后续二次关联 * @@ -104,7 +109,6 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey); //包含SIP和RTP int listSize = tempList.size(); - System.out.println(listSize); if (listSize > 1) { List<String> sipBeanArr = new ArrayList<>(); @@ -156,12 +160,12 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S Object voipLog = JSONObject.parseObject(msg, voipObject.getClass()); long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME); long intervalTime = nowTime - commonEndTime; + logger.error("VoIP日志时间差值记录:" + intervalTime); if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); } else { sendDirectlyOneElement(msg, voipLog, output); } - } } } @@ -309,8 +313,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S */ private static String mergeJson(Object voipLog, Object rtpLog) { - long rtpPayloadTypeC2s = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); - long rtpPayloadTypeS2c = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); + int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); + int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); @@ -320,5 +324,4 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S return JSONObject.toJSONString(voipLog); } - } diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java index 1757194..887d9ba 100644 --- a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java +++ b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java @@ -2,6 +2,7 @@ package com.zdjizhi.utils.ip; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.IPUtil; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.exception.VoipRelationException; @@ -77,16 +78,20 @@ public class IPUtils { * @return */ public static boolean isInnerIp(String ipAddress) { - if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) { - //为空或者为特定IP时也算作内网IP - return true; - } + if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) { + if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) { + //为空或者为特定IP时也算作内网IP + return true; + } - boolean isInnerIp = false; - long ipNum = ipToLong(ipAddress); + long ipNum = ipToLong(ipAddress); - isInnerIp = isInner(ipNum, A_BEGIN, A_END) || isInner(ipNum, B_BEGIN, B_END) || isInner(ipNum, C_BEGIN, C_END); - return isInnerIp; + return isInner(ipNum, A_BEGIN, A_END) || + isInner(ipNum, B_BEGIN, B_END) || + isInner(ipNum, C_BEGIN, C_END); + } else { + return false; + } } private static boolean isInner(long userIp, long begin, long end) { diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index 83a39ad..b5273fa 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -12,24 +12,36 @@ import java.util.Properties; * @date 2021/9/610:37 */ class CertUtils { - static void chooseCert(String type, Properties properties) { - switch (type) { - case "SSL": - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN); - properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN); - properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN); - break; - case "SASL": - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";"); - break; - default: + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN); + properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 6dde52a..ada1477 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -13,23 +13,23 @@ import java.util.Properties; * @Description: * @date 2021/6/813:54 */ -public class Consumer { +public class KafkaConsumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", VoipRelationConfig.INPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", VoipRelationConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", VoipRelationConfig.GROUP_ID); - properties.put("session.timeout.ms", "60000"); - properties.put("max.poll.records", 3000); - properties.put("max.partition.fetch.bytes", 31457280); + properties.put("session.timeout.ms", VoipRelationConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", VoipRelationConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", VoipRelationConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(VoipRelationConfig.KAFKA_SOURCE_PROTOCOL,properties); + CertUtils.chooseCert(VoipRelationConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC, + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index bf63098..65e253a 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -13,11 +13,11 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", VoipRelationConfig.SINK_KAFKA_SERVERS); properties.put("acks", VoipRelationConfig.PRODUCER_ACK); properties.put("retries", VoipRelationConfig.RETRIES); properties.put("linger.ms", VoipRelationConfig.LINGER_MS); @@ -27,7 +27,7 @@ public class Producer { properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE); properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(VoipRelationConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -35,7 +35,7 @@ public class Producer { public static FlinkKafkaProducer<String> getKafkaProducer() { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - VoipRelationConfig.OUTPUT_KAFKA_TOPIC, + VoipRelationConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty()); |
