summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlee <[email protected]>2020-06-10 17:58:31 +0800
committerlee <[email protected]>2020-06-10 17:58:31 +0800
commitcbf8c0ca39764a68726f710c75f45c7623fc641a (patch)
treec72bfd9a012fcd194bc7f32c1bdf7582430d80ee
parentcb4ee7544e1c75e15fcc45e800e047ec269816db (diff)
原始的形式使用窗口实现预聚合初始版本
-rw-r--r--dependency-reduced-pom.xml2
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/KeyBean.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/bean/ValueBean.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java105
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java102
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java85
-rw-r--r--src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java56
-rw-r--r--src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java81
-rw-r--r--src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java95
-rw-r--r--src/main/java/cn/ac/iie/origion/topology/StormRunner.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java)31
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java)2
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java (renamed from src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java)20
-rw-r--r--src/main/java/cn/ac/iie/origion/utils/TupleUtils.java23
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggCount.java98
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java58
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java75
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java40
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java111
-rw-r--r--src/main/java/cn/ac/iie/trident/log4j.properties23
-rw-r--r--src/test/java/com/wp/AppTest.java8
24 files changed, 581 insertions, 446 deletions
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
index cc9ded2..2c51830 100644
--- a/dependency-reduced-pom.xml
+++ b/dependency-reduced-pom.xml
@@ -34,7 +34,7 @@
<configuration>
<transformers>
<transformer>
- <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
+ <mainClass>cn.ac.iie.origion.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer>
<resource>META-INF/spring.handlers</resource>
diff --git a/pom.xml b/pom.xml
index 8b12d9e..82aca69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
+ <mainClass>cn.ac.iie.origion.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java
index 54fa4f3..06b2135 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java
+++ b/src/main/java/cn/ac/iie/origion/bean/ConnectionRecordLog.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.bean;
+package cn.ac.iie.origion.bean;
/**
* @ClassNameConnectionRecordLog
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
index 5515c48..00e9e3c 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java
+++ b/src/main/java/cn/ac/iie/origion/bean/KeyBean.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.bean;
+package cn.ac.iie.origion.bean;
/**
* @ClassNameKeyBean
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java
index 8ddcc38..cfe2f37 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java
+++ b/src/main/java/cn/ac/iie/origion/bean/ValueBean.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.bean;
+package cn.ac.iie.origion.bean;
import java.io.Serializable;
diff --git a/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
new file mode 100644
index 0000000..a656606
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/bolt/AggregateBolt.java
@@ -0,0 +1,105 @@
+package cn.ac.iie.origion.bolt;
+
+
+import cn.ac.iie.origion.bean.ValueBean;
+import cn.ac.iie.origion.utils.FlowWriteConfig;
+import cn.ac.iie.origion.utils.TupleUtils;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AggregateBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(AggregateBolt.class);
+ private static final long serialVersionUID = 9006119186526123734L;
+
+ private static HashMap<String, Integer> map = new HashMap();
+ private static ValueBean valueBean = new ValueBean();
+ private static ValueBean tupleValueBean = new ValueBean();
+ private static String key = "";
+ private static Integer value = 0;
+ private static String message = "";
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ System.out.println("prepare方法执行了++++++++++++++++++++++++++");
+
+ logger.error("prepare方法执行了++++++++++++++++++++++++++");
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+
+ System.out.println("执行了一次====================================" + value + "==========================" + System.currentTimeMillis());
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ System.out.println(this.map);
+ //批量发送到下一个bolt
+ basicOutputCollector.emit(new Values(JSON.toJSONString(this.map)));
+
+
+ } else {
+ message = tuple.getString(0);
+ //TODO 获取一条tuple数据的key和value
+
+ //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖
+ this.map.put("192", addValueBean(value, 1));
+ }
+ } catch (Exception e) {
+ logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 30);
+ return conf;
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("connLog"));
+ }
+
+ /**
+ * 将两个ValueBean中的相应的属性相加
+ *
+ * @param result
+ * @param value2
+ * @return
+ */
+
+ @SuppressWarnings("all")
+ public ValueBean addValueBean(ValueBean result, ValueBean value2) {
+
+ result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
+ result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
+ result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
+ result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
+ result.setCommon_sessions(result.getCommon_sessions() + 1L);
+
+
+ return result;
+ }
+
+ public Integer addValueBean(Integer result, Integer value2) {
+
+ return result + value2;
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
new file mode 100644
index 0000000..96151ff
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/bolt/MyWindowBolt.java
@@ -0,0 +1,102 @@
+package cn.ac.iie.origion.bolt;
+
+import cn.ac.iie.origion.bean.KeyBean;
+import cn.ac.iie.origion.bean.ValueBean;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNameMyWindowBolt
+ * @Author [email protected]
+ * @Date2020/6/9 14:45
+ * @Version V1.0
+ **/
+public class MyWindowBolt extends BaseWindowedBolt {
+
+
+ private OutputCollector collector;
+
+ private static ArrayList<Integer> list;
+ private static HashMap<String, ValueBean> map;
+ private static String message;
+
+
+ @Override
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ map = new HashMap<>(16);
+ list = new ArrayList<>();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ declarer.declare(new Fields("map"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return super.getComponentConfiguration();
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ //TODO 清空上一个窗口聚合的数据
+ map.clear();
+
+ //TODO 遍历一个窗口的数据
+ for (Tuple tuple : inputWindow.getNew()) {
+
+ //TODO 将一个Tuple解析成String
+ message = tuple.getStringByField("source");
+
+ //TODO 获取Tuple中的value Bean
+ ValueBean valueBean = JSONObject.parseObject(message,ValueBean.class);
+ //TODO 获取Tuple中的key String
+ String key = JSONObject.toJSONString(JSONObject.parseObject(message, KeyBean.class));
+ //TODO 获取map中的value Bean
+ ValueBean mapValueBean = map.getOrDefault(key,new ValueBean());
+ //TODO 将tuple中的value和map中的value做累加
+ mapValueBean = addValueBean(mapValueBean, valueBean);
+
+ //TODO 将累加后的结果放到map中
+ map.put(key, mapValueBean);
+ }
+
+ collector.emit(new Values(map));
+ }
+
+ @SuppressWarnings("all")
+ /**
+ * 将两个ValueBean中的相应的属性相加
+ * @param result
+ * @param value2
+ * @return
+ */
+
+ public ValueBean addValueBean(ValueBean result, ValueBean value2) {
+
+ result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
+ result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
+ result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
+ result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
+ result.setCommon_sessions(result.getCommon_sessions() + 1L);
+
+
+ return result;
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
new file mode 100644
index 0000000..59d7eef
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/bolt/NtcLogSendBolt.java
@@ -0,0 +1,85 @@
+package cn.ac.iie.origion.bolt;
+
+
+import cn.ac.iie.origion.bean.ConnectionRecordLog;
+import cn.ac.iie.origion.bean.ValueBean;
+import cn.ac.iie.origion.utils.FlowWriteConfig;
+import cn.ac.iie.origion.utils.KafkaLogNtc;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 发送到kafka的bolt
+ */
+public class NtcLogSendBolt extends BaseBasicBolt {
+
+
+ private static final long serialVersionUID = 3237813470939823159L;
+ private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
+ private KafkaLogNtc kafkaLogNtc;
+
+ private static ConnectionRecordLog connectionRecordLog;
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ kafkaLogNtc = KafkaLogNtc.getInstance();
+ connectionRecordLog = new ConnectionRecordLog();
+ }
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
+ if (hashMap.size() != 0) {
+ for (String key : hashMap.keySet()) {
+ JSONObject keys = JSONObject.parseObject(key);
+
+ ValueBean valueBean = hashMap.get(key);
+ connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
+ connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
+ connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
+ connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
+ connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
+ connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
+ connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
+ connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
+ connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
+ connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
+ connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
+ connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
+ connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
+ connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
+ connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
+
+ //TODO 为Value赋值
+
+ connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
+ connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
+ connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
+ connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
+ connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
+
+ kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
+
+ }
+ }
+ } catch (Exception e) {
+ logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java
new file mode 100644
index 0000000..4215d0e
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/bolt/PrintBolt.java
@@ -0,0 +1,56 @@
+package cn.ac.iie.origion.bolt;
+
+import cn.ac.iie.origion.bean.ValueBean;
+
+import cn.ac.iie.origion.utils.FlowWriteConfig;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNamePrintBolt
+ * @Author [email protected]
+ * @Date2020/6/9 13:53
+ * @Version V1.0
+ **/
+public class PrintBolt extends BaseBasicBolt {
+
+ private static final long serialVersionUID = -3663610927224396615L;
+ private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ @SuppressWarnings("all")
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ HashMap<String,ValueBean> hashMap = (HashMap) tuple.getValue(0);
+
+ if (hashMap.size() != 0) {
+ for (String key : hashMap.keySet()) {
+ System.out.println(key);
+ System.out.println(JSONObject.toJSONString(hashMap.get(key)));
+ }
+
+ }
+ } catch (Exception e) {
+ logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ }
+}
diff --git a/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java
new file mode 100644
index 0000000..501f2f6
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/spout/CustomizedKafkaSpout.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.origion.spout;
+
+
+import cn.ac.iie.origion.utils.FlowWriteConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * kafkaSpout
+ *
+ * @author Administrator
+ */
+public class CustomizedKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", FlowWriteConfig.GROUP_ID);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("KafkaSpout发送消息出现异常!", e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..b197b6a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/topology/LogFlowWriteTopology.java
@@ -0,0 +1,95 @@
+package cn.ac.iie.origion.topology;
+
+import cn.ac.iie.origion.bolt.MyWindowBolt;
+import cn.ac.iie.origion.bolt.NtcLogSendBolt;
+import cn.ac.iie.origion.spout.CustomizedKafkaSpout;
+import cn.ac.iie.origion.utils.FlowWriteConfig;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Storm程序主类
+ *
+ * @author Administrator
+ */
+
+public class LogFlowWriteTopology {
+ private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ private LogFlowWriteTopology() {
+ this(LogFlowWriteTopology.class.getSimpleName());
+ }
+
+ private LogFlowWriteTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setMaxSpoutPending(150000);
+ conf.setNumAckers(3);
+ return conf;
+ }
+
+ private void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), 3);
+ builder.setBolt("TEST-CONN", new MyWindowBolt()
+ .withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)),FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
+ .localOrShuffleGrouping("LogFlowWriteSpout");
+
+// builder.setBolt("TEST-CONN", new AggregateBolt(),3).localOrShuffleGrouping("LogFlowWriteSpout");
+// builder.setBolt("KAKFA-CONN", new PrintBolt(),3).localOrShuffleGrouping("TEST-CONN");
+ builder.setBolt("KAKFA-CONN", new NtcLogSendBolt(),FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("TEST-CONN");
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogFlowWriteTopology csst = null;
+ boolean runLocally = true;
+ String parameter = "remote";
+ int size = 2;
+ if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ csst = new LogFlowWriteTopology(args[0]);
+ } else {
+ csst = new LogFlowWriteTopology();
+ }
+
+ csst.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/origion/topology/StormRunner.java
index 708f77c..f4ecbcd 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java
+++ b/src/main/java/cn/ac/iie/origion/topology/StormRunner.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.topology;
+package cn.ac.iie.origion.topology;
import org.apache.storm.Config;
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
index 2a8191c..a7047ef 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java
+++ b/src/main/java/cn/ac/iie/origion/utils/AggregateUtil.java
@@ -1,36 +1,35 @@
-package cn.ac.iie.trident.aggregate;
+package cn.ac.iie.origion.utils;
+
+import cn.ac.iie.origion.bean.KeyBean;
+import cn.ac.iie.origion.bean.ValueBean;
import com.alibaba.fastjson.JSONObject;
-import cn.ac.iie.trident.aggregate.bean.KeyBean;
-import cn.ac.iie.trident.aggregate.bean.ValueBean;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
import java.util.Map;
/**
- * 把一个tuple解析成
- * @ClassNameToJson
+ * @ClassNameAggregateUtil
- * @Date2020/5/29 16:05
+ * @Date2020/6/9 11:44
* @Version V1.0
**/
-public class ParseJson2KV extends BaseFunction {
+public class AggregateUtil {
private static ValueBean valueBean = new ValueBean();
private static KeyBean keyBean = new KeyBean();
+ /**
+ * 接收到的是一个json字符串,该方法将其解析成 Bean
+ * @param message
+ * @return
+ */
+ public static String aggregate(String message){
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
-
//TODO 获取tuple输入内容,解析成map
- Map map = JSONObject.parseObject(tuple.getStringByField("str"));
+ Map map = JSONObject.parseObject(message);
//TODO KEY
@@ -61,7 +60,7 @@ public class ParseJson2KV extends BaseFunction {
valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString()));
- collector.emit(new Values(JSONObject.toJSONString(keyBean), JSONObject.toJSONString(valueBean)));
+ return message;
}
}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java
index 3905580..16a069c 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
+++ b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfig.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.utils;
+package cn.ac.iie.origion.utils;
/**
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java
index bf24f34..70c9a6d 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java
+++ b/src/main/java/cn/ac/iie/origion/utils/FlowWriteConfigurations.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.utils;
+package cn.ac.iie.origion.utils;
import java.util.Properties;
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java b/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java
index 22918be..89ea53a 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java
+++ b/src/main/java/cn/ac/iie/origion/utils/KafkaLogNtc.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.trident.aggregate.utils;
+package cn.ac.iie.origion.utils;
import org.apache.kafka.clients.producer.*;
@@ -14,6 +14,7 @@ import java.util.Properties;
*/
public class KafkaLogNtc {
+
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
/**
@@ -40,15 +41,15 @@ public class KafkaLogNtc {
public void sendMessage(String message) {
final int[] errorSum = {0};
- kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
- errorSum[0]++;
- }
+ kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
+ errorSum[0]++;
}
- });
+ }
+ });
kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
@@ -71,5 +72,4 @@ public class KafkaLogNtc {
kafkaProducer = new KafkaProducer<>(properties);
}
-
}
diff --git a/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java
new file mode 100644
index 0000000..6c0d6bc
--- /dev/null
+++ b/src/main/java/cn/ac/iie/origion/utils/TupleUtils.java
@@ -0,0 +1,23 @@
+package cn.ac.iie.origion.utils;
+
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * 用于检测是否是系统发送的tuple
+ *
+ * @author Administrator
+ */
+public final class TupleUtils {
+ /**
+ * 判断是否系统自动发送的Tuple
+ *
+ * @param tuple 元组
+ * @return true or false
+ */
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java
deleted file mode 100644
index 5ac557b..0000000
--- a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package cn.ac.iie.trident.aggregate;
-
-
-import com.alibaba.fastjson.JSON;
-import cn.ac.iie.trident.aggregate.bean.ValueBean;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.trident.operation.BaseAggregator;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-
-import java.util.HashMap;
-
-
-/**
- * @ClassNameAggCount
- * @Author [email protected]
- * @Date2020/6/1 10:48
- * @Version V1.0
- **/
-public class AggCount extends BaseAggregator<AggCount.State> {
-
-
- static class State {
- HashMap<String,ValueBean> map = new HashMap();
- ValueBean valueBean = new ValueBean();
- ValueBean tupleValueBean = new ValueBean();
- String key = "";
- }
-
- @Override
- public AggCount.State init(Object batchId, TridentCollector collector) {
-
- return new State();
- }
-
- /**
- * 聚合一个tuple
- * @param state
- * @param tuple
- * @param collector
- */
- @Override
- public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
-
-
- //TODO 获取一条tuple数据的key和value
- state.key = tuple.getStringByField("key");
- state.tupleValueBean = JSON.parseObject(tuple.getStringByField("value"),ValueBean.class);
-
-
- //TODO 获取HashMap中的key对应的value,如果没有就默认为null
- state.valueBean = state.map.getOrDefault(state.key, new ValueBean());
-
-
- //TODO 聚合两个value
- state.valueBean = addValueBean(state.valueBean,state.tupleValueBean);
-
- //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖
- state.map.put(state.key, state.valueBean);
-
-
-
- }
-
- /**
- * 处理一批tuple
- * @param state
- * @param collector
- */
- @Override
- public void complete(State state, TridentCollector collector) {
-
- collector.emit(new Values(JSON.toJSONString(state.map)));
-
-
- }
-
- /**
- * 将两个ValueBean中的相应的属性相加
- * @param result
- * @param value2
- * @return
- */
-
- public ValueBean addValueBean(ValueBean result, ValueBean value2){
-
- result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
- result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
- result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
- result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
- result.setCommon_sessions(result.getCommon_sessions() + 1L);
-
-
- return result;
- }
-
-}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
deleted file mode 100644
index f63884d..0000000
--- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package cn.ac.iie.trident.aggregate;
-
-import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
-import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
-import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
-import org.apache.storm.tuple.Fields;
-
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @ClassNameWcTopo
- * @Author [email protected]
- * @Date2020/5/29 10:38
- * @Version V1.0
- **/
-public class AggregateTopology {
-
-
-
-
- public static void main(String[] args) {
- //TODO 创建一个topo任务
- TridentTopology topology = new TridentTopology();
- //TODO 为Topo绑定Spout
- OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
-
- topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .name("one")
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
- .name("two")
- .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
- .name("three")
- .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .name("four")
- .each(new Fields("map"), new KafkaBolt(), new Fields())
- .name("five")
- .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
- .name("six");
-
- Config config = new Config();
-// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
- config.setDebug(false);
- config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量
- LocalCluster cluster = new LocalCluster();
-
-
- cluster.submitTopology("trident-wordcount", config, topology.build());
-// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
- }
-}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java
deleted file mode 100644
index 3c70f01..0000000
--- a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package cn.ac.iie.trident.aggregate.bolt;
-
-import cn.ac.iie.trident.aggregate.bean.ConnectionRecordLog;
-import cn.ac.iie.trident.aggregate.bean.ValueBean;
-import cn.ac.iie.trident.aggregate.utils.KafkaLogNtc;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-/**
- * @ClassNameKafkaBolt
- * @Author [email protected]
- * @Date2020/6/3 16:50
- * @Version V1.0
- **/
-public class KafkaBolt extends BaseFunction {
-
-
- private static final long serialVersionUID = -2107081139682355171L;
- private static KafkaLogNtc kafkaLogNtc;
- private static ConnectionRecordLog connectionRecordLog = new ConnectionRecordLog();
- private static JSONObject jsonObject;
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
-
- if (kafkaLogNtc == null) {
- kafkaLogNtc = KafkaLogNtc.getInstance();
- }
-
- //TODO 解析成json对象,方便以后的遍历
- jsonObject = JSONObject.parseObject(tuple.getStringByField("map"));
-
- for (String key : jsonObject.keySet()) {
-
- //TODO 为Key赋值
-
- JSONObject keys = JSONObject.parseObject(key);
-
- connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
- connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
- connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
- connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
- connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
- connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
- connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
- connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
- connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
- connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
- connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
- connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
- connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
- connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
- connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
-
-
-
- //TODO 为Value赋值
-
- ValueBean valueBean = JSONObject.parseObject(jsonObject.get(key).toString(), ValueBean.class);
- connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
- connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
- connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
- connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
- connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
-
- kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
-
-
- }
-
-
- }
-}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
deleted file mode 100644
index 2583b5b..0000000
--- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package cn.ac.iie.trident.aggregate.spout;
-
-import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.StringScheme;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.kafka.trident.TridentKafkaConfig;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-/**
- * @ClassNameKafkaSpout
- * @Author [email protected]
- * @Date2020/6/4 11:55
- * @Version V1.0
- **/
-public class TridentKafkaSpout {
-
-
- /**
- * kafka生产者适配器(单例),用来代理kafka生产者发送消息
- */
- private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout;
-
- public static OpaqueTridentKafkaSpout getInstance() {
- if (opaqueTridentKafkaSpout == null) {
-
- BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS);
- TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
- kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
- kafkaConfig.startOffsetTime = -1L;
- kafkaConfig.socketTimeoutMs=60000;
-
- //不透明事务型Spout
- opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
- }
- return opaqueTridentKafkaSpout;
- }
-
-}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
deleted file mode 100644
index d981fe1..0000000
--- a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package cn.ac.iie.trident.aggregate.topology;
-
-
-import cn.ac.iie.trident.aggregate.AggCount;
-import cn.ac.iie.trident.aggregate.ParseJson2KV;
-import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
-import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
-import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
-import org.apache.log4j.Logger;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
-import org.apache.storm.tuple.Fields;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Storm程序主类
- *
- * @author Administrator
- */
-
-public class LogFlowWriteTopology {
- private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
-
- private TopologyBuilder builder;
- private static TridentTopology tridentTopology;
-
-
-
-
-
- private static Config createTopologConfig() {
- Config conf = new Config();
- conf.setDebug(false);
- conf.setMessageTimeoutSecs(60);
- conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
- return conf;
- }
-
-
- private static StormTopology buildTopology() {
-
- OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
-
- tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
- .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
- .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .each(new Fields("map"), new KafkaBolt(), new Fields());
- return tridentTopology.build();
- }
-
- public static void main(String[] args) throws Exception {
-
-
- Config conf = new Config();
- conf.setDebug(false);
- conf.setMessageTimeoutSecs(60);
- conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
-
- //TODO 创建一个topo任务
- TridentTopology topology = new TridentTopology();
- //TODO 为Topo绑定Spout
- OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
-
- /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6
- .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9
- .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .each(new Fields("map"), new KafkaBolt(), new Fields());*/
-
-
- topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .name("one")
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
- .name("two")
- .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
- .name("three")
- .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .name("four")
- .each(new Fields("map"), new KafkaBolt(), new Fields())
- .name("five")
- .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
- .name("six");
-
- if(args.length == 0){//本地模式运行
-
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology("trident-function", conf, topology.build());
- Thread.sleep(100000);
- cluster.shutdown();
- }else{//集群模式运行
- StormSubmitter.submitTopology(args[0], conf, topology.build());
- }
-
- }
-}
diff --git a/src/main/java/cn/ac/iie/trident/log4j.properties b/src/main/java/cn/ac/iie/trident/log4j.properties
deleted file mode 100644
index 8d4ada4..0000000
--- a/src/main/java/cn/ac/iie/trident/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#Log4j
-log4j.rootLogger=warn,console,file
-# 控制台日志设置
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=info
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
-
-# 文件日志设置
-log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.file.Threshold=error
-log4j.appender.file.encoding=UTF-8
-log4j.appender.file.Append=true
-#路径请用相对路径,做好相关测试输出到应用目下
-log4j.appender.file.file=storm-topology.log
-log4j.appender.file.DatePattern='.'yyyy-MM-dd
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
-log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
-#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
-log4j.logger.com.nis.web.dao=debug
-#bonecp数据源配置
-log4j.category.com.jolbox=debug,console \ No newline at end of file
diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java
index 77c826f..71ab064 100644
--- a/src/test/java/com/wp/AppTest.java
+++ b/src/test/java/com/wp/AppTest.java
@@ -1,12 +1,6 @@
package com.wp;
-import cn.ac.iie.trident.aggregate.bean.ValueBean;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
+
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentTopology;