summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/bolt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac/iie/bolt')
-rw-r--r--src/main/java/cn/ac/iie/bolt/FromSpoutBufferBoltDC.java40
-rw-r--r--src/main/java/cn/ac/iie/bolt/GetSipOriBoltDC.java1027
-rw-r--r--src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java136
-rw-r--r--src/main/java/cn/ac/iie/bolt/SipRealTimeCountBoltDC.java182
-rw-r--r--src/main/java/cn/ac/iie/bolt/SipRealTimeMergeBoltDC.java604
5 files changed, 1989 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/FromSpoutBufferBoltDC.java b/src/main/java/cn/ac/iie/bolt/FromSpoutBufferBoltDC.java
new file mode 100644
index 0000000..4b82597
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/FromSpoutBufferBoltDC.java
@@ -0,0 +1,40 @@
+package cn.ac.iie.bolt;
+
+
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FromSpoutBufferBoltDC extends BaseBasicBolt {
+ private static final long serialVersionUID = -106783017834081712L;
+
+ private static Logger logger = Logger.getLogger(FromSpoutBufferBoltDC.class);
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ collector.emit(new Values(tuple.getString(0)));
+ } catch (Exception e) {
+ logger.error("FromSpoutBufferBoltDC Get Log is error --->" + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("json"));
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/GetSipOriBoltDC.java b/src/main/java/cn/ac/iie/bolt/GetSipOriBoltDC.java
new file mode 100644
index 0000000..7e7202b
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/GetSipOriBoltDC.java
@@ -0,0 +1,1027 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.bean.voipSipFromToLog.RouteRelationLog;
+import cn.ac.iie.bean.voipSipOrigin.SipOriginALL;
+import cn.ac.iie.common.CommonService;
+import cn.ac.iie.common.HashTableConfig;
+import cn.ac.iie.common.RealtimeCountConfig;
+import cn.ac.iie.dao.DataBaseLoad;
+import cn.ac.iie.dao.KafkaDB;
+import cn.ac.iie.utils.IPIPLibrary.Ipip;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class GetSipOriBoltDC extends BaseBasicBolt {
+
+ private static final long serialVersionUID = -5702741658721325473L;
+
+ private static Logger logger = Logger.getLogger(GetSipOriBoltDC.class);
+
+ private static Ipip ipIpLook = new Ipip();
+
+ private Long SipOriginAllCountSumMi = 0L;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ RealtimeCountConfig.configShow();
+ //载入ipip库
+ ipIpLook.load(RealtimeCountConfig.IPIP_LIBRARY);
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ logger.warn("Num of SipOriginALL Count(last one minutes): " + SipOriginAllCountSumMi);
+ SipOriginAllCountSumMi = 0L;
+ } else {
+ try {
+ String message = tuple.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ SipOriginALL sipOriginLogDisable = JSONObject.parseObject(message, SipOriginALL.class);
+ SipOriginALL sipOriginLog = completeAbnorKeyForSipOriLog(sipOriginLogDisable, message);
+ SipOriginAllCountSumMi++;
+ //填充voip日志并发送
+ completeSipOriginAllLogAndemit(sipOriginLog, collector);
+
+ //填充voip路由关系表并发送
+// completeRouteRelationLogAndemit(sipOriginLog, message, collector);
+ }
+ } catch (Exception e) {
+ if (RealtimeCountConfig.ALL_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("GetSipOriBoltDC get " + RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC + " Data is error!!! --> {" + e + "} <--");
+ logger.error("GetSipOriBoltDC the " + RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC + " Data is --> {" + tuple.getString(0) + "} <--");
+ e.printStackTrace();
+ } else {
+ logger.error("GetSipOriBoltDC get " + RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC + " Data is error!!! --> {" + e + "} <--");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private SipOriginALL completeAbnorKeyForSipOriLog(SipOriginALL sipOriginLogDisable, String message) {
+ Map mapWithLedge = JSONObject.parseObject(message, Map.class);
+ sipOriginLogDisable.setCall_ID((String) (mapWithLedge.get("Call-ID")));
+ sipOriginLogDisable.setUser_Agent((String) (mapWithLedge.get("User-Agent")));
+ sipOriginLogDisable.setMax_Forwards((String) (mapWithLedge.get("Max-Forwards")));
+ sipOriginLogDisable.setReq_Record_Route((String[]) (mapWithLedge.get("Req_Record-Route")));
+ sipOriginLogDisable.setReq_Content_Type((String) (mapWithLedge.get("Req_Content-Type")));
+ sipOriginLogDisable.setRes_Record_Route((String[]) (mapWithLedge.get("Res_Record-Route")));
+ sipOriginLogDisable.setRes_Content_Type((String) (mapWithLedge.get("Res_Content-Type")));
+
+ //将字符串数组字段转化为json字段便于入库
+ sipOriginLogDisable.setReq_Via_Json(JSONObject.toJSONString(sipOriginLogDisable.getReq_Via()));
+ sipOriginLogDisable.setReq_Record_Route_Json(JSONObject.toJSONString(sipOriginLogDisable.getReq_Record_Route()));
+ sipOriginLogDisable.setReq_Route_Json(JSONObject.toJSONString(sipOriginLogDisable.getReq_Route()));
+ sipOriginLogDisable.setRes_Via_Json(JSONObject.toJSONString(sipOriginLogDisable.getRes_Via()));
+ sipOriginLogDisable.setRes_Record_Route_Json(JSONObject.toJSONString(sipOriginLogDisable.getRes_Record_Route()));
+ sipOriginLogDisable.setRes_Route_Json(JSONObject.toJSONString(sipOriginLogDisable.getRes_Route()));
+
+ return sipOriginLogDisable;
+ }
+
+ //填充SIP_ORIGIN_ALL日志并发送
+ private void completeSipOriginAllLogAndemit(SipOriginALL sipOriginLog, BasicOutputCollector collector) {
+ //获取SRC_IP相关定位
+ if (StringUtil.isNotBlank(sipOriginLog.getSRC_IP())
+ && sipOriginLog.getSRC_IP() != null
+ && sipOriginLog.getSRC_IP().length() != 0
+ && !(sipOriginLog.getSRC_IP().contains(":"))) {
+ String[] src_IpLocation = ipIpLook.find(sipOriginLog.getSRC_IP());
+ String src_Ip_nation = src_IpLocation[0];
+ String src_Ip_region = src_IpLocation[1];
+ sipOriginLog.setSRC_LOCATION_NATION(src_Ip_nation);
+ sipOriginLog.setSRC_LOCATION_REGION(src_Ip_region);
+
+ if (sipOriginLog.getSRC_LOCATION_REGION().equals("香港")
+ || sipOriginLog.getSRC_LOCATION_REGION().equals("台湾")
+ || sipOriginLog.getSRC_LOCATION_REGION().equals("澳门")) {
+ sipOriginLog.setSRC_LOCATION_NATION(sipOriginLog.getSRC_LOCATION_REGION());
+ }
+
+ //设置SRC_IP国家码
+ String src_Ip_nation_code = HashTableConfig.ISO_3166_1_ALPHA_2.get(sipOriginLog.getSRC_LOCATION_NATION());
+ if (StringUtil.isNotBlank(src_Ip_nation_code)
+ && src_Ip_nation_code != null
+ && src_Ip_nation_code.length() != 0) {
+ sipOriginLog.setSRC_LOCATION_NATION_CODE(src_Ip_nation_code);
+ }
+ }
+
+ //获取DST_IP相关定位
+ if (StringUtil.isNotBlank(sipOriginLog.getDST_IP())
+ && sipOriginLog.getDST_IP() != null
+ && sipOriginLog.getDST_IP().length() != 0
+ && !(sipOriginLog.getDST_IP().contains(":"))) {
+ String[] dst_IpLocation = ipIpLook.find(sipOriginLog.getDST_IP());
+ String dst_Ip_nation = dst_IpLocation[0];
+ String dst_Ip_region = dst_IpLocation[1];
+ sipOriginLog.setDST_LOCATION_NATION(dst_Ip_nation);
+ sipOriginLog.setDST_LOCATION_REGION(dst_Ip_region);
+
+ if (sipOriginLog.getDST_LOCATION_REGION().equals("香港")
+ || sipOriginLog.getDST_LOCATION_REGION().equals("台湾")
+ || sipOriginLog.getDST_LOCATION_REGION().equals("澳门")) {
+ sipOriginLog.setDST_LOCATION_NATION(sipOriginLog.getDST_LOCATION_REGION());
+ }
+
+ //设置DST_IP国家码
+ String dst_Ip_nation_code = HashTableConfig.ISO_3166_1_ALPHA_2.get(sipOriginLog.getDST_LOCATION_NATION());
+ if (StringUtil.isNotBlank(dst_Ip_nation_code)
+ && dst_Ip_nation_code != null
+ && dst_Ip_nation_code.length() != 0) {
+ sipOriginLog.setDST_LOCATION_NATION_CODE(dst_Ip_nation_code);
+ }
+ }
+
+ //设置IP_TYPE
+ if (sipOriginLog.getDST_IP().contains(":") && sipOriginLog.getDST_IP().split(":").length > 2) {
+ sipOriginLog.setIP_TYPE("6");
+ } else {
+ sipOriginLog.setIP_TYPE("4");
+ }
+
+ //处理Request_URI
+ if (StringUtil.isNotBlank(sipOriginLog.getRequest_URI())
+ && sipOriginLog.getRequest_URI() != null
+ && sipOriginLog.getRequest_URI().length() != 0) {
+
+ if (!(sipOriginLog.getRequest_URI().contains("sip:"))) {
+ if (sipOriginLog.getRequest_URI().contains("@")) {
+ String[] splitAlte = sipOriginLog.getRequest_URI().split("@");
+ String userName = splitAlte[0];
+ String ServiceDomain = splitAlte[1].split(";")[0];
+ sipOriginLog.setUser_name(userName);
+ sipOriginLog.setService_domain(ServiceDomain);
+
+ if (StringUtil.isNotBlank(sipOriginLog.getService_domain())
+ && sipOriginLog.getService_domain() != null
+ && sipOriginLog.getService_domain().length() != 0) {
+
+ if (sipOriginLog.getService_domain().contains(":")) {
+ if (sipOriginLog.getService_domain().split(":").length <= 2) {
+ if (isIp(sipOriginLog.getService_domain().split(":")[0])) {//0索引为ip或者url
+ String[] splitColon = sipOriginLog.getService_domain().split(":");
+ String service_domain_v4_ip = splitColon[0];
+ String service_domain_v4_port = splitColon[1];
+ boolean judgeInnerIP = isInnerIP(service_domain_v4_ip);
+ if (judgeInnerIP) {
+ String newServiceDomain = sipOriginLog.getDST_IP() + ":" + service_domain_v4_port;
+ sipOriginLog.setService_domain(newServiceDomain);
+ sipOriginLog.setService_domain_valid("1");
+ } else {
+ //外网ip
+ sipOriginLog.setService_domain_valid("0");
+ }
+ } else {
+ //url:端口号,暂时不做操作
+ }
+ } else {
+ //v6,暂时不做操作
+ }
+ } else {
+ boolean ipOrurl = isIp(sipOriginLog.getService_domain());
+ if (ipOrurl) {
+ String ipv4_serviceDomain = sipOriginLog.getService_domain();
+ boolean judgeInnerIP = isInnerIP(ipv4_serviceDomain);
+ if (judgeInnerIP) {
+ String newServiceDomain = sipOriginLog.getDST_IP();
+ sipOriginLog.setService_domain(newServiceDomain);
+ sipOriginLog.setService_domain_valid("1");
+ } else {
+ //外网ip
+ sipOriginLog.setService_domain_valid("0");
+ }
+ } else {
+ //url网址,暂不处理
+ }
+ }
+ }
+ } else if (sipOriginLog.getRequest_URI().contains(".") || sipOriginLog.getRequest_URI().contains(":")) {
+ String ServiceDomain = sipOriginLog.getRequest_URI();
+ sipOriginLog.setService_domain(ServiceDomain);
+
+ if (StringUtil.isNotBlank(sipOriginLog.getService_domain())
+ && sipOriginLog.getService_domain() != null
+ && sipOriginLog.getService_domain().length() != 0) {
+
+ if (sipOriginLog.getService_domain().contains(":")) {
+ if (sipOriginLog.getService_domain().split(":").length <= 2) {
+ if (isIp(sipOriginLog.getService_domain().split(":")[0])) {
+ String[] splitColon = sipOriginLog.getService_domain().split(":");
+ String service_domain_v4_ip = splitColon[0];
+ String service_domain_v4_port = splitColon[1];
+ boolean judgeInnerIP = isInnerIP(service_domain_v4_ip);
+ if (judgeInnerIP) {
+ String newServiceDomain = sipOriginLog.getDST_IP() + ":" + service_domain_v4_port;
+ sipOriginLog.setService_domain(newServiceDomain);
+ sipOriginLog.setService_domain_valid("1");
+ } else {
+ //外网ip
+ sipOriginLog.setService_domain_valid("0");
+ }
+ } else {
+ //url:端口号,暂时不做操作
+ }
+ } else {
+ //v6,暂时不做操作
+ }
+ } else {
+ boolean ipOrurl = isIp(sipOriginLog.getService_domain());
+ if (ipOrurl) {
+ String ipv4_serviceDomain = sipOriginLog.getService_domain();
+ boolean judgeInnerIP = isInnerIP(ipv4_serviceDomain);
+ if (judgeInnerIP) {
+ String newServiceDomain = sipOriginLog.getDST_IP();
+ sipOriginLog.setService_domain(newServiceDomain);
+ sipOriginLog.setService_domain_valid("1");
+ } else {
+ //外网ip
+ sipOriginLog.setService_domain_valid("0");
+ }
+ } else {
+ //url网址,暂不处理
+ }
+ }
+ }
+ } else if (!(sipOriginLog.getRequest_URI().contains(".")) && !(sipOriginLog.getRequest_URI().contains(":"))) {
+ //即是一个字符串或者数字串,没有具体价值,不做处理
+ } else {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("GetSipOriBoltDC--> " + RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC + " Request_URI no @ and is not ip , Request_URI is ---> " + sipOriginLog.getRequest_URI() + " <---");
+ }
+ }
+ } else {
+ //此处包含sip:的情况已经在前端代码中去除,故这里不做处理
+ }
+ }
+
+ //处理Res_stat_format
+ if (StringUtil.isNotBlank(sipOriginLog.getRes_stat())
+ && sipOriginLog.getRes_stat() != null
+ && sipOriginLog.getRes_stat().length() != 0) {
+
+ String replaceBlank = sipOriginLog.getRes_stat().toLowerCase().trim().replace(" ", "");//去除空格
+ //修整Res_stat_format字段异常格式
+ replaceBlank = repairResStatFormatStr(replaceBlank);
+ sipOriginLog.setRes_stat_format(replaceBlank);
+ }
+
+
+ //处理From
+ if (StringUtil.isNotBlank(sipOriginLog.getFrom())
+ && sipOriginLog.getFrom() != null
+ && sipOriginLog.getFrom().length() != 0) {
+ String useFromStr = sipOriginLog.getFrom().replace(" ", "").replace(",", "");
+
+ if (sipOriginLog.getFrom().contains("\"")) {
+ String nick_name = getNickname(useFromStr, "from");
+ sipOriginLog.setFrom_Nickname(nick_name);
+ } else {
+ //From字段不包含"",此时获取不到用户昵称,但能获取用户名,服务域名,下方统一获取
+ }
+ String user_name = getUserName(useFromStr, "from");
+ sipOriginLog.setFrom_usr_name(user_name);
+ String serDomainAndValid = getSerDomain(useFromStr, sipOriginLog, "from");
+ if (serDomainAndValid.contains("##%##")) {
+ String[] splitSpliter = serDomainAndValid.split("##%##");
+ String serDomainOrNull = splitSpliter[0];
+ String validStr = splitSpliter[1];
+ if ((!(serDomainOrNull.equals("null")))) {
+ sipOriginLog.setFrom_ser_domain(serDomainOrNull);
+ } else {
+ sipOriginLog.setFrom_ser_domain(null);
+ }
+ sipOriginLog.setFrom_ser_domain_valid(validStr);
+ }
+ }
+
+ //处理To
+ if (StringUtil.isNotBlank(sipOriginLog.getTo())
+ && sipOriginLog.getTo() != null
+ && sipOriginLog.getTo().length() != 0) {
+ String useToStr = sipOriginLog.getTo().replace(" ", "").replace(",", "");
+
+ if (sipOriginLog.getTo().contains("\"")) {
+ String nick_name = getNickname(useToStr, "to");
+ sipOriginLog.setTo_Nickname(nick_name);
+ } else {
+ //To字段不包含"",此时获取不到用户昵称,但能获取用户名,服务域名,下方统一获取
+ }
+ String user_name = getUserName(useToStr, "to");
+ sipOriginLog.setTo_usr_name(user_name);
+ String serDomainAndValid = getSerDomain(useToStr, sipOriginLog, "to");
+ if (serDomainAndValid.contains("##%##")) {
+ String[] splitSpliter = serDomainAndValid.split("##%##");
+ String serDomainOrNull = splitSpliter[0];
+ String validStr = splitSpliter[1];
+ if ((!(serDomainOrNull.equals("null")))) {
+ //补丁,针对性修改To_ser_domain格式
+ serDomainOrNull = pointSerDomainPatch(serDomainOrNull);
+ //设置To_ser_domain
+ sipOriginLog.setTo_ser_domain(serDomainOrNull);
+ } else {
+ sipOriginLog.setTo_ser_domain(null);
+ }
+ sipOriginLog.setTo_ser_domain_valid(validStr);
+ }
+ }
+
+ //获取Cseq_method
+ if (StringUtil.isNotBlank(sipOriginLog.getCseq())
+ && sipOriginLog.getCseq() != null
+ && sipOriginLog.getCseq().length() != 0) {
+ String[] splitCseq = sipOriginLog.getCseq().split(" ");
+ if (splitCseq.length == 2) {
+ sipOriginLog.setCseq_method(splitCseq[1]);
+ }
+ }
+
+ //处理Req_Contact
+ if (StringUtil.isNotBlank(sipOriginLog.getReq_Contact())
+ && sipOriginLog.getReq_Contact() != null
+ && sipOriginLog.getReq_Contact().length() != 0) {
+ String useReqConStr = sipOriginLog.getReq_Contact().replace(" ", "").replace(",", "");
+
+ if (sipOriginLog.getReq_Contact().contains("\"")) {
+ String nick_name = getNickname(useReqConStr, "req");
+ sipOriginLog.setReq_Contact_Nickname(nick_name);
+ } else {
+ //字段不包含"",此时获取不到用户昵称,但能获取用户名,服务域名,下方统一获取
+ }
+ String user_name = getUserName(useReqConStr, "req");
+ sipOriginLog.setReq_Contact_usr_name(user_name);
+ String serDomainAndValid = getSerDomain(useReqConStr, sipOriginLog, "req");
+ if (serDomainAndValid.contains("##%##")) {
+ String[] splitSpliter = serDomainAndValid.split("##%##");
+ String serDomainOrNull = splitSpliter[0];
+ String validStr = splitSpliter[1];
+ if ((!(serDomainOrNull.equals("null")))) {
+ sipOriginLog.setReq_Contact_ser_domain(serDomainOrNull);
+ } else {
+ sipOriginLog.setReq_Contact_ser_domain(null);
+ }
+ sipOriginLog.setReq_ser_domain_valid(validStr);
+ }
+ }
+
+ //处理Res_Contact
+ if (StringUtil.isNotBlank(sipOriginLog.getRes_Contact())
+ && sipOriginLog.getRes_Contact() != null
+ && sipOriginLog.getRes_Contact().length() != 0) {
+ String useResConStr = sipOriginLog.getRes_Contact().replace(" ", "").replace(",", "");
+
+ if (sipOriginLog.getRes_Contact().contains("\"")) {
+ String nick_name = getNickname(useResConStr, "res");
+ sipOriginLog.setRes_Contact_Nickname(nick_name);
+ } else {
+ //字段不包含"",此时获取不到用户昵称,但能获取用户名,服务域名,下方统一获取
+ }
+ String user_name = getUserName(useResConStr, "res");
+ sipOriginLog.setRes_Contact_usr_name(user_name);
+ String serDomainAndValid = getSerDomain(useResConStr, sipOriginLog, "res");
+ if (serDomainAndValid.contains("##%##")) {
+ String[] splitSpliter = serDomainAndValid.split("##%##");
+ String serDomainOrNull = splitSpliter[0];
+ String validStr = splitSpliter[1];
+ if ((!(serDomainOrNull.equals("null")))) {
+ sipOriginLog.setRes_Contact_ser_domain(serDomainOrNull);
+ } else {
+ sipOriginLog.setRes_Contact_ser_domain(null);
+ }
+ sipOriginLog.setRes_ser_domain_valid(validStr);
+ }
+ }
+
+ //根据Req_Content_Type设置Req_coding(主叫编码方式)
+ if (StringUtil.isNotBlank(sipOriginLog.getReq_Content_Type())
+ && sipOriginLog.getReq_Content_Type() != null
+ && sipOriginLog.getReq_Content_Type().length() != 0) {
+ if (sipOriginLog.getReq_Content_Type().equals("application/sdp")) {
+ if (StringUtil.isNotBlank(sipOriginLog.getReq_Content())
+ && sipOriginLog.getReq_Content() != null
+ && sipOriginLog.getReq_Content().length() != 0) {
+ String codingS = getCodingS(sipOriginLog.getReq_Content());
+ if (StringUtil.isNotBlank(codingS)) {
+ sipOriginLog.setReq_coding(codingS);
+ }
+ }
+ }
+ }
+
+ //根据Res_Content_Type设置Res_coding(被叫编码方式)
+ if (StringUtil.isNotBlank(sipOriginLog.getRes_Content_Type())
+ && sipOriginLog.getRes_Content_Type() != null
+ && sipOriginLog.getRes_Content_Type().length() != 0) {
+ if (sipOriginLog.getRes_Content_Type().equals("application/sdp")) {
+ if (StringUtil.isNotBlank(sipOriginLog.getRes_Content())
+ && sipOriginLog.getRes_Content() != null
+ && sipOriginLog.getRes_Content().length() != 0) {
+ String codingS = getCodingS(sipOriginLog.getRes_Content());
+ if (StringUtil.isNotBlank(codingS)) {
+ sipOriginLog.setRes_coding(codingS);
+ }
+ }
+ }
+ }
+
+ collector.emit(new Values(JSONObject.toJSONString(sipOriginLog), "origin"));
+ }
+
+ //填充voip路由关系表并发送
+ private void completeRouteRelationLogAndemit(SipOriginALL sipOriginLog, String message, BasicOutputCollector collector) {
+ RouteRelationLog routeRelationLog = new RouteRelationLog();
+ LinkedList<String> emitList = new LinkedList<String>();
+ String src_ip = sipOriginLog.getSRC_IP();
+ String dst_ip = sipOriginLog.getDST_IP();
+
+ if (StringUtil.isNotBlank(sipOriginLog.getMethod())
+ && sipOriginLog.getMethod() != null
+ && sipOriginLog.getMethod().length() != 0) {
+ //请求侧req,顺序 sip->last->….->first->dip
+ if (sipOriginLog.getReq_Route() != null && sipOriginLog.getReq_Route().length != 0) {
+ LinkedList<String> reqRouteList = getRoute(sipOriginLog.getReq_Route());
+ emitList.add(src_ip);
+ if (reqRouteList != null) {
+ if (reqRouteList.size() > 0) {
+ for (int i = reqRouteList.size() - 1; i >= 0; i--) {
+ emitList.add(reqRouteList.get(i));
+ }
+ }
+ }
+ emitList.add(dst_ip);
+ } else if (sipOriginLog.getReq_Via() != null && sipOriginLog.getReq_Via().length != 0) {
+ LinkedList<String> reqViaList = getVia(sipOriginLog.getReq_Via());
+ emitList.add(src_ip);
+ if (reqViaList != null) {
+ if (reqViaList.size() > 0) {
+ for (int i = reqViaList.size() - 1; i >= 0; i--) {
+ emitList.add(reqViaList.get(i));
+ }
+ }
+ }
+ emitList.add(dst_ip);
+ } else if (sipOriginLog.getReq_Record_Route() != null && sipOriginLog.getReq_Record_Route().length != 0) {
+ LinkedList<String> reqRecordRouteList = getRecordRoute(sipOriginLog.getReq_Record_Route());
+ emitList.add(src_ip);
+ if (reqRecordRouteList != null) {
+ if (reqRecordRouteList.size() > 0) {
+ for (int i = reqRecordRouteList.size() - 1; i >= 0; i--) {
+ emitList.add(reqRecordRouteList.get(i));
+ }
+ }
+ }
+ emitList.add(dst_ip);
+ } else {
+ //三个字段都没有则不做处理
+ }
+ } else {
+ //响应侧res,顺序 dip->first->….->last->sip
+ if (sipOriginLog.getRes_Route() != null && sipOriginLog.getRes_Route().length != 0) {
+ LinkedList<String> resRouteList = getRoute(sipOriginLog.getRes_Route());
+ emitList.add(dst_ip);
+ if (resRouteList != null) {
+ if (resRouteList.size() > 0) {
+ for (int i = 0; i < resRouteList.size(); i++) {
+ emitList.add(resRouteList.get(i));
+ }
+ }
+ }
+ emitList.add(src_ip);
+ } else if (sipOriginLog.getRes_Via() != null && sipOriginLog.getRes_Via().length != 0) {
+ LinkedList<String> resViaList = getVia(sipOriginLog.getRes_Via());
+ emitList.add(dst_ip);
+ if (resViaList != null) {
+ if (resViaList.size() > 0) {
+ for (int i = 0; i < resViaList.size(); i++) {
+ emitList.add(resViaList.get(i));
+ }
+ }
+ }
+ emitList.add(src_ip);
+ } else if (sipOriginLog.getRes_Record_Route() != null && sipOriginLog.getRes_Record_Route().length != 0) {
+ LinkedList<String> resRecordRouteList = getRecordRoute(sipOriginLog.getRes_Record_Route());
+ emitList.add(dst_ip);
+ if (resRecordRouteList != null) {
+ if (resRecordRouteList.size() > 0) {
+ for (int i = 0; i < resRecordRouteList.size(); i++) {
+ emitList.add(resRecordRouteList.get(i));
+ }
+ }
+ }
+ emitList.add(src_ip);
+ } else {
+ //三个字段都没有则不做处理
+ }
+ }
+ if (emitList.size() != 0) {
+ for (int i = 0; i < emitList.size(); i++) {
+ if (i != emitList.size() - 1) {
+ routeRelationLog.setFrom_domain(emitList.get(i));
+ routeRelationLog.setTo_domain(emitList.get(i + 1));
+ collector.emit(new Values(JSONObject.toJSONString(routeRelationLog), "route"));
+ }
+ }
+ }
+ }
+
+ private LinkedList<String> getRecordRoute(String[] record_route) {
+ LinkedList<String> recordRouteList = new LinkedList<>();
+ try {
+ for (int i = 0; i < record_route.length; i++) {
+ String str = record_route[i].replace("<", "").replace(">", "");
+ String splitSemi = str.split(";")[0];
+ if (splitSemi.split(":").length <= 3) {
+ String splitColon = splitSemi.split(":")[1];
+ recordRouteList.add(splitColon);
+ } else {
+ String splitSipColon = splitSemi.split("sip:")[1];
+ recordRouteList.add(splitSipColon);
+ }
+ }
+ } catch (Exception e) {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("GetSipOriBoltDC-->getRoute Split Route error ---> " + e + " <---");
+ logger.error("GetSipOriBoltDC-->getRoute Split Route data is ---> " + Arrays.toString(record_route) + " <---");
+ }
+ }
+ if (recordRouteList.size() != 0) {
+ return recordRouteList;
+ } else {
+ return null;
+ }
+ }
+
+ private LinkedList<String> getVia(String[] via) {
+ LinkedList<String> viaList = new LinkedList<>();
+ try {
+ for (int i = 0; i < via.length; i++) {
+ if (via[i].contains(" ")) {
+ String originDomain = via[i].split(";")[0].split(" ")[1];
+ if (originDomain.contains(":")) {
+ if (originDomain.split(":").length <= 2) {
+ viaList.add(originDomain.split(":")[0]);
+ } else {
+ viaList.add(originDomain);
+ }
+ } else {
+ viaList.add(originDomain);
+ }
+ }
+ }
+ } catch (Exception e) {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("GetSipOriBoltDC-->getVia Split Via error ---> " + e + " <---");
+ logger.error("GetSipOriBoltDC-->getVia Split Via data is ---> " + Arrays.toString(via) + " <---");
+ }
+ }
+ if (viaList.size() != 0) {
+ return viaList;
+ } else {
+ return null;
+ }
+ }
+
+ private LinkedList<String> getRoute(String[] route) {
+ LinkedList<String> routeList = new LinkedList<>();
+ try {
+ for (int i = 0; i < route.length; i++) {
+ String str = route[i].replace("<", "").replace(">", "");
+ String splitSemi = str.split(";")[0];
+ if (splitSemi.split(":").length <= 3) {
+ String splitColon = splitSemi.split(":")[1];
+ if (splitColon.contains("@") && splitColon.split("@")[1].contains(".")) {//例如"Req_Route": ["<sip:1.228.34.22;lr>", "<sip:114.207.73.139:5060;lr>", "<sip:[email protected]:5067;lr>"]这种形式
+ splitColon = splitColon.split("@")[1];
+ }
+ routeList.add(splitColon);
+ } else {
+ String splitSipColon = splitSemi.split("sip:")[1];
+ routeList.add(splitSipColon);
+ }
+ }
+ } catch (Exception e) {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("GetSipOriBoltDC-->getRoute Split Route error ---> " + e + " <---");
+ logger.error("GetSipOriBoltDC-->getRoute Split Route data is ---> " + Arrays.toString(route) + " <---");
+ }
+ }
+ if (routeList.size() != 0) {
+ return routeList;
+ } else {
+ return null;
+ }
+ }
+
+ //获取Nickname
+ private String getNickname(String useStr, String type) {
+ String nick_name = useStr.split("\"")[1];
+ return nick_name;
+ }
+
+ //获取UserName
+ private String getUserName(String useStr, String type) {
+ if (useStr.equals("None") || useStr.equals("*")) {
+ return null;
+ }
+ if (useStr.contains("@") && (useStr.contains("sip:") || useStr.contains("sips:"))) {
+ String userName = useStr.replace("sips:", "sip:").split("sip:")[1].split("@")[0];
+ return userName;
+ } else if (!(useStr.contains("@")) && useStr.contains("sip:")) {
+ try {
+ if (useStr.split(":").length <= 3) {
+ String userName = useStr.split(":")[1];
+ return userName;
+ } else {
+ if (useStr.contains(";")) {
+ String userName = useStr.split(";")[0].split(":")[1];
+ return userName;
+ } else {
+ String userName = useStr.replace("<", "")
+ .replace(">", "")
+ .split("sip:")[1];
+ return userName;
+ }
+ }
+ } catch (Exception e) {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("|~|~|GetSipOriBoltDC===getUserName can not split this data but it has sip: --->" + useStr + "<--- the type is --->" + type + "<---|~|~|");
+ }
+ return null;
+ }
+ } else if (useStr.contains("tel:")) {
+ String userName = useStr.split(">")[0].split("tel:")[1];
+ return userName;
+ } else if ((!(useStr.contains("."))) && (!(useStr.contains(":")))) {
+ return null;
+ } else {
+ if (RealtimeCountConfig.PART_LOG_OUTPUT_CONTROLLER.equals("yes")) {
+ logger.error("|~|~|GetSipOriBoltDC===getUserName can not split this data --->" + useStr + "<--- the type is --->" + type + "<---|~|~|");
+ }
+ return null;
+ }
+ }
+
+ //获取ser_domain
+ private String getSerDomain(String useStr, SipOriginALL sipOriginLog, String type) {
+ if (useStr.equals("None") || useStr.equals("*")) {
+ return "null##%##0";
+ }
+ String serDomain;
+ if (useStr.contains("<") && useStr.contains(">")) {
+ if (useStr.contains("@")) {
+// serDomain = useStr.split("@")[1].split(">")[0];//旧版,由于"From":"\"4903\" <sip:4903@>"以及"To":"\"1441519470648\" <sip:1441519470648@>"报索引越界异常
+ String splitFirst = useStr.split("@")[1];
+ if (!(">".equals(splitFirst))) {
+ serDomain = splitFirst.split(">")[0];
+ } else {
+ serDomain = null;
+ }
+ } else if (useStr.contains(".")) {
+ if (useStr.split(":").length == 3) {
+ String[] split = useStr.split(">")[0].split(":");
+ serDomain = split[1] + ":" + split[2];
+ } else {
+ serDomain = useStr.split(">")[0].split(":")[1];
+ }
+ } else if (useStr.contains("sip:") && useStr.split(":").length > 3) {
+ serDomain = useStr.split("sip:")[1].split(">")[0];
+ } else if (useStr.contains("tel:") && useStr.split(":").length >= 2) {
+ serDomain = useStr.split(">")[0].split("tel:")[1];
+ } else {
+ serDomain = useStr.split(">")[0].split(":")[1];
+ }
+ } else {
+ if (useStr.contains("@")) {
+ serDomain = useStr.split("@")[1];
+ } else if (useStr.contains(".")) {
+ if (useStr.split(":").length == 3) {
+ String[] split = useStr.split(":");
+ serDomain = split[1] + ":" + split[2];
+ } else {
+ serDomain = useStr.split(":")[1];
+ }
+ } else if (useStr.contains("sip:")) {
+ serDomain = useStr.split("sip:")[1];
+ } else if (useStr.contains("tel:")) {
+ if (useStr.contains(";")) {
+ serDomain = useStr.split(";")[0].split("tel:")[1];
+ } else {
+ serDomain = useStr.split("tel:")[1];
+ }
+ } else if ((!(useStr.contains(":"))) && (!(useStr.contains(".")))) {
+ serDomain = null;
+ } else {
+ if (useStr.contains(":")) {
+ serDomain = useStr.split(":")[1];
+ } else {
+ serDomain = null;
+ }
+ }
+ }
+
+ if (serDomain == null || serDomain.length() == 0) {
+ return "null##%##0";
+ }
+ if (serDomain.contains(":")) {
+ if (serDomain.split(":").length <= 2) {
+ String[] splitColon = serDomain.split(":");
+ String ipOrUrl = splitColon[0];
+ String port = splitColon[1];
+ if (isIp(ipOrUrl)) {
+ if (isInnerIP(ipOrUrl)) {
+ if (StringUtil.isNotBlank(sipOriginLog.getMethod())
+ && sipOriginLog.getMethod() != null
+ && sipOriginLog.getMethod().length() != 0) {
+ if (type.equals("from") || type.equals("req")) {
+ String newServiceDomain = sipOriginLog.getSRC_IP() + ":" + port;
+ return newServiceDomain + "##%##1";
+ } else {
+ String newServiceDomain = sipOriginLog.getDST_IP() + ":" + port;
+ return newServiceDomain + "##%##1";
+ }
+ } else {
+ if (type.equals("from") || type.equals("req")) {
+ String newServiceDomain = sipOriginLog.getDST_IP() + ":" + port;
+ return newServiceDomain + "##%##1";
+ } else {
+ String newServiceDomain = sipOriginLog.getSRC_IP() + ":" + port;
+ return newServiceDomain + "##%##1";
+ }
+ }
+ } else {
+ //外网ip
+ return serDomain + "##%##0";
+ }
+ } else {
+ //网址
+ return serDomain + "##%##0";
+ }
+ } else {
+ //v6
+ return serDomain + "##%##0";
+ }
+ } else {
+ boolean ipOrurl = isIp(serDomain);
+ if (ipOrurl) {
+ if (isInnerIP(serDomain)) {
+ if (StringUtil.isNotBlank(sipOriginLog.getMethod())
+ && sipOriginLog.getMethod() != null
+ && sipOriginLog.getMethod().length() != 0) {
+ if (type.equals("from") || type.equals("req")) {
+ String newServiceDomain = sipOriginLog.getSRC_IP();
+ return newServiceDomain + "##%##1";
+ } else {
+ String newServiceDomain = sipOriginLog.getDST_IP();
+ return newServiceDomain + "##%##1";
+ }
+ } else {
+ if (type.equals("from") || type.equals("req")) {
+ String newServiceDomain = sipOriginLog.getDST_IP();
+ return newServiceDomain + "##%##1";
+ } else {
+ String newServiceDomain = sipOriginLog.getSRC_IP();
+ return newServiceDomain + "##%##1";
+ }
+ }
+ } else {
+ return serDomain + "##%##0";
+ }
+ } else {
+ return serDomain + "##%##0";
+ }
+ }
+ }
+
+
+ //通用方法,传入url,返回domain
+ private String getDomainFromUrl(String oriUrl) {
+ String url = oriUrl.split("[?]")[0];
+ if (url.contains("http://") || url.contains("https://")) {
+ if (url.split("//")[1].split("/")[0].split(":").length <= 2) {
+ String v4Domain = url.split("//")[1]
+ .split("/")[0]
+ .split(":")[0];
+ return v4Domain;
+ } else {
+ String v6Domain = url.split("//")[1]
+ .split("/")[0];
+ return v6Domain;
+ }
+ } else {
+ if (url.split("/")[0].split(":").length <= 2) {
+ String v4Domain = url.split("/")[0].split(":")[0];
+ return v4Domain;
+ } else {
+ String v6Domain = url.split("/")[0];
+ return v6Domain;
+ }
+ }
+ }
+
+ //通用方法,传入url,返回file_path
+ private String getFilePathFromUrl(String oriUrl) {
+ String url = oriUrl.split("[?]")[0];
+ if (url.contains("http://") || url.contains("https://")) {
+ String filePath = url.split("//")[1];
+ return filePath;
+ } else {
+ String filePath = url;
+ return filePath;
+ }
+ }
+
+ //判断是否为内网ip
+ private boolean isInnerIP(String ipAddress) {
+ if (ipAddress.equals("1.1.1.1") || ipAddress.equals("127.0.0.1") || ipAddress.equals("0.0.0.0")) {
+ return true;
+ }
+ boolean isInnerIp = false;
+ long ipNum = getIpNum(ipAddress);
+
+ long aBegin = getIpNum("10.0.0.0");
+ long aEnd = getIpNum("10.255.255.255");
+ long bBegin = getIpNum("172.16.0.0");
+ long bEnd = getIpNum("172.31.255.255");
+ long cBegin = getIpNum("192.168.0.0");
+ long cEnd = getIpNum("192.168.255.255");
+ isInnerIp = isInner(ipNum, aBegin, aEnd) || isInner(ipNum, bBegin, bEnd) || isInner(ipNum, cBegin, cEnd);
+ return isInnerIp;
+ }
+
+ private long getIpNum(String ipAddress) {
+ String[] ip = ipAddress.split("\\.");
+ long a = Integer.parseInt(ip[0]);
+ long b = Integer.parseInt(ip[1]);
+ long c = Integer.parseInt(ip[2]);
+ long d = Integer.parseInt(ip[3]);
+
+ long ipNum = a * 256 * 256 * 256 + b * 256 * 256 + c * 256 + d;
+ return ipNum;
+ }
+
+ private boolean isInner(long userIp, long begin, long end) {
+ return (userIp >= begin) && (userIp <= end);
+ }
+
+ //判断是否是一个IP
+ private boolean isIp(String IP) {
+ IP = this.removeBlank(IP);
+ if (IP.length() < 7 || IP.length() > 15) {
+ return false;
+ }
+ if (IP.contains(".")) {
+ String[] arr = IP.split("\\.");
+ if (arr.length != 4) {
+ return false;
+ }
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < arr[i].length(); j++) {
+ char temp = arr[i].charAt(j);
+ if (!(temp >= '0' && temp <= '9')) {
+ return false;
+ }
+ }
+ }
+ for (int i = 0; i < 4; i++) {
+ int temp = Integer.parseInt(arr[i]);
+ if (temp < 0 || temp > 255) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private String removeBlank(String IP) {
+ while (IP.startsWith(" ")) {
+ IP = IP.substring(1, IP.length()).trim();
+ }
+ while (IP.endsWith(" ")) {
+ IP = IP.substring(0, IP.length() - 1).trim();
+ }
+ return IP;
+ }
+
+ //获取编码
+ private String getCodingS(String contentStr) {
+ List<String> list = new ArrayList<String>();
+ Pattern pattern = Pattern.compile("a=rtpmap:(.*?)\\r\\n");
+ Matcher m = pattern.matcher(contentStr);
+ while (m.find()) {
+ list.add(m.group(1).replace(" ", "").toLowerCase());
+ }
+ if (list.size() >= 1) {
+ String[] strArray = list.toArray(new String[list.size()]);
+ return Arrays.toString(strArray);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * 针对性修改ser_domain返回值的格式问题,此处主要是处理To_ser_domain字段
+ *
+ * @param serDomainOrNull
+ * @return
+ */
+ private String pointSerDomainPatch(String serDomainOrNull) {
+ if (serDomainOrNull.contains(";")) {
+ serDomainOrNull = serDomainOrNull.split(";")[0];
+ }
+ if (serDomainOrNull.contains("<sip:")) {
+ serDomainOrNull = serDomainOrNull.split("<sip:")[0].replace("\"", "");
+ }
+
+ if (serDomainOrNull.contains("\r") || serDomainOrNull.contains("\n")) {
+ serDomainOrNull = serDomainOrNull.split("[\\r\\n]")[0];
+ }
+ if (serDomainOrNull.contains("\\r") || serDomainOrNull.contains("\\n")) {
+ serDomainOrNull = serDomainOrNull.split("[\\\\r\\\\n]")[0];
+ }
+
+ return serDomainOrNull.replace("+", "")
+ .replace("[", "")
+ .replace("]", "")
+ .replace(" ", "")
+ ;
+ }
+
+ /**
+ * 修整 Res_stat_format 字段格式
+ *
+ * @param replaceBlank
+ * @return
+ */
+ private String repairResStatFormatStr(String replaceBlank) {
+ if (replaceBlank.contains(",")) {
+ replaceBlank = replaceBlank.split(",")[0];
+ } else if (replaceBlank.contains("'")) {
+ replaceBlank = replaceBlank.split("'")[0];
+ if (replaceBlank.contains("-")) {
+ if (replaceBlank.indexOf("-") == (replaceBlank.length() - 1)) {
+ replaceBlank = replaceBlank.replace("-", "");
+ }
+ }
+ } else if (replaceBlank.contains("(")) {
+ replaceBlank = replaceBlank.split("[(]")[0];
+ } else if (replaceBlank.contains("...")) {
+ replaceBlank = replaceBlank.split("\\.\\.\\.")[0];
+ } else if (replaceBlank.contains(".")) {
+ replaceBlank = replaceBlank.split("[.]")[0];
+ } else if (replaceBlank.contains("/")) {
+ replaceBlank = replaceBlank.split("/")[0];
+ } else if (replaceBlank.contains("[")) {
+ replaceBlank = replaceBlank.split("\\[")[0];
+ } else if (replaceBlank.contains("-")) {
+ replaceBlank = replaceBlank.split("-")[0];
+ } else if (replaceBlank.contains(":")) {
+ replaceBlank = replaceBlank.split(":")[0];
+ } else if (replaceBlank.contains(";")) {
+ replaceBlank = replaceBlank.split(";")[0];
+ } else if (replaceBlank.contains("!")) {
+ replaceBlank = replaceBlank.split("[!]")[0];
+ }
+
+ return replaceBlank.replace(" ", "")
+ .replace("-", "")
+ .replace(",", "")
+ .replace(".", "")
+ .replace("[", "")
+ .replace("]", "")
+ .replace("/", "")
+ .replace("!", "")
+ .replace(";", "")
+ .replace(":", "")
+ .replace("@", "")
+ ;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ RealtimeCountConfig.TOPOLOGY_TICK_TUPLE_COMP_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("jsonLog", "logType"));
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java b/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java
new file mode 100644
index 0000000..d09243c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java
@@ -0,0 +1,136 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.common.RealtimeCountConfig;
+import cn.ac.iie.dao.KafkaDB;
+import cn.ac.iie.utils.HiveDao.HdfsDataLoad_Avro;
+import cn.ac.iie.utils.TupleUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+public class SipInsertBoltDC extends BaseBasicBolt {
+
+ private static final long serialVersionUID = -6795251425357896415L;
+ private static Logger logger = Logger.getLogger(SipInsertBoltDC.class);
+
+ private LinkedList<String> sipOriJsonList;//存放sip原始补全日志字符串
+ private LinkedList<String> routeRelationJsonList;//存放voip路由关系日志字符串
+
+ private HdfsDataLoad_Avro hdfsDataLoadAvro;
+
+ private Integer tickFreqSecs;
+
+ public SipInsertBoltDC(Integer tickFreqSecs) {
+ this.tickFreqSecs = tickFreqSecs;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf,
+ TopologyContext context) {
+ hdfsDataLoadAvro = HdfsDataLoad_Avro.getHdfsInstance();
+ sipOriJsonList = new LinkedList<String>();
+ routeRelationJsonList = new LinkedList<String>();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ long time = System.currentTimeMillis() / 1000L;
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");//格式:20190722
+ if (!sipOriJsonList.isEmpty()) {
+ LinkedList<String> tmpListFreq = new LinkedList<String>();
+ tmpListFreq.addAll(sipOriJsonList);
+ sipOriJsonList.clear();
+ hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpListFreq, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
+ }
+
+ //定时写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
+// if (!routeRelationJsonList.isEmpty()) {
+//// Map<String, Long> tmpMap = new HashMap<String, Long>();
+// LinkedList<String> tmpFragListFreq = new LinkedList<String>();
+// tmpFragListFreq.addAll(routeRelationJsonList);
+// routeRelationJsonList.clear();
+// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpFragListFreq);
+//// dcl.dfPzFlowBatchStorage(tmpMap);//正式用,直接入中心http,已验证可用
+//// dbl.dfPzFlowBatchStorage2CH(tmpMap);//测试入clickhouse20190220
+//// dbl.dfPzFlowBatchStorage2CH(tmpFragListFreq);//测试入clickhouse20190220
+// }
+ } else {
+ String jsonLog = tuple.getString(0);
+ String logType = tuple.getString(1);
+ switch (logType) {
+ case "origin":
+ if (StringUtil.isNotBlank(jsonLog)) {
+ sipOriJsonList.add(jsonLog);
+ collector.emit(new Values(jsonLog));
+ }
+ break;
+// case "route"://存放路由关系数据---20190807废弃,路由关系转离线spark清洗获取
+// if (StringUtil.isNotBlank(jsonLog)) {
+// routeRelationJsonList.add(jsonLog);
+// }
+// break;
+ default:
+ logger.error("SipInsertBoltDC logType is error !!!This logType is--->{" + logType + "}<---");
+ break;
+ }
+ long time = System.currentTimeMillis() / 1000L;
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");
+ if (sipOriJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
+ LinkedList<String> tmpList = new LinkedList<String>();
+ tmpList.addAll(sipOriJsonList);
+ sipOriJsonList.clear();
+ hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpList, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
+ }
+ //写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
+// if (routeRelationJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
+// LinkedList<String> tmpRouteList = new LinkedList<String>();
+// tmpRouteList.addAll(routeRelationJsonList);
+// routeRelationJsonList.clear();
+//// dbl.dfPzFlowBatchStorage2CH(tmpRouteList);//测试入clickhouse20190220
+// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpRouteList);
+// }
+
+ }
+ } catch (Exception e) {
+ logger.error("SipInsertBoltDC to insert is error !!!--->{" + e + "}<---");
+ e.printStackTrace();
+ }
+ }
+
+ private void logCount(String key, Map<String, Long> hm) {
+ if (hm.containsKey(key)) {
+ hm.put(key, hm.get(key) + 1);
+ } else {
+ hm.put(key, 1l);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("countJsonLog"));
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/bolt/SipRealTimeCountBoltDC.java b/src/main/java/cn/ac/iie/bolt/SipRealTimeCountBoltDC.java
new file mode 100644
index 0000000..00a4f95
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SipRealTimeCountBoltDC.java
@@ -0,0 +1,182 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.bean.voipSipOrigin.SipOriginALL;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class SipRealTimeCountBoltDC extends BaseBasicBolt {
+
+ private static Logger logger = Logger.getLogger(SipRealTimeCountBoltDC.class);
+ private final Integer tickFreqSecs;
+
+ private Map<String, Long> codingCount = null;
+ private Map<String, Long> ipLocationCount = null;
+ private Map<String, Long> ipTypeCount = null;
+ private Map<String, Long> methodCount = null;
+ private Map<String, Long> resStatCount = null;
+ private Map<String, Long> serverCount = null;
+ private Map<String, Long> serviceDomainCount = null;
+ private Map<String, Long> uaCount = null;
+
+
+ public SipRealTimeCountBoltDC(Integer tickFreqSecs) {
+ this.tickFreqSecs = tickFreqSecs;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ codingCount = new HashMap<>();
+ ipLocationCount = new HashMap<>();
+ ipTypeCount = new HashMap<>();
+ methodCount = new HashMap<>();
+ resStatCount = new HashMap<>();
+ serverCount = new HashMap<>();
+ serviceDomainCount = new HashMap<>();
+ uaCount = new HashMap<>();
+
+ }
+
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+
+ Map<String, Long> tmpServiceMap = new HashMap<String, Long>(serviceDomainCount);
+ serviceDomainCount.clear();
+ sendCount("service", tmpServiceMap, collector);
+
+ Map<String, Long> tmpServerMap = new HashMap<String, Long>(serverCount);
+ serverCount.clear();
+ sendCount("server", tmpServerMap, collector);
+
+ Map<String, Long> tmpUaMap = new HashMap<String, Long>(uaCount);
+ uaCount.clear();
+ sendCount("ua", tmpUaMap, collector);
+
+ Map<String, Long> tmpLocationMap = new HashMap<String, Long>(ipLocationCount);
+ ipLocationCount.clear();
+ sendCount("location", tmpLocationMap, collector);
+
+ Map<String, Long> tmpTypeMap = new HashMap<String, Long>(ipTypeCount);
+ ipTypeCount.clear();
+ sendCount("type", tmpTypeMap, collector);
+
+ Map<String, Long> tmpMethodMap = new HashMap<String, Long>(methodCount);
+ methodCount.clear();
+ sendCount("method", tmpMethodMap, collector);
+
+ Map<String, Long> tmpResStatMap = new HashMap<String, Long>(resStatCount);
+ resStatCount.clear();
+ sendCount("resStat", tmpResStatMap, collector);
+
+ Map<String, Long> tmpCodingMap = new HashMap<String, Long>(codingCount);
+ codingCount.clear();
+ sendCount("coding", tmpCodingMap, collector);
+
+ } else {
+ try {
+
+ String jsonLog = tuple.getString(0);
+
+ SipOriginALL sipOriginLog = JSONObject.parseObject(jsonLog, SipOriginALL.class);
+
+ //预处理cseq
+ String cSeq = "NULL";
+ String rawCSeq = sipOriginLog.getCseq();
+ if(null != rawCSeq) {
+ String[] splitCSeq = rawCSeq.toUpperCase().split("[\\s]+");
+ if(splitCSeq.length > 1) {
+ cSeq = splitCSeq[1];
+ }
+ }
+
+ //提取所需的字段
+ String service = sipOriginLog.getTo_ser_domain();
+ String server = sipOriginLog.getServer();
+ String ua = sipOriginLog.getUser_Agent();
+ String srcCtyReg = sipOriginLog.getSRC_LOCATION_NATION() + "+" + sipOriginLog.getSRC_LOCATION_REGION() + "+" + sipOriginLog.getSRC_LOCATION_NATION_CODE();
+ String dstCtyReg = sipOriginLog.getDST_LOCATION_NATION() + "+" + sipOriginLog.getDST_LOCATION_REGION() + "+" + sipOriginLog.getDST_LOCATION_NATION_CODE();
+ String type = sipOriginLog.getIP_TYPE();
+ String method = sipOriginLog.getMethod();
+ String resStat = sipOriginLog.getRes_stat_format() + "+" + cSeq;
+ String reqCodings = sipOriginLog.getReq_coding();
+ String resCodings = sipOriginLog.getRes_coding();
+
+ //计数
+ logCount(service, serviceDomainCount);
+ logCount(server, serverCount);
+ logCount(ua, uaCount);
+ logCount(srcCtyReg, ipLocationCount);
+ logCount(dstCtyReg, ipLocationCount);
+ logCount(type, ipTypeCount);
+ logCount(method, methodCount);
+ logCount(resStat, resStatCount);
+ if(null != reqCodings) {
+ String[] reqSplit = reqCodings.split("[,\\[\\]]");
+ for(int i = 1; i < reqSplit.length; i++) {
+ logCount(reqSplit[i].trim(), codingCount);
+ }
+ } else {
+ logCount("NULL", codingCount);
+ }
+ if(null != resCodings) {
+ String[] resSplit = resCodings.split("[,\\[\\]]");
+ for(int j = 1; j < resSplit.length; j++) {
+ logCount(resSplit[j].trim(), codingCount);
+ }
+ } else {
+ logCount("NULL", codingCount);
+ }
+ } catch (Exception e) {
+ logger.error("SipRealTimeCountBoltDC error !!!--->{" + e + "}<---");
+ logger.error("SipRealTimeCountBoltDC data is !!!--->{" + tuple.getString(0) + "}<---");
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private void logCount(String key, Map<String, Long> hm) {
+ if (hm.containsKey(key)) {
+ hm.put(key, hm.get(key) + 1);
+ } else {
+ hm.put(key, 1l);
+ }
+ }
+
+ private void sendCount(String countType, Map<String, Long> hm, BasicOutputCollector collector) {
+
+ long time = System.currentTimeMillis();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String currentTime = sdf.format(time);
+
+ String jsonString = JSON.toJSONString(hm);
+ collector.emit(new Values(countType, jsonString, currentTime));
+
+ }
+
+
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);
+ return conf;
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("countType", "jsonCount", "currentTime"));
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/bolt/SipRealTimeMergeBoltDC.java b/src/main/java/cn/ac/iie/bolt/SipRealTimeMergeBoltDC.java
new file mode 100644
index 0000000..fcf9af9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SipRealTimeMergeBoltDC.java
@@ -0,0 +1,604 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.bean.voipSipCount.*;
+import cn.ac.iie.dao.DbConnect;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class SipRealTimeMergeBoltDC extends BaseBasicBolt {
+
+ private static Logger logger = Logger.getLogger(SipRealTimeMergeBoltDC.class);
+
+ private static DbConnect manager = DbConnect.getInstance();
+ private Connection connection = null;
+ private PreparedStatement pstmt = null;
+
+ private final Integer tickFreqSecs;
+
+ private Map<String, VoipServiceDomain> serviceDomainCountResult;
+ private Map<String, VoipServer> serverCountResult;
+ private Map<String, VoipUa> uaCountResult;
+ private Map<String, VoipIpLocation> ipLocationCountResult;
+ private Map<String, VoipIpType> ipTypeCountResult;
+ private Map<String, VoipMethod> methodCountResult;
+ private Map<String, VoipResStat> resStatCountResult;
+ private Map<String, VoipCoding> codingCountResult;
+
+
+ public SipRealTimeMergeBoltDC(Integer tickFreqSecs) {
+ this.tickFreqSecs = tickFreqSecs;
+ }
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+
+ serviceDomainCountResult = new HashMap<>();
+ serverCountResult = new HashMap<>();
+ uaCountResult = new HashMap<>();
+ ipLocationCountResult = new HashMap<>();
+ ipTypeCountResult = new HashMap<>();
+ methodCountResult = new HashMap<>();
+ resStatCountResult = new HashMap<>();
+ codingCountResult = new HashMap<>();
+
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+
+
+ if (TupleUtils.isTick(input)) {
+
+ long time = System.currentTimeMillis();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String currentTime = sdf.format(time);
+
+ if (!serviceDomainCountResult.isEmpty()) {
+ Map<String, VoipServiceDomain> tmpMap = new HashMap<String, VoipServiceDomain>(serviceDomainCountResult);
+ serviceDomainCountResult.clear();
+ batchInsertServiceDomain(tmpMap, currentTime);
+ }
+ if (!serverCountResult.isEmpty()) {
+ Map<String, VoipServer> tmpMap = new HashMap<String, VoipServer>(serverCountResult);
+ serverCountResult.clear();
+ batchInsertServer(tmpMap, currentTime);
+ }
+ if (!uaCountResult.isEmpty()) {
+ Map<String, VoipUa> tmpMap = new HashMap<String, VoipUa>(uaCountResult);
+ uaCountResult.clear();
+ batchInsertUa(tmpMap, currentTime);
+ }
+ if (!ipLocationCountResult.isEmpty()) {
+ Map<String, VoipIpLocation> tmpMap = new HashMap<String, VoipIpLocation>(ipLocationCountResult);
+ ipLocationCountResult.clear();
+ batchInsertIpLocation(tmpMap, currentTime);
+ }
+ if (!ipTypeCountResult.isEmpty()) {
+ Map<String, VoipIpType> tmpMap = new HashMap<String, VoipIpType>(ipTypeCountResult);
+ ipTypeCountResult.clear();
+ batchInsertIpType(tmpMap, currentTime);
+ }
+ if (!methodCountResult.isEmpty()) {
+ Map<String, VoipMethod> tmpMap = new HashMap<String, VoipMethod>(methodCountResult);
+ methodCountResult.clear();
+ batchInsertMethod(tmpMap, currentTime);
+ }
+ if (!resStatCountResult.isEmpty()) {
+ Map<String, VoipResStat> tmpMap = new HashMap<String, VoipResStat>(resStatCountResult);
+ resStatCountResult.clear();
+ batchInsertResStat(tmpMap, currentTime);
+ }
+ if (!codingCountResult.isEmpty()) {
+ Map<String, VoipCoding> tmpMap = new HashMap<String, VoipCoding>(codingCountResult);
+ codingCountResult.clear();
+ batchInsertCoding(tmpMap, currentTime);
+ }
+
+ logger.warn("Real time count result Insert Clickhouse execute at " + currentTime);
+
+ } else {
+
+ String countType = input.getStringByField("countType");
+ String jsonCount = input.getStringByField("jsonCount");
+ String currentTime = input.getStringByField("currentTime");
+ JSONObject jsonObject = JSONObject.parseObject(jsonCount);
+ Map<String, Object> hmCount = (Map<String, Object>) jsonObject;
+
+
+ switch (countType) {
+ case "service":
+ mergeServiceDomainResult(hmCount, serviceDomainCountResult, currentTime);
+ break;
+ case "server":
+ mergeServerResult(hmCount, serverCountResult, currentTime);
+ break;
+ case "ua":
+ mergeUaResult(hmCount, uaCountResult, currentTime);
+ break;
+ case "location":
+ mergeIpLocationResult(hmCount, ipLocationCountResult, currentTime);
+ break;
+ case "type":
+ mergeIpTypeResult(hmCount, ipTypeCountResult, currentTime);
+ break;
+ case "method":
+ mergeMethodResult(hmCount, methodCountResult, currentTime);
+ break;
+ case "resStat":
+ mergeResStatResult(hmCount, resStatCountResult, currentTime);
+ break;
+ case "coding":
+ mergeCodingResult(hmCount, codingCountResult, currentTime);
+ break;
+
+ }
+
+ }
+
+ }
+
+ private String removeBlank(String IP) {
+ while (IP.startsWith(" ")) {
+ IP = IP.substring(1, IP.length()).trim();
+ }
+ while (IP.endsWith(" ")) {
+ IP = IP.substring(0, IP.length() - 1).trim();
+ }
+ return IP;
+ }
+
+ /**
+ * 判断IP
+ */
+ private boolean isIp(String IP) {
+ if (null == IP) {
+ return false;
+ }
+ IP = this.removeBlank(IP);
+ if (IP.length() < 7 || IP.length() > 21) {
+ return false;
+ }
+ if (IP.contains(".")) {
+ String[] arr = IP.split("[.:]");
+ if (!(arr.length == 4 || arr.length == 5)) {
+ return false;
+ }
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < arr[i].length(); j++) {
+ char temp = arr[i].charAt(j);
+ if (!(temp >= '0' && temp <= '9')) {
+ return false;
+ }
+ }
+ }
+ for (int i = 0; i < 4; i++) {
+ if("" == arr[i] || arr[i].length() > 3)
+ {
+ return false;
+ }
+ int temp = Integer.parseInt(arr[i]);
+ if (temp < 0 || temp > 255) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void mergeServiceDomainResult(Map<String, Object> hmCount, Map<String, VoipServiceDomain> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipServiceDomain object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipServiceDomain object = new VoipServiceDomain();
+ object.setService(key);
+ if (isIp(key)) {
+ object.setType("0");
+ } else {
+ object.setType("1");
+ }
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeServerResult(Map<String, Object> hmCount, Map<String, VoipServer> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.parseLong(str);
+
+ if (hm.containsKey(key)) {
+ VoipServer object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipServer object = new VoipServer();
+ object.setServer(key);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeUaResult(Map<String, Object> hmCount, Map<String, VoipUa> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipUa object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipUa object = new VoipUa();
+ object.setUa(key);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeIpLocationResult(Map<String, Object> hmCount, Map<String, VoipIpLocation> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipIpLocation object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipIpLocation object = new VoipIpLocation();
+ String[] items = key.split("\\+");
+ String country = items[0];
+ String region = items[1];
+ String code = items[2];
+ object.setCountry(country);
+ object.setRegion(region);
+ object.setNationCode(code);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeIpTypeResult(Map<String, Object> hmCount, Map<String, VoipIpType> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipIpType object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipIpType object = new VoipIpType();
+ object.setType(key);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeMethodResult(Map<String, Object> hmCount, Map<String, VoipMethod> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipMethod object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipMethod object = new VoipMethod();
+ object.setMethod(key);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeResStatResult(Map<String, Object> hmCount, Map<String, VoipResStat> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipResStat object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipResStat object = new VoipResStat();
+ String[] items = key.split("\\+");
+ String res_stat = items[0];
+ String cseq = items[1];
+ object.setRes_stat(res_stat);
+ object.setCseq(cseq);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void mergeCodingResult(Map<String, Object> hmCount, Map<String, VoipCoding> hm, String currentTime) {
+
+ Iterator<Map.Entry<String, Object>> iter = hmCount.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Object> entry = iter.next();
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String str = String.valueOf(val);
+ Long count = Long.valueOf(str);
+
+ if (hm.containsKey(key)) {
+ VoipCoding object = hm.get(key);
+ object.setCount(object.getCount() + count);
+ } else {
+ VoipCoding object = new VoipCoding();
+ object.setCoding(key);
+ object.setCount(count);
+ object.setInterval_time(currentTime);
+ hm.put(key, object);
+ }
+ }
+ }
+
+ private void batchInsertServiceDomain(Map<String, VoipServiceDomain> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_service_domain values(?, ?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipServiceDomain> entry : hm.entrySet()) {
+ VoipServiceDomain object = entry.getValue();
+ pstmt.setString(1, object.getService());
+ pstmt.setString(2, object.getType());
+ pstmt.setLong(3, object.getCount());
+ pstmt.setString(4, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_service_domain Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertCoding(Map<String, VoipCoding> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_coding values(?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipCoding> entry : hm.entrySet()) {
+ VoipCoding object = entry.getValue();
+ pstmt.setString(1, object.getCoding());
+ pstmt.setLong(2, object.getCount());
+ pstmt.setString(3, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_ua Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertIpLocation(Map<String, VoipIpLocation> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_ip_location values(?, ?, ?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipIpLocation> entry : hm.entrySet()) {
+ VoipIpLocation object = entry.getValue();
+ pstmt.setString(1, object.getCountry());
+ pstmt.setString(2, object.getRegion());
+ pstmt.setString(3, object.getNationCode());
+ pstmt.setLong(4, object.getCount());
+ pstmt.setString(5, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_ip_location Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertIpType(Map<String, VoipIpType> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_ip_type values(?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipIpType> entry : hm.entrySet()) {
+ VoipIpType object = entry.getValue();
+ pstmt.setString(1, object.getType());
+ pstmt.setLong(2, object.getCount());
+ pstmt.setString(3, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_ip_type Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertMethod(Map<String, VoipMethod> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_method values(?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipMethod> entry : hm.entrySet()) {
+ VoipMethod object = entry.getValue();
+ pstmt.setString(1, object.getMethod());
+ pstmt.setLong(2, object.getCount());
+ pstmt.setString(3, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_method Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertResStat(Map<String, VoipResStat> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_res_stat values(?, ?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipResStat> entry : hm.entrySet()) {
+ VoipResStat object = entry.getValue();
+ pstmt.setString(1, object.getRes_stat());
+ pstmt.setString(2, object.getCseq());
+ pstmt.setLong(3, object.getCount());
+ pstmt.setString(4, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_res_stat Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertServer(Map<String, VoipServer> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_server values(?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipServer> entry : hm.entrySet()) {
+ VoipServer object = entry.getValue();
+ pstmt.setString(1, object.getServer());
+ pstmt.setLong(2, object.getCount());
+ pstmt.setString(3, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_server Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+ private void batchInsertUa(Map<String, VoipUa> hm, String currentTime) {
+ String sql = "insert into voip_knowledge.voip_ua values(?, ?, ?)";
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ for (Map.Entry<String, VoipUa> entry : hm.entrySet()) {
+ VoipUa object = entry.getValue();
+ pstmt.setString(1, object.getUa());
+ pstmt.setLong(2, object.getCount());
+ pstmt.setString(3, currentTime);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ logger.error("+++++++++insert to voip_ua Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, connection);
+ }
+ }
+
+
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);
+ return conf;
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}