diff options
Diffstat (limited to 'src')
5 files changed, 120 insertions, 19 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java index 91155bd..d90c9bc 100644 --- a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java +++ b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java @@ -1,5 +1,6 @@ package cn.ac.iie.bolt; +import cn.ac.iie.bean.ConnectionTime; import cn.ac.iie.common.ConnectionsCountConfig; import cn.ac.iie.utils.TimeUtils; import cn.ac.iie.utils.system.TupleUtils; @@ -15,9 +16,7 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import static cn.ac.iie.utils.Statistical.*; import static cn.ac.iie.utils.TimeUtils.getNowTime; /** @@ -42,37 +41,52 @@ public class TimeCalculateBolt extends BaseBasicBolt { private static final long serialVersionUID = -6628202735196670575L; private static Logger logger = Logger.getLogger(TimeCalculateBolt.class); private static Long nowTime; + private static Map<Integer, Long> startMap; + private static Map<Integer, Long> endMap; + private static Map<Integer, Long> intervalMap; + private static List<String> list; @Override public void prepare(Map stormConf, TopologyContext context) { nowTime = getNowTime(); + startMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + endMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + intervalMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER); + list = new ArrayList<>(); + for (int i = 0; i < ConnectionsCountConfig.MAX_NUMBER; i++) { + startMap.put(i, 0L); + endMap.put(i, 0L); + intervalMap.put(i, 0L); + } } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { if (TupleUtils.isTick(tuple)) { - List<String> result = getResult(nowTime); + List<String> result = new ArrayList<>(getResult(nowTime)); + list.clear(); if (result.size() > 0) { - basicOutputCollector.emit(new Values(getResult(nowTime).toString())); - clear(); + basicOutputCollector.emit(new Values(result.toString())); } nowTime = getNowTime(); } else { String message = tuple.getString(0); JSONObject jsonObject = JSON.parseObject(message); - int startTime = jsonObject.getInteger("start_time"); - int endTime = jsonObject.getInteger("end_time"); - if (startTime != 0 && endTime != 0) { - int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime); - int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime); + if (message.contains("start_time") && message.contains("end_time")) { + int startTime = jsonObject.getInteger("start_time"); + int endTime = jsonObject.getInteger("end_time"); + if (startTime != 0 && endTime != 0) { + int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime); + int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime); // System.out.println(newConnDifference + "\t\t\t" + endConnDifference); - timeIntervalCount(newConnDifference, endConnDifference); + timeIntervalCount(newConnDifference, endConnDifference); + } } } } catch (Exception e) { - logger.error("解析和写入kafka出现异常", e); + logger.error("解析和写入kafka出现异常"); e.printStackTrace(); } } @@ -89,4 +103,68 @@ public class TimeCalculateBolt extends BaseBasicBolt { outputFieldsDeclarer.declare(new Fields("value")); } + /** + * 对时间间隔进行计算,间隔内的数据进行累加等 + * + * @param newConnDifference 新建连接时间差 + * @param endConnDifference 关闭连接时间差 + */ + private static void timeIntervalCount(int newConnDifference, int endConnDifference) { + try { + boolean start = false; + boolean end = false; + if (newConnDifference >= 0 && newConnDifference < ConnectionsCountConfig.MAX_NUMBER) { + startMap.put(newConnDifference, startMap.get(newConnDifference) + 1); + start = true; + } + + if (endConnDifference >= 0 && endConnDifference < ConnectionsCountConfig.MAX_NUMBER) { + endMap.put(endConnDifference, endMap.get(endConnDifference) + 1); + end = true; + } + + if (start && end) { + if (newConnDifference == 0 && endConnDifference == 0) { + intervalMap.put(0, intervalMap.get(0) + 1); + } else if (newConnDifference - endConnDifference > 0) { + for (int i = endConnDifference; i <= newConnDifference; i++) { + intervalMap.put(i, intervalMap.get(i) + 1); + } + } + } else if (!start && end) { + for (int i = ConnectionsCountConfig.MAX_NUMBER - 1; i >= endConnDifference; i--) { + intervalMap.put(i, intervalMap.get(i) + 1); + } + } + + } catch (Exception e) { + logger.error("错误的开始和结束" + newConnDifference + endConnDifference); + } + } + + /** + * 获取结果集传入下一bolt + * + * @return 统计结果集 + */ + private static List<String> getResult(Long nowTime) { + ConnectionTime connectionTime = new ConnectionTime(); + for (int key : startMap.keySet()) { + if (startMap.get(key) == 0 && intervalMap.get(key) == 0 && endMap.get(key) == 0) { +// logger.info("结果均为0不予以入库"); + } else { + connectionTime.setNew_conn_num(startMap.get(key)); + connectionTime.setLive_conn_num(intervalMap.get(key)); + connectionTime.setClose_conn_num(endMap.get(key)); + connectionTime.setStat_time(nowTime - 30 * key); + list.add(JSONObject.toJSONString(connectionTime)); + startMap.put(key, 0L); + endMap.put(key, 0L); + intervalMap.put(key, 0L); + } + } + return list; + } + + } diff --git a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java index b0be497..54f8635 100644 --- a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java +++ b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java @@ -13,7 +13,6 @@ public class ConnectionsCountConfig implements Serializable { public static final String LOG_STRING_SPLITTER = ","; public static final String EMPTY_OPTION_CHARACTER = "-"; - public static final Integer MAX_NUMBER = ConnectionsCountConfigurations.getIntProperty(1, "max.number"); public static final Integer SPOUT_PARALLELISM = ConnectionsCountConfigurations.getIntProperty(1, "topology.spout.parallelism"); public static final String RESULT_SOUTPUT_TOPIC = ConnectionsCountConfigurations.getStringProperty(1, "results.output.topics"); diff --git a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java index 3d1b9d7..0e847cf 100644 --- a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java @@ -3,7 +3,6 @@ package cn.ac.iie.topology; import cn.ac.iie.bolt.InsertBolt; import cn.ac.iie.bolt.TimeCalculateBolt; -import cn.ac.iie.bolt.TimeCalculateTestBolt; import cn.ac.iie.common.ConnectionsCountConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -56,10 +55,8 @@ public class LogConnectionsTopology { private void buildTopology() { builder = new TopologyBuilder(); builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), ConnectionsCountConfig.SPOUT_PARALLELISM); -// builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); -// builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt"); - builder.setBolt("TimeCalculateTestBolt", new TimeCalculateTestBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); - builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateTestBolt"); + builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); + builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt"); } diff --git a/src/test/java/cn/ac/iie/test/JsonTest.java b/src/test/java/cn/ac/iie/test/JsonTest.java new file mode 100644 index 0000000..e19e97a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/JsonTest.java @@ -0,0 +1,27 @@ +package cn.ac.iie.test; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.junit.Test; + +import java.util.BitSet; + +public class JsonTest { + + @Test + public void nullKeyTest() { + try { + String message = "{\"stream_trace_id\":\"11570861088.164491556\",\"policy_id\":-1,\"action\":0,\"service\":0,\"start_time\":1570861088,\"addr_type\":4,\"client_ip\":\"92.47.150.222\",\"server_ip\":\"203.205.142.154\",\"client_port\":59253,\"server_port\":80,\"trans_proto\":\"IPv4_TCP\",\"entrance_id\":0,\"device_id\":0,\"link_id\":0,\"isp\":\"\",\"encap_type\":0,\"pinningst\":0,\"intercept_state\":0,\"ssl_server_side_latency\":0,\"ssl_client_side_latency\":0,\"ssl_server_side_version\":\"\",\"ssl_client_side_version\":\"\",\"direction\":0,\"stream_dir\":3,\"cap_ip\":\"10.4.39.21\",\"addr_list\":\"\",\"c2s_pkt_num\":4,\"s2c_pkt_num\":2,\"c2s_byte_num\":0,\"s2c_byte_num\":0,\"has_dup_traffic\":0,\"stream_error\":\"e_no_data\"}"; + JSONObject jsonObject = JSON.parseObject(message); + int startTime = 0; + int endTime = 0; + if (message.contains("start_time") && message.contains("end_time")) { + startTime = jsonObject.getInteger("start_time"); + endTime = jsonObject.getInteger("end_time"); + } + System.out.println("starTime:" + startTime + "\tendTime:" + endTime); + } catch (NullPointerException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/cn/ac/iie/test/KafkaTest.java b/src/test/java/cn/ac/iie/test/KafkaTest.java index ec74e85..00df19c 100644 --- a/src/test/java/cn/ac/iie/test/KafkaTest.java +++ b/src/test/java/cn/ac/iie/test/KafkaTest.java @@ -31,7 +31,7 @@ public class KafkaTest { urlsBean.setC2s_pkt_num(1000); urlsBean.setS2c_byte_num(1000); urlsBean.setS2c_pkt_num(1000); - kafkaProducer.send(new ProducerRecord<>("test", JSONObject.toJSONString(urlsBean)), new Callback() { + kafkaProducer.send(new ProducerRecord<>("JsonTest", JSONObject.toJSONString(urlsBean)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null){ |
