summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java102
-rw-r--r--src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java1
-rw-r--r--src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java7
-rw-r--r--src/test/java/cn/ac/iie/test/JsonTest.java27
-rw-r--r--src/test/java/cn/ac/iie/test/KafkaTest.java2
5 files changed, 120 insertions, 19 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java
index 91155bd..d90c9bc 100644
--- a/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java
+++ b/src/main/java/cn/ac/iie/bolt/TimeCalculateBolt.java
@@ -1,5 +1,6 @@
package cn.ac.iie.bolt;
+import cn.ac.iie.bean.ConnectionTime;
import cn.ac.iie.common.ConnectionsCountConfig;
import cn.ac.iie.utils.TimeUtils;
import cn.ac.iie.utils.system.TupleUtils;
@@ -15,9 +16,7 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import static cn.ac.iie.utils.Statistical.*;
import static cn.ac.iie.utils.TimeUtils.getNowTime;
/**
@@ -42,37 +41,52 @@ public class TimeCalculateBolt extends BaseBasicBolt {
private static final long serialVersionUID = -6628202735196670575L;
private static Logger logger = Logger.getLogger(TimeCalculateBolt.class);
private static Long nowTime;
+ private static Map<Integer, Long> startMap;
+ private static Map<Integer, Long> endMap;
+ private static Map<Integer, Long> intervalMap;
+ private static List<String> list;
@Override
public void prepare(Map stormConf, TopologyContext context) {
nowTime = getNowTime();
+ startMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER);
+ endMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER);
+ intervalMap = new HashMap<>(ConnectionsCountConfig.MAX_NUMBER);
+ list = new ArrayList<>();
+ for (int i = 0; i < ConnectionsCountConfig.MAX_NUMBER; i++) {
+ startMap.put(i, 0L);
+ endMap.put(i, 0L);
+ intervalMap.put(i, 0L);
+ }
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
- List<String> result = getResult(nowTime);
+ List<String> result = new ArrayList<>(getResult(nowTime));
+ list.clear();
if (result.size() > 0) {
- basicOutputCollector.emit(new Values(getResult(nowTime).toString()));
- clear();
+ basicOutputCollector.emit(new Values(result.toString()));
}
nowTime = getNowTime();
} else {
String message = tuple.getString(0);
JSONObject jsonObject = JSON.parseObject(message);
- int startTime = jsonObject.getInteger("start_time");
- int endTime = jsonObject.getInteger("end_time");
- if (startTime != 0 && endTime != 0) {
- int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime);
- int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime);
+ if (message.contains("start_time") && message.contains("end_time")) {
+ int startTime = jsonObject.getInteger("start_time");
+ int endTime = jsonObject.getInteger("end_time");
+ if (startTime != 0 && endTime != 0) {
+ int newConnDifference = TimeUtils.getTimeBetween(nowTime, startTime);
+ int endConnDifference = TimeUtils.getTimeBetween(nowTime, endTime);
// System.out.println(newConnDifference + "\t\t\t" + endConnDifference);
- timeIntervalCount(newConnDifference, endConnDifference);
+ timeIntervalCount(newConnDifference, endConnDifference);
+ }
}
}
} catch (Exception e) {
- logger.error("解析和写入kafka出现异常", e);
+ logger.error("解析和写入kafka出现异常");
e.printStackTrace();
}
}
@@ -89,4 +103,68 @@ public class TimeCalculateBolt extends BaseBasicBolt {
outputFieldsDeclarer.declare(new Fields("value"));
}
+ /**
+ * 对时间间隔进行计算,间隔内的数据进行累加等
+ *
+ * @param newConnDifference 新建连接时间差
+ * @param endConnDifference 关闭连接时间差
+ */
+ private static void timeIntervalCount(int newConnDifference, int endConnDifference) {
+ try {
+ boolean start = false;
+ boolean end = false;
+ if (newConnDifference >= 0 && newConnDifference < ConnectionsCountConfig.MAX_NUMBER) {
+ startMap.put(newConnDifference, startMap.get(newConnDifference) + 1);
+ start = true;
+ }
+
+ if (endConnDifference >= 0 && endConnDifference < ConnectionsCountConfig.MAX_NUMBER) {
+ endMap.put(endConnDifference, endMap.get(endConnDifference) + 1);
+ end = true;
+ }
+
+ if (start && end) {
+ if (newConnDifference == 0 && endConnDifference == 0) {
+ intervalMap.put(0, intervalMap.get(0) + 1);
+ } else if (newConnDifference - endConnDifference > 0) {
+ for (int i = endConnDifference; i <= newConnDifference; i++) {
+ intervalMap.put(i, intervalMap.get(i) + 1);
+ }
+ }
+ } else if (!start && end) {
+ for (int i = ConnectionsCountConfig.MAX_NUMBER - 1; i >= endConnDifference; i--) {
+ intervalMap.put(i, intervalMap.get(i) + 1);
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("错误的开始和结束" + newConnDifference + endConnDifference);
+ }
+ }
+
+ /**
+ * 获取结果集传入下一bolt
+ *
+ * @return 统计结果集
+ */
+ private static List<String> getResult(Long nowTime) {
+ ConnectionTime connectionTime = new ConnectionTime();
+ for (int key : startMap.keySet()) {
+ if (startMap.get(key) == 0 && intervalMap.get(key) == 0 && endMap.get(key) == 0) {
+// logger.info("结果均为0不予以入库");
+ } else {
+ connectionTime.setNew_conn_num(startMap.get(key));
+ connectionTime.setLive_conn_num(intervalMap.get(key));
+ connectionTime.setClose_conn_num(endMap.get(key));
+ connectionTime.setStat_time(nowTime - 30 * key);
+ list.add(JSONObject.toJSONString(connectionTime));
+ startMap.put(key, 0L);
+ endMap.put(key, 0L);
+ intervalMap.put(key, 0L);
+ }
+ }
+ return list;
+ }
+
+
}
diff --git a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java
index b0be497..54f8635 100644
--- a/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java
+++ b/src/main/java/cn/ac/iie/common/ConnectionsCountConfig.java
@@ -13,7 +13,6 @@ public class ConnectionsCountConfig implements Serializable {
public static final String LOG_STRING_SPLITTER = ",";
public static final String EMPTY_OPTION_CHARACTER = "-";
-
public static final Integer MAX_NUMBER = ConnectionsCountConfigurations.getIntProperty(1, "max.number");
public static final Integer SPOUT_PARALLELISM = ConnectionsCountConfigurations.getIntProperty(1, "topology.spout.parallelism");
public static final String RESULT_SOUTPUT_TOPIC = ConnectionsCountConfigurations.getStringProperty(1, "results.output.topics");
diff --git a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java
index 3d1b9d7..0e847cf 100644
--- a/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java
+++ b/src/main/java/cn/ac/iie/topology/LogConnectionsTopology.java
@@ -3,7 +3,6 @@ package cn.ac.iie.topology;
import cn.ac.iie.bolt.InsertBolt;
import cn.ac.iie.bolt.TimeCalculateBolt;
-import cn.ac.iie.bolt.TimeCalculateTestBolt;
import cn.ac.iie.common.ConnectionsCountConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.log4j.Logger;
@@ -56,10 +55,8 @@ public class LogConnectionsTopology {
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), ConnectionsCountConfig.SPOUT_PARALLELISM);
-// builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
-// builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt");
- builder.setBolt("TimeCalculateTestBolt", new TimeCalculateTestBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
- builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateTestBolt");
+ builder.setBolt("TimeCalculateBolt", new TimeCalculateBolt(), ConnectionsCountConfig.TIME_CALCULATE_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
+ builder.setBolt("InsertBolt", new InsertBolt(), 1).localOrShuffleGrouping("TimeCalculateBolt");
}
diff --git a/src/test/java/cn/ac/iie/test/JsonTest.java b/src/test/java/cn/ac/iie/test/JsonTest.java
new file mode 100644
index 0000000..e19e97a
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/JsonTest.java
@@ -0,0 +1,27 @@
+package cn.ac.iie.test;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+public class JsonTest {
+
+ @Test
+ public void nullKeyTest() {
+ try {
+ String message = "{\"stream_trace_id\":\"11570861088.164491556\",\"policy_id\":-1,\"action\":0,\"service\":0,\"start_time\":1570861088,\"addr_type\":4,\"client_ip\":\"92.47.150.222\",\"server_ip\":\"203.205.142.154\",\"client_port\":59253,\"server_port\":80,\"trans_proto\":\"IPv4_TCP\",\"entrance_id\":0,\"device_id\":0,\"link_id\":0,\"isp\":\"\",\"encap_type\":0,\"pinningst\":0,\"intercept_state\":0,\"ssl_server_side_latency\":0,\"ssl_client_side_latency\":0,\"ssl_server_side_version\":\"\",\"ssl_client_side_version\":\"\",\"direction\":0,\"stream_dir\":3,\"cap_ip\":\"10.4.39.21\",\"addr_list\":\"\",\"c2s_pkt_num\":4,\"s2c_pkt_num\":2,\"c2s_byte_num\":0,\"s2c_byte_num\":0,\"has_dup_traffic\":0,\"stream_error\":\"e_no_data\"}";
+ JSONObject jsonObject = JSON.parseObject(message);
+ int startTime = 0;
+ int endTime = 0;
+ if (message.contains("start_time") && message.contains("end_time")) {
+ startTime = jsonObject.getInteger("start_time");
+ endTime = jsonObject.getInteger("end_time");
+ }
+ System.out.println("starTime:" + startTime + "\tendTime:" + endTime);
+ } catch (NullPointerException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/KafkaTest.java b/src/test/java/cn/ac/iie/test/KafkaTest.java
index ec74e85..00df19c 100644
--- a/src/test/java/cn/ac/iie/test/KafkaTest.java
+++ b/src/test/java/cn/ac/iie/test/KafkaTest.java
@@ -31,7 +31,7 @@ public class KafkaTest {
urlsBean.setC2s_pkt_num(1000);
urlsBean.setS2c_byte_num(1000);
urlsBean.setS2c_pkt_num(1000);
- kafkaProducer.send(new ProducerRecord<>("test", JSONObject.toJSONString(urlsBean)), new Callback() {
+ kafkaProducer.send(new ProducerRecord<>("JsonTest", JSONObject.toJSONString(urlsBean)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){