summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-03-09 10:14:59 +0800
committerqidaijie <[email protected]>2022-03-09 10:14:59 +0800
commit00cfc1a1138b2cc8bf05c3797a062d053d52c679 (patch)
tree25ab6900b00f477df3ed1da07a1f70013936d282 /src
parente370a0d3dc679e7f9d6d28ce40064b46871b647d (diff)
优化Kafka认证方式,删除配置项通过连接端口判断
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/common/VoipRelationConfig.java16
-rw-r--r--src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java29
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java19
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java327
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java29
-rw-r--r--src/main/java/com/zdjizhi/utils/ip/IPUtils.java21
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java)14
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java)8
10 files changed, 101 insertions, 427 deletions
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index abd41c7..1a2f78e 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -26,10 +26,10 @@ public class VoipRelationConfig {
/**
* connection kafka
*/
- public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers");
- public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers");
- public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic");
- public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String SOURCE_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id");
public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type");
@@ -37,6 +37,13 @@ public class VoipRelationConfig {
public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin");
/**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = VoipRelationConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = VoipRelationConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = VoipRelationConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+ /**
* kafka sink
*/
public static final String RETRIES = VoipRelationConfigurations.getStringProperty(1, "retries");
@@ -58,5 +65,6 @@ public class VoipRelationConfig {
* voip
*/
public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
+ public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
index 37b2f18..80a80ad 100644
--- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
+++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
@@ -4,17 +4,14 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.functions.*;
-import com.zdjizhi.utils.kafka.Consumer;
-import com.zdjizhi.utils.kafka.Producer;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
@@ -29,9 +26,7 @@ public class VoIpRelationTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
-// environment.enableCheckpointing(5000);
-
- DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
+ DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation =
streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
@@ -39,26 +34,14 @@ public class VoIpRelationTopology {
SingleOutputStreamOperator<String> window = sipCorrelation.windowAll(
TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
- .process(new SipCalibrationWindow()).name("SipCalibrationWindow");
-
- window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
-
-// KeyedStream<Tuple3<String, String, String>, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction());
-//
-// WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window =
-// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)));
-//
-// SingleOutputStreamOperator<String> output = window.process(new SipCalibrationWindowFunction())
-// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM);
+ .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction");
-// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
+ window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
try {
- environment.execute("VOIP-RELATION");
+ environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
}
-
}
-
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
deleted file mode 100644
index 0b00b3c..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2112:13
- */
-public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> {
-
- @Override
- public String getKey(Tuple3<String, String, String> value) throws Exception {
- //以map拼接的key分组
- return value.f1;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
index 38e6bdd..061fa12 100644
--- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
@@ -41,7 +41,6 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
for (String input : inputs) {
if (StringUtil.isNotBlank(input)) {
JSONObject object = JSONObject.parseObject(input);
-
String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE);
String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID);
@@ -52,10 +51,14 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
* 针对SIP日志进行处理
*/
if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
+ if (checkSipCompleteness(object)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
+ } else {
+ separateInnerIp(object, out);
+ }
} else {
- separateInnerIp(object, out);
+ out.collect(new Tuple3<>("", "violation", input));
}
}
@@ -364,4 +367,10 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
}
}
+ private static boolean checkSipCompleteness(JSONObject object) {
+ return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
+ object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
+ object.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
+ object.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java
deleted file mode 100644
index 29b55a1..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java
+++ /dev/null
@@ -1,327 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.JsonProConfig;
-import com.zdjizhi.common.VoipRelationConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2113:55
- */
-public class SipCalibrationWindow extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> {
- private static final Log logger = LogFactory.get();
-
- /**
- * 实体类反射map
- */
- private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
-
- /**
- * 反射成一个类
- */
- private static Object voipObject = JsonParseUtil.generateObject(classMap);
-
- /**
- * 关联用HashMap
- * key---四元组
- * value---List存放对应SIP或者RTP数据
- * 存放数据:rtp-single,rtp-two,sip-two
- * 不存放的数据:sip-single与sip-in
- */
- private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16);
-
- /**
- * 二次关联用HashMap
- * key---四元组
- * value---List存放对应SIP或者RTP数据
- * 存放数据:rtp-single,rtp-two,sip-two
- * 不存放的数据:sip-single与sip-in
- */
- private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16);
-
- @Override
- public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception {
- for (Tuple3<String, String, String> tuple : input) {
- //拼接的四元组
- String fourKey = tuple.f0;
- //已关联的sip,rtp;未关联的sip,rtp;内网的sip
- String type = tuple.f1;
- String msg = tuple.f2;
- switch (type) {
- //单向流对准后的SIP
- case "sip-two":
- //单向流对准后的RTP
- case "rtp-two":
- //对不上的RTP
- case "rtp-single":
- putKeyAndMsg(msg, fourKey, combineSRHmList);
- break;
- //单向流的SIP
- case "sip-single":
- //内网的SIP
- case "sip-in":
- output.collect(msg);
- break;
- default:
- logger.error("type is beyond expectation:" + type);
- break;
-
- }
- }
- //初次关联
- tickCombineHmList(combineSRHmList, output);
- //和缓存中的数据二次关联
- tickCombineHmList(secCombineSRHmList, output);
- }
-
-
- /**
- * 定时关联,包括初次关联以及后续二次关联
- *
- * @param combineHmList
- */
- private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) {
- if (combineHmList.size() > 0) {
- long nowTime = System.currentTimeMillis() / 1000;
-
- HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList);
- combineHmList.clear();
-
- for (String fourStrKey : tempCombineSRhmList.keySet()) {
-
- LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey);
- //包含SIP和RTP
- int listSize = tempList.size();
- System.out.println(listSize);
- if (listSize > 1) {
-
- List<String> sipBeanArr = new ArrayList<>();
- List<String> rtpBeanArr = new ArrayList<>();
-
- for (String message : tempList) {
- Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
- String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
- if (JsonProConfig.SIP_MARK.equals(schemaType)) {
- sipBeanArr.add(message);
- } else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
- rtpBeanArr.add(message);
- }
- }
- int rtpSize = rtpBeanArr.size();
- int sipSize = sipBeanArr.size();
-
- if (rtpSize == 1 && sipSize >= 1) {
- for (String sipMessage : sipBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
- //四元组,voip,关联后的数据
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
- }
- } else if (sipSize == 1 && rtpSize >= 1) {
- for (String rtpMessage : rtpBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
- //四元组,voip,关联后的数据
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
- }
- } else {
- logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
- sendErrorLogToKafka(sipBeanArr, output);
- sendErrorLogToKafka(rtpBeanArr, output);
- }
-
- } else {
- String msg = tempList.get(0);
-
- Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
- long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
- long intervalTime = nowTime - commonEndTime;
- if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
- putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
- } else {
- sendDirectlyOneElement(msg, voipLog, output);
- }
-
- }
- }
- }
- }
-
- /**
- * 累加关联后的字节类参数值
- *
- * @param voipLog
- * @param rtpLog
- */
- private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
-
- Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
-
- Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
-
- Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
-
- Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
-
- Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
-
- Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
-
- Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
-
- Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
-
- Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
-
- Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
-
- Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
-
- //common_sessions
- JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
-
- //common_c2s_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
- //common_s2c_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
- //common_c2s_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
- //common_s2c_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
-
- //common_c2s_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
- //common_s2c_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
-
- //common_c2s_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
- //common_s2c_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
-
- //common_c2s_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
- //common_s2c_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
-
- }
-
- /**
- * 定时处理中List元素数仅为1的情况
- */
- private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) {
- //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据
- String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
- output.collect(msg);
- } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- output.collect(msg);
- } else {
- output.collect(msg);
- }
- }
- }
-
-
- /**
- * 存放key并添加对应List
- */
- private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) {
- if (combineSRHmList.containsKey(fourStrKey)) {
- LinkedList<String> tmpList = combineSRHmList.get(fourStrKey);
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- } else {
- LinkedList<String> tmpList = new LinkedList<>();
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- }
- }
-
- /**
- * 判断RTP主叫方向-测试
- *
- * @param rtpLog RTP原始日志
- * @param voipLog 融合后VOIP日志
- * @return 方向 0:未知 1:c2s 2:s2c
- */
- private static int judgeDirection(Object rtpLog, Object voipLog) {
-
- String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
- String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
-
- if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
- return 1;
- } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
- return 2;
- }
-
- return 0;
- }
-
- /**
- * 发送不符合逻辑的日志到kafka
- */
- private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) {
- if (logList.size() > 0) {
- for (String log : logList) {
- output.collect(log);
- }
- }
- }
-
- /**
- *
- */
- private static String mergeJson(Object voipLog, Object rtpLog) {
-
- int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
- int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
- String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
-
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
-
- return JSONObject.toJSONString(voipLog);
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
index 2215d5b..3718775 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
@@ -8,11 +8,14 @@ import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
/**
* @author qidaijie
@@ -20,7 +23,7 @@ import java.util.*;
* @Description:
* @date 2021/7/2113:55
*/
-public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> {
+public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -52,8 +55,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16);
@Override
- @SuppressWarnings("unchecked")
- public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception {
+ public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception {
+ logger.error("windowall窗口运行");
for (Tuple3<String, String, String> tuple : input) {
//拼接的四元组
String fourKey = tuple.f0;
@@ -63,16 +66,17 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
switch (type) {
//单向流对准后的SIP
case "sip-two":
- //单向流对准后的RTP
+ //单向流对准后的RTP
case "rtp-two":
- //对不上的RTP
+ //对不上的RTP
case "rtp-single":
putKeyAndMsg(msg, fourKey, combineSRHmList);
break;
//单向流的SIP
case "sip-single":
- //内网的SIP
+ //内网的SIP
case "sip-in":
+ case "violation":
output.collect(msg);
break;
default:
@@ -87,6 +91,7 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
tickCombineHmList(secCombineSRHmList, output);
}
+
/**
* 定时关联,包括初次关联以及后续二次关联
*
@@ -104,7 +109,6 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey);
//包含SIP和RTP
int listSize = tempList.size();
- System.out.println(listSize);
if (listSize > 1) {
List<String> sipBeanArr = new ArrayList<>();
@@ -156,12 +160,12 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
+ logger.error("VoIP日志时间差值记录:" + intervalTime);
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
} else {
sendDirectlyOneElement(msg, voipLog, output);
}
-
}
}
}
@@ -309,8 +313,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
*/
private static String mergeJson(Object voipLog, Object rtpLog) {
- long rtpPayloadTypeC2s = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
- long rtpPayloadTypeS2c = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
+ int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
+ int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
@@ -320,5 +324,4 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<S
return JSONObject.toJSONString(voipLog);
}
-
}
diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
index 1757194..887d9ba 100644
--- a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
+++ b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
@@ -2,6 +2,7 @@ package com.zdjizhi.utils.ip;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.VoipRelationException;
@@ -77,16 +78,20 @@ public class IPUtils {
* @return
*/
public static boolean isInnerIp(String ipAddress) {
- if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) {
- //为空或者为特定IP时也算作内网IP
- return true;
- }
+ if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
+ if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) {
+ //为空或者为特定IP时也算作内网IP
+ return true;
+ }
- boolean isInnerIp = false;
- long ipNum = ipToLong(ipAddress);
+ long ipNum = ipToLong(ipAddress);
- isInnerIp = isInner(ipNum, A_BEGIN, A_END) || isInner(ipNum, B_BEGIN, B_END) || isInner(ipNum, C_BEGIN, C_END);
- return isInnerIp;
+ return isInner(ipNum, A_BEGIN, A_END) ||
+ isInner(ipNum, B_BEGIN, B_END) ||
+ isInner(ipNum, C_BEGIN, C_END);
+ } else {
+ return false;
+ }
}
private static boolean isInner(long userIp, long begin, long end) {
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
index 83a39ad..b5273fa 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -12,24 +12,36 @@ import java.util.Properties;
* @date 2021/9/610:37
*/
class CertUtils {
- static void chooseCert(String type, Properties properties) {
- switch (type) {
- case "SSL":
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN);
- properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN);
- properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN);
- break;
- case "SASL":
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";");
- break;
- default:
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 6dde52a..ada1477 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -13,23 +13,23 @@ import java.util.Properties;
* @Description:
* @date 2021/6/813:54
*/
-public class Consumer {
+public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", VoipRelationConfig.INPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", VoipRelationConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", VoipRelationConfig.GROUP_ID);
- properties.put("session.timeout.ms", "60000");
- properties.put("max.poll.records", 3000);
- properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("session.timeout.ms", VoipRelationConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", VoipRelationConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", VoipRelationConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- CertUtils.chooseCert(VoipRelationConfig.KAFKA_SOURCE_PROTOCOL,properties);
+ CertUtils.chooseCert(VoipRelationConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC,
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index bf63098..65e253a 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -13,11 +13,11 @@ import java.util.Properties;
* @Description:
* @date 2021/6/814:04
*/
-public class Producer {
+public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", VoipRelationConfig.SINK_KAFKA_SERVERS);
properties.put("acks", VoipRelationConfig.PRODUCER_ACK);
properties.put("retries", VoipRelationConfig.RETRIES);
properties.put("linger.ms", VoipRelationConfig.LINGER_MS);
@@ -27,7 +27,7 @@ public class Producer {
properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
- CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties);
+ CertUtils.chooseCert(VoipRelationConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -35,7 +35,7 @@ public class Producer {
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- VoipRelationConfig.OUTPUT_KAFKA_TOPIC,
+ VoipRelationConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());