diff options
| author | lee <[email protected]> | 2020-06-15 15:08:22 +0800 |
|---|---|---|
| committer | lee <[email protected]> | 2020-06-15 15:08:22 +0800 |
| commit | 4a8718a458c70d7ea2c087c0248456ae1048f796 (patch) | |
| tree | 7866629673f501c0c7b6d461bf1b13d2c0c15d98 /src/main | |
| parent | cbf8c0ca39764a68726f710c75f45c7623fc641a (diff) | |
Diffstat (limited to 'src/main')
6 files changed, 88 insertions, 100 deletions
diff --git a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java index 00e9e3c..6898558 100644 --- a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java +++ b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java @@ -8,7 +8,6 @@ package cn.ac.iie.origion.bean; **/ public class KeyBean { - private long common_recv_time; private int common_policy_id; private int common_action; private String common_sub_action; @@ -24,14 +23,6 @@ public class KeyBean { private String http_domain; private String ssl_sni; - public long getCommon_recv_time() { - return common_recv_time; - } - - public void setCommon_recv_time(long common_recv_time) { - this.common_recv_time = common_recv_time; - } - public int getCommon_policy_id() { return common_policy_id; } @@ -147,7 +138,7 @@ public class KeyBean { @Override public int hashCode() { - return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); + return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); } @@ -155,8 +146,7 @@ public class KeyBean { public boolean equals(Object o) { if (o instanceof KeyBean) { KeyBean keyBean = (KeyBean) o; - return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) && - this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && + return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && this.getCommon_action()==(keyBean.getCommon_action()) && this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) && this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) && diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java index a656606..d7f5eb7 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java @@ -5,8 +5,8 @@ import cn.ac.iie.origion.bean.ValueBean; import cn.ac.iie.origion.utils.FlowWriteConfig; import cn.ac.iie.origion.utils.TupleUtils; -import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; +import org.apache.storm.Config; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -23,38 +23,37 @@ public class AggregateBolt extends BaseBasicBolt { private final static Logger logger = Logger.getLogger(AggregateBolt.class); private static final long serialVersionUID = 9006119186526123734L; - private static HashMap<String, Integer> map = new HashMap(); - private static ValueBean valueBean = new ValueBean(); - private static ValueBean tupleValueBean = new ValueBean(); - private static String key = ""; - private static Integer value = 0; - private static String message = ""; + private HashMap<String, ValueBean> map; + private String key = ""; + private ValueBean value; + private ValueBean mapValue; @Override public void prepare(Map stormConf, TopologyContext context) { - System.out.println("prepare方法执行了++++++++++++++++++++++++++"); - - logger.error("prepare方法执行了++++++++++++++++++++++++++"); + map = new HashMap<>(16); + key = ""; + value = new ValueBean(); + mapValue = new ValueBean(); } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - - System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis()); try { if (TupleUtils.isTick(tuple)) { - System.out.println(this.map); - //批量发送到下一个bolt - basicOutputCollector.emit(new Values(JSON.toJSONString(this.map))); - - + //TODO 发送到kafka的 bolt + for (String key : map.keySet()) { + basicOutputCollector.emit(new Values(key,map.get(key))); + } + map.clear(); } else { - message = tuple.getString(0); //TODO 获取一条tuple数据的key和value - + key = tuple.getString(0); + value = (ValueBean) tuple.getValue(1); //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 - this.map.put("192", addValueBean(value, 1)); + mapValue = map.getOrDefault(key, new ValueBean()); + mapValue = addValueBean(mapValue, value); + map.put(key, mapValue); } } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); @@ -65,14 +64,15 @@ public class AggregateBolt extends BaseBasicBolt { @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(16); - conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME); return conf; } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("connLog")); +// outputFieldsDeclarer.declare(new Fields("map")); + outputFieldsDeclarer.declare(new Fields("key","value")); } /** @@ -90,16 +90,10 @@ public class AggregateBolt extends BaseBasicBolt { result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + 1L); + result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions()); return result; } - public Integer addValueBean(Integer result, Integer value2) { - - return result + value2; - } - - } diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java index 96151ff..4432255 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java @@ -12,7 +12,6 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.windowing.TupleWindow; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -25,11 +24,9 @@ import java.util.Map; public class MyWindowBolt extends BaseWindowedBolt { - private OutputCollector collector; - - private static ArrayList<Integer> list; - private static HashMap<String, ValueBean> map; - private static String message; + private static OutputCollector collector; + private HashMap<String, ValueBean> map; + private String message; @Override @@ -37,13 +34,13 @@ public class MyWindowBolt extends BaseWindowedBolt { public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; map = new HashMap<>(16); - list = new ArrayList<>(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("map")); + declarer.declare(new Fields("key","value")); +// declarer.declare(new Fields("map")); } @Override @@ -63,11 +60,11 @@ public class MyWindowBolt extends BaseWindowedBolt { message = tuple.getStringByField("source"); //TODO 获取Tuple中的value Bean - ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class); + ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class); //TODO 获取Tuple中的key String String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class)); //TODO 获取map中的value Bean - ValueBean mapValueBean = map.getOrDefault(key,new ValueBean()); + ValueBean mapValueBean = map.getOrDefault(key, new ValueBean()); //TODO 将tuple中的value和map中的value做累加 mapValueBean = addValueBean(mapValueBean, valueBean); @@ -75,7 +72,12 @@ public class MyWindowBolt extends BaseWindowedBolt { map.put(key, mapValueBean); } - collector.emit(new Values(map)); + //TODO 遍历map将 K V发送出去 + for (String key : map.keySet()) { + collector.emit(new Values(key,map.get(key))); + } +// collector.emit(new Values(map)); + } @SuppressWarnings("all") @@ -93,8 +95,6 @@ public class MyWindowBolt extends BaseWindowedBolt { result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); result.setCommon_sessions(result.getCommon_sessions() + 1L); - - return result; } diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java index 59d7eef..40a2070 100644 --- a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java @@ -26,58 +26,60 @@ public class NtcLogSendBolt extends BaseBasicBolt { private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); private KafkaLogNtc kafkaLogNtc; - private static ConnectionRecordLog connectionRecordLog; + private JSONObject key; + private ValueBean valueBean; + private ConnectionRecordLog connectionRecordLog; @Override public void prepare(Map stormConf, TopologyContext context) { kafkaLogNtc = KafkaLogNtc.getInstance(); connectionRecordLog = new ConnectionRecordLog(); + key = new JSONObject(); + valueBean = new ValueBean(); } + @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + + +// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions()); try { - HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0); - if (hashMap.size() != 0) { - for (String key : hashMap.keySet()) { - JSONObject keys = JSONObject.parseObject(key); - - ValueBean valueBean = hashMap.get(key); - connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); - connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id"))); - connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action"))); - connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action")); - connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip")); - connectionRecordLog.setCommon_client_location(keys.getString("common_client_location")); - connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip")); - connectionRecordLog.setCommon_device_id(keys.getString("common_device_id")); - connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id")); - connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip")); - connectionRecordLog.setCommon_server_location(keys.getString("common_server_location")); - connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port"))); - connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol")); - connectionRecordLog.setHttp_domain(keys.getString("http_domain")); - connectionRecordLog.setSsl_sni(keys.getString("ssl_sni")); - - //TODO 为Value赋值 - - connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); - connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); - connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); - connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); - connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); - - kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); - - } - } - } catch (Exception e) { + key = JSONObject.parseObject(tuple.getValue(0).toString()); + + valueBean = (ValueBean) (tuple.getValue(1)); + + connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); + connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id"))); + connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action"))); + connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action")); + connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip")); + connectionRecordLog.setCommon_client_location(key.getString("common_client_location")); + connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip")); + connectionRecordLog.setCommon_device_id(key.getString("common_device_id")); + connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id")); + connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip")); + connectionRecordLog.setCommon_server_location(key.getString("common_server_location")); + connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port"))); + connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol")); + connectionRecordLog.setHttp_domain(key.getString("http_domain")); + connectionRecordLog.setSsl_sni(key.getString("ssl_sni")); + + //TODO 为Value赋值 + + connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); + connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); + connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); + connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); + connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); + + kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); + } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); e.printStackTrace(); } } - @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java index b197b6a..c8359b0 100644 --- a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java @@ -1,5 +1,6 @@ package cn.ac.iie.origion.topology; +import cn.ac.iie.origion.bolt.AggregateBolt; import cn.ac.iie.origion.bolt.MyWindowBolt; import cn.ac.iie.origion.bolt.NtcLogSendBolt; import cn.ac.iie.origion.spout.CustomizedKafkaSpout; @@ -11,6 +12,7 @@ import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; import java.util.concurrent.TimeUnit; @@ -41,13 +43,15 @@ public class LogFlowWriteTopology { conf.setDebug(false); conf.setMessageTimeoutSecs(60); conf.setMaxSpoutPending(150000); - conf.setNumAckers(3); + conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS); +// conf.setTopologyWorkerMaxHeapSize(6144); + conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G"); return conf; } private void runLocally() throws InterruptedException { topologyConfig.setMaxTaskParallelism(1); - StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 6000); } private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { @@ -59,15 +63,14 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); - builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3); + builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); builder.setBolt("TEST-CONN", new MyWindowBolt() - .withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS), - new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) .localOrShuffleGrouping("LogFlowWriteSpout"); -// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout"); -// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN"); - builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN"); + builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key")); + builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT"); } public static void main(String[] args) throws Exception { diff --git a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java index a7047ef..e7d7a35 100644 --- a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java +++ b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java @@ -34,7 +34,6 @@ public class AggregateUtil { //TODO KEY - keyBean.setCommon_recv_time(0L); keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString())); keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString())); keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString()); |
