summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李玺康 <[email protected]>2020-10-23 14:42:51 +0800
committer李玺康 <[email protected]>2020-10-23 14:42:51 +0800
commitbc8cb5c68c785d9a20a4d0d2cd7917639e1c1e68 (patch)
tree527dd55953a9dabc733a43eff11fbc2ed79c4bd0
parent4a8718a458c70d7ea2c087c0248456ae1048f796 (diff)
20.11-rc1代码提交storm-20.11-rc1
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java195
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/KeyBean.java166
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/ValueBean.java59
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java99
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java102
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java87
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java56
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java65
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java48
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/TupleUtils.java23
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java172
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java168
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java44
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java68
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java40
-rw-r--r--src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java (renamed from src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java)15
-rw-r--r--src/main/java/cn/ac/iie/storm/topology/StormRunner.java (renamed from src/main/java/cn/ac/iie/origion/topology/StormRunner.java)2
-rw-r--r--src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java (renamed from src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java)75
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java204
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java (renamed from src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java)27
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java45
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java (renamed from src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java)6
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java55
-rw-r--r--src/main/main.iml11
-rw-r--r--src/test/java/com/wp/AppIdTest.java54
-rw-r--r--src/test/java/com/wp/AppTest.java51
-rw-r--r--src/test/java/com/wp/FilterBolt.java133
-rw-r--r--src/test/java/com/wp/SchemaTest.java43
-rw-r--r--src/test/test.iml12
29 files changed, 1114 insertions, 1011 deletions
diff --git a/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java
deleted file mode 100644
index 06b2135..0000000
--- a/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package cn.ac.iie.origion.bean;
-
-/**
- * @ClassNameConnectionRecordLog
- * @Author [email protected]
- * @Date2020/5/28 13:44
- * @Version V1.0
- **/
-public class ConnectionRecordLog {
-
- //key
- private long common_recv_time;
- private int common_policy_id;
- private int common_action;
- private String common_sub_action;
- private String common_client_ip;
- private String common_client_location;
- private String common_sled_ip;
- private String common_device_id;
- private String common_subscriber_id;
- private String common_server_ip;
- private String common_server_location;
- private int common_server_port;
- private String common_l4_protocol;
- private String http_domain;
- private String ssl_sni;
-
- //value
- private long common_sessions;
- private long common_c2s_pkt_num;
- private long common_s2c_pkt_num;
- private long common_c2s_byte_num;
- private long common_s2c_byte_num;
-
-
- public long getCommon_recv_time() {
- return common_recv_time;
- }
-
- public void setCommon_recv_time(long common_recv_time) {
- this.common_recv_time = common_recv_time;
- }
-
- public int getCommon_policy_id() {
- return common_policy_id;
- }
-
- public void setCommon_policy_id(int common_policy_id) {
- this.common_policy_id = common_policy_id;
- }
-
- public int getCommon_action() {
- return common_action;
- }
-
- public void setCommon_action(int common_action) {
- this.common_action = common_action;
- }
-
- public String getCommon_sub_action() {
- return common_sub_action;
- }
-
- public void setCommon_sub_action(String common_sub_action) {
- this.common_sub_action = common_sub_action;
- }
-
- public String getCommon_client_ip() {
- return common_client_ip;
- }
-
- public void setCommon_client_ip(String common_client_ip) {
- this.common_client_ip = common_client_ip;
- }
-
- public String getCommon_client_location() {
- return common_client_location;
- }
-
- public void setCommon_client_location(String common_client_location) {
- this.common_client_location = common_client_location;
- }
-
- public String getCommon_sled_ip() {
- return common_sled_ip;
- }
-
- public void setCommon_sled_ip(String common_sled_ip) {
- this.common_sled_ip = common_sled_ip;
- }
-
- public String getCommon_device_id() {
- return common_device_id;
- }
-
- public void setCommon_device_id(String common_device_id) {
- this.common_device_id = common_device_id;
- }
-
- public String getCommon_subscriber_id() {
- return common_subscriber_id;
- }
-
- public void setCommon_subscriber_id(String common_subscriber_id) {
- this.common_subscriber_id = common_subscriber_id;
- }
-
- public String getCommon_server_ip() {
- return common_server_ip;
- }
-
- public void setCommon_server_ip(String common_server_ip) {
- this.common_server_ip = common_server_ip;
- }
-
- public String getCommon_server_location() {
- return common_server_location;
- }
-
- public void setCommon_server_location(String common_server_location) {
- this.common_server_location = common_server_location;
- }
-
- public int getCommon_server_port() {
- return common_server_port;
- }
-
- public void setCommon_server_port(int common_server_port) {
- this.common_server_port = common_server_port;
- }
-
- public String getCommon_l4_protocol() {
- return common_l4_protocol;
- }
-
- public void setCommon_l4_protocol(String common_l4_protocol) {
- this.common_l4_protocol = common_l4_protocol;
- }
-
- public String getHttp_domain() {
- return http_domain;
- }
-
- public void setHttp_domain(String http_domain) {
- this.http_domain = http_domain;
- }
-
- public String getSsl_sni() {
- return ssl_sni;
- }
-
- public void setSsl_sni(String ssl_sni) {
- this.ssl_sni = ssl_sni;
- }
-
- public long getCommon_sessions() {
- return common_sessions;
- }
-
- public void setCommon_sessions(long common_sessions) {
- this.common_sessions = common_sessions;
- }
-
- public long getCommon_c2s_pkt_num() {
- return common_c2s_pkt_num;
- }
-
- public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
- this.common_c2s_pkt_num = common_c2s_pkt_num;
- }
-
- public long getCommon_s2c_pkt_num() {
- return common_s2c_pkt_num;
- }
-
- public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
- this.common_s2c_pkt_num = common_s2c_pkt_num;
- }
-
- public long getCommon_c2s_byte_num() {
- return common_c2s_byte_num;
- }
-
- public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
- this.common_c2s_byte_num = common_c2s_byte_num;
- }
-
- public long getCommon_s2c_byte_num() {
- return common_s2c_byte_num;
- }
-
- public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
- this.common_s2c_byte_num = common_s2c_byte_num;
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
deleted file mode 100644
index 6898558..0000000
--- a/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package cn.ac.iie.origion.bean;
-
-/**
- * @ClassNameKeyBean
- * @Author [email protected]
- * @Date2020/6/3 18:52
- * @Version V1.0
- **/
-public class KeyBean {
-
- private int common_policy_id;
- private int common_action;
- private String common_sub_action;
- private String common_client_ip;
- private String common_client_location;
- private String common_sled_ip;
- private String common_device_id;
- private String common_subscriber_id;
- private String common_server_ip;
- private String common_server_location;
- private int common_server_port;
- private String common_l4_protocol;
- private String http_domain;
- private String ssl_sni;
-
- public int getCommon_policy_id() {
- return common_policy_id;
- }
-
- public void setCommon_policy_id(int common_policy_id) {
- this.common_policy_id = common_policy_id;
- }
-
- public int getCommon_action() {
- return common_action;
- }
-
- public void setCommon_action(int common_action) {
- this.common_action = common_action;
- }
-
- public String getCommon_sub_action() {
- return common_sub_action;
- }
-
- public void setCommon_sub_action(String common_sub_action) {
- this.common_sub_action = common_sub_action;
- }
-
- public String getCommon_client_ip() {
- return common_client_ip;
- }
-
- public void setCommon_client_ip(String common_client_ip) {
- this.common_client_ip = common_client_ip;
- }
-
- public String getCommon_client_location() {
- return common_client_location;
- }
-
- public void setCommon_client_location(String common_client_location) {
- this.common_client_location = common_client_location;
- }
-
- public String getCommon_sled_ip() {
- return common_sled_ip;
- }
-
- public void setCommon_sled_ip(String common_sled_ip) {
- this.common_sled_ip = common_sled_ip;
- }
-
- public String getCommon_device_id() {
- return common_device_id;
- }
-
- public void setCommon_device_id(String common_device_id) {
- this.common_device_id = common_device_id;
- }
-
- public String getCommon_subscriber_id() {
- return common_subscriber_id;
- }
-
- public void setCommon_subscriber_id(String common_subscriber_id) {
- this.common_subscriber_id = common_subscriber_id;
- }
-
- public String getCommon_server_ip() {
- return common_server_ip;
- }
-
- public void setCommon_server_ip(String common_server_ip) {
- this.common_server_ip = common_server_ip;
- }
-
- public String getCommon_server_location() {
- return common_server_location;
- }
-
- public void setCommon_server_location(String common_server_location) {
- this.common_server_location = common_server_location;
- }
-
- public int getCommon_server_port() {
- return common_server_port;
- }
-
- public void setCommon_server_port(int common_server_port) {
- this.common_server_port = common_server_port;
- }
-
- public String getCommon_l4_protocol() {
- return common_l4_protocol;
- }
-
- public void setCommon_l4_protocol(String common_l4_protocol) {
- this.common_l4_protocol = common_l4_protocol;
- }
-
- public String getHttp_domain() {
- return http_domain;
- }
-
- public void setHttp_domain(String http_domain) {
- this.http_domain = http_domain;
- }
-
- public String getSsl_sni() {
- return ssl_sni;
- }
-
- public void setSsl_sni(String ssl_sni) {
- this.ssl_sni = ssl_sni;
- }
-
- @Override
- public int hashCode() {
-
- return ("" + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
-
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof KeyBean) {
- KeyBean keyBean = (KeyBean) o;
- return (this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
- this.getCommon_action()==(keyBean.getCommon_action()) &&
- this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) &&
- this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) &&
- this.getCommon_client_location().equals(keyBean.getCommon_client_location()) &&
- this.getCommon_sled_ip().equals(keyBean.getCommon_sled_ip()) &&
- this.getCommon_device_id().equals(keyBean.getCommon_device_id()) &&
- this.getCommon_subscriber_id().equals(keyBean.getCommon_subscriber_id()) &&
- this.getCommon_server_ip().equals(keyBean.getCommon_server_ip()) &&
- this.getCommon_server_location().equals(keyBean.getCommon_server_location()) &&
- this.getCommon_server_port()==(keyBean.getCommon_server_port()) &&
- this.getCommon_l4_protocol().equals(keyBean.getCommon_l4_protocol()) &&
- this.getHttp_domain().equals(keyBean.getHttp_domain()) &&
- this.getSsl_sni().equals(keyBean.getSsl_sni()));
- }
- return false;
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/bean/ValueBean.java b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java
deleted file mode 100644
index cfe2f37..0000000
--- a/src/main/java/cn/ac/iie/origion/bean/ValueBean.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package cn.ac.iie.origion.bean;
-
-import java.io.Serializable;
-
-/**
- * @ClassNameValueBean
- * @Author [email protected]
- * @Date2020/6/2 14:05
- * @Version V1.0
- **/
-public class ValueBean implements Serializable {
-
- private long common_sessions;
- private long common_c2s_pkt_num;
- private long common_s2c_pkt_num;
- private long common_c2s_byte_num;
- private long common_s2c_byte_num;
-
-
- public long getCommon_sessions() {
- return common_sessions;
- }
-
- public void setCommon_sessions(long common_sessions) {
- this.common_sessions = common_sessions;
- }
-
- public long getCommon_c2s_pkt_num() {
- return common_c2s_pkt_num;
- }
-
- public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
- this.common_c2s_pkt_num = common_c2s_pkt_num;
- }
-
- public long getCommon_s2c_pkt_num() {
- return common_s2c_pkt_num;
- }
-
- public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
- this.common_s2c_pkt_num = common_s2c_pkt_num;
- }
-
- public long getCommon_c2s_byte_num() {
- return common_c2s_byte_num;
- }
-
- public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
- this.common_c2s_byte_num = common_c2s_byte_num;
- }
-
- public long getCommon_s2c_byte_num() {
- return common_s2c_byte_num;
- }
-
- public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
- this.common_s2c_byte_num = common_s2c_byte_num;
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
deleted file mode 100644
index d7f5eb7..0000000
--- a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package cn.ac.iie.origion.bolt;
-
-
-import cn.ac.iie.origion.bean.ValueBean;
-import cn.ac.iie.origion.utils.FlowWriteConfig;
-import cn.ac.iie.origion.utils.TupleUtils;
-
-import org.apache.log4j.Logger;
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class AggregateBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(AggregateBolt.class);
- private static final long serialVersionUID = 9006119186526123734L;
-
- private HashMap<String, ValueBean> map;
- private String key = "";
- private ValueBean value;
- private ValueBean mapValue;
-
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- map = new HashMap<>(16);
- key = "";
- value = new ValueBean();
- mapValue = new ValueBean();
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
- try {
- if (TupleUtils.isTick(tuple)) {
- //TODO 发送到kafka的 bolt
- for (String key : map.keySet()) {
- basicOutputCollector.emit(new Values(key,map.get(key)));
- }
- map.clear();
- } else {
- //TODO 获取一条tuple数据的key和value
- key = tuple.getString(0);
- value = (ValueBean) tuple.getValue(1);
- //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖
- mapValue = map.getOrDefault(key, new ValueBean());
- mapValue = addValueBean(mapValue, value);
- map.put(key, mapValue);
- }
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
- e.printStackTrace();
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>(16);
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.AGG_TIME);
- return conf;
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-// outputFieldsDeclarer.declare(new Fields("map"));
- outputFieldsDeclarer.declare(new Fields("key","value"));
- }
-
- /**
- * 将两个ValueBean中的相应的属性相加
- *
- * @param result
- * @param value2
- * @return
- */
-
- @SuppressWarnings("all")
- public ValueBean addValueBean(ValueBean result, ValueBean value2) {
-
- result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
- result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
- result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
- result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
- result.setCommon_sessions(result.getCommon_sessions() + value2.getCommon_sessions());
-
-
- return result;
- }
-
-}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
deleted file mode 100644
index 4432255..0000000
--- a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package cn.ac.iie.origion.bolt;
-
-import cn.ac.iie.origion.bean.KeyBean;
-import cn.ac.iie.origion.bean.ValueBean;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.windowing.TupleWindow;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @ClassNameMyWindowBolt
- * @Author [email protected]
- * @Date2020/6/9 14:45
- * @Version V1.0
- **/
-public class MyWindowBolt extends BaseWindowedBolt {
-
-
- private static OutputCollector collector;
- private HashMap<String, ValueBean> map;
- private String message;
-
-
- @Override
-
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- map = new HashMap<>(16);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- declarer.declare(new Fields("key","value"));
-// declarer.declare(new Fields("map"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return super.getComponentConfiguration();
- }
-
- @Override
- public void execute(TupleWindow inputWindow) {
- //TODO 清空上一个窗口聚合的数据
- map.clear();
-
- //TODO 遍历一个窗口的数据
- for (Tuple tuple : inputWindow.getNew()) {
-
- //TODO 将一个Tuple解析成String
- message = tuple.getStringByField("source");
-
- //TODO 获取Tuple中的value Bean
- ValueBean valueBean = JSONObject.parseObject(message, ValueBean.class);
- //TODO 获取Tuple中的key String
- String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
- //TODO 获取map中的value Bean
- ValueBean mapValueBean = map.getOrDefault(key, new ValueBean());
- //TODO 将tuple中的value和map中的value做累加
- mapValueBean = addValueBean(mapValueBean, valueBean);
-
- //TODO 将累加后的结果放到map中
- map.put(key, mapValueBean);
- }
-
- //TODO 遍历map将 K V发送出去
- for (String key : map.keySet()) {
- collector.emit(new Values(key,map.get(key)));
- }
-// collector.emit(new Values(map));
-
- }
-
- @SuppressWarnings("all")
- /**
- * 将两个ValueBean中的相应的属性相加
- * @param result
- * @param value2
- * @return
- */
-
- public ValueBean addValueBean(ValueBean result, ValueBean value2) {
-
- result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
- result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
- result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
- result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
- result.setCommon_sessions(result.getCommon_sessions() + 1L);
- return result;
- }
-
-
-}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
deleted file mode 100644
index 40a2070..0000000
--- a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package cn.ac.iie.origion.bolt;
-
-
-import cn.ac.iie.origion.bean.ConnectionRecordLog;
-import cn.ac.iie.origion.bean.ValueBean;
-import cn.ac.iie.origion.utils.FlowWriteConfig;
-import cn.ac.iie.origion.utils.KafkaLogNtc;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * 发送到kafka的bolt
- */
-public class NtcLogSendBolt extends BaseBasicBolt {
-
-
- private static final long serialVersionUID = 3237813470939823159L;
- private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
- private KafkaLogNtc kafkaLogNtc;
-
- private JSONObject key;
- private ValueBean valueBean;
- private ConnectionRecordLog connectionRecordLog;
-
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- kafkaLogNtc = KafkaLogNtc.getInstance();
- connectionRecordLog = new ConnectionRecordLog();
- key = new JSONObject();
- valueBean = new ValueBean();
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
-
-
-// System.out.println(this.getClass() + " 获取的tuple的sessions " + (tuple.getValue(0)) + ((ValueBean)tuple.getValue(1)).getCommon_sessions());
- try {
- key = JSONObject.parseObject(tuple.getValue(0).toString());
-
- valueBean = (ValueBean) (tuple.getValue(1));
-
- connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
- connectionRecordLog.setCommon_policy_id(Integer.parseInt(key.getString("common_policy_id")));
- connectionRecordLog.setCommon_action(Integer.parseInt(key.getString("common_action")));
- connectionRecordLog.setCommon_sub_action(key.getString("common_sub_action"));
- connectionRecordLog.setCommon_client_ip(key.getString("common_client_ip"));
- connectionRecordLog.setCommon_client_location(key.getString("common_client_location"));
- connectionRecordLog.setCommon_sled_ip(key.getString("common_sled_ip"));
- connectionRecordLog.setCommon_device_id(key.getString("common_device_id"));
- connectionRecordLog.setCommon_subscriber_id(key.getString("common_subscriber_id"));
- connectionRecordLog.setCommon_server_ip(key.getString("common_server_ip"));
- connectionRecordLog.setCommon_server_location(key.getString("common_server_location"));
- connectionRecordLog.setCommon_server_port(Integer.parseInt(key.getString("common_server_port")));
- connectionRecordLog.setCommon_l4_protocol(key.getString("common_l4_protocol"));
- connectionRecordLog.setHttp_domain(key.getString("http_domain"));
- connectionRecordLog.setSsl_sni(key.getString("ssl_sni"));
-
- //TODO 为Value赋值
-
- connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
- connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
- connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
- connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
- connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
-
- kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
- e.printStackTrace();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java
deleted file mode 100644
index 4215d0e..0000000
--- a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package cn.ac.iie.origion.bolt;
-
-import cn.ac.iie.origion.bean.ValueBean;
-
-import cn.ac.iie.origion.utils.FlowWriteConfig;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @ClassNamePrintBolt
- * @Author [email protected]
- * @Date2020/6/9 13:53
- * @Version V1.0
- **/
-public class PrintBolt extends BaseBasicBolt {
-
- private static final long serialVersionUID = -3663610927224396615L;
- private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
-
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- }
-
- @SuppressWarnings("all")
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
- try {
- HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
-
- if (hashMap.size() != 0) {
- for (String key : hashMap.keySet()) {
- System.out.println(key);
- System.out.println(JSONObject.toJSONString(hashMap.get(key)));
- }
-
- }
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
deleted file mode 100644
index e7d7a35..0000000
--- a/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package cn.ac.iie.origion.utils;
-
-
-import cn.ac.iie.origion.bean.KeyBean;
-import cn.ac.iie.origion.bean.ValueBean;
-import com.alibaba.fastjson.JSONObject;
-
-import java.util.Map;
-
-/**
- * @ClassNameAggregateUtil
- * @Author [email protected]
- * @Date2020/6/9 11:44
- * @Version V1.0
- **/
-public class AggregateUtil {
-
-
- private static ValueBean valueBean = new ValueBean();
- private static KeyBean keyBean = new KeyBean();
-
- /**
- * 接收到的是一个json字符串,该方法将其解析成 Bean
- * @param message
- * @return
- */
- public static String aggregate(String message){
-
-
-
- //TODO 获取tuple输入内容,解析成map
- Map map = JSONObject.parseObject(message);
-
-
- //TODO KEY
-
- keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString()));
- keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString()));
- keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());
- keyBean.setCommon_client_ip(map.getOrDefault("common_client_ip","").toString());
- keyBean.setCommon_client_location(map.getOrDefault("common_client_location","").toString());
- keyBean.setCommon_sled_ip(map.getOrDefault("common_sled_ip","").toString());
- keyBean.setCommon_device_id(map.getOrDefault("common_device_id","").toString());
- keyBean.setCommon_subscriber_id(map.getOrDefault("common_subscriber_id","").toString());
- keyBean.setCommon_server_ip(map.getOrDefault("common_server_ip","").toString());
- keyBean.setCommon_server_location(map.getOrDefault("common_server_location","").toString());
- keyBean.setCommon_server_port(Integer.parseInt(map.getOrDefault("common_server_port","0" ).toString()));
- keyBean.setCommon_l4_protocol(map.getOrDefault("common_l4_protocol","").toString());
- keyBean.setHttp_domain(map.getOrDefault("http_domain","").toString());
- keyBean.setSsl_sni(map.getOrDefault("ssl_sni","").toString());
-
- //TODO VALUE
-
-
- valueBean.setCommon_c2s_pkt_num(Long.parseLong(map.getOrDefault("common_c2s_pkt_num",0).toString()));
- valueBean.setCommon_s2c_pkt_num(Long.parseLong(map.getOrDefault("common_s2c_pkt_num",0).toString()));
- valueBean.setCommon_c2s_byte_num(Long.parseLong(map.getOrDefault("common_c2s_byte_num",0).toString()));
- valueBean.setCommon_s2c_byte_num(Long.parseLong(map.getOrDefault("common_s2c_byte_num",0).toString()));
- valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString()));
-
-
-
- return message;
- }
-}
diff --git a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java
deleted file mode 100644
index 16a069c..0000000
--- a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package cn.ac.iie.origion.utils;
-
-
-/**
- * @author Administrator
- */
-public class FlowWriteConfig {
-
- public static final int IPV4_TYPE = 1;
- public static final int IPV6_TYPE = 2;
- public static final String FORMAT_SPLITTER = ",";
- /**
- * System
- */
- public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism");
- public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
- public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
- public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
- public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
- public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
- public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
- public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
- public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
- public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
-
-
- public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time");
-
-
- /**
- * kafka
- */
- public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
- public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
- public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
- public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
- public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
- public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
- public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
- public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
-
- /**
- * http
- */
- public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
-
-} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java
deleted file mode 100644
index 6c0d6bc..0000000
--- a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package cn.ac.iie.origion.utils;
-
-import org.apache.storm.Constants;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * 用于检测是否是系统发送的tuple
- *
- * @author Administrator
- */
-public final class TupleUtils {
- /**
- * 判断是否系统自动发送的Tuple
- *
- * @param tuple 元组
- * @return true or false
- */
- public static boolean isTick(Tuple tuple) {
- return tuple != null
- && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
- && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
- }
-}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
new file mode 100644
index 0000000..0c88a7f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
@@ -0,0 +1,172 @@
+package cn.ac.iie.storm.bolt;
+
+import cn.ac.iie.storm.utils.combine.AggregateUtils;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNameAggregateBolt
+ * @Author [email protected]
+ * @Date2020/6/24 13:39
+ * @Version V1.0
+ **/
+public class AggregateBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(AggregateBolt.class);
+ private static final long serialVersionUID = -7666031217706448622L;
+ private HashMap<String, JSONObject> metricsMap;
+ private HashMap<String, String[]> actionMap;
+ private HashMap<String, JSONObject> cacheMap;
+ private static String timestamp;
+
+ /**
+ * 只会在程序初始化的时候执行一次
+ *
+ * @param stormConf
+ * @param context
+ */
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+// timestampValue = System.currentTimeMillis() / 1000;
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ timestamp = AggregateUtils.getTimeMetric(schema);
+ cacheMap = new HashMap<>(32);
+
+ // TODO 获取action标签的内容
+ actionMap = AggregateUtils.getActionMap(schema);
+ metricsMap = AggregateUtils.getMetircsMap(schema);
+
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(input)) {
+ long timestampValue = System.currentTimeMillis() / 1000;
+ for (String s : cacheMap.keySet()) {
+ JSONObject result = JSONObject.parseObject(s);
+ result.put(timestamp, timestampValue);
+ result.putAll(cacheMap.get(s));
+ collector.emit(new Values(result.toString()));
+ }
+ cacheMap.clear();
+
+ } else {
+ String label = input.getStringByField("label");
+ //action中某个协议的所有function,如果没有就默认
+ String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
+
+ String dimensions = input.getStringByField("dimensions");
+ String message = input.getStringByField("message");
+
+ //一条数据
+ JSONObject event = JSONObject.parseObject(message);
+ //数据中的key值 (protocol,device_id,isp)
+ //map中对应的数据,可能为空,为空就默认创建一个对象
+ JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject());
+ //TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行
+ for (String metric : metrics) {
+ String name = metricsMap.get(metric).getString("name");
+ //可能为空
+ String fieldName = metricsMap.get(name).getString("fieldName");
+ String nameValue = cacheMessage.getString(name);
+ //map中的字段值
+ nameValue = (nameValue == null) ? "0" : nameValue;
+
+ String fieldNameValue = event.getString(fieldName);
+ //数据中的字段值
+ fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue;
+
+ //TODO 每次新增函数,需要改动此处代码
+ functionSet(name, cacheMessage, nameValue, fieldNameValue);
+ }
+ cacheMap.put(dimensions, cacheMessage);
+
+ }
+ } catch (Exception e) {
+ logger.error("计算节点异常,异常信息:" + e);
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("message"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME);
+ return conf;
+ }
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param name 函数名称
+ * @param cacheMessage 结果集
+ * @param nameValue 当前值
+ * @param fieldNameValue 新加值
+ */
+ private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) {
+ switch (name) {
+ case "sessions":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "c2s_byte_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "s2c_byte_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "c2s_pkt_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "s2c_pkt_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "c2s_ipfrag_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "s2c_ipfrag_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "s2c_tcp_lostlen":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "c2s_tcp_lostlen":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "c2s_tcp_unorder_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+ case "s2c_tcp_unorder_num":
+ cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ break;
+
+ case "unique_sip_num":
+ //TODO
+ //cacheMessage.put(name, AggregateUtils.)
+ break;
+ case "unique_cip_num":
+ //TODO
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
new file mode 100644
index 0000000..367d62b
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
@@ -0,0 +1,168 @@
+package cn.ac.iie.storm.bolt;
+
+
+import cn.ac.iie.storm.utils.combine.AggregateUtils;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions;
+import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation;
+
+/**
+ * @ClassNameMyWindowBolt
+ * @Author [email protected]
+ * @Date2020/6/9 14:45
+ * @Version V1.0
+ **/
+public class ParseKvBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(ParseKvBolt.class);
+ private static final long serialVersionUID = -999382396035310355L;
+ private JSONArray transforms;
+ private JSONArray dimensions;
+ private static HashMap<Long, String> appMap = new HashMap<>(32);
+
+
+ /**
+ * 此方法只在程序启动的时候执行一次,用来初始化
+ *
+ * @param stormConf Map
+ * @param context TopologyContext
+ */
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = JSONObject.parseObject(jsonObject.getString("data")).getString("data");
+ //TODO 解析 schema
+ transforms = JSONObject.parseArray(JSONObject.parseObject(data).getString("transforms"));
+
+ //TODO 获取dimensions
+ dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions"));
+ updateAppRelation(appMap);
+
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ updateAppRelation(appMap);
+ } else {
+ //TODO 解析tuple的 message
+ JSONObject message = JSONObject.parseObject(tuple.getStringByField("source"));
+
+ //TODO 新建一个dimensions的Json对象
+ JSONObject dimensionsObj = transDimensions(dimensions, message);
+
+ for (Object transform : transforms) {
+ JSONObject transformObj = JSONObject.parseObject(transform.toString());
+ String function = transformObj.getString("function");
+ String name = transformObj.getString("name");
+ String fieldName = transformObj.getString("fieldName");
+ String parameters = transformObj.getString("parameters");
+
+ switch (function) {
+ case "alignment":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (message.containsKey(fieldName)) {
+ alignmentUtils(parameters, message, name, fieldName);
+ }
+ }
+ break;
+ case "combination":
+ if (StringUtil.isNotBlank(parameters)) {
+ combinationUtils(parameters, message, name, fieldName, dimensionsObj);
+ }
+ break;
+ case "hierarchy":
+ String hierarchyValue = message.getString(fieldName);
+ //TODO 执行拆分代码
+ if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
+ String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
+ //TODO 递归拼接tuple并发送出去
+ AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
+ }
+ break;
+ default:
+ //数据原样输出
+ collector.emit(new Values(null, null, message.toString()));
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.UPDATE_APP_ID_TIME);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ declarer.declare(new Fields("label", "dimensions", "message"));
+ }
+
+ /**
+ * alignment ID替换操作
+ *
+ * @param parameters 参数集
+ * @param message 原始日志
+ * @param name 结果日志列名
+ * @param fieldName 原始日志列名
+ */
+ private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
+ String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ String data = message.getString(fieldName);
+ System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
+ int subscript = Integer.parseInt(alignmentPars[0]);
+ String[] fieldSplit = data.split(alignmentPars[1]);
+ Long appID = Long.valueOf(fieldSplit[subscript]);
+ int length = fieldSplit[subscript].length();
+ StringBuilder sb = new StringBuilder(data);
+ message.put(name, sb.replace(0, length, appMap.get(appID)));
+ }
+
+ /**
+ * combination 拼接操作
+ *
+ * @param parameters 参数集
+ * @param message 原始日志
+ * @param name 结果日志列名
+ * @param fieldName 原始日志列名
+ * @param dimensionsObj 结果集
+ */
+ private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) {
+ String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ String parameter0Value = message.getString(combinationPars[0]);
+ if (StringUtil.isNotBlank(parameter0Value)) {
+ String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName);
+ message.put(fieldName, combinationValue);
+ dimensionsObj.put(name, combinationValue);
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
new file mode 100644
index 0000000..36e57a6
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
@@ -0,0 +1,44 @@
+package cn.ac.iie.storm.bolt;
+
+
+import cn.ac.iie.storm.utils.common.LogSendKafka;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * 发送到kafka的bolt
+ *
+ * @author qidaijie
+ */
+public class ResultSendBolt extends BaseBasicBolt {
+
+ private static final long serialVersionUID = 3237813470939823159L;
+ private static Logger logger = Logger.getLogger(ResultSendBolt.class);
+ private LogSendKafka logSendKafka;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ logSendKafka = LogSendKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ logSendKafka.sendMessage(tuple.getStringByField("message"));
+ } catch (Exception e) {
+ logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java
new file mode 100644
index 0000000..6093a94
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java
@@ -0,0 +1,68 @@
+package cn.ac.iie.storm.bolt.change;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * @ClassNameFilterBolt
+ * @Author [email protected]
+ * @Date2020/7/1 12:02
+ * @Version V1.0
+ **/
+public class FilterBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ JSONObject source = JSONObject.parseObject(input.getStringByField("source"));
+ String schema = "";
+
+ String data = JSONObject.parseObject(schema).getString("data");
+
+ String filters = JSONObject.parseObject(data).getString("filters");
+
+ boolean flag = true;
+ String type = JSONObject.parseObject(filters).getString("type");
+ JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
+ if ("and".equals(type)) {
+ for (int i = 0; i < fieldsArr.size(); i++) {
+
+ JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
+ String name = field.getString("fieldName");
+ String fieldType = field.getString("type");
+ Object values = field.get("values");
+
+ Object nameValue = source.get(name);
+
+ System.out.println("nameValue ========" +nameValue);
+
+ if ("not".equals(fieldType)) {
+
+ if (nameValue == values) {
+ //满足过滤条件
+ flag = false;
+ }
+
+ } else if ("in".equals(fieldType)) {
+ if (!values.toString().contains(nameValue.toString())) {
+ //满足过滤条件
+ flag = false;
+ }
+ }
+ }}
+
+
+
+ collector.emit(new Values(source));
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("filter"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java
new file mode 100644
index 0000000..e251f21
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java
@@ -0,0 +1,40 @@
+package cn.ac.iie.storm.bolt.print;
+
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * @ClassNamePrintBolt
+ * @Author [email protected]
+ * @Date2020/6/28 15:34
+ * @Version V1.0
+ **/
+public class PrintBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(PrintBolt.class);
+ private static long a;
+ private long b;
+ public static long c;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ logger.error("==================================一批数据=========================");
+
+ a= System.currentTimeMillis();
+ b= System.currentTimeMillis();
+ c= System.currentTimeMillis();
+
+
+ logger.error(Thread.currentThread() + "private static long a======:" + a);
+ logger.error(Thread.currentThread() + "private long b======:" + b);
+ logger.error(Thread.currentThread() + "public static long c======:" + c);
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
index 501f2f6..fd6dfba 100644
--- a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
@@ -1,7 +1,6 @@
-package cn.ac.iie.origion.spout;
+package cn.ac.iie.storm.spout;
-
-import cn.ac.iie.origion.utils.FlowWriteConfig;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -32,12 +31,12 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private static Properties createConsumerConfig() {
Properties props = new Properties();
- props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
- props.put("group.id", FlowWriteConfig.GROUP_ID);
+ props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", StreamAggregateConfig.GROUP_ID);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
- props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
+ props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
@@ -50,7 +49,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
- this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
+ this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC));
}
@Override
@@ -63,7 +62,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
- Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}
diff --git a/src/main/java/cn/ac/iie/origion/topology/StormRunner.java b/src/main/java/cn/ac/iie/storm/topology/StormRunner.java
index f4ecbcd..6e77c66 100644
--- a/src/main/java/cn/ac/iie/origion/topology/StormRunner.java
+++ b/src/main/java/cn/ac/iie/storm/topology/StormRunner.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.origion.topology;
+package cn.ac.iie.storm.topology;
import org.apache.storm.Config;
diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
index c8359b0..ed38bce 100644
--- a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
+++ b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
@@ -1,51 +1,52 @@
-package cn.ac.iie.origion.topology;
+package cn.ac.iie.storm.topology;
-import cn.ac.iie.origion.bolt.AggregateBolt;
-import cn.ac.iie.origion.bolt.MyWindowBolt;
-import cn.ac.iie.origion.bolt.NtcLogSendBolt;
-import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
-import cn.ac.iie.origion.utils.FlowWriteConfig;
+import cn.ac.iie.storm.bolt.AggregateBolt;
+import cn.ac.iie.storm.bolt.ResultSendBolt;
+import cn.ac.iie.storm.bolt.ParseKvBolt;
+import cn.ac.iie.storm.spout.CustomizedKafkaSpout;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
-import java.util.concurrent.TimeUnit;
-
-
/**
- * Storm程序主类
- *
- * @author Administrator
- */
+ * @ClassNameFlowAggregateTopo
+ * @Author [email protected]
+ * @Date2020/6/24 10:14
+ * @Version V1.0
+ **/
+public class StreamAggregateTopology {
-public class LogFlowWriteTopology {
- private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
+
+ private static Logger logger = Logger.getLogger(StreamAggregateTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
- private LogFlowWriteTopology() {
- this(LogFlowWriteTopology.class.getSimpleName());
+ private StreamAggregateTopology() {
+ this(StreamAggregateTopology.class.getSimpleName());
}
- private LogFlowWriteTopology(String topologyName) {
+ private StreamAggregateTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
}
+ /**
+ * 测试配置
+ * conf.setTopologyWorkerMaxHeapSize(6144);
+ * conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
+ */
private Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
- conf.setMaxSpoutPending(150000);
- conf.setNumAckers(FlowWriteConfig.TOPOLOGY_WORKERS);
-// conf.setTopologyWorkerMaxHeapSize(6144);
- conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
+ conf.setMaxSpoutPending(StreamAggregateConfig.SPOUT_PARALLELISM);
+ conf.setNumAckers(StreamAggregateConfig.TOPOLOGY_NUM_ACKS);
return conf;
}
@@ -55,7 +56,7 @@ public class LogFlowWriteTopology {
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
- topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+ topologyConfig.setNumWorkers(StreamAggregateConfig.TOPOLOGY_WORKERS);
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
@@ -63,26 +64,30 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
- builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
- builder.setBolt("TEST-CONN", new MyWindowBolt()
- .withWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
- new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
- .localOrShuffleGrouping("LogFlowWriteSpout");
-
- builder.setBolt("AGG-BOLT", new AggregateBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).fieldsGrouping("TEST-CONN", new Fields("key"));
- builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("AGG-BOLT");
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM);
+
+ builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
+ .localOrShuffleGrouping("CustomizedKafkaSpout");
+
+ builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
+ .fieldsGrouping("ParseKvBolt", new Fields("dimensions"));
+
+ builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM)
+ .localOrShuffleGrouping("AggregateBolt");
+// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout");
+
}
public static void main(String[] args) throws Exception {
- LogFlowWriteTopology csst = null;
+ StreamAggregateTopology csst = null;
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
- csst = new LogFlowWriteTopology(args[0]);
+ csst = new StreamAggregateTopology(args[0]);
} else {
- csst = new LogFlowWriteTopology();
+ csst = new StreamAggregateTopology();
}
csst.buildTopology();
diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
new file mode 100644
index 0000000..fec3592
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
@@ -0,0 +1,204 @@
+package cn.ac.iie.storm.utils.combine;
+
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.tuple.Values;
+
+
+import java.util.HashMap;
+
+/**
+ * @ClassNameAggregateUtils
+ * @Author [email protected]
+ * @Date2020/6/23 14:04
+ * @Version V1.0
+ **/
+public class AggregateUtils {
+ private final static Logger logger = Logger.getLogger(AggregateUtils.class);
+
+ /**
+ * Long类型的数据求和
+ *
+ * @param value1 第一个值
+ * @param value2 第二个值
+ * @return value1 + value2
+ */
+ public static Long longSum(Long value1, Long value2) {
+
+ return value1 + value2;
+ }
+
+ /**
+ * 计算Count
+ *
+ * @param count 当前count值
+ * @return count+1
+ */
+ public static Long count(Long count) {
+
+ count++;
+ return count;
+ }
+
+
+ /**
+ * 返回指标列的Map集合
+ *
+ * @param schema 动态获取的schema
+ * @return 指标列集合
+ * (c2s_byte_len, { "function" : "c2s_byte_sum", "name" : "c2s_byte_len", "fieldName" : "common_c2s_byte_num" })
+ */
+ public static HashMap<String, JSONObject> getMetircsMap(String schema) {
+ HashMap<String, JSONObject> metricsMap = new HashMap<>(16);
+
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = jsonObject.getString("data");
+ JSONArray metrics = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("metrics"));
+
+ for (Object metric : metrics) {
+ JSONObject json = JSONObject.parseObject(metric.toString());
+ String name = json.getString("name");
+ metricsMap.put(name, json);
+ }
+
+ return metricsMap;
+ }
+
+
+ /**
+ * 递归发送tuple
+ *
+ * @param headIndex ssss
+ * @param splitArr
+ * @param initStr
+ * @param collector
+ * @param message
+ * @param dimesionsObj
+ * @param name
+ */
+ public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
+
+// //递归拼接字符串
+// if (splitArr.length == headIndex - 1) {
+// //什么也不做
+// } else {
+// //递归的核心代码
+// if ("".equals(initStr)) {
+// initStr = splitArr[splitArr.length - headIndex];
+// } else {
+// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
+// }
+// dimesionsObj.put(name, initStr);
+//
+// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
+// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
+// }
+
+ //递归拼接字符串
+ if (splitArr.length != headIndex - 1) {
+ //递归的核心代码
+ if ("".equals(initStr)) {
+ initStr = splitArr[splitArr.length - headIndex];
+ } else {
+ initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
+ }
+ dimesionsObj.put(name, initStr);
+
+ collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
+ reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
+ }
+ }
+
+
+ /**
+ * 获取action模块的Map集合
+ *
+ * @param schema 动态获取的schema
+ * @return (HTTP,metrics数组)
+ */
+ public static HashMap<String, String[]> getActionMap(String schema) {
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = jsonObject.getString("data");
+ JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("data")).getString("action"));
+ HashMap<String, String[]> map = new HashMap<>(16);
+
+ for (Object action : actions) {
+ JSONObject json = JSONObject.parseObject(action.toString());
+
+ String label = json.getString("label");
+ String[] metrics = json.getString("metrics").split(",");
+
+ map.put(label, metrics);
+ }
+ return map;
+ }
+
+
+ /**
+ * 获取时间列的集合
+ *
+ * @param schema 动态获取的schema
+ * @return 时间列
+ */
+ public static String getTimeMetric(String schema) {
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = jsonObject.getString("data");
+
+ return JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data)
+ .getString("data"))
+ .getString("timestamp"))
+ .getString("name");
+ }
+
+ /**
+ * 更新缓存中的对应关系map
+ *
+ * @param hashMap 当前缓存对应关系map
+ */
+ public static void updateAppRelation(HashMap<Long, String> hashMap) {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ Long key = jsonArray.getLong(0);
+ String value = jsonArray.getString(1);
+ if (hashMap.containsKey(key)) {
+ if (!value.equals(hashMap.get(key))) {
+ hashMap.put(key, value);
+ }
+ } else {
+ hashMap.put(key, value);
+ }
+
+ }
+ System.out.println((System.currentTimeMillis() - begin));
+ logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
+ } catch (Exception e) {
+ logger.error("更新缓存APP-ID失败,异常:" + e);
+ }
+ }
+
+ /**
+ * 解析 dimensions 字段集
+ *
+ * @param dimensions 维度集
+ * @param message 原始日志
+ * @return 结果维度集
+ */
+ public static JSONObject transDimensions(JSONArray dimensions, JSONObject message) {
+ JSONObject dimensionsObj = new JSONObject();
+ for (Object dimension : dimensions) {
+ String fieldName = JSONObject.parseObject(dimension.toString()).getString("fieldName");
+ String name = JSONObject.parseObject(dimension.toString()).getString("name");
+ dimensionsObj.put(name, message.get(fieldName));
+ }
+ return dimensionsObj;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
index 89ea53a..378fea6 100644
--- a/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java
+++ b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
@@ -1,6 +1,7 @@
-package cn.ac.iie.origion.utils;
+package cn.ac.iie.storm.utils.common;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
@@ -13,9 +14,9 @@ import java.util.Properties;
* @create 2018-08-13 15:11
*/
-public class KafkaLogNtc {
+public class LogSendKafka {
- private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
+ private static Logger logger = Logger.getLogger(LogSendKafka.class);
/**
* kafka生产者,用于向kafka中发送消息
@@ -25,27 +26,27 @@ public class KafkaLogNtc {
/**
* kafka生产者适配器(单例),用来代理kafka生产者发送消息
*/
- private static KafkaLogNtc kafkaLogNtc;
+ private static LogSendKafka logSendKafka;
- private KafkaLogNtc() {
+ private LogSendKafka() {
initKafkaProducer();
}
- public static KafkaLogNtc getInstance() {
- if (kafkaLogNtc == null) {
- kafkaLogNtc = new KafkaLogNtc();
+ public static LogSendKafka getInstance() {
+ if (logSendKafka == null) {
+ logSendKafka = new LogSendKafka();
}
- return kafkaLogNtc;
+ return logSendKafka;
}
public void sendMessage(String message) {
final int[] errorSum = {0};
- kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
+ kafkaProducer.send(new ProducerRecord<>(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
- logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
+ logger.error("写入" + StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
}
@@ -60,7 +61,7 @@ public class KafkaLogNtc {
*/
private void initKafkaProducer() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
+ properties.put("bootstrap.servers", StreamAggregateConfig.RESULTS_BOOTSTRAP_SERVERS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
@@ -68,7 +69,7 @@ public class KafkaLogNtc {
properties.put("request.timeout.ms", 30000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
- properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
+ properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE);
kafkaProducer = new KafkaProducer<>(properties);
}
diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
new file mode 100644
index 0000000..a768ce1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
@@ -0,0 +1,45 @@
+package cn.ac.iie.storm.utils.file;
+
+
+/**
+ * @author Administrator
+ */
+public class StreamAggregateConfig {
+
+ public static final String FORMAT_SPLITTER = ",";
+ /**
+ * System
+ */
+ public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism");
+ public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
+ public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers");
+ public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
+ public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks");
+ public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time");
+ public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num");
+ public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num");
+ public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num");
+
+
+ public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time");
+ public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time");
+
+
+ /**
+ * kafka
+ */
+ public static final String BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final String RESULTS_BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "results.bootstrap.servers");
+ public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
+ public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic");
+ public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http");
+
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java
index 70c9a6d..03f67c0 100644
--- a/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java
+++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfigurations.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.origion.utils;
+package cn.ac.iie.storm.utils.file;
import java.util.Properties;
@@ -7,7 +7,7 @@ import java.util.Properties;
* @author Administrator
*/
-public final class FlowWriteConfigurations {
+public final class StreamAggregateConfigurations {
// private static Properties propCommon = new Properties();
private static Properties propService = new Properties();
@@ -56,7 +56,7 @@ public final class FlowWriteConfigurations {
static {
try {
- propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
} catch (Exception e) {
// propCommon = null;
propService = null;
diff --git a/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..2e00efd
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/utils/http/HttpClientUtil.java
@@ -0,0 +1,55 @@
+package cn.ac.iie.storm.utils.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 获取网关schema的工具类
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+
+ /**
+ * 请求网关获取schema
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder = null;
+ try {
+ HttpGet get = new HttpGet(http);
+ try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ entityStringBuilder.append(line);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return entityStringBuilder.toString();
+ }
+
+}
diff --git a/src/main/main.iml b/src/main/main.iml
new file mode 100644
index 0000000..908ad4f
--- /dev/null
+++ b/src/main/main.iml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" inherit-compiler-output="true">
+ <exclude-output />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+</module> \ No newline at end of file
diff --git a/src/test/java/com/wp/AppIdTest.java b/src/test/java/com/wp/AppIdTest.java
new file mode 100644
index 0000000..39c9348
--- /dev/null
+++ b/src/test/java/com/wp/AppIdTest.java
@@ -0,0 +1,54 @@
+package com.wp;
+
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.wp
+ * @Description:
+ * @date 2020/9/2215:09
+ */
+public class AppIdTest {
+
+ @Test
+ public void appTest() {
+ //http://192.168.44.12:9999/open-api/appDicList
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = jsonObject.getString("data");
+ HashMap<Long, String> map = new HashMap<>(16);
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ map.put(jsonArray.getLong(0), jsonArray.getString(1));
+// System.out.println(object);
+ }
+ System.out.println(map.toString());
+
+ System.out.println(map.size());
+ }
+
+ @Test
+ public void changeApp() {
+ String a = "QQ";
+ String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER);
+ String data = "100/HTTP";
+ int subscript = Integer.parseInt(alignmentPars[0]);
+ String[] fieldSplit = data.split(alignmentPars[1]);
+ Long appID = Long.valueOf(fieldSplit[subscript]);
+ int length = fieldSplit[subscript].length();
+ StringBuilder sb = new StringBuilder(data);
+
+ System.out.println(sb.replace(0, length, a));
+
+
+ }
+}
diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java
deleted file mode 100644
index 71ab064..0000000
--- a/src/test/java/com/wp/AppTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.wp;
-
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Debug;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest{
-
-
- @org.junit.Test
- public void test(){
-
- Config conf = new Config();
-// conf.setDebug(false);
- conf.setMessageTimeoutSecs(60);
- conf.setNumWorkers(1);
-
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
- new Values("nickt1", 4),
- new Values("nickt2", 7),
- new Values("nickt3", 8),
- new Values("nickt4", 9),
- new Values("nickt5", 7),
- new Values("nickt6", 11),
- new Values("nickt7", 5)
- );
- spout.setCycle(true);
- TridentTopology topology = new TridentTopology();
- topology.newStream("spout1", spout)
- .batchGlobal()
- .each(new Fields("user"),new Debug("print:"))
- .parallelismHint(5);
-
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology("trident-function", conf, topology.build());
-
- }
-
-}
-
-
-
diff --git a/src/test/java/com/wp/FilterBolt.java b/src/test/java/com/wp/FilterBolt.java
new file mode 100644
index 0000000..a3a9ee7
--- /dev/null
+++ b/src/test/java/com/wp/FilterBolt.java
@@ -0,0 +1,133 @@
+package com.wp;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * @ClassNameFilterBolt
+ * @Author [email protected]
+ * @Date2020/7/1 14:53
+ * @Version V1.0
+ **/
+public class FilterBolt {
+ @SuppressWarnings("all")
+ public static void main(String[] args) {
+ JSONObject source = new JSONObject();
+
+
+ String schema = "{\n" +
+ " \"task\": \"Application-Protocol-Distribution\",\n" +
+ " \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" +
+ " \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" +
+ " \"data\": {\n" +
+ " \"timestamp\": {\n" +
+ " \"name\": \"stat_time\"\n" +
+ " },\n" +
+ " \"dimensions\": [\n" +
+ " {\n" +
+ " \"name\": \"protocol_id\",\n" +
+ " \"fieldName\": \"common_protocol_label\",\n" +
+ " \"type\": \"String\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\": \"device_id\",\n" +
+ " \"fieldName\": \"common_device_id\",\n" +
+ " \"type\": \"String\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\": \"isp\",\n" +
+ " \"fieldName\": \"common_isp\",\n" +
+ " \"type\": \"String\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"metrics\": [\n" +
+ " { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" +
+ " { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" +
+ " { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" +
+ " { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" +
+ " { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" +
+ " { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" +
+ " { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" +
+ " ],\n" +
+ " \"filters\": {\n" +
+ " \"type\": \"and\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"fieldName\": \"common_device_id\",\n" +
+ " \"type\": \"not\",\n" +
+ " \"values\": null\n" +
+ " },\n" +
+ " {\n" +
+ " \"fieldName\": \"common_protocol_label\",\n" +
+ " \"type\": \"not\",\n" +
+ " \"values\": null\n" +
+ " },\n" +
+ " {\n" +
+ " \"fieldName\": \"common_isp\",\n" +
+ " \"type\": \"not\",\n" +
+ " \"values\": null\n" +
+ " }\n" +
+ " ]\n" +
+ " },\n" +
+ " \"transforms\":[\n" +
+ " {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" +
+ " {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" +
+ " ],\n" +
+ " \"action\":[\n" +
+ " {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" +
+ " {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" +
+ " ],\n" +
+ " \"granularity\":{\n" +
+ " \"type\": \"period\",\n" +
+ " \"period\": \"5M\"\n" +
+ " }\n" +
+ " }\n" +
+ "}";
+
+
+ source.put("common_protocol_label", "HTTP");
+ source.put("common_isp", "Unicom");
+ source.put("common_device_id", "1");
+ String data = JSONObject.parseObject(schema).getString("data");
+
+ String filters = JSONObject.parseObject(data).getString("filters");
+
+ boolean flag = true;
+ String type = JSONObject.parseObject(filters).getString("type");
+ JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
+ if ("and".equals(type)) {
+ for (int i = 0; i < fieldsArr.size(); i++) {
+
+ JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
+ String name = field.getString("fieldName");
+ String fieldType = field.getString("type");
+ Object values = field.get("values");
+
+ Object nameValue = source.get(name);
+
+
+ if ("not".equals(fieldType)) {
+
+ if (nameValue == values) {
+ //满足过滤条件
+ flag = false;
+ }
+
+ } else if ("in".equals(fieldType)) {
+ if (!values.toString().contains(nameValue.toString())) {
+ //满足过滤条件
+ flag = false;
+ }
+ }
+ }
+
+ if (flag){
+ System.out.println("输出到下个Bolt");
+ }else {
+
+ System.out.println("此条消息被过滤掉");
+ }
+
+ }
+ }
+}
diff --git a/src/test/java/com/wp/SchemaTest.java b/src/test/java/com/wp/SchemaTest.java
new file mode 100644
index 0000000..f275592
--- /dev/null
+++ b/src/test/java/com/wp/SchemaTest.java
@@ -0,0 +1,43 @@
+package com.wp;
+
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+
+/**
+ * @ClassNameSchemaTest
+ * @Author [email protected]
+ * @Date2020/6/28 10:11
+ * @Version V1.0
+ **/
+public class SchemaTest {
+
+ static String str = "";
+
+ public static void main(String[] args) {
+
+
+ String str1 = null;
+ String str2 = " ";
+
+
+ System.out.println(StringUtil.isNotBlank(str1));
+ System.out.println(StringUtil.isNotEmpty(str1));
+
+ System.out.println(StringUtil.isNotBlank(str2));
+ System.out.println(StringUtil.isNotEmpty(str2));
+
+ }
+
+ public static void reAdd(int m, String[] split, String str) {
+
+ //递归拼接字符串
+ if (0 == m) {
+ } else {
+ //递归的核心代码
+ str = str + split[m - 1] + "/";
+ reAdd(m - 1, split, str);
+
+ }
+
+ }
+}
diff --git a/src/test/test.iml b/src/test/test.iml
new file mode 100644
index 0000000..5ebc6f4
--- /dev/null
+++ b/src/test/test.iml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" inherit-compiler-output="true">
+ <exclude-output />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/java" isTestSource="true" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="module" module-name="main" />
+ </component>
+</module> \ No newline at end of file