summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlee <[email protected]>2020-06-15 15:08:22 +0800
committerlee <[email protected]>2020-06-15 15:08:22 +0800
commit4a8718a458c70d7ea2c087c0248456ae1048f796 (patch)
tree7866629673f501c0c7b6d461bf1b13d2c0c15d98
parentcbf8c0ca39764a68726f710c75f45c7623fc641a (diff)
使用定时器实现二次聚合HEADmaster
-rw-r--r--pom.xml2
-rw-r--r--properties/service_flow_config.properties8
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/KeyBean.java14
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java52
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java26
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java76
-rw-r--r--src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java19
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java1
8 files changed, 93 insertions, 105 deletions
diff --git a/pom.xml b/pom.xml
index 82aca69..b318499 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
- <scope>provided</scope>
+ <!--<scope>provided</scope>-->
</dependency>
<dependency>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index b3eb718..aadcabf 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,9 +1,9 @@
#管理kafka地址
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
-bootstrap.servers=192.168.40.127:9093
+bootstrap.servers=192.168.40.207:9092
#zookeeper 地址
-zookeeper.servers=192.168.40.127:2182/kafka-test
+zookeeper.servers=192.168.40.207:2181
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
@@ -21,11 +21,11 @@ kafka.compression.type=none
#kafka broker下的topic名称
#kafka.topic=SECURITY-EVENT-LOG
-kafka.topic=test528
+kafka.topic=test615
#kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=lxk-200512
+group.id=lxk615
#输出topic
results.output.topic=agg_test
diff --git a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
index 00e9e3c..6898558 100644
--- a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
+++ b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
@@ -8,7 +8,6 @@ package cn.ac.iie.origion.bean;
**/
public class KeyBean {
- private long common_recv_time;
private int common_policy_id;
private int common_action;
private String common_sub_action;
@@ -24,14 +23,6 @@ public class KeyBean {
private String http_domain;
private String ssl_sni;
- public long getCommon_recv_time() {
- return common_recv_time;
- }
-
- public void setCommon_recv_time(long common_recv_time) {
- this.common_recv_time = common_recv_time;
- }
-
public int getCommon_policy_id() {
return common_policy_id;
}
@@ -147,7 +138,7 @@ public class KeyBean {
@Override
public int hashCode() {
- return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
+ return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
}
@@ -155,8 +146,7 @@ public class KeyBean {
public boolean equals(Object o) {
if (o instanceof KeyBean) {
KeyBean keyBean = (KeyBean) o;
- return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) &&
- this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
+ return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
this.getCommon_action()==(keyBean.getCommon_action()) &&
this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) &&
this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) &&
diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
index a656606..d7f5eb7 100644
--- a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
+++ b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
@@ -5,8 +5,8 @@ import cn.ac.iie.origion.bean.ValueBean;
import cn.ac.iie.origion.utils.FlowWriteConfig;
import cn.ac.iie.origion.utils.TupleUtils;
-import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
+import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -23,38 +23,37 @@ public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = 9006119186526123734L;
- private static HashMap<String, Integer> map = new HashMap();
- private static ValueBean valueBean = new ValueBean();
- private static ValueBean tupleValueBean = new ValueBean();
- private static String key = "";
- private static Integer value = 0;
- private static String message = "";
+ private HashMap<String, ValueBean> map;
+ private String key = "";
+ private ValueBean value;
+ private ValueBean mapValue;
@Override
public void prepare(Map stormConf, TopologyContext context) {
- System.out.println("prepare方法执行了++++++++++++++++++++++++++");
-
- logger.error("prepare方法执行了++++++++++++++++++++++++++");
+ map = new HashMap<>(16);
+ key = "";
+ value = new ValueBean();
+ mapValue = new ValueBean();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
-
- System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis());
try {
if (TupleUtils.isTick(tuple)) {
- System.out.println(this.map);
- //批量发送到下一个bolt
- basicOutputCollector.emit(new Values(JSON.toJSONString(this.map)));
-
-
+ //TODO 发送到kafka的 bolt
+ for (String key : map.keySet()) {
+ basicOutputCollector.emit(new Values(key,map.get(key)));
+ }
+ map.clear();
} else {
- message = tuple.getString(0);
//TODO 获取一条tuple数据的key和value
-
+ key = tuple.getString(0);
+ value = (ValueBean) tuple.getValue(1);
//TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖
- this.map.put("192", addValueBean(value, 1));
+ mapValue = map.getOrDefault(key, new ValueBean());
+ mapValue = addValueBean(mapValue, value);
+ map.put(key, mapValue);
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
@@ -65,14 +64,15 @@ public class AggregateBolt extends BaseBasicBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
- conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("connLog"));
+// outputFieldsDeclarer.declare(new Fields("map"));
+ outputFieldsDeclarer.declare(new Fields("key","value"));
}
/**
@@ -90,16 +90,10 @@ public class AggregateBolt extends BaseBasicBolt {
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
- result.setCommon_sessions(result.getCommon_sessions() + 1L);
+ result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions());
return result;
}
- public Integer addValueBean(Integer result, Integer value2) {
-
- return result + value2;
- }
-
-
}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
index 96151ff..4432255 100644
--- a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
+++ b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
@@ -12,7 +12,6 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -25,11 +24,9 @@ import java.util.Map;
public class MyWindowBolt extends BaseWindowedBolt {
- private OutputCollector collector;
-
- private static ArrayList<Integer> list;
- private static HashMap<String, ValueBean> map;
- private static String message;
+ private static OutputCollector collector;
+ private HashMap<String, ValueBean> map;
+ private String message;
@Override
@@ -37,13 +34,13 @@ public class MyWindowBolt extends BaseWindowedBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
map = new HashMap<>(16);
- list = new ArrayList<>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("map"));
+ declarer.declare(new Fields("key","value"));
+// declarer.declare(new Fields("map"));
}
@Override
@@ -63,11 +60,11 @@ public class MyWindowBolt extends BaseWindowedBolt {
message = tuple.getStringByField("source");
//TODO 获取Tuple中的value Bean
- ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class);
+ ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class);
//TODO 获取Tuple中的key String
String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
//TODO 获取map中的value Bean
- ValueBean mapValueBean = map.getOrDefault(key,new ValueBean());
+ ValueBean mapValueBean = map.getOrDefault(key, new ValueBean());
//TODO 将tuple中的value和map中的value做累加
mapValueBean = addValueBean(mapValueBean, valueBean);
@@ -75,7 +72,12 @@ public class MyWindowBolt extends BaseWindowedBolt {
map.put(key, mapValueBean);
}
- collector.emit(new Values(map));
+ //TODO 遍历map将 K V发送出去
+ for (String key : map.keySet()) {
+ collector.emit(new Values(key,map.get(key)));
+ }
+// collector.emit(new Values(map));
+
}
@SuppressWarnings("all")
@@ -93,8 +95,6 @@ public class MyWindowBolt extends BaseWindowedBolt {
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
result.setCommon_sessions(result.getCommon_sessions() + 1L);
-
-
return result;
}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
index 59d7eef..40a2070 100644
--- a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
+++ b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
@@ -26,58 +26,60 @@ public class NtcLogSendBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private KafkaLogNtc kafkaLogNtc;
- private static ConnectionRecordLog connectionRecordLog;
+ private JSONObject key;
+ private ValueBean valueBean;
+ private ConnectionRecordLog connectionRecordLog;
@Override
public void prepare(Map stormConf, TopologyContext context) {
kafkaLogNtc = KafkaLogNtc.getInstance();
connectionRecordLog = new ConnectionRecordLog();
+ key = new JSONObject();
+ valueBean = new ValueBean();
}
+
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+
+
+// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions());
try {
- HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
- if (hashMap.size() != 0) {
- for (String key : hashMap.keySet()) {
- JSONObject keys = JSONObject.parseObject(key);
-
- ValueBean valueBean = hashMap.get(key);
- connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
- connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
- connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
- connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
- connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
- connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
- connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
- connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
- connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
- connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
- connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
- connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
- connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
- connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
- connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
-
- //TODO 为Value赋值
-
- connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
- connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
- connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
- connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
- connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
-
- kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
-
- }
- }
- } catch (Exception e) {
+ key = JSONObject.parseObject(tuple.getValue(0).toString());
+
+ valueBean = (ValueBean) (tuple.getValue(1));
+
+ connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
+ connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id")));
+ connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action")));
+ connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action"));
+ connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip"));
+ connectionRecordLog.setCommon_client_location(key.getString("common_client_location"));
+ connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip"));
+ connectionRecordLog.setCommon_device_id(key.getString("common_device_id"));
+ connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id"));
+ connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip"));
+ connectionRecordLog.setCommon_server_location(key.getString("common_server_location"));
+ connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port")));
+ connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol"));
+ connectionRecordLog.setHttp_domain(key.getString("http_domain"));
+ connectionRecordLog.setSsl_sni(key.getString("ssl_sni"));
+
+ //TODO 为Value赋值
+
+ connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
+ connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
+ connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
+ connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
+ connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
+
+ kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
+ } catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
-
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
index b197b6a..c8359b0 100644
--- a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
+++ b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
@@ -1,5 +1,6 @@
package cn.ac.iie.origion.topology;
+import cn.ac.iie.origion.bolt.AggregateBolt;
import cn.ac.iie.origion.bolt.MyWindowBolt;
import cn.ac.iie.origion.bolt.NtcLogSendBolt;
import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
@@ -11,6 +12,7 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
@@ -41,13 +43,15 @@ public class LogFlowWriteTopology {
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(150000);
- conf.setNumAckers(3);
+ conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS);
+// conf.setTopologyWorkerMaxHeapSize(6144);
+ conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
return conf;
}
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
- StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 6000);
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -59,15 +63,14 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
- builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3);
+ builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
builder.setBolt("TEST-CONN", new MyWindowBolt()
- .withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),
- new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
+ .withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("LogFlowWriteSpout");
-// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout");
-// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN");
- builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN");
+ builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key"));
+ builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT");
}
public static void main(String[] args) throws Exception {
diff --git a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
index a7047ef..e7d7a35 100644
--- a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
+++ b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
@@ -34,7 +34,6 @@ public class AggregateUtil {
//TODO KEY
- keyBean.setCommon_recv_time(0L);
keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString()));
keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString()));
keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());