diff options
| author | qidaijie <[email protected]> | 2019-10-18 11:40:58 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2019-10-18 11:40:58 +0800 |
| commit | fac7c80c5977f82ff92061ca27c77ebf4ee08915 (patch) | |
| tree | 7e946f9d8fc3f80697622846eac91efa26c41499 /src/main | |
| parent | 539803d932cf0c928ab0a174de3383b7ab894347 (diff) | |
Diffstat (limited to 'src/main')
3 files changed, 92 insertions, 18 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java index 91155bd..d90c9bc 100644 --- a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java +++ b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java @@ -1,5 +1,6 @@ package cn.ac.iie.bolt; +import cn.ac.iie.bean.ConnectionTime; import cn.ac.iie.common.ConnectionsCountConfig; import cn.ac.iie.utils.TimeUtils; import cn.ac.iie.utils.system.TupleUtils; @@ -15,9 +16,7 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import static cn.ac.iie.utils.Statistical.*; import static cn.ac.iie.utils.TimeUtils.getNowTime; /** @@ -42,37 +41,52 @@ public class TimeCalculateBolt extends BaseBasicBolt { private static final long serialVersionUID = -6628202735196670575L; private static Logger logger = Logger.getLogger(TimeCalculateBolt.class); private static Long nowTime; + private static Map<Integer, Long> startMap; + private static Map<Integer, Long> endMap; + private static Map<Integer, Long> intervalMap; + private static List<String> list; @Override public void prepare(Map stormConf, TopologyContext context) { nowTime = getNowTime(); + startMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + endMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + intervalMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + list = new ArrayList<>(); + for (int i = 0; i < ConnectionsCountConfig.MAX_NUMBER; i++) { + startMap.put(i, 0L); + endMap.put(i, 0L); + intervalMap.put(i, 0L); + } } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { if (TupleUtils.isTick(tuple)) { - List<String> result = getResult(nowTime); + List<String> result = new ArrayList<>(getResult(nowTime)); + list.clear(); if (result.size() > 0) { - basicOutputCollector.emit(new Values(getResult(nowTime).toString())); - clear(); + basicOutputCollector.emit(new Values(result.toString())); } nowTime = getNowTime(); } else { String message = tuple.getString(0); JSONObject jsonObject = JSON.parseObject(message); - int startTime = jsonObject.getInteger("start_time"); - int endTime = jsonObject.getInteger("end_time"); - if (startTime != 0 && endTime != 0) { - int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime); - int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime); + if (message.contains("start_time") && message.contains("end_time")) { + int startTime = jsonObject.getInteger("start_time"); + int endTime = jsonObject.getInteger("end_time"); + if (startTime != 0 && endTime != 0) { + int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime); + int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime); // System.out.println(newConnDifference + "\t\t\t" + endConnDifference); - timeIntervalCount(newConnDifference, endConnDifference); + timeIntervalCount(newConnDifference, endConnDifference); + } } } } catch (Exception e) { - logger.error("解析和写入kafka出现异常", e); + logger.error("解析和写入kafka出现异常"); e.printStackTrace(); } } @@ -89,4 +103,68 @@ public class TimeCalculateBolt extends BaseBasicBolt { outputFieldsDeclarer.declare(new Fields("value")); } + /** + * 对时间间隔进行计算,间隔内的数据进行累加等 + * + * @param newConnDifference 新建连接时间差 + * @param endConnDifference 关闭连接时间差 + */ + private static void timeIntervalCount(int newConnDifference, int endConnDifference) { + try { + boolean start = false; + boolean end = false; + if (newConnDifference >= 0 && newConnDifference < ConnectionsCountConfig.MAX_NUMBER) { + startMap.put(newConnDifference, startMap.get(newConnDifference) + 1); + start = true; + } + + if (endConnDifference >= 0 && endConnDifference < ConnectionsCountConfig.MAX_NUMBER) { + endMap.put(endConnDifference, endMap.get(endConnDifference) + 1); + end = true; + } + + if (start && end) { + if (newConnDifference == 0 && endConnDifference == 0) { + intervalMap.put(0, intervalMap.get(0) + 1); + } else if (newConnDifference - endConnDifference > 0) { + for (int i = endConnDifference; i <= newConnDifference; i++) { + intervalMap.put(i, intervalMap.get(i) + 1); + } + } + } else if (!start && end) { + for (int i = ConnectionsCountConfig.MAX_NUMBER - 1; i >= endConnDifference; i--) { + intervalMap.put(i, intervalMap.get(i) + 1); + } + } + + } catch (Exception e) { + logger.error("错误的开始和结束" + newConnDifference + endConnDifference); + } + } + + /** + * 获取结果集传入下一bolt + * + * @return 统计结果集 + */ + private static List<String> getResult(Long nowTime) { + ConnectionTime connectionTime = new ConnectionTime(); + for (int key : startMap.keySet()) { + if (startMap.get(key) == 0 && intervalMap.get(key) == 0 && endMap.get(key) == 0) { +// logger.info("结果均为0不予以入库"); + } else { + connectionTime.setNew_conn_num(startMap.get(key)); + connectionTime.setLive_conn_num(intervalMap.get(key)); + connectionTime.setClose_conn_num(endMap.get(key)); + connectionTime.setStat_time(nowTime - 30 * key); + list.add(JSONObject.toJSONString(connectionTime)); + startMap.put(key, 0L); + endMap.put(key, 0L); + intervalMap.put(key, 0L); + } + } + return list; + } + + } diff --git a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java index b0be497..54f8635 100644 --- a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java +++ b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java @@ -13,7 +13,6 @@ public class ConnectionsCountConfig implements Serializable { public static final String LOG_STRING_SPLITTER = ","; public static final String EMPTY_OPTION_CHARACTER = "-"; - public static final Integer MAX_NUMBER = ConnectionsCountConfigurations.getIntProperty(1, "max.number"); public static final Integer SPOUT_PARALLELISM = ConnectionsCountConfigurations.getIntProperty(1, "topology.spout.parallelism"); public static final String RESULT_SOUTPUT_TOPIC = ConnectionsCountConfigurations.getStringProperty(1, "results.output.topics"); diff --git a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java index 3d1b9d7..0e847cf 100644 --- a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java @@ -3,7 +3,6 @@ package cn.ac.iie.topology; import cn.ac.iie.bolt.InsertBolt; import cn.ac.iie.bolt.TimeCalculateBolt; -import cn.ac.iie.bolt.TimeCalculateTestBolt; import cn.ac.iie.common.ConnectionsCountConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -56,10 +55,8 @@ public class LogConnectionsTopology { private void buildTopology() { builder = new TopologyBuilder(); builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), ConnectionsCountConfig.SPOUT_PARALLELISM); -// builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); -// builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt"); - builder.setBolt("TimeCalculateTestBolt", new TimeCalculateTestBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); - builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateTestBolt"); + builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); + builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt"); } |
