summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java
blob: d09243cc67802e43d0e8946296dd732165b0cc81 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package cn.ac.iie.bolt;

import cn.ac.iie.common.RealtimeCountConfig;
import cn.ac.iie.dao.KafkaDB;
import cn.ac.iie.utils.HiveDao.HdfsDataLoad_Avro;
import cn.ac.iie.utils.TupleUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

public class SipInsertBoltDC extends BaseBasicBolt {

    private static final long serialVersionUID = -6795251425357896415L;
    private static Logger logger = Logger.getLogger(SipInsertBoltDC.class);

    private LinkedList<String> sipOriJsonList;//存放sip原始补全日志字符串
    private LinkedList<String> routeRelationJsonList;//存放voip路由关系日志字符串

    private HdfsDataLoad_Avro hdfsDataLoadAvro;

    private Integer tickFreqSecs;

    public SipInsertBoltDC(Integer tickFreqSecs) {
        this.tickFreqSecs = tickFreqSecs;
    }

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf,
                        TopologyContext context) {
        hdfsDataLoadAvro = HdfsDataLoad_Avro.getHdfsInstance();
        sipOriJsonList = new LinkedList<String>();
        routeRelationJsonList = new LinkedList<String>();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        try {
            if (TupleUtils.isTick(tuple)) {
                long time = System.currentTimeMillis() / 1000L;
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");//格式:20190722
                if (!sipOriJsonList.isEmpty()) {
                    LinkedList<String> tmpListFreq = new LinkedList<String>();
                    tmpListFreq.addAll(sipOriJsonList);
                    sipOriJsonList.clear();
                    hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpListFreq, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
                }

                //定时写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
//                if (!routeRelationJsonList.isEmpty()) {
////                    Map<String, Long> tmpMap = new HashMap<String, Long>();
//                    LinkedList<String> tmpFragListFreq = new LinkedList<String>();
//                    tmpFragListFreq.addAll(routeRelationJsonList);
//                    routeRelationJsonList.clear();
//                    kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpFragListFreq);
////                    dcl.dfPzFlowBatchStorage(tmpMap);//正式用,直接入中心http,已验证可用
////                    dbl.dfPzFlowBatchStorage2CH(tmpMap);//测试入clickhouse20190220
////                    dbl.dfPzFlowBatchStorage2CH(tmpFragListFreq);//测试入clickhouse20190220
//                }
            } else {
                String jsonLog = tuple.getString(0);
                String logType = tuple.getString(1);
                switch (logType) {
                    case "origin":
                        if (StringUtil.isNotBlank(jsonLog)) {
                            sipOriJsonList.add(jsonLog);
                            collector.emit(new Values(jsonLog));
                        }
                        break;
//                    case "route"://存放路由关系数据---20190807废弃,路由关系转离线spark清洗获取
//                        if (StringUtil.isNotBlank(jsonLog)) {
//                            routeRelationJsonList.add(jsonLog);
//                        }
//                        break;
                    default:
                        logger.error("SipInsertBoltDC logType is error !!!This logType is--->{" + logType + "}<---");
                        break;
                }
                long time = System.currentTimeMillis() / 1000L;
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");
                if (sipOriJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
                    LinkedList<String> tmpList = new LinkedList<String>();
                    tmpList.addAll(sipOriJsonList);
                    sipOriJsonList.clear();
                    hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpList, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
                }
                //写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
//                if (routeRelationJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
//                    LinkedList<String> tmpRouteList = new LinkedList<String>();
//                    tmpRouteList.addAll(routeRelationJsonList);
//                    routeRelationJsonList.clear();
////                    dbl.dfPzFlowBatchStorage2CH(tmpRouteList);//测试入clickhouse20190220
//                    kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpRouteList);
//                }

            }
        } catch (Exception e) {
            logger.error("SipInsertBoltDC to insert is error !!!--->{" + e + "}<---");
            e.printStackTrace();
        }
    }

    private void logCount(String key, Map<String, Long> hm) {
        if (hm.containsKey(key)) {
            hm.put(key, hm.get(key) + 1);
        } else {
            hm.put(key, 1l);
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);
        return conf;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("countJsonLog"));
    }
}