diff options
Diffstat (limited to 'src')
18 files changed, 2346 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/JsonProConfig.java b/src/main/java/com/zdjizhi/common/JsonProConfig.java new file mode 100644 index 0000000..6e9d437 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/JsonProConfig.java @@ -0,0 +1,135 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.VoipRelationConfigurations; + +/** + * @author Administrator + */ +public class JsonProConfig { + /** + * 双向流标识 + */ + public static final int DOUBLE = 3; + + /** + * + */ + public static final String SIP_MARK = "SIP"; + + /** + * + */ + public static final String RTP_MARK = "RTP"; + /** + * + */ + public static final String SCHEMA_TYPE = "common_schema_type"; + /** + * + */ + public static final String END_TIME = "common_end_time"; + /** + * + */ + public static final String STREAM_DIR = "common_stream_dir"; + /** + * + */ + public static final String SESSIONS = "common_sessions"; + /** + * + */ + public static final String C2S_PKT_NUM = "common_c2s_pkt_num"; + /** + * + */ + public static final String S2C_PKT_NUM = "common_s2c_pkt_num"; + /** + * + */ + public static final String C2S_BYTE_NUM = "common_c2s_byte_num"; + /** + * + */ + public static final String S2C_BYTE_NUM = "common_s2c_byte_num"; + /** + * + */ + public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num"; + /** + * + */ + public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num"; + /** + * + */ + public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen"; + /** + * + */ + public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen"; + /** + * + */ + public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num"; + /** + * + */ + public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num"; + /** + * + */ + public static final String CLIENT_IP = "common_client_ip"; + /** + * + */ + public static final String CLIENT_PORT = "common_client_port"; + /** + * + */ + public static final String SERVER_IP = "common_server_ip"; + /** + * + */ + public static final String SERVER_PORT = "common_server_port"; + + /** + * + */ + public static final String SIP_CALL_ID = "sip_call_id"; + /** + * + */ + public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip"; + /** + * + */ + public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port"; + /** + * + */ + public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip"; + /** + * + */ + public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port"; + /** + * + */ + public static final String RTP_PCAP_PATH = "rtp_pcap_path"; + /** + * + */ + public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir"; + /** + * + */ + public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s"; + /** + * + */ + public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c"; + + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java new file mode 100644 index 0000000..abd41c7 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -0,0 +1,62 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.VoipRelationConfigurations; + +/** + * @author Administrator + */ +public class VoipRelationConfig { + + /** + * 四元组的拼接连接字符 + */ + public static final String CORRELATION_STR = "_"; + + public static final String VISIBILITY = "disabled"; + public static final String FORMAT_SPLITTER = ","; + + /** + * System + */ + public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time"); + public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time"); + public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism"); + + /** + * connection kafka + */ + public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id"); + public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String KAFKA_USER = VoipRelationConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin"); + + /** + * kafka sink + */ + public static final String RETRIES = VoipRelationConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = VoipRelationConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = VoipRelationConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = VoipRelationConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = VoipRelationConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size"); + public static final String KAFKA_SOURCE_PROTOCOL = VoipRelationConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String KAFKA_SINK_PROTOCOL = VoipRelationConfigurations.getStringProperty(1, "kafka.sink.protocol"); + public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library"); + + /** + * http + */ + public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http"); + + /** + * voip + */ + public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs"); + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java new file mode 100644 index 0000000..37b2f18 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java @@ -0,0 +1,64 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.functions.*; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.WindowedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class VoIpRelationTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + +// environment.enableCheckpointing(5000); + + DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + + SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation = + streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) + .process(new OneSidedWindowFunction()).name("OneSidedWindow"); + + SingleOutputStreamOperator<String> window = sipCorrelation.windowAll( + TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))) + .process(new SipCalibrationWindow()).name("SipCalibrationWindow"); + + window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); + +// KeyedStream<Tuple3<String, String, String>, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction()); +// +// WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = +// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))); +// +// SingleOutputStreamOperator<String> output = window.process(new SipCalibrationWindowFunction()) +// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM); + +// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); + + try { + environment.execute("VOIP-RELATION"); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java b/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java new file mode 100644 index 0000000..b2ef9e9 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class VoipRelationException extends RuntimeException { + + public VoipRelationException() { + } + + public VoipRelationException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java new file mode 100644 index 0000000..0b00b3c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java @@ -0,0 +1,19 @@ +package com.zdjizhi.utils.functions; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2112:13 + */ +public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> { + + @Override + public String getKey(Tuple3<String, String, String> value) throws Exception { + //以map拼接的key分组 + return value.f1; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java new file mode 100644 index 0000000..38e6bdd --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java @@ -0,0 +1,367 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.ip.IPUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/8/1818:04 + */ +public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tuple3<String, String, String>, TimeWindow> { + private static final Log logger = LogFactory.get(); + /** + * key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流) + */ + private static HashMap<String, String> sipOriHmList = new HashMap<>(16); + + /** + * key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流) + */ + private static HashMap<String, String> rtpOriHmList = new HashMap<>(16); + + @Override + @SuppressWarnings("unchecked") + public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) throws Exception { + + for (String input : inputs) { + if (StringUtil.isNotBlank(input)) { + JSONObject object = JSONObject.parseObject(input); + + String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE); + String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID); + + //1:c2s,2:s2c;3;double + int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR); + + /* + * 针对SIP日志进行处理 + */ + if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { + if (commonStreamDir != JsonProConfig.DOUBLE) { + putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); + } else { + separateInnerIp(object, out); + } + } + + /* + * 针对RTP日志进行处理 + */ + if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + + String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP), + object.getInteger(JsonProConfig.CLIENT_PORT), + object.getString(JsonProConfig.SERVER_IP), + object.getInteger(JsonProConfig.SERVER_PORT)); + + if (commonStreamDir != JsonProConfig.DOUBLE) { + //对rtp单向流进行关联 + putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out); + + } else { + //RTP双向流,按四元组下发 + out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input)); + } + } + } + } + /* + * 定时发送SIP或RTP未关联上数据 + */ + if (sipOriHmList.size() > 0) { + HashMap<String, String> tmpSipOriHmList = new HashMap<String, String>(sipOriHmList); + sipOriHmList.clear(); + for (String sipKey : tmpSipOriHmList.keySet()) { + String sipSingleMsg = tmpSipOriHmList.get(sipKey); + //sipKey为sip_call_id,未关联成功的sip是不能使用的 + out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg)); + } + } + + if (rtpOriHmList.size() > 0) { + HashMap<String, String> tmpRtpOriHmList = new HashMap<String, String>(rtpOriHmList); + rtpOriHmList.clear(); + for (String rtpKey : tmpRtpOriHmList.keySet()) { + String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); + //未关联成功的rtp还可以继续关联,因为有四元组 + out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg)); + } + } + } + + /** + * 存放key并关联拼接对应Key + */ + private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) { + + //和上次存入的数据关联 + if (hashMapStr.containsKey(hmStrKey)) { + + JSONObject jsonCombinObject = new JSONObject(); + String[] strArr = new String[2]; + String firstMsg = hashMapStr.remove(hmStrKey); + + JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg); + JSONObject secendSipOrRtpLog = JSONObject.parseObject(message); + + //1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准 + if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) { + strArr[0] = message; + strArr[1] = firstMsg; + } else { + strArr[0] = firstMsg; + strArr[1] = message; + } + jsonCombinObject.putAll(JSONObject.parseObject(strArr[0])); + jsonCombinObject.putAll(JSONObject.parseObject(strArr[1])); + String sipTwoMsg = jsonCombinObject.toString(); + + + JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg); + accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin); + sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); + if (JsonProConfig.SIP_MARK.equals(protocolType)) { + //手动关联SIP后区分内外网IP再下发 + separateInnerIp(sipOrRtpCombin, out); + } else if (JsonProConfig.RTP_MARK.equals(protocolType)) { + //手动关联RTP后按四元组下发 + sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog)); + out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin))); + } + } else { + hashMapStr.put(hmStrKey, message); + } + } + + /** + * 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP + */ + private static void separateInnerIp(JSONObject object, Collector<Tuple3<String, String, String>> out) { + + String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP); + int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT); + int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT); + + if (IPUtils.isInnerIp(sipOriginatorIp) + || IPUtils.isInnerIp(sipResponderIp)) { + /** + * 按from-ip_from-port_to-ip_to-port + */ + String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR + + sipOriginatorPort + VoipRelationConfig.CORRELATION_STR + + sipResponderIp + VoipRelationConfig.CORRELATION_STR + + sipResponderPort; + //包含内网IP的SIP关联后数据 + out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object))); + } else { + String sipIpPort4Key = getFourKey(sipOriginatorIp, + sipOriginatorPort, + sipResponderIp, + sipResponderPort); + + //按照四元组的Key发送到下一个bolt + out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object))); + } + } + + /** + * 获得四元组key + * + * @param commonClientIp 客户端IP + * @param commonClientPort 客户端端口 + * @param commonServerIp 服务端IP + * @param commonServerPort 服务端端口 + * @return 比较拼接后的四元组 + */ + private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) { + String ipPort4Key = ""; + int comparePortResult = compareNum(commonClientPort, commonServerPort); + + /* + * 按端口比较 + */ + switch (comparePortResult) { + //common_client_port > commonServerPort + case 1: + ipPort4Key = commonServerIp + VoipRelationConfig.CORRELATION_STR + + commonServerPort + VoipRelationConfig.CORRELATION_STR + + commonClientIp + VoipRelationConfig.CORRELATION_STR + + commonClientPort; + break; + //common_client_port < commonServerPort + case -1: + ipPort4Key = commonClientIp + VoipRelationConfig.CORRELATION_STR + + commonClientPort + VoipRelationConfig.CORRELATION_STR + + commonServerIp + VoipRelationConfig.CORRELATION_STR + + commonServerPort; + break; + //common_client_port = commonServerPort,开始按照IP比较 + case 0: + ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort); + break; + //port端口值异常 + case -2: + default: + logger.error("compareNum is error," + + "common_client_port:" + commonClientPort + "," + + "commonServerPort:" + commonServerPort); + break; + } + + return ipPort4Key; + } + + /** + * 比较IP,并作key的拼接 + * + * @param commonClientIp + * @param commonServerIp + * @param commonClientPort + * @param commonServerPort + * @return + */ + private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) { + long clientIpNum = IPUtils.ipToLong(commonClientIp); + long serverIpNum = IPUtils.ipToLong(commonServerIp); + int compareIpResult = compareNum(clientIpNum, serverIpNum); + switch (compareIpResult) { + //clientIpNum > serverIpNum + case 1: + return commonServerIp + VoipRelationConfig.CORRELATION_STR + + commonServerPort + VoipRelationConfig.CORRELATION_STR + + commonClientIp + VoipRelationConfig.CORRELATION_STR + + commonClientPort; + //clientIpNum < serverIpNum + case -1: + return commonClientIp + VoipRelationConfig.CORRELATION_STR + + commonClientPort + VoipRelationConfig.CORRELATION_STR + + commonServerIp + VoipRelationConfig.CORRELATION_STR + + commonServerPort; + //clientIpNum = serverIpNum,说明两个IP值一样,即IP异常 + case 0: + //IP值异常 + case -2: + default: + logger.error("compareNum is error," + + "common_client_ip:" + commonClientIp + "," + + "commonServerIp:" + commonServerIp + "," + + "commonClientPort:" + commonClientPort + "," + + "commonServerPort:" + commonServerPort); + return ""; + } + } + + /** + * 计算相关字节信息,主要是累加 + * + * @param firstSipOrRtpLog + * @param secendSipOrRtpLog + * @param sipOrRtpCombin + */ + private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) { + //common_sessions + sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS))); + + //common_c2s_pkt_num + sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM))); + + //common_s2c_pkt_num + sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM))); + + //common_c2s_byte_num + sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM))); + + //common_s2c_byte_num + sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM))); + + //common_c2s_ipfrag_num + sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM))); + + //common_s2c_ipfrag_num + sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM))); + + //common_c2s_tcp_lostlen + sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN))); + + //common_s2c_tcp_lostlen + sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN))); + + //common_c2s_tcp_unorder_num + sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM))); + + //common_s2c_tcp_unorder_num + sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM))); + } + + /** + * int类型 + * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 + * + * @param numOne + * @param numTwo + */ + private static int compareNum(int numOne, int numTwo) { + if (numOne > 0 && numTwo > 0) { + return Integer.compare(numOne, numTwo); + } else { + return -2; + } + } + + /** + * long类型 + * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 + * + * @param numOne + * @param numTwo + */ + private static int compareNum(long numOne, long numTwo) { + if (numOne > 0 && numTwo > 0) { + return Long.compare(numOne, numTwo); + } else { + return -2; + } + } + + /** + * 判断RTP单向流对准后是否存在多个文件,若相同则返回任意一个,若不同则拼接返回 + * + * @param firstSipOrRtpLog 第一个单向流日志 + * @param secendSipOrRtpLog 第二个单向流日志 + * @return 文件路径 + */ + private static String setRtpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) { + + String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); + String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); + + if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) { + if (firstRtpPcapPath.equals(secendRtpPcapPath)) { + return firstRtpPcapPath; + } else { + return firstRtpPcapPath + ";" + secendRtpPcapPath; + } + } else if (StringUtil.isNotBlank(firstRtpPcapPath)) { + return firstRtpPcapPath; + } else { + return secendRtpPcapPath; + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java new file mode 100644 index 0000000..29b55a1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java @@ -0,0 +1,327 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2113:55 + */ +public class SipCalibrationWindow extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> { + private static final Log logger = LogFactory.get(); + + /** + * 实体类反射map + */ + private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP); + + /** + * 反射成一个类 + */ + private static Object voipObject = JsonParseUtil.generateObject(classMap); + + /** + * 关联用HashMap + * key---四元组 + * value---List存放对应SIP或者RTP数据 + * 存放数据:rtp-single,rtp-two,sip-two + * 不存放的数据:sip-single与sip-in + */ + private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16); + + /** + * 二次关联用HashMap + * key---四元组 + * value---List存放对应SIP或者RTP数据 + * 存放数据:rtp-single,rtp-two,sip-two + * 不存放的数据:sip-single与sip-in + */ + private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16); + + @Override + public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { + for (Tuple3<String, String, String> tuple : input) { + //拼接的四元组 + String fourKey = tuple.f0; + //已关联的sip,rtp;未关联的sip,rtp;内网的sip + String type = tuple.f1; + String msg = tuple.f2; + switch (type) { + //单向流对准后的SIP + case "sip-two": + //单向流对准后的RTP + case "rtp-two": + //对不上的RTP + case "rtp-single": + putKeyAndMsg(msg, fourKey, combineSRHmList); + break; + //单向流的SIP + case "sip-single": + //内网的SIP + case "sip-in": + output.collect(msg); + break; + default: + logger.error("type is beyond expectation:" + type); + break; + + } + } + //初次关联 + tickCombineHmList(combineSRHmList, output); + //和缓存中的数据二次关联 + tickCombineHmList(secCombineSRHmList, output); + } + + + /** + * 定时关联,包括初次关联以及后续二次关联 + * + * @param combineHmList + */ + private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) { + if (combineHmList.size() > 0) { + long nowTime = System.currentTimeMillis() / 1000; + + HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList); + combineHmList.clear(); + + for (String fourStrKey : tempCombineSRhmList.keySet()) { + + LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey); + //包含SIP和RTP + int listSize = tempList.size(); + System.out.println(listSize); + if (listSize > 1) { + + List<String> sipBeanArr = new ArrayList<>(); + List<String> rtpBeanArr = new ArrayList<>(); + + for (String message : tempList) { + Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass()); + String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); + if (JsonProConfig.SIP_MARK.equals(schemaType)) { + sipBeanArr.add(message); + } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { + rtpBeanArr.add(message); + } + } + int rtpSize = rtpBeanArr.size(); + int sipSize = sipBeanArr.size(); + + if (rtpSize == 1 && sipSize >= 1) { + for (String sipMessage : sipBeanArr) { + Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass()); + Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass()); + accumulateVoipMsg(voipLog, rtpLog); + JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + //四元组,voip,关联后的数据 + output.collect(mergeJson(voipLog, rtpLog)); +// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + } + } else if (sipSize == 1 && rtpSize >= 1) { + for (String rtpMessage : rtpBeanArr) { + Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass()); + Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass()); + accumulateVoipMsg(voipLog, rtpLog); + JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + //四元组,voip,关联后的数据 + output.collect(mergeJson(voipLog, rtpLog)); +// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + } + } else { + logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); + sendErrorLogToKafka(sipBeanArr, output); + sendErrorLogToKafka(rtpBeanArr, output); + } + + } else { + String msg = tempList.get(0); + + Object voipLog = JSONObject.parseObject(msg, voipObject.getClass()); + long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME); + long intervalTime = nowTime - commonEndTime; + if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { + putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); + } else { + sendDirectlyOneElement(msg, voipLog, output); + } + + } + } + } + } + + /** + * 累加关联后的字节类参数值 + * + * @param voipLog + * @param rtpLog + */ + private void accumulateVoipMsg(Object voipLog, Object rtpLog) { + + Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS); + + Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM); + + Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM); + + Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM); + + Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM); + + Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM); + + Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM); + + Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); + + Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); + + Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); + + Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); + + //common_sessions + JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions); + + //common_c2s_pkt_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum); + //common_s2c_pkt_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum); + //common_c2s_byte_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum); + //common_s2c_byte_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum); + + //common_c2s_ipfrag_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum); + //common_s2c_ipfrag_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum); + + //common_c2s_tcp_lostlen + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen); + //common_s2c_tcp_lostlen + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen); + + //common_c2s_tcp_unorder_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum); + //common_s2c_tcp_unorder_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum); + + } + + /** + * 定时处理中List元素数仅为1的情况 + */ + private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) { + //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据 + String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE); + if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { + output.collect(msg); + } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR); + if (commonStreamDir != JsonProConfig.DOUBLE) { + output.collect(msg); + } else { + output.collect(msg); + } + } + } + + + /** + * 存放key并添加对应List + */ + private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) { + if (combineSRHmList.containsKey(fourStrKey)) { + LinkedList<String> tmpList = combineSRHmList.get(fourStrKey); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } else { + LinkedList<String> tmpList = new LinkedList<>(); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } + } + + /** + * 判断RTP主叫方向-测试 + * + * @param rtpLog RTP原始日志 + * @param voipLog 融合后VOIP日志 + * @return 方向 0:未知 1:c2s 2:s2c + */ + private static int judgeDirection(Object rtpLog, Object voipLog) { + + String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); + String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP); + + if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { + return 1; + } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { + return 2; + } + + return 0; + } + + /** + * 发送不符合逻辑的日志到kafka + */ + private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) { + if (logList.size() > 0) { + for (String log : logList) { + output.collect(log); + } + } + } + + /** + * + */ + private static String mergeJson(Object voipLog, Object rtpLog) { + + int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); + int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); + String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); + + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); + + return JSONObject.toJSONString(voipLog); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java new file mode 100644 index 0000000..2215d5b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java @@ -0,0 +1,324 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.JsonProConfig; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.*; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2113:55 + */ +public class SipCalibrationWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> { + private static final Log logger = LogFactory.get(); + + /** + * 实体类反射map + */ + private static HashMap<String, Class> classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP); + + /** + * 反射成一个类 + */ + private static Object voipObject = JsonParseUtil.generateObject(classMap); + + /** + * 关联用HashMap + * key---四元组 + * value---List存放对应SIP或者RTP数据 + * 存放数据:rtp-single,rtp-two,sip-two + * 不存放的数据:sip-single与sip-in + */ + private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16); + + /** + * 二次关联用HashMap + * key---四元组 + * value---List存放对应SIP或者RTP数据 + * 存放数据:rtp-single,rtp-two,sip-two + * 不存放的数据:sip-single与sip-in + */ + private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16); + + @Override + @SuppressWarnings("unchecked") + public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception { + for (Tuple3<String, String, String> tuple : input) { + //拼接的四元组 + String fourKey = tuple.f0; + //已关联的sip,rtp;未关联的sip,rtp;内网的sip + String type = tuple.f1; + String msg = tuple.f2; + switch (type) { + //单向流对准后的SIP + case "sip-two": + //单向流对准后的RTP + case "rtp-two": + //对不上的RTP + case "rtp-single": + putKeyAndMsg(msg, fourKey, combineSRHmList); + break; + //单向流的SIP + case "sip-single": + //内网的SIP + case "sip-in": + output.collect(msg); + break; + default: + logger.error("type is beyond expectation:" + type); + break; + + } + } + //初次关联 + tickCombineHmList(combineSRHmList, output); + //和缓存中的数据二次关联 + tickCombineHmList(secCombineSRHmList, output); + } + + /** + * 定时关联,包括初次关联以及后续二次关联 + * + * @param combineHmList + */ + private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) { + if (combineHmList.size() > 0) { + long nowTime = System.currentTimeMillis() / 1000; + + HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList); + combineHmList.clear(); + + for (String fourStrKey : tempCombineSRhmList.keySet()) { + + LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey); + //包含SIP和RTP + int listSize = tempList.size(); + System.out.println(listSize); + if (listSize > 1) { + + List<String> sipBeanArr = new ArrayList<>(); + List<String> rtpBeanArr = new ArrayList<>(); + + for (String message : tempList) { + Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass()); + String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); + if (JsonProConfig.SIP_MARK.equals(schemaType)) { + sipBeanArr.add(message); + } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { + rtpBeanArr.add(message); + } + } + int rtpSize = rtpBeanArr.size(); + int sipSize = sipBeanArr.size(); + + if (rtpSize == 1 && sipSize >= 1) { + for (String sipMessage : sipBeanArr) { + Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass()); + Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass()); + accumulateVoipMsg(voipLog, rtpLog); + JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + //四元组,voip,关联后的数据 + output.collect(mergeJson(voipLog, rtpLog)); +// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + } + } else if (sipSize == 1 && rtpSize >= 1) { + for (String rtpMessage : rtpBeanArr) { + Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass()); + Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass()); + accumulateVoipMsg(voipLog, rtpLog); + JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); + //四元组,voip,关联后的数据 + output.collect(mergeJson(voipLog, rtpLog)); +// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); + } + } else { + logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); + sendErrorLogToKafka(sipBeanArr, output); + sendErrorLogToKafka(rtpBeanArr, output); + } + + } else { + String msg = tempList.get(0); + + Object voipLog = JSONObject.parseObject(msg, voipObject.getClass()); + long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME); + long intervalTime = nowTime - commonEndTime; + if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { + putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); + } else { + sendDirectlyOneElement(msg, voipLog, output); + } + + } + } + } + } + + /** + * 累加关联后的字节类参数值 + * + * @param voipLog + * @param rtpLog + */ + private void accumulateVoipMsg(Object voipLog, Object rtpLog) { + + Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS); + + Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM); + + Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM); + + Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM); + + Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM); + + Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM); + + Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM); + + Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); + + Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); + + Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); + + Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM) + + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); + + //common_sessions + JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions); + + //common_c2s_pkt_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum); + //common_s2c_pkt_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum); + //common_c2s_byte_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum); + //common_s2c_byte_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum); + + //common_c2s_ipfrag_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum); + //common_s2c_ipfrag_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum); + + //common_c2s_tcp_lostlen + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen); + //common_s2c_tcp_lostlen + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen); + + //common_c2s_tcp_unorder_num + JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum); + //common_s2c_tcp_unorder_num + JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum); + + } + + /** + * 定时处理中List元素数仅为1的情况 + */ + private void sendDirectlyOneElement(String msg, Object voipLog, Collector<String> output) { + //四元组,sip(一定为双侧)/rtp(可能为单侧也可能为双侧,看单向流字段信息),拿出来的原始数据 + String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE); + if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { + output.collect(msg); + } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { + int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR); + if (commonStreamDir != JsonProConfig.DOUBLE) { + output.collect(msg); + } else { + output.collect(msg); + } + } + } + + + /** + * 存放key并添加对应List + */ + private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) { + if (combineSRHmList.containsKey(fourStrKey)) { + LinkedList<String> tmpList = combineSRHmList.get(fourStrKey); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } else { + LinkedList<String> tmpList = new LinkedList<>(); + tmpList.add(message); + combineSRHmList.put(fourStrKey, tmpList); + } + } + + /** + * 判断RTP主叫方向-测试 + * + * @param rtpLog RTP原始日志 + * @param voipLog 融合后VOIP日志 + * @return 方向 0:未知 1:c2s 2:s2c + */ + private static int judgeDirection(Object rtpLog, Object voipLog) { + + String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); + String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP); + String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP); + + if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { + return 1; + } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { + return 2; + } + + return 0; + } + + /** + * 发送不符合逻辑的日志到kafka + */ + private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) { + if (logList.size() > 0) { + for (String log : logList) { + output.collect(log); + } + } + } + + /** + * + */ + private static String mergeJson(Object voipLog, Object rtpLog) { + + long rtpPayloadTypeC2s = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); + long rtpPayloadTypeS2c = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); + String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); + + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); + JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); + + return JSONObject.toJSONString(voipLog); + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java new file mode 100644 index 0000000..1757194 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java @@ -0,0 +1,96 @@ +package com.zdjizhi.utils.ip; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.VoipRelationException; + +/** + * IP转换工具类 + * + * @author Colbert + * @date 2021/03/16 + */ +public class IPUtils { + private static final Log logger = LogFactory.get(); + + private static final long A_BEGIN = ipToLong("10.0.0.0"); + private static final long A_END = ipToLong("10.255.255.255"); + private static final long B_BEGIN = ipToLong("172.16.0.0"); + private static final long B_END = ipToLong("172.31.255.255"); + private static final long C_BEGIN = ipToLong("192.168.0.0"); + private static final long C_END = ipToLong("192.168.255.255"); + + /** + * 将127.0.0.1形式的IP地址转换成十进制整数 + * + * @param strIp + * @return + */ + public static long ipToLong(String strIp) { + try { + if (StringUtil.isBlank(strIp)) { + logger.error("IPUtils.ipToLong input IP is null!!!"); + return 0L; + } + long[] ip = new long[4]; + int position1 = strIp.indexOf("."); + int position2 = strIp.indexOf(".", position1 + 1); + int position3 = strIp.indexOf(".", position2 + 1); + + ip[0] = Long.parseLong(strIp.substring(0, position1)); + ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2)); + ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3)); + ip[3] = Long.parseLong(strIp.substring(position3 + 1)); + return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3]; + } catch (VoipRelationException e) { + logger.error("IPUtils.ipToLong input IP is:" + strIp + ",convert IP to Long is error:" + e.getMessage()); + return 0L; + } + + } + + /** + * 将十进制整数形式转换成127.0.0.1形式的ip地址 + * + * @param longIp + * @return + */ + public static String longToIp(long longIp) { + StringBuffer sb = new StringBuffer(""); + sb.append(String.valueOf((longIp >>> 24))); + sb.append("."); + sb.append(String.valueOf((longIp & 0x00FFFFFF) >>> 16)); + sb.append("."); + sb.append(String.valueOf((longIp & 0x0000FFFF) >>> 8)); + sb.append("."); + sb.append(String.valueOf((longIp & 0x000000FF))); + return sb.toString(); + } + + + /** + * 是否为内网IP + * + * @param ipAddress + * @return + */ + public static boolean isInnerIp(String ipAddress) { + if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) { + //为空或者为特定IP时也算作内网IP + return true; + } + + boolean isInnerIp = false; + long ipNum = ipToLong(ipAddress); + + isInnerIp = isInner(ipNum, A_BEGIN, A_END) || isInner(ipNum, B_BEGIN, B_END) || isInner(ipNum, C_BEGIN, C_END); + return isInnerIp; + } + + private static boolean isInner(long userIp, long begin, long end) { + return (userIp >= begin) && (userIp <= end); + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..2a3194f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,313 @@ +package com.zdjizhi.utils.json; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.http.HttpClientUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用FastJson解析json的工具类 + * + * @author qidaijie + */ +public class JsonParseUtil { + + private static final Log logger = LogFactory.get(); + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type 类型 + * @return 类类型 + */ + + public static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + + /** + * 获取属性值的方法 + * + * @param obj 对象 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + + try { + BeanMap beanMap = BeanMap.create(obj); + if (beanMap.containsKey(property)) { + return beanMap.get(property); + } else { + return null; + } + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 获取属性值的方法 + * + * @param jsonMap 原始日志 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Map<String, Object> jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @return Long value + */ + public static Long getLong(Map<String, Object> jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @return Long value + */ + public static Long getLong(Object jsonMap, String property) { + Object value = getValue(jsonMap, property); + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @return int value + */ + public static int getInteger(Map<String, Object> jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal.intValue(); + } + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @return int value + */ + public static int getInteger(Object jsonMap, String property) { + Object value = getValue(jsonMap, property); + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal.intValue(); + } + + public static String getString(Map<String, Object> jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * Object 方式获取getString类型字段 + * + * @param jsonObject + * @param property + * @return + */ + public static String getString(Object jsonObject, String property) { + Object value = getValue(jsonObject, property); + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Map<String, Object> jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 更新属性值的方法 + * + * @param obj 对象 + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @param http 网关schema地址 + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, Class> getMapFromHttp(String http) { + HashMap<String, Class> map = new HashMap<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String filedStr = field.toString(); + if (checkKeepField(filedStr)) { + String name = JsonPath.read(filedStr, "$.name").toString(); + String type = JsonPath.read(filedStr, "$.type").toString(); + //组合用来生成实体类的map + map.put(name, getClassName(type)); + } + } + return map; + } + + /** + * 判断字段是否需要保留 + * + * @param message 单个field-json + * @return true or false + */ + private static boolean checkKeepField(String message) { + boolean isKeepField = true; + boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); + if (isHiveDoc) { + boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); + if (isHiveVi) { + String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + if (VoipRelationConfig.VISIBILITY.equals(visibility)) { + isKeepField = false; + } + } + } + return isKeepField; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..8562679 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,142 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.VoipRelationException; + +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + public static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new VoipRelationException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new VoipRelationException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + public static long checkLongValue(Object value) { + + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..1e0f156 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,180 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.VoipRelationConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.VoipRelationException; + + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new VoipRelationException("can not cast to int, value : " + value); + } + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).intValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new VoipRelationException("can not cast to int, value : " + value); + } + + /** + * Double类型判断方法 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Double) { + return (Double) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).doubleValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new VoipRelationException("can not cast to double, value : " + value); + } + + /** + * Long类型判断方法 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + +// 此判断数值超范围不抛出异常,会截取成对应类型数值 + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new VoipRelationException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..83a39ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,36 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.VoipRelationConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + static void chooseCert(String type, Properties properties) { + switch (type) { + case "SSL": + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN); + properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN); + break; + case "SASL": + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";"); + break; + default: + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..6dde52a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,40 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.VoipRelationConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", VoipRelationConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", VoipRelationConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", 3000); + properties.put("max.partition.fetch.bytes", 31457280); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + CertUtils.chooseCert(VoipRelationConfig.KAFKA_SOURCE_PROTOCOL,properties); + return properties; + } + + public static FlinkKafkaConsumer<String> getKafkaConsumer() { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..bf63098 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,50 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.VoipRelationConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS); + properties.put("acks", VoipRelationConfig.PRODUCER_ACK); + properties.put("retries", VoipRelationConfig.RETRIES); + properties.put("linger.ms", VoipRelationConfig.LINGER_MS); + properties.put("request.timeout.ms", VoipRelationConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", VoipRelationConfig.BATCH_SIZE); + properties.put("buffer.memory", VoipRelationConfig.BUFFER_MEMORY); + properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE); + properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties); + + return properties; + } + + + public static FlinkKafkaProducer<String> getKafkaProducer() { + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( + VoipRelationConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), Optional.empty()); + + //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 + kafkaProducer.setLogFailuresOnly(false); + + //写入kafka的消息携带时间戳 +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java b/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java new file mode 100644 index 0000000..95e322b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java @@ -0,0 +1,71 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.VoipRelationException; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class VoipRelationConfigurations { + + private static Properties propDefault = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propDefault.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(VoipRelationConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(VoipRelationConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | VoipRelationException e) { + propDefault = null; + propService = null; + } + } +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + |
