summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlee <[email protected]>2020-06-04 15:54:53 +0800
committerlee <[email protected]>2020-06-04 15:54:53 +0800
commita34f8c4df6280dc0a80b7e500c2869ba97971f9c (patch)
treeb127ac429614ee7ddd5bc8c711b8e9baace7272c
parent868364da6b1282c1c1c0dcaf2670a1b3d75a8bae (diff)
OLAP预聚合代码初始版本
-rw-r--r--pom.xml266
-rw-r--r--properties/service_flow_config.properties29
-rw-r--r--src/java/cn/ac/iie/bean/ValueTuple.java67
-rw-r--r--src/java/cn/ac/iie/bolt/CompletionBolt.java54
-rw-r--r--src/java/cn/ac/iie/bolt/NtcLogSendBolt.java71
-rw-r--r--src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java51
-rw-r--r--src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java81
-rw-r--r--src/java/cn/ac/iie/topology/LogFlowWriteTopology.java92
-rw-r--r--src/java/cn/ac/iie/topology/StormRunner.java35
-rw-r--r--src/java/cn/ac/iie/utils/general/Aggregate.java152
-rw-r--r--src/java/cn/ac/iie/utils/http/HttpClientUtil.java55
-rw-r--r--src/java/cn/ac/iie/utils/json/JsonParseUtil.java204
-rw-r--r--src/java/cn/ac/iie/utils/system/TupleUtils.java23
-rw-r--r--src/java/cn/ac/iie/utils/tuple/ThreeTuple.java20
-rw-r--r--src/java/cn/ac/iie/utils/tuple/TupleAggregate.java195
-rw-r--r--src/java/cn/ac/iie/utils/tuple/TwoTuple.java32
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggCount.java98
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java45
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java67
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java (renamed from src/java/cn/ac/iie/bean/KeyTuple.java)95
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java176
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java59
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java75
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java (renamed from src/java/cn/ac/iie/common/FlowWriteConfig.java)7
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java (renamed from src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java)2
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java (renamed from src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java)15
-rw-r--r--src/main/java/cn/ac/iie/trident/log4j.properties (renamed from src/java/log4j.properties)2
-rw-r--r--src/test/java/com/wp/AppTest.java85
28 files changed, 758 insertions, 1395 deletions
diff --git a/pom.xml b/pom.xml
index 11afe9c..f6c2779 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,149 +7,55 @@
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
+
<name>log-stream-aggregation</name>
<url>http://maven.apache.org</url>
- <repositories>
- <repository>
- <id>nexus</id>
- <name>Team Nexus Repository</name>
- <url>http://192.168.40.125:8099/content/groups/public</url>
- </repository>
- </repositories>
+
+ <!--<repositories>-->
+ <!--<repository>-->
+ <!--<id>nexus</id>-->
+ <!--<name>Team Nexus Repository</name>-->
+ <!--<url>http://192.168.40.125:8099/content/groups/public</url>-->
+ <!--</repository>-->
+ <!--</repositories>-->
<build>
<plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>cn.ac.iie.topology.LogFlowWriteTopology</mainClass>
- </transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.handlers</resource>
- </transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.schemas</resource>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
</plugins>
- <resources>
- <resource>
- <directory>properties</directory>
- <includes>
- <include>**/*.properties</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- <resource>
- <directory>properties</directory>
- <includes>
- <include>log4j.properties</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- </resources>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <kafka.version>1.0.0</kafka.version>
- <storm.version>1.0.2</storm.version>
- <hbase.version>2.2.1</hbase.version>
- <hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>1.0.0</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- <scope>provided</scope>
+ <artifactId>storm-kafka</artifactId>
+ <version>1.0.2</version>
<exclusions>
<exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
- </dependency>
- <dependency>
- <groupId>org.influxdb</groupId>
- <artifactId>influxdb-java</artifactId>
- <version>2.1</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>${storm.version}</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.59</version>
- </dependency>
-
- <dependency>
- <groupId>cglib</groupId>
- <artifactId>cglib-nodep</artifactId>
- <version>3.2.4</version>
- </dependency>
-
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
@@ -166,125 +72,83 @@
</exclusions>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.9</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.0.1</version>
<exclusions>
<exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
<artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
+ <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.10.0</version>
+ <type>pom</type>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
+ <!-- https://mvnrepository.com/artifact/org.apache.thrift.tools/maven-thrift-plugin -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.thrift.tools</groupId>
+ <artifactId>maven-thrift-plugin</artifactId>
+ <version>0.1.11</version>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.59</version>
</dependency>
+
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>5.3.2</version>
- <scope>compile</scope>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.2</version>
</dependency>
-
-
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.2</version>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.10</version>
</dependency>
+ <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core &ndash;&gt;-->
+ <!--<dependency>-->
+ <!--<groupId>org.jgrapht</groupId>-->
+ <!--<artifactId>jgrapht-core</artifactId>-->
+ <!--<version>1.1.0</version>-->
+ <!--</dependency>-->
+ <!-- https://mvnrepository.com/artifact/org.jgrapht/jgrapht-dist -->
+ <dependency>
+ <groupId>org.jgrapht</groupId>
+ <artifactId>jgrapht-dist</artifactId>
+ <version>1.0.1</version>
+ <type>pom</type>
+ </dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <version>4.4.1</version>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.0.0</version>
+ <scope>compile</scope>
</dependency>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 8072d1c..ce08511 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -3,7 +3,7 @@
bootstrap.servers=192.168.40.127:9093
#zookeeper 地址
-zookeeper.servers=192.168.40.127:2182
+zookeeper.servers=192.168.40.127:2182/kafka-test
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
@@ -21,13 +21,14 @@ kafka.compression.type=none
#kafka broker下的topic名称
#kafka.topic=SECURITY-EVENT-LOG
-kafka.topic=CONNECTION-RECORD-LOG
+kafka.topic=test528
+#kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=lxk-200512
#输出topic
-results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
+results.output.topic=agg_test
#results.output.topic=SECURITY-EVENT-COMPLETED-LOG
#storm topology workers
@@ -42,11 +43,6 @@ datacenter.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=1
-#定位库地址
-#ip.library=/home/ceiec/topology/dat/
-#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\
-ip.library=/dat/
-
#kafka批量条数
batch.insert.num=2000
@@ -56,14 +52,6 @@ schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/connection_reco
#数据中心(UID)
data.center.id.num=15
-#tick时钟频率
-topology.tick.tuple.freq.secs=5
-
-#hbase 更新时间
-hbase.tick.tuple.freq.secs=60
-
-#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效
-topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
@@ -76,12 +64,3 @@ max.failure.num=20
#邮件默认编码
mail.default.charset=UTF-8
-
-#influx地址
-influx.ip=http://192.168.40.151:8086
-
-#influx用户名
-influx.username=admin
-
-#influx密码
-influx.password=admin \ No newline at end of file
diff --git a/src/java/cn/ac/iie/bean/ValueTuple.java b/src/java/cn/ac/iie/bean/ValueTuple.java
deleted file mode 100644
index 4c12f52..0000000
--- a/src/java/cn/ac/iie/bean/ValueTuple.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package cn.ac.iie.bean;
-
-/**
- * @ClassNameValueTuple
- * @Author [email protected]
- * @Date2020/5/27 16:18
- * @Version V1.0
- **/
-public class ValueTuple {
-
- private int common_sessions;
- private int common_c2s_pkt_num;
- private int common_s2c_pkt_num;
- private int common_c2s_byte_num;
- private int common_s2c_byte_num;
-
- public int getCommon_sessions() {
- return common_sessions;
- }
-
- public void setCommon_sessions(int common_sessions) {
- this.common_sessions = common_sessions;
- }
-
- public int getCommon_c2s_pkt_num() {
- return common_c2s_pkt_num;
- }
-
- public void setCommon_c2s_pkt_num(int common_c2s_pkt_num) {
- this.common_c2s_pkt_num = common_c2s_pkt_num;
- }
-
- public int getCommon_s2c_pkt_num() {
- return common_s2c_pkt_num;
- }
-
- public void setCommon_s2c_pkt_num(int common_s2c_pkt_num) {
- this.common_s2c_pkt_num = common_s2c_pkt_num;
- }
-
- public int getCommon_c2s_byte_num() {
- return common_c2s_byte_num;
- }
-
- public void setCommon_c2s_byte_num(int common_c2s_byte_num) {
- this.common_c2s_byte_num = common_c2s_byte_num;
- }
-
- public int getCommon_s2c_byte_num() {
- return common_s2c_byte_num;
- }
-
- public void setCommon_s2c_byte_num(int common_s2c_byte_num) {
- this.common_s2c_byte_num = common_s2c_byte_num;
- }
-
- @Override
- public String toString() {
- return "ValueTuple{" +
- "common_sessions=" + common_sessions +
- ", common_c2s_pkt_num=" + common_c2s_pkt_num +
- ", common_s2c_pkt_num=" + common_s2c_pkt_num +
- ", common_c2s_byte_num=" + common_c2s_byte_num +
- ", common_s2c_byte_num=" + common_s2c_byte_num +
- '}';
- }
-}
diff --git a/src/java/cn/ac/iie/bolt/CompletionBolt.java b/src/java/cn/ac/iie/bolt/CompletionBolt.java
deleted file mode 100644
index e34ab88..0000000
--- a/src/java/cn/ac/iie/bolt/CompletionBolt.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package cn.ac.iie.bolt;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import cn.ac.iie.utils.system.TupleUtils;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class CompletionBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(CompletionBolt.class);
- private static final long serialVersionUID = 9006119186526123734L;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- }
-
- @SuppressWarnings("Duplicates")
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
- try {
- String message = tuple.getString(0);
- if (StringUtil.isNotBlank(message)) {
- basicOutputCollector.emit(new Values(message));
- }
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
- e.printStackTrace();
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>(16);
- conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
- FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
- return conf;
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("connLog"));
- }
-
-}
diff --git a/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java
deleted file mode 100644
index fa8fab3..0000000
--- a/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package cn.ac.iie.bolt;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import cn.ac.iie.utils.kafka.KafkaLogNtc;
-import cn.ac.iie.utils.system.TupleUtils;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @date 2018/8/14
- */
-public class NtcLogSendBolt extends BaseBasicBolt {
- private static final long serialVersionUID = -3663610927224396615L;
- private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
- private List<String> list;
- private KafkaLogNtc kafkaLogNtc;
-
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- list = new LinkedList<>();
- kafkaLogNtc = KafkaLogNtc.getInstance();
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
- try {
- if (TupleUtils.isTick(tuple)) {
- if (list.size() != 0) {
- kafkaLogNtc.sendMessage(list);
- list.clear();
- }
- } else {
- String message = tuple.getString(0);
- if (StringUtil.isNotBlank(message)) {
- list.add(message);
- }
- if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
- kafkaLogNtc.sendMessage(list);
- list.clear();
- }
- }
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
- e.printStackTrace();
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<>(16);
- conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- return conf;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- }
-
-}
diff --git a/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java b/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java
deleted file mode 100644
index 7dbe16c..0000000
--- a/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package cn.ac.iie.bolt.radius;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-/**
- * 通联关系日志补全
- *
- * @author qidaijie
- */
-public class RadiusCompletionBolt extends BaseBasicBolt {
-
- private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class);
- private static final long serialVersionUID = -3657802387129063952L;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
-
- }
- @SuppressWarnings("Duplicates")
- @Override
- public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
- try {
- String message = tuple.getString(0);
- if (StringUtil.isNotBlank(message)) {
- basicOutputCollector.emit(new Values(message));
- }
- } catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("connLog"));
- }
-
-}
diff --git a/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
deleted file mode 100644
index 6b2d82b..0000000
--- a/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package cn.ac.iie.spout;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.log4j.Logger;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * kafkaSpout
- *
- * @author Administrator
- */
-public class CustomizedKafkaSpout extends BaseRichSpout {
- private static final long serialVersionUID = -3363788553406229592L;
- private KafkaConsumer<String, String> consumer;
- private SpoutOutputCollector collector = null;
- private TopologyContext context = null;
- private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
-
-
- private static Properties createConsumerConfig() {
- Properties props = new Properties();
- props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
- props.put("group.id", FlowWriteConfig.GROUP_ID);
- props.put("session.timeout.ms", "60000");
- props.put("max.poll.records", 3000);
- props.put("max.partition.fetch.bytes", 31457280);
- props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- return props;
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- // TODO Auto-generated method stub
- this.collector = collector;
- this.context = context;
- Properties prop = createConsumerConfig();
- this.consumer = new KafkaConsumer<>(prop);
- this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
- }
-
- @Override
- public void close() {
- consumer.close();
- }
-
- @Override
- public void nextTuple() {
- try {
- // TODO Auto-generated method stub
- ConsumerRecords<String, String> records = consumer.poll(10000L);
- Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
- for (ConsumerRecord<String, String> record : records) {
- this.collector.emit(new Values(record.value()));
- }
- } catch (Exception e) {
- logger.error("KafkaSpout发送消息出现异常!", e);
- e.printStackTrace();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
- declarer.declare(new Fields("source"));
- }
-}
diff --git a/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java
deleted file mode 100644
index ca90ca6..0000000
--- a/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package cn.ac.iie.topology;
-
-
-import cn.ac.iie.bolt.CompletionBolt;
-import cn.ac.iie.bolt.NtcLogSendBolt;
-import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import cn.ac.iie.spout.CustomizedKafkaSpout;
-import org.apache.log4j.Logger;
-import org.apache.storm.Config;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Storm程序主类
- *
- * @author Administrator
- */
-
-public class LogFlowWriteTopology {
- private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
- private final String topologyName;
- private final Config topologyConfig;
- private TopologyBuilder builder;
-
- private LogFlowWriteTopology() {
- this(LogFlowWriteTopology.class.getSimpleName());
- }
-
- private LogFlowWriteTopology(String topologyName) {
- this.topologyName = topologyName;
- topologyConfig = createTopologConfig();
- }
-
- private Config createTopologConfig() {
- Config conf = new Config();
- conf.setDebug(false);
- conf.setMessageTimeoutSecs(60);
- conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
- conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS);
- return conf;
- }
-
- private void runLocally() throws InterruptedException {
- topologyConfig.setMaxTaskParallelism(1);
- StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
- }
-
- private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
- topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
- //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
- topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
- StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
- }
-
- private void buildTopology() {
- builder = new TopologyBuilder();
- builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
-
- builder.setBolt(FlowWriteConfig.KAFKA_TOPIC, new CompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
-
- builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping(FlowWriteConfig.KAFKA_TOPIC);
-
-
- }
-
- public static void main(String[] args) throws Exception {
- LogFlowWriteTopology csst = null;
- boolean runLocally = true;
- String parameter = "remote";
- int size = 2;
- if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
- runLocally = false;
- csst = new LogFlowWriteTopology(args[0]);
- } else {
- csst = new LogFlowWriteTopology();
- }
-
- csst.buildTopology();
-
- if (runLocally) {
- logger.info("执行本地模式...");
- csst.runLocally();
- } else {
- logger.info("执行远程部署模式...");
- csst.runRemotely();
- }
- }
-}
diff --git a/src/java/cn/ac/iie/topology/StormRunner.java b/src/java/cn/ac/iie/topology/StormRunner.java
deleted file mode 100644
index f5094a4..0000000
--- a/src/java/cn/ac/iie/topology/StormRunner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package cn.ac.iie.topology;
-
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * @author Administrator
- */
-public final class StormRunner{
- private static final int MILLS_IN_SEC = 1000;
-
- private StormRunner() {}
-
- public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
-
- LocalCluster localCluster = new LocalCluster();
- localCluster.submitTopology(topologyName, conf, builder.createTopology());
- Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
- localCluster.shutdown();
-
- }
-
- public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
-
- StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
- }
-
-
-}
diff --git a/src/java/cn/ac/iie/utils/general/Aggregate.java b/src/java/cn/ac/iie/utils/general/Aggregate.java
deleted file mode 100644
index e427574..0000000
--- a/src/java/cn/ac/iie/utils/general/Aggregate.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package cn.ac.iie.utils.general;
-
-import cn.ac.iie.bean.ValueTuple;
-import cn.ac.iie.common.FlowWriteConfig;
-import cn.ac.iie.utils.json.JsonParseUtil;
-import cn.ac.iie.utils.tuple.TwoTuple;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.StringUtil;
-
-import java.util.HashMap;
-
-public class Aggregate {
-
-
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
-
- /**
- * 反射成一个类
- */
- private static Object mapObject = JsonParseUtil.generateObject(map);
-
- private static String key;
-
- private static HashMap<String, ValueTuple> resultMap = new HashMap<>();
-
- private static Object conn;
-
- private static ValueTuple valueTuple = new ValueTuple();
-
- private static String test = "{\"bgp_as_num\":\"100\",\"bgp_route\":\"192.168.222.0/24\",\"bgp_type\":1,\"common_action\":4,\"common_address_list\":\"\",\"common_address_type\":4,\"common_app_id\":0,\"common_app_label\":\"\",\"common_c2s_byte_num\":650,\"common_c2s_pkt_num\":7,\"common_client_asn\":\"9198\",\"common_client_ip\":\"95.56.198.87\",\"common_client_location\":\"Pervomayskiy,Almaty oblysy,哈萨克斯坦\",\"common_client_port\":13555,\"common_con_duration_ms\":154122,\"common_device_id\":\"2506398\",\"common_direction\":0,\"common_encapsulation\":0,\"common_end_time\":1590388545,\"common_entrance_id\":20,\"common_has_dup_traffic\":1,\"common_isp\":\"CMCC\",\"common_l4_protocol\":\"VLAN\",\"common_link_id\":1,\"common_log_id\":126995036504993794,\"common_policy_id\":0,\"common_protocol_id\":0,\"common_recv_time\":1590388694,\"common_s2c_byte_num\":9921,\"common_s2c_pkt_num\":21,\"common_schema_type\":\"SSL\",\"common_server_asn\":\"12876\",\"common_server_ip\":\"62.210.101.44\",\"common_server_location\":\"法国\",\"common_server_port\":22,\"common_service\":7,\"common_sled_ip\":\"192.168.10.36\",\"common_start_time\":1590388490,\"common_stream_dir\":2,\"common_stream_error\":\"\",\"common_stream_trace_id\":6193492085736674541,\"common_user_region\":\"prysUgOCWSmUYcGRL5rcUvVc8zbI9MOtlb9KOvH8rZCMRVqnIEyQVtQfBp94IIIjha24tGQ4x33qtC3jSx8udADkuezGGzrTrxCB\",\"common_user_tags\":\"9PD3v4GaIgS97l4wQnRtahW00YBi3933RDQg8PpiB8R9ftXhELHploJ0Ocg1Pj0xH06svaPbY2Tp1Chb0usQPttRqhpNbHTkW3En\",\"dns_aa\":0,\"dns_ancount\":64,\"dns_arcount\":22,\"dns_cname\":\"bFh2JvWJMWTCNcVEyuroMimLhoNM3O4effDDiNA9SRlCFdzaev10\",\"dns_message_id\":744559,\"dns_nscount\":59,\"dns_opcode\":0,\"dns_qclass\":2,\"dns_qdcount\":26,\"dns_qname\":\"kankanews.com12041281\",\"dns_qr\":1,\"dns_qtype\":5,\"dns_ra\":0,\"dns_rcode\":9,\"dns_rd\":0,\"dns_rr\":\"{\\\"aEWseVK\\\":\\\"UEUZ4qlk8qOjJBZ4\\\",\\\"9jGNxy0\\\":\\\"s075dZOXDXZ\\\",\\\"yyNXYD9G\\\":\\\"EEKxB99FuYDHH2E6NrV\\\",\\\"al23zn\\\":\\\"4dX1qd5L0A\\\"}\",\"dns_sub\":1,\"dns_tc\":0,\"ftp_account\":\"JXU2RDRCJXU4QkQ1\",\"ftp_content\":\"JXU2RDRCJXU4QkQ1\",\"ftp_url\":\"ftp://test:[email protected]/soft/list.txt\",\"http_content_length\":\"37339\",\"http_content_type\":\"application/x-javascript\",\"http_cookie\":\"BD_UPN=12314753\",\"http_domain\":\"163.com\",\"http_host\":\"v.163.com\",\"http_proxy_flag\":1,\"http_referer\":\"https://www.baidu.com/\",\"http_request_body\":\"\",\"http_request_body_key\":\"\",\"http_request_header\":\"\",\"http_request_line\":\"GET www.baidu.com/ HTTP/1.1\",\"http_response_body\":\"\",\"http_response_body_key\":\"\",\"http_response_header\":\"\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_sequence\":9,\"http_set_cookie\":\"delPer=0; path=/; domain=.baidu.com\",\"http_snapshot\":\"\",\"http_url\":\"http://v.163.com/movie/2011/7/0/3/M7B9K1R60_M7BAANT03.html\",\"http_user_agent\":\"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.87 Safari/537.36\",\"http_version\":\"http1\",\"mail_account\":\"123456789\",\"mail_attachment_content\":\"\",\"mail_attachment_name\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_attachment_name_charset\":\"UTF-8\",\"mail_bcc\":\"\",\"mail_cc\":\"[email protected]\",\"mail_content\":\"\",\"mail_content_charset\":\"\",\"mail_eml_file\":\"\",\"mail_from\":\"[email protected]\",\"mail_from_cmd\":\"[email protected]\",\"mail_protocol_type\":\"POP3\",\"mail_snapshot\":\"\",\"mail_subject\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_subject_charset\":\"UTF-8\",\"mail_to\":\"[email protected]\",\"mail_to_cmd\":\"[email protected]\",\"ssl_cert_verify\":0,\"ssl_client_side_latency\":5237,\"ssl_client_side_version\":\"SSLv3\",\"ssl_cn\":\"\",\"ssl_con_latency_ms\":3,\"ssl_error\":\"\",\"ssl_intercept_state\":0,\"ssl_pinningst\":1,\"ssl_san\":\"\",\"ssl_server_side_latency\":4644,\"ssl_server_side_version\":\"TLSv1.1\",\"ssl_sni\":\"cztv.com11547021\",\"ssl_version\":\"V3\",\"streaming_media_protocol\":\"RTP\",\"streaming_media_url\":\"http://home.sogua.com/lujingai/mv/play.aspx?id=30195689\",\"voip_called_account\":\"13307536537\",\"voip_called_number\":\"15301710004\",\"voip_calling_account\":\"15901848931\",\"voip_calling_number\":\"13908208553\"}";
-
-
- public static void main(String[] args) {
-
-
-
- resultMap = aggregateJsonToMap(resultMap, test);
-
- System.out.println("聚合一次之后: " + resultMap.get(key));
-
- resultMap = aggregateJsonToMap(resultMap, test);
-
- System.out.println("聚合两次之后: " + resultMap.get(key));
-
-
- }
-
- /**
- * 将一条新数据累加到HashMap中
- * @param map
- * @param message
- * @return
- */
- public static HashMap<String, ValueTuple> aggregateJsonToMap(HashMap<String,ValueTuple> map, String message) {
-
- ValueTuple valueTuple = JSON.parseObject(message, ValueTuple.class);
-
- key = getKey(message);
-
- map.put(key,addTuple(map.get(key), valueTuple));
-
- return map;
- }
-
- /**
- * 两个ValueTuple类型的对象做相应属性的聚合
- * @param result
- * @param message
- * @return
- */
- public static ValueTuple addTuple(ValueTuple result,ValueTuple message){
-
- if (result == null){
-
- result = new ValueTuple();
- }
-
- result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + message.getCommon_s2c_byte_num());
- result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + message.getCommon_c2s_byte_num());
- result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + message.getCommon_s2c_pkt_num());
- result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + message.getCommon_c2s_pkt_num());
- result.setCommon_sessions(result.getCommon_sessions() + message.getCommon_sessions());
-
- return result;
- }
-
- public static String getKey(String message){
- Object conn = JSONObject.parseObject(message, mapObject.getClass());
- //TODO key
- Object common_policy_id = JsonParseUtil.getValue(conn, "common_policy_id");
- Object common_action = JsonParseUtil.getValue(conn, "common_action");
- Object common_sub_action = JsonParseUtil.getValue(conn, "common_sub_action");
- Object common_client_ip = JsonParseUtil.getValue(conn, "common_client_ip");
- Object common_client_location = JsonParseUtil.getValue(conn, "common_client_location");
- Object common_sled_ip = JsonParseUtil.getValue(conn, "common_sled_ip");
- Object common_device_id = JsonParseUtil.getValue(conn, "common_device_id");
- Object common_subscriber_id = JsonParseUtil.getValue(conn, "common_subscriber_id");
- Object common_server_ip = JsonParseUtil.getValue(conn, "common_server_ip");
- Object common_server_location = JsonParseUtil.getValue(conn, "common_server_location");
- Object common_server_port = JsonParseUtil.getValue(conn, "common_server_port");
- Object common_l4_protocol = JsonParseUtil.getValue(conn, "common_l4_protocol");
- Object http_domain = JsonParseUtil.getValue(conn, "http_domain");
- Object ssl_sni = JsonParseUtil.getValue(conn, "ssl_sni");
-
-
- StringBuilder builder = new StringBuilder();
- builder.append(common_policy_id).append("_")
- .append(common_action).append("_")
- .append(common_sub_action).append("_")
- .append(common_client_ip).append("_")
- .append(common_client_location).append("_")
- .append(common_sled_ip).append("_")
- .append(common_device_id).append("_")
- .append(common_subscriber_id).append("_")
- .append(common_server_ip).append("_")
- .append(common_server_location).append("_")
- .append(common_server_port).append("_")
- .append(common_l4_protocol).append("_")
- .append(http_domain).append("_")
- .append(ssl_sni);
-
- return builder.toString();
- }
-
-
-/* public static ValueTuple getValueTuple(String message){
-
- conn = JSONObject.parseObject(message, mapObject.getClass());
- Object common_sessions = JsonParseUtil.getValue(conn, "common_sessions");
-
- if (StringUtil.isEmpty(common_sessions)) {
- common_sessions = 0;
- }
- Object common_c2s_pkt_num = JsonParseUtil.getValue(conn, "common_c2s_pkt_num");
- Object common_s2c_pkt_num = JsonParseUtil.getValue(conn, "common_s2c_pkt_num");
- Object common_c2s_byte_num = JsonParseUtil.getValue(conn, "common_c2s_byte_num");
- Object common_s2c_byte_num = JsonParseUtil.getValue(conn, "common_s2c_byte_num");
-
- valueTuple.setCommon_sessions((int) common_sessions);
- valueTuple.setCommon_c2s_pkt_num((int) common_c2s_pkt_num);
- valueTuple.setCommon_s2c_pkt_num((int) common_s2c_pkt_num);
- valueTuple.setCommon_c2s_byte_num((int) common_c2s_byte_num);
- valueTuple.setCommon_s2c_byte_num((int) common_s2c_byte_num);
-
- return valueTuple;
-
- }*/
-}
diff --git a/src/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/java/cn/ac/iie/utils/http/HttpClientUtil.java
deleted file mode 100644
index 347a69b..0000000
--- a/src/java/cn/ac/iie/utils/http/HttpClientUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package cn.ac.iie.utils.http;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * 获取网关schema的工具类
- *
- * @author qidaijie
- */
-public class HttpClientUtil {
-
- /**
- * 请求网关获取schema
- * @param http 网关url
- * @return schema
- */
- public static String requestByGetMethod(String http) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder = null;
- try {
- HttpGet get = new HttpGet(http);
- try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
- HttpEntity entity = httpResponse.getEntity();
- entityStringBuilder = new StringBuilder();
- if (null != entity) {
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- String line = null;
- while ((line = bufferedReader.readLine()) != null) {
- entityStringBuilder.append(line);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (httpClient != null) {
- httpClient.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return entityStringBuilder.toString();
- }
-
-}
diff --git a/src/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/java/cn/ac/iie/utils/json/JsonParseUtil.java
deleted file mode 100644
index 0350e7f..0000000
--- a/src/java/cn/ac/iie/utils/json/JsonParseUtil.java
+++ /dev/null
@@ -1,204 +0,0 @@
-package cn.ac.iie.utils.json;
-
-import cn.ac.iie.common.FlowWriteConfig;
-import cn.ac.iie.utils.http.HttpClientUtil;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.StringUtil;
-import net.sf.cglib.beans.BeanGenerator;
-import net.sf.cglib.beans.BeanMap;
-
-import java.util.*;
-
-/**
- * 使用FastJson解析json的工具类
- *
- * @author qidaijie
- */
-public class JsonParseUtil {
-
- /**
- * 模式匹配,给定一个类型字符串返回一个类类型
- *
- * @param type 类型
- * @return 类类型
- */
-
- private static Class getClassName(String type) {
- Class clazz;
-
- switch (type) {
- case "int":
- clazz = Integer.class;
- break;
- case "String":
- clazz = String.class;
- break;
- case "long":
- clazz = long.class;
- break;
- case "Integer":
- clazz = Integer.class;
- break;
- case "double":
- clazz = double.class;
- break;
- case "float":
- clazz = float.class;
- break;
- case "char":
- clazz = char.class;
- break;
- case "byte":
- clazz = byte.class;
- break;
- case "boolean":
- clazz = boolean.class;
- break;
- case "short":
- clazz = short.class;
- break;
- default:
- clazz = String.class;
- }
- return clazz;
- }
-
- /**
- * 根据反射生成对象的方法
- *
- * @param properties 反射类用的map
- * @return 生成的Object类型的对象
- */
- public static Object generateObject(Map properties) {
- BeanGenerator generator = new BeanGenerator();
- Set keySet = properties.keySet();
- for (Object aKeySet : keySet) {
- String key = (String) aKeySet;
- generator.addProperty(key, (Class) properties.get(key));
- }
- return generator.create();
- }
-
- /**
- * 获取属性值的方法
- *
- * @param obj 对象
- * @param property key
- * @return 属性的值
- */
- public static Object getValue(Object obj, String property) {
- BeanMap beanMap = BeanMap.create(obj);
- return beanMap.get(property);
- }
-
- /**
- * 更新属性值的方法
- *
- * @param obj 对象
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Object obj, String property, Object value) {
- BeanMap beanMap = BeanMap.create(obj);
- beanMap.put(property, value);
- }
-
- /**
- * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
- *
- * @param http 网关schema地址
- * @return 用于反射生成schema类型的对象的一个map集合
- */
- public static HashMap<String, Class> getMapFromHttp(String http) {
- HashMap<String, Class> map = new HashMap<>();
-
- String schema = HttpClientUtil.requestByGetMethod(http);
- Object data = JSON.parseObject(schema).get("data");
-
- //获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(data.toString());
- JSONArray fields = (JSONArray) schemaJson.get("fields");
-
- for (Object field : fields) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- String type = JSON.parseObject(field.toString()).get("type").toString();
- //组合用来生成实体类的map
- map.put(name, getClassName(type));
- }
-
- return map;
- }
-
-
- /**
- * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
- *
- * @param http 网关url
- * @return 任务列表
- */
- public static ArrayList<String[]> getJobListFromHttp(String http) {
- ArrayList<String[]> list = new ArrayList<>();
-
- String schema = HttpClientUtil.requestByGetMethod(http);
- //解析data
- Object data = JSON.parseObject(schema).get("data");
-
- //获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(data.toString());
- JSONArray fields = (JSONArray) schemaJson.get("fields");
-
- for (Object field : fields) {
-
- if (JSON.parseObject(field.toString()).containsKey("doc")) {
- Object doc = JSON.parseObject(field.toString()).get("doc");
-
- if (JSON.parseObject(doc.toString()).containsKey("format")) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- Object format = JSON.parseObject(doc.toString()).get("format");
- JSONObject formatObject = JSON.parseObject(format.toString());
-
- String functions = formatObject.get("functions").toString();
- String appendTo = null;
- String params = null;
-
- if (formatObject.containsKey("appendTo")) {
- appendTo = formatObject.get("appendTo").toString();
- }
-
- if (formatObject.containsKey("param")) {
- params = formatObject.get("param").toString();
- }
-
-
- if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], null});
- }
-
- } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
-
- }
- } else {
- list.add(new String[]{name, name, functions, params});
- }
-
- }
- }
-
- }
- return list;
- }
-
-
-} \ No newline at end of file
diff --git a/src/java/cn/ac/iie/utils/system/TupleUtils.java b/src/java/cn/ac/iie/utils/system/TupleUtils.java
deleted file mode 100644
index 53e14ca..0000000
--- a/src/java/cn/ac/iie/utils/system/TupleUtils.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package cn.ac.iie.utils.system;
-
-import org.apache.storm.Constants;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * 用于检测是否是系统发送的tuple
- *
- * @author Administrator
- */
-public final class TupleUtils {
- /**
- * 判断是否系统自动发送的Tuple
- *
- * @param tuple 元组
- * @return true or false
- */
- public static boolean isTick(Tuple tuple) {
- return tuple != null
- && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
- && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
- }
-}
diff --git a/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java b/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java
deleted file mode 100644
index 0529035..0000000
--- a/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package cn.ac.iie.utils.tuple;
-
-public class ThreeTuple<A,B,C> {
-
- public String first;
-
- public long second;
-
- public int third;
-
- public ThreeTuple(String name,long time, int sum){
- first = name;
- second = time;
- third = sum;
- }
-
- public String toString(){
- return "[" + first + ", " + second + ", " + third + "]";
- }
-}
diff --git a/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java b/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java
deleted file mode 100644
index b3d2e71..0000000
--- a/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package cn.ac.iie.utils.tuple;
-
-
-import com.alibaba.fastjson.JSON;
-
-
-import java.util.HashMap;
-
-public class TupleAggregate {
-
-
- private static TwoTuple<String, Integer> a = new TwoTuple<>("192.168.40.101", 1);
- private static TwoTuple<String, Integer> b = new TwoTuple<>("192.168.40.101", 1);
-
- private static ThreeTuple<String, Long, Integer> a1 = new ThreeTuple<>("lxk", 30L, 2);
- private static ThreeTuple<String, Long, Integer> b1 = new ThreeTuple<>("lxk", 20L, 2);
-
-
- public static TwoTuple<TwoTuple, ThreeTuple> parseJsonToTuple(String json) {
-
- CONN conn = JSON.parseObject(json, CONN.class);
- //二元组 key
- TwoTuple key = new TwoTuple<>(conn.getCip(), conn.getNum());
- //三元组 value
- ThreeTuple value = new ThreeTuple<>(conn.getName(), conn.getTime(), conn.getSum());
-
-
- return new TwoTuple<>(key, value);
- }
-
- /**
- * 聚合两个三元组
- *
- * @param tuple1
- * @param tuple2
- * @return
- */
- public static ThreeTuple addTuple(ThreeTuple tuple1, ThreeTuple tuple2) {
-
-
- tuple1.second += tuple2.second;
- tuple1.third += tuple2.third;
-
- return tuple1;
-
- }
-
- /**
- * 将一条新数据累加到HashMap中
- *
- * @param map
- * @param json
- * @return map1
- */
- public static HashMap aggregate(HashMap<TwoTuple, ThreeTuple> map, String json) {
-
-
- //TODO json解析成对象,取出key聚合组成tuple 与HashMap中具有相同key的聚合
-
- /**
- * 还存在的问题
- * 1. key是对象 就算值一样也不会相同 (重写HashCode和equal方法)
- *
- * 2. 拿key去map中取值,如果为null,后面聚合会报错 空指针异常
- */
-
- //一条日志 ==》 两元组
- TwoTuple<TwoTuple, ThreeTuple> tuple = parseJsonToTuple(json);
- //取出key
- TwoTuple key = tuple.first;
- //内存中的HashMap中获取具有相同key的value
- ThreeTuple value = map.get(key);
- //将两个value聚合,赋值给value
- value = addTuple(value, tuple.second);
- //聚合的结果放回到内存中的HashMap
- map.put(key, value);
-
- return map;
-
- }
-
- public static void main(String[] args) {
-
-// HashMap<TwoTuple, ThreeTuple> map1 = new HashMap<>();
-// a1 = addTuple(a1, b1);
-// System.out.println("聚合成功:" + a1);
-
-
- CONN conn1 = new CONN();
- CONN conn2 = new CONN();
-
- conn1.setCip("192.168.40.101");
- conn2.setCip("192.168.40.101");
-
- conn1.setNum(1);
- conn2.setNum(1);
-
- conn1.setName("lxk");
- conn2.setName("lxk");
-
- conn1.setTime(100L);
- conn2.setTime(200L);
-
- conn1.setSum(10);
- conn2.setSum(20);
-
- System.out.println("conn1" + conn1);
- System.out.println("conn2" + conn2);
-
- String json1 = JSON.toJSONString(conn1);
- String json2 = JSON.toJSONString(conn2);
-
-
- HashMap map = new HashMap<>();
-
- map.put(a, a1);
-
- System.out.println("开始的map:" + map);
-
- map = aggregate(map, json1);
-
- System.out.println("后来的map:" + map);
-
- System.out.println("a的HashCode: " + a.hashCode());
- System.out.println("b的HashCode: " + b.hashCode());
-
-
-
- }
-
-
-}
-
-class CONN {
-
- //二元组使用的 key
- private String cip;
- private int num;
-
- //三元组使用的 value
- private String name;
- private long time;
- private int sum;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public long getTime() {
- return time;
- }
-
- public void setTime(long time) {
- this.time = time;
- }
-
- public int getSum() {
- return sum;
- }
-
- public void setSum(int sum) {
- this.sum = sum;
- }
-
- public String getCip() {
- return cip;
- }
-
- public void setCip(String cip) {
- this.cip = cip;
- }
-
- public int getNum() {
- return num;
- }
-
- public void setNum(int num) {
- this.num = num;
- }
-
- @Override
- public String toString() {
- return "CONN{" +
- "cip='" + cip + '\'' +
- ", num=" + num +
- ", name='" + name + '\'' +
- ", time=" + time +
- ", sum=" + sum +
- '}';
- }
-} \ No newline at end of file
diff --git a/src/java/cn/ac/iie/utils/tuple/TwoTuple.java b/src/java/cn/ac/iie/utils/tuple/TwoTuple.java
deleted file mode 100644
index 618f3ee..0000000
--- a/src/java/cn/ac/iie/utils/tuple/TwoTuple.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package cn.ac.iie.utils.tuple;
-
-public class TwoTuple<A, B> {
-
- public A first;
-
- public B second;
-
- public TwoTuple(){};
- public TwoTuple(A cip, B num) {
- first = cip;
- second = num;
- }
-
- public String toString() {
- return "[" + first + ", " + second + "]";
- }
-
- @Override
- public int hashCode() {
-
- int result = (first != null && second != null) ? (first.toString() + second).hashCode() : 0;
-
- return result;
- }
-
- @Override
- public boolean equals(Object o) {
-
- return this.first.toString().equals(((TwoTuple) o).first.toString()) && this.first.toString().equals(((TwoTuple) o).first.toString());
- }
-}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java
new file mode 100644
index 0000000..5ac557b
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java
@@ -0,0 +1,98 @@
+package cn.ac.iie.trident.aggregate;
+
+
+import com.alibaba.fastjson.JSON;
+import cn.ac.iie.trident.aggregate.bean.ValueBean;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+import java.util.HashMap;
+
+
+/**
+ * @ClassNameAggCount
+ * @Author [email protected]
+ * @Date2020/6/1 10:48
+ * @Version V1.0
+ **/
+public class AggCount extends BaseAggregator<AggCount.State> {
+
+
+ static class State {
+ HashMap<String,ValueBean> map = new HashMap();
+ ValueBean valueBean = new ValueBean();
+ ValueBean tupleValueBean = new ValueBean();
+ String key = "";
+ }
+
+ @Override
+ public AggCount.State init(Object batchId, TridentCollector collector) {
+
+ return new State();
+ }
+
+ /**
+ * 聚合一个tuple
+ * @param state
+ * @param tuple
+ * @param collector
+ */
+ @Override
+ public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
+
+
+ //TODO 获取一条tuple数据的key和value
+ state.key = tuple.getStringByField("key");
+ state.tupleValueBean = JSON.parseObject(tuple.getStringByField("value"),ValueBean.class);
+
+
+ //TODO 获取HashMap中的key对应的value,如果没有就默认为null
+ state.valueBean = state.map.getOrDefault(state.key, new ValueBean());
+
+
+ //TODO 聚合两个value
+ state.valueBean = addValueBean(state.valueBean,state.tupleValueBean);
+
+ //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖
+ state.map.put(state.key, state.valueBean);
+
+
+
+ }
+
+ /**
+ * 处理一批tuple
+ * @param state
+ * @param collector
+ */
+ @Override
+ public void complete(State state, TridentCollector collector) {
+
+ collector.emit(new Values(JSON.toJSONString(state.map)));
+
+
+ }
+
+ /**
+ * 将两个ValueBean中的相应的属性相加
+ * @param result
+ * @param value2
+ * @return
+ */
+
+ public ValueBean addValueBean(ValueBean result, ValueBean value2){
+
+ result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
+ result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
+ result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
+ result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
+ result.setCommon_sessions(result.getCommon_sessions() + 1L);
+
+
+ return result;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
new file mode 100644
index 0000000..cd179cd
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
@@ -0,0 +1,45 @@
+package cn.ac.iie.trident.aggregate;
+
+import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
+import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
+import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.tuple.Fields;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @ClassNameWcTopo
+ * @Author [email protected]
+ * @Date2020/5/29 10:38
+ * @Version V1.0
+ **/
+public class AggregateTopology {
+
+ public static void main(String[] args) {
+ //TODO 创建一个topo任务
+ TridentTopology topology = new TridentTopology();
+ //TODO 为Topo绑定Spout
+ OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
+
+ topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
+ .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .each(new Fields("map"), new KafkaBolt(), new Fields());
+
+ Config config = new Config();
+ config.setDebug(false);
+ config.setNumWorkers(5);
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("trident-wordcount", config, topology.build());
+// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java b/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java
new file mode 100644
index 0000000..2a8191c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java
@@ -0,0 +1,67 @@
+package cn.ac.iie.trident.aggregate;
+
+import com.alibaba.fastjson.JSONObject;
+import cn.ac.iie.trident.aggregate.bean.KeyBean;
+import cn.ac.iie.trident.aggregate.bean.ValueBean;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * 把一个tuple解析成
+ * @ClassNameToJson
+ * @Author [email protected]
+ * @Date2020/5/29 16:05
+ * @Version V1.0
+ **/
+public class ParseJson2KV extends BaseFunction {
+
+
+ private static ValueBean valueBean = new ValueBean();
+ private static KeyBean keyBean = new KeyBean();
+
+
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+
+
+ //TODO 获取tuple输入内容,解析成map
+ Map map = JSONObject.parseObject(tuple.getStringByField("str"));
+
+
+ //TODO KEY
+
+ keyBean.setCommon_recv_time(0L);
+ keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString()));
+ keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString()));
+ keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString());
+ keyBean.setCommon_client_ip(map.getOrDefault("common_client_ip","").toString());
+ keyBean.setCommon_client_location(map.getOrDefault("common_client_location","").toString());
+ keyBean.setCommon_sled_ip(map.getOrDefault("common_sled_ip","").toString());
+ keyBean.setCommon_device_id(map.getOrDefault("common_device_id","").toString());
+ keyBean.setCommon_subscriber_id(map.getOrDefault("common_subscriber_id","").toString());
+ keyBean.setCommon_server_ip(map.getOrDefault("common_server_ip","").toString());
+ keyBean.setCommon_server_location(map.getOrDefault("common_server_location","").toString());
+ keyBean.setCommon_server_port(Integer.parseInt(map.getOrDefault("common_server_port","0" ).toString()));
+ keyBean.setCommon_l4_protocol(map.getOrDefault("common_l4_protocol","").toString());
+ keyBean.setHttp_domain(map.getOrDefault("http_domain","").toString());
+ keyBean.setSsl_sni(map.getOrDefault("ssl_sni","").toString());
+
+ //TODO VALUE
+
+
+ valueBean.setCommon_c2s_pkt_num(Long.parseLong(map.getOrDefault("common_c2s_pkt_num",0).toString()));
+ valueBean.setCommon_s2c_pkt_num(Long.parseLong(map.getOrDefault("common_s2c_pkt_num",0).toString()));
+ valueBean.setCommon_c2s_byte_num(Long.parseLong(map.getOrDefault("common_c2s_byte_num",0).toString()));
+ valueBean.setCommon_s2c_byte_num(Long.parseLong(map.getOrDefault("common_s2c_byte_num",0).toString()));
+ valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString()));
+
+
+ collector.emit(new Values(JSONObject.toJSONString(keyBean), JSONObject.toJSONString(valueBean)));
+
+ }
+}
diff --git a/src/java/cn/ac/iie/bean/KeyTuple.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java
index 8dc6bab..54fa4f3 100644
--- a/src/java/cn/ac/iie/bean/KeyTuple.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java
@@ -1,28 +1,45 @@
-package cn.ac.iie.bean;
+package cn.ac.iie.trident.aggregate.bean;
/**
- * @ClassNameKeyTuple
+ * @ClassNameConnectionRecordLog
- * @Date2020/5/27 16:18
+ * @Date2020/5/28 13:44
* @Version V1.0
**/
-public class KeyTuple {
-
- private int common_policy_id;
- private int common_action;
- private String common_sub_action;
- private String common_client_ip;
- private String common_client_location;
- private String common_sled_ip;
- private String common_device_id;
- private String common_subscriber_id;
- private String common_server_ip;
- private String common_server_location;
- private int common_server_port;
- private String common_l4_protocol;
- private String http_domain;
- private String ssl_sni;
+public class ConnectionRecordLog {
+ //key
+ private long common_recv_time;
+ private int common_policy_id;
+ private int common_action;
+ private String common_sub_action;
+ private String common_client_ip;
+ private String common_client_location;
+ private String common_sled_ip;
+ private String common_device_id;
+ private String common_subscriber_id;
+ private String common_server_ip;
+ private String common_server_location;
+ private int common_server_port;
+ private String common_l4_protocol;
+ private String http_domain;
+ private String ssl_sni;
+
+ //value
+ private long common_sessions;
+ private long common_c2s_pkt_num;
+ private long common_s2c_pkt_num;
+ private long common_c2s_byte_num;
+ private long common_s2c_byte_num;
+
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
public int getCommon_policy_id() {
return common_policy_id;
@@ -135,4 +152,44 @@ public class KeyTuple {
public void setSsl_sni(String ssl_sni) {
this.ssl_sni = ssl_sni;
}
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+
+ public long getCommon_c2s_pkt_num() {
+ return common_c2s_pkt_num;
+ }
+
+ public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
+ this.common_c2s_pkt_num = common_c2s_pkt_num;
+ }
+
+ public long getCommon_s2c_pkt_num() {
+ return common_s2c_pkt_num;
+ }
+
+ public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
+ this.common_s2c_pkt_num = common_s2c_pkt_num;
+ }
+
+ public long getCommon_c2s_byte_num() {
+ return common_c2s_byte_num;
+ }
+
+ public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
+ this.common_c2s_byte_num = common_c2s_byte_num;
+ }
+
+ public long getCommon_s2c_byte_num() {
+ return common_s2c_byte_num;
+ }
+
+ public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
+ this.common_s2c_byte_num = common_s2c_byte_num;
+ }
}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java
new file mode 100644
index 0000000..5515c48
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java
@@ -0,0 +1,176 @@
+package cn.ac.iie.trident.aggregate.bean;
+
+/**
+ * @ClassNameKeyBean
+ * @Author [email protected]
+ * @Date2020/6/3 18:52
+ * @Version V1.0
+ **/
+public class KeyBean {
+
+ private long common_recv_time;
+ private int common_policy_id;
+ private int common_action;
+ private String common_sub_action;
+ private String common_client_ip;
+ private String common_client_location;
+ private String common_sled_ip;
+ private String common_device_id;
+ private String common_subscriber_id;
+ private String common_server_ip;
+ private String common_server_location;
+ private int common_server_port;
+ private String common_l4_protocol;
+ private String http_domain;
+ private String ssl_sni;
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public int getCommon_policy_id() {
+ return common_policy_id;
+ }
+
+ public void setCommon_policy_id(int common_policy_id) {
+ this.common_policy_id = common_policy_id;
+ }
+
+ public int getCommon_action() {
+ return common_action;
+ }
+
+ public void setCommon_action(int common_action) {
+ this.common_action = common_action;
+ }
+
+ public String getCommon_sub_action() {
+ return common_sub_action;
+ }
+
+ public void setCommon_sub_action(String common_sub_action) {
+ this.common_sub_action = common_sub_action;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public String getCommon_client_location() {
+ return common_client_location;
+ }
+
+ public void setCommon_client_location(String common_client_location) {
+ this.common_client_location = common_client_location;
+ }
+
+ public String getCommon_sled_ip() {
+ return common_sled_ip;
+ }
+
+ public void setCommon_sled_ip(String common_sled_ip) {
+ this.common_sled_ip = common_sled_ip;
+ }
+
+ public String getCommon_device_id() {
+ return common_device_id;
+ }
+
+ public void setCommon_device_id(String common_device_id) {
+ this.common_device_id = common_device_id;
+ }
+
+ public String getCommon_subscriber_id() {
+ return common_subscriber_id;
+ }
+
+ public void setCommon_subscriber_id(String common_subscriber_id) {
+ this.common_subscriber_id = common_subscriber_id;
+ }
+
+ public String getCommon_server_ip() {
+ return common_server_ip;
+ }
+
+ public void setCommon_server_ip(String common_server_ip) {
+ this.common_server_ip = common_server_ip;
+ }
+
+ public String getCommon_server_location() {
+ return common_server_location;
+ }
+
+ public void setCommon_server_location(String common_server_location) {
+ this.common_server_location = common_server_location;
+ }
+
+ public int getCommon_server_port() {
+ return common_server_port;
+ }
+
+ public void setCommon_server_port(int common_server_port) {
+ this.common_server_port = common_server_port;
+ }
+
+ public String getCommon_l4_protocol() {
+ return common_l4_protocol;
+ }
+
+ public void setCommon_l4_protocol(String common_l4_protocol) {
+ this.common_l4_protocol = common_l4_protocol;
+ }
+
+ public String getHttp_domain() {
+ return http_domain;
+ }
+
+ public void setHttp_domain(String http_domain) {
+ this.http_domain = http_domain;
+ }
+
+ public String getSsl_sni() {
+ return ssl_sni;
+ }
+
+ public void setSsl_sni(String ssl_sni) {
+ this.ssl_sni = ssl_sni;
+ }
+
+ @Override
+ public int hashCode() {
+
+ return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode();
+
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof KeyBean) {
+ KeyBean keyBean = (KeyBean) o;
+ return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) &&
+ this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) &&
+ this.getCommon_action()==(keyBean.getCommon_action()) &&
+ this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) &&
+ this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) &&
+ this.getCommon_client_location().equals(keyBean.getCommon_client_location()) &&
+ this.getCommon_sled_ip().equals(keyBean.getCommon_sled_ip()) &&
+ this.getCommon_device_id().equals(keyBean.getCommon_device_id()) &&
+ this.getCommon_subscriber_id().equals(keyBean.getCommon_subscriber_id()) &&
+ this.getCommon_server_ip().equals(keyBean.getCommon_server_ip()) &&
+ this.getCommon_server_location().equals(keyBean.getCommon_server_location()) &&
+ this.getCommon_server_port()==(keyBean.getCommon_server_port()) &&
+ this.getCommon_l4_protocol().equals(keyBean.getCommon_l4_protocol()) &&
+ this.getHttp_domain().equals(keyBean.getHttp_domain()) &&
+ this.getSsl_sni().equals(keyBean.getSsl_sni()));
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java
new file mode 100644
index 0000000..8ddcc38
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java
@@ -0,0 +1,59 @@
+package cn.ac.iie.trident.aggregate.bean;
+
+import java.io.Serializable;
+
+/**
+ * @ClassNameValueBean
+ * @Author [email protected]
+ * @Date2020/6/2 14:05
+ * @Version V1.0
+ **/
+public class ValueBean implements Serializable {
+
+ private long common_sessions;
+ private long common_c2s_pkt_num;
+ private long common_s2c_pkt_num;
+ private long common_c2s_byte_num;
+ private long common_s2c_byte_num;
+
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+
+ public long getCommon_c2s_pkt_num() {
+ return common_c2s_pkt_num;
+ }
+
+ public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
+ this.common_c2s_pkt_num = common_c2s_pkt_num;
+ }
+
+ public long getCommon_s2c_pkt_num() {
+ return common_s2c_pkt_num;
+ }
+
+ public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
+ this.common_s2c_pkt_num = common_s2c_pkt_num;
+ }
+
+ public long getCommon_c2s_byte_num() {
+ return common_c2s_byte_num;
+ }
+
+ public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
+ this.common_c2s_byte_num = common_c2s_byte_num;
+ }
+
+ public long getCommon_s2c_byte_num() {
+ return common_s2c_byte_num;
+ }
+
+ public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
+ this.common_s2c_byte_num = common_s2c_byte_num;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java
new file mode 100644
index 0000000..3c70f01
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java
@@ -0,0 +1,75 @@
+package cn.ac.iie.trident.aggregate.bolt;
+
+import cn.ac.iie.trident.aggregate.bean.ConnectionRecordLog;
+import cn.ac.iie.trident.aggregate.bean.ValueBean;
+import cn.ac.iie.trident.aggregate.utils.KafkaLogNtc;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * @ClassNameKafkaBolt
+ * @Author [email protected]
+ * @Date2020/6/3 16:50
+ * @Version V1.0
+ **/
+public class KafkaBolt extends BaseFunction {
+
+
+ private static final long serialVersionUID = -2107081139682355171L;
+ private static KafkaLogNtc kafkaLogNtc;
+ private static ConnectionRecordLog connectionRecordLog = new ConnectionRecordLog();
+ private static JSONObject jsonObject;
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+
+ if (kafkaLogNtc == null) {
+ kafkaLogNtc = KafkaLogNtc.getInstance();
+ }
+
+ //TODO 解析成json对象,方便以后的遍历
+ jsonObject = JSONObject.parseObject(tuple.getStringByField("map"));
+
+ for (String key : jsonObject.keySet()) {
+
+ //TODO 为Key赋值
+
+ JSONObject keys = JSONObject.parseObject(key);
+
+ connectionRecordLog.setCommon_recv_time(System.currentTimeMillis());
+ connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id")));
+ connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action")));
+ connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action"));
+ connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip"));
+ connectionRecordLog.setCommon_client_location(keys.getString("common_client_location"));
+ connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip"));
+ connectionRecordLog.setCommon_device_id(keys.getString("common_device_id"));
+ connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id"));
+ connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip"));
+ connectionRecordLog.setCommon_server_location(keys.getString("common_server_location"));
+ connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port")));
+ connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol"));
+ connectionRecordLog.setHttp_domain(keys.getString("http_domain"));
+ connectionRecordLog.setSsl_sni(keys.getString("ssl_sni"));
+
+
+
+ //TODO 为Value赋值
+
+ ValueBean valueBean = JSONObject.parseObject(jsonObject.get(key).toString(), ValueBean.class);
+ connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions());
+ connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num());
+ connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num());
+ connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num());
+ connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num());
+
+ kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog));
+
+
+ }
+
+
+ }
+}
diff --git a/src/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
index f049547..0ab90a5 100644
--- a/src/java/cn/ac/iie/common/FlowWriteConfig.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
@@ -1,8 +1,6 @@
-package cn.ac.iie.common;
+package cn.ac.iie.trident.aggregate.utils;
-import cn.ac.iie.utils.system.FlowWriteConfigurations;
-
/**
* @author Administrator
*/
@@ -18,9 +16,6 @@ public class FlowWriteConfig {
public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
- public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
diff --git a/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java
index 273a5f8..bf24f34 100644
--- a/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java
@@ -1,4 +1,4 @@
-package cn.ac.iie.utils.system;
+package cn.ac.iie.trident.aggregate.utils;
import java.util.Properties;
diff --git a/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java
index 09097c2..22918be 100644
--- a/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java
@@ -1,10 +1,9 @@
-package cn.ac.iie.utils.kafka;
+package cn.ac.iie.trident.aggregate.utils;
+
-import cn.ac.iie.common.FlowWriteConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
-import java.util.List;
import java.util.Properties;
/**
@@ -39,10 +38,9 @@ public class KafkaLogNtc {
}
- public void sendMessage(List<String> list) {
+ public void sendMessage(String message) {
final int[] errorSum = {0};
- for (String value : list) {
- kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() {
+ kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
@@ -51,10 +49,7 @@ public class KafkaLogNtc {
}
}
});
- if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) {
- list.clear();
- }
- }
+
kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
}
diff --git a/src/java/log4j.properties b/src/main/java/cn/ac/iie/trident/log4j.properties
index 17c0e9a..8d4ada4 100644
--- a/src/java/log4j.properties
+++ b/src/main/java/cn/ac/iie/trident/log4j.properties
@@ -1,5 +1,5 @@
#Log4j
-log4j.rootLogger=info,console,file
+log4j.rootLogger=warn,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java
new file mode 100644
index 0000000..3202adb
--- /dev/null
+++ b/src/test/java/com/wp/AppTest.java
@@ -0,0 +1,85 @@
+package com.wp;
+
+import cn.ac.iie.trident.aggregate.bean.ValueBean;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+
+
+ private static ValueBean valueBean;
+
+ @org.junit.Test
+ public void test(){
+
+ System.out.println(valueBean == null);
+
+ }
+
+ static class Demo{
+ private String a;
+ private String b;
+ private String c;
+
+ public String getA() {
+ return a;
+ }
+
+ public void setA(String a) {
+ this.a = a;
+ }
+
+ public String getB() {
+ return b;
+ }
+
+ public void setB(String b) {
+ this.b = b;
+ }
+
+ public String getC() {
+ return c;
+ }
+
+ public void setC(String c) {
+ this.c = c;
+ }
+ }
+}
+
+
+