diff options
| author | lee <[email protected]> | 2020-06-04 15:54:53 +0800 |
|---|---|---|
| committer | lee <[email protected]> | 2020-06-04 15:54:53 +0800 |
| commit | a34f8c4df6280dc0a80b7e500c2869ba97971f9c (patch) | |
| tree | b127ac429614ee7ddd5bc8c711b8e9baace7272c | |
| parent | 868364da6b1282c1c1c0dcaf2670a1b3d75a8bae (diff) | |
OLAP预聚合代码初始版本
28 files changed, 758 insertions, 1395 deletions
@@ -7,149 +7,55 @@ <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> + <name>log-stream-aggregation</name> <url>http://maven.apache.org</url> - <repositories> - <repository> - <id>nexus</id> - <name>Team Nexus Repository</name> - <url>http://192.168.40.125:8099/content/groups/public</url> - </repository> - </repositories> + + <!--<repositories>--> + <!--<repository>--> + <!--<id>nexus</id>--> + <!--<name>Team Nexus Repository</name>--> + <!--<url>http://192.168.40.125:8099/content/groups/public</url>--> + <!--</repository>--> + <!--</repositories>--> <build> <plugins> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.2</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer - implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>cn.ac.iie.topology.LogFlowWriteTopology</mainClass> - </transformer> - <transformer - implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>META-INF/spring.handlers</resource> - </transformer> - <transformer - implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>META-INF/spring.schemas</resource> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>2.3.2</version> <configuration> - <source>1.8</source> - <target>1.8</target> + <source>8</source> + <target>8</target> </configuration> </plugin> </plugins> - <resources> - <resource> - <directory>properties</directory> - <includes> - <include>**/*.properties</include> - </includes> - <filtering>false</filtering> - </resource> - <resource> - <directory>properties</directory> - <includes> - <include>log4j.properties</include> - </includes> - <filtering>false</filtering> - </resource> - </resources> </build> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <kafka.version>1.0.0</kafka.version> - <storm.version>1.0.2</storm.version> - <hbase.version>2.2.1</hbase.version> - <hadoop.version>2.7.1</hadoop.version> </properties> <dependencies> - <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>1.0.0</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${storm.version}</version> - <scope>provided</scope> + <artifactId>storm-kafka</artifactId> + <version>1.0.2</version> <exclusions> <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> - </dependency> - <dependency> - <groupId>org.influxdb</groupId> - <artifactId>influxdb-java</artifactId> - <version>2.1</version> </dependency> - - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> - <version>${storm.version}</version> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.alibaba</groupId> - <artifactId>fastjson</artifactId> - <version>1.2.59</version> - </dependency> - - <dependency> - <groupId>cglib</groupId> - <artifactId>cglib-nodep</artifactId> - <version>3.2.4</version> - </dependency> - <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> @@ -166,125 +72,83 @@ </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>3.4.9</version> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.0.1</version> <exclusions> <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> </exclusion> <exclusion> - <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> </exclusion> <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> + <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift --> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.10.0</version> + <type>pom</type> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> + <!-- https://mvnrepository.com/artifact/org.apache.thrift.tools/maven-thrift-plugin --> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.thrift.tools</groupId> + <artifactId>maven-thrift-plugin</artifactId> + <version>0.1.11</version> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.59</version> </dependency> + <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-api</artifactId> - <version>5.3.2</version> - <scope>compile</scope> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>3.2.2</version> </dependency> - - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.5.2</version> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.10</version> </dependency> + <!--<!– https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core –>--> + <!--<dependency>--> + <!--<groupId>org.jgrapht</groupId>--> + <!--<artifactId>jgrapht-core</artifactId>--> + <!--<version>1.1.0</version>--> + <!--</dependency>--> + <!-- https://mvnrepository.com/artifact/org.jgrapht/jgrapht-dist --> + <dependency> + <groupId>org.jgrapht</groupId> + <artifactId>jgrapht-dist</artifactId> + <version>1.0.1</version> + <type>pom</type> + </dependency> <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpcore</artifactId> - <version>4.4.1</version> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.0.0</version> + <scope>compile</scope> </dependency> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 8072d1c..ce08511 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -3,7 +3,7 @@ bootstrap.servers=192.168.40.127:9093 #zookeeper 地址 -zookeeper.servers=192.168.40.127:2182 +zookeeper.servers=192.168.40.127:2182/kafka-test #zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 #hbase zookeeper地址 @@ -21,13 +21,14 @@ kafka.compression.type=none #kafka broker下的topic名称 #kafka.topic=SECURITY-EVENT-LOG -kafka.topic=CONNECTION-RECORD-LOG +kafka.topic=test528 +#kafka.topic=CONNECTION-RECORD-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=lxk-200512 #输出topic -results.output.topic=CONNECTION-RECORD-COMPLETED-LOG +results.output.topic=agg_test #results.output.topic=SECURITY-EVENT-COMPLETED-LOG #storm topology workers @@ -42,11 +43,6 @@ datacenter.bolt.parallelism=1 #写入kafka的并行度10 kafka.bolt.parallelism=1 -#定位库地址 -#ip.library=/home/ceiec/topology/dat/ -#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\ -ip.library=/dat/ - #kafka批量条数 batch.insert.num=2000 @@ -56,14 +52,6 @@ schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/connection_reco #数据中心(UID) data.center.id.num=15 -#tick时钟频率 -topology.tick.tuple.freq.secs=5 - -#hbase 更新时间 -hbase.tick.tuple.freq.secs=60 - -#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 -topology.config.max.spout.pending=150000 #ack设置 1启动ack 0不启动ack topology.num.acks=0 @@ -76,12 +64,3 @@ max.failure.num=20 #邮件默认编码 mail.default.charset=UTF-8 - -#influx地址 -influx.ip=http://192.168.40.151:8086 - -#influx用户名 -influx.username=admin - -#influx密码 -influx.password=admin
\ No newline at end of file diff --git a/src/java/cn/ac/iie/bean/ValueTuple.java b/src/java/cn/ac/iie/bean/ValueTuple.java deleted file mode 100644 index 4c12f52..0000000 --- a/src/java/cn/ac/iie/bean/ValueTuple.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.ac.iie.bean; - -/** - * @ClassNameValueTuple - * @Author [email protected] - * @Date2020/5/27 16:18 - * @Version V1.0 - **/ -public class ValueTuple { - - private int common_sessions; - private int common_c2s_pkt_num; - private int common_s2c_pkt_num; - private int common_c2s_byte_num; - private int common_s2c_byte_num; - - public int getCommon_sessions() { - return common_sessions; - } - - public void setCommon_sessions(int common_sessions) { - this.common_sessions = common_sessions; - } - - public int getCommon_c2s_pkt_num() { - return common_c2s_pkt_num; - } - - public void setCommon_c2s_pkt_num(int common_c2s_pkt_num) { - this.common_c2s_pkt_num = common_c2s_pkt_num; - } - - public int getCommon_s2c_pkt_num() { - return common_s2c_pkt_num; - } - - public void setCommon_s2c_pkt_num(int common_s2c_pkt_num) { - this.common_s2c_pkt_num = common_s2c_pkt_num; - } - - public int getCommon_c2s_byte_num() { - return common_c2s_byte_num; - } - - public void setCommon_c2s_byte_num(int common_c2s_byte_num) { - this.common_c2s_byte_num = common_c2s_byte_num; - } - - public int getCommon_s2c_byte_num() { - return common_s2c_byte_num; - } - - public void setCommon_s2c_byte_num(int common_s2c_byte_num) { - this.common_s2c_byte_num = common_s2c_byte_num; - } - - @Override - public String toString() { - return "ValueTuple{" + - "common_sessions=" + common_sessions + - ", common_c2s_pkt_num=" + common_c2s_pkt_num + - ", common_s2c_pkt_num=" + common_s2c_pkt_num + - ", common_c2s_byte_num=" + common_c2s_byte_num + - ", common_s2c_byte_num=" + common_s2c_byte_num + - '}'; - } -} diff --git a/src/java/cn/ac/iie/bolt/CompletionBolt.java b/src/java/cn/ac/iie/bolt/CompletionBolt.java deleted file mode 100644 index e34ab88..0000000 --- a/src/java/cn/ac/iie/bolt/CompletionBolt.java +++ /dev/null @@ -1,54 +0,0 @@ -package cn.ac.iie.bolt; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.system.TupleUtils; -import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - -public class CompletionBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(CompletionBolt.class); - private static final long serialVersionUID = 9006119186526123734L; - - @Override - public void prepare(Map stormConf, TopologyContext context) { - } - - @SuppressWarnings("Duplicates") - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - String message = tuple.getString(0); - if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(message)); - } - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); - e.printStackTrace(); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<String, Object>(16); - conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, - FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); - return conf; - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("connLog")); - } - -} diff --git a/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java deleted file mode 100644 index fa8fab3..0000000 --- a/src/java/cn/ac/iie/bolt/NtcLogSendBolt.java +++ /dev/null @@ -1,71 +0,0 @@ -package cn.ac.iie.bolt; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.kafka.KafkaLogNtc; -import cn.ac.iie.utils.system.TupleUtils; -import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Tuple; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * @author qidaijie - * @date 2018/8/14 - */ -public class NtcLogSendBolt extends BaseBasicBolt { - private static final long serialVersionUID = -3663610927224396615L; - private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); - private List<String> list; - private KafkaLogNtc kafkaLogNtc; - - - @Override - public void prepare(Map stormConf, TopologyContext context) { - list = new LinkedList<>(); - kafkaLogNtc = KafkaLogNtc.getInstance(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - if (TupleUtils.isTick(tuple)) { - if (list.size() != 0) { - kafkaLogNtc.sendMessage(list); - list.clear(); - } - } else { - String message = tuple.getString(0); - if (StringUtil.isNotBlank(message)) { - list.add(message); - } - if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { - kafkaLogNtc.sendMessage(list); - list.clear(); - } - } - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); - e.printStackTrace(); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<>(16); - conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); - return conf; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - } - -} diff --git a/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java b/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java deleted file mode 100644 index 7dbe16c..0000000 --- a/src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java +++ /dev/null @@ -1,51 +0,0 @@ -package cn.ac.iie.bolt.radius; - -import cn.ac.iie.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.Map; - - -/** - * 通联关系日志补全 - * - * @author qidaijie - */ -public class RadiusCompletionBolt extends BaseBasicBolt { - - private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class); - private static final long serialVersionUID = -3657802387129063952L; - - @Override - public void prepare(Map stormConf, TopologyContext context) { - - } - @SuppressWarnings("Duplicates") - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - String message = tuple.getString(0); - if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(message)); - } - } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); - e.printStackTrace(); - } - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("connLog")); - } - -} diff --git a/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java deleted file mode 100644 index 6b2d82b..0000000 --- a/src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java +++ /dev/null @@ -1,81 +0,0 @@ -package cn.ac.iie.spout; - -import cn.ac.iie.common.FlowWriteConfig; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.log4j.Logger; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -import java.util.Collections; -import java.util.Map; -import java.util.Properties; - -/** - * kafkaSpout - * - * @author Administrator - */ -public class CustomizedKafkaSpout extends BaseRichSpout { - private static final long serialVersionUID = -3363788553406229592L; - private KafkaConsumer<String, String> consumer; - private SpoutOutputCollector collector = null; - private TopologyContext context = null; - private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); - - - private static Properties createConsumerConfig() { - Properties props = new Properties(); - props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); - props.put("group.id", FlowWriteConfig.GROUP_ID); - props.put("session.timeout.ms", "60000"); - props.put("max.poll.records", 3000); - props.put("max.partition.fetch.bytes", 31457280); - props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - return props; - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - // TODO Auto-generated method stub - this.collector = collector; - this.context = context; - Properties prop = createConsumerConfig(); - this.consumer = new KafkaConsumer<>(prop); - this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC)); - } - - @Override - public void close() { - consumer.close(); - } - - @Override - public void nextTuple() { - try { - // TODO Auto-generated method stub - ConsumerRecords<String, String> records = consumer.poll(10000L); - Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME); - for (ConsumerRecord<String, String> record : records) { - this.collector.emit(new Values(record.value())); - } - } catch (Exception e) { - logger.error("KafkaSpout发送消息出现异常!", e); - e.printStackTrace(); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // TODO Auto-generated method stub - declarer.declare(new Fields("source")); - } -} diff --git a/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java deleted file mode 100644 index ca90ca6..0000000 --- a/src/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ /dev/null @@ -1,92 +0,0 @@ -package cn.ac.iie.topology; - - -import cn.ac.iie.bolt.CompletionBolt; -import cn.ac.iie.bolt.NtcLogSendBolt; -import cn.ac.iie.bolt.radius.RadiusCompletionBolt; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.spout.CustomizedKafkaSpout; -import org.apache.log4j.Logger; -import org.apache.storm.Config; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.topology.TopologyBuilder; - -/** - * Storm程序主类 - * - * @author Administrator - */ - -public class LogFlowWriteTopology { - private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); - private final String topologyName; - private final Config topologyConfig; - private TopologyBuilder builder; - - private LogFlowWriteTopology() { - this(LogFlowWriteTopology.class.getSimpleName()); - } - - private LogFlowWriteTopology(String topologyName) { - this.topologyName = topologyName; - topologyConfig = createTopologConfig(); - } - - private Config createTopologConfig() { - Config conf = new Config(); - conf.setDebug(false); - conf.setMessageTimeoutSecs(60); - conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); - conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS); - return conf; - } - - private void runLocally() throws InterruptedException { - topologyConfig.setMaxTaskParallelism(1); - StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); - } - - private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 - topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); - StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); - } - - private void buildTopology() { - builder = new TopologyBuilder(); - builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - - builder.setBolt(FlowWriteConfig.KAFKA_TOPIC, new CompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping(FlowWriteConfig.KAFKA_TOPIC); - - - } - - public static void main(String[] args) throws Exception { - LogFlowWriteTopology csst = null; - boolean runLocally = true; - String parameter = "remote"; - int size = 2; - if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { - runLocally = false; - csst = new LogFlowWriteTopology(args[0]); - } else { - csst = new LogFlowWriteTopology(); - } - - csst.buildTopology(); - - if (runLocally) { - logger.info("执行本地模式..."); - csst.runLocally(); - } else { - logger.info("执行远程部署模式..."); - csst.runRemotely(); - } - } -} diff --git a/src/java/cn/ac/iie/topology/StormRunner.java b/src/java/cn/ac/iie/topology/StormRunner.java deleted file mode 100644 index f5094a4..0000000 --- a/src/java/cn/ac/iie/topology/StormRunner.java +++ /dev/null @@ -1,35 +0,0 @@ -package cn.ac.iie.topology; - - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.topology.TopologyBuilder; - -/** - * @author Administrator - */ -public final class StormRunner{ - private static final int MILLS_IN_SEC = 1000; - - private StormRunner() {} - - public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { - - LocalCluster localCluster = new LocalCluster(); - localCluster.submitTopology(topologyName, conf, builder.createTopology()); - Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); - localCluster.shutdown(); - - } - - public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); - } - - -} diff --git a/src/java/cn/ac/iie/utils/general/Aggregate.java b/src/java/cn/ac/iie/utils/general/Aggregate.java deleted file mode 100644 index e427574..0000000 --- a/src/java/cn/ac/iie/utils/general/Aggregate.java +++ /dev/null @@ -1,152 +0,0 @@ -package cn.ac.iie.utils.general; - -import cn.ac.iie.bean.ValueTuple; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.json.JsonParseUtil; -import cn.ac.iie.utils.tuple.TwoTuple; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.StringUtil; - -import java.util.HashMap; - -public class Aggregate { - - - /** - * 在内存中加载反射类用的map - */ - private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 反射成一个类 - */ - private static Object mapObject = JsonParseUtil.generateObject(map); - - private static String key; - - private static HashMap<String, ValueTuple> resultMap = new HashMap<>(); - - private static Object conn; - - private static ValueTuple valueTuple = new ValueTuple(); - - private static String test = "{\"bgp_as_num\":\"100\",\"bgp_route\":\"192.168.222.0/24\",\"bgp_type\":1,\"common_action\":4,\"common_address_list\":\"\",\"common_address_type\":4,\"common_app_id\":0,\"common_app_label\":\"\",\"common_c2s_byte_num\":650,\"common_c2s_pkt_num\":7,\"common_client_asn\":\"9198\",\"common_client_ip\":\"95.56.198.87\",\"common_client_location\":\"Pervomayskiy,Almaty oblysy,哈萨克斯坦\",\"common_client_port\":13555,\"common_con_duration_ms\":154122,\"common_device_id\":\"2506398\",\"common_direction\":0,\"common_encapsulation\":0,\"common_end_time\":1590388545,\"common_entrance_id\":20,\"common_has_dup_traffic\":1,\"common_isp\":\"CMCC\",\"common_l4_protocol\":\"VLAN\",\"common_link_id\":1,\"common_log_id\":126995036504993794,\"common_policy_id\":0,\"common_protocol_id\":0,\"common_recv_time\":1590388694,\"common_s2c_byte_num\":9921,\"common_s2c_pkt_num\":21,\"common_schema_type\":\"SSL\",\"common_server_asn\":\"12876\",\"common_server_ip\":\"62.210.101.44\",\"common_server_location\":\"法国\",\"common_server_port\":22,\"common_service\":7,\"common_sled_ip\":\"192.168.10.36\",\"common_start_time\":1590388490,\"common_stream_dir\":2,\"common_stream_error\":\"\",\"common_stream_trace_id\":6193492085736674541,\"common_user_region\":\"prysUgOCWSmUYcGRL5rcUvVc8zbI9MOtlb9KOvH8rZCMRVqnIEyQVtQfBp94IIIjha24tGQ4x33qtC3jSx8udADkuezGGzrTrxCB\",\"common_user_tags\":\"9PD3v4GaIgS97l4wQnRtahW00YBi3933RDQg8PpiB8R9ftXhELHploJ0Ocg1Pj0xH06svaPbY2Tp1Chb0usQPttRqhpNbHTkW3En\",\"dns_aa\":0,\"dns_ancount\":64,\"dns_arcount\":22,\"dns_cname\":\"bFh2JvWJMWTCNcVEyuroMimLhoNM3O4effDDiNA9SRlCFdzaev10\",\"dns_message_id\":744559,\"dns_nscount\":59,\"dns_opcode\":0,\"dns_qclass\":2,\"dns_qdcount\":26,\"dns_qname\":\"kankanews.com12041281\",\"dns_qr\":1,\"dns_qtype\":5,\"dns_ra\":0,\"dns_rcode\":9,\"dns_rd\":0,\"dns_rr\":\"{\\\"aEWseVK\\\":\\\"UEUZ4qlk8qOjJBZ4\\\",\\\"9jGNxy0\\\":\\\"s075dZOXDXZ\\\",\\\"yyNXYD9G\\\":\\\"EEKxB99FuYDHH2E6NrV\\\",\\\"al23zn\\\":\\\"4dX1qd5L0A\\\"}\",\"dns_sub\":1,\"dns_tc\":0,\"ftp_account\":\"JXU2RDRCJXU4QkQ1\",\"ftp_content\":\"JXU2RDRCJXU4QkQ1\",\"ftp_url\":\"ftp://test:[email protected]/soft/list.txt\",\"http_content_length\":\"37339\",\"http_content_type\":\"application/x-javascript\",\"http_cookie\":\"BD_UPN=12314753\",\"http_domain\":\"163.com\",\"http_host\":\"v.163.com\",\"http_proxy_flag\":1,\"http_referer\":\"https://www.baidu.com/\",\"http_request_body\":\"\",\"http_request_body_key\":\"\",\"http_request_header\":\"\",\"http_request_line\":\"GET www.baidu.com/ HTTP/1.1\",\"http_response_body\":\"\",\"http_response_body_key\":\"\",\"http_response_header\":\"\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_sequence\":9,\"http_set_cookie\":\"delPer=0; path=/; domain=.baidu.com\",\"http_snapshot\":\"\",\"http_url\":\"http://v.163.com/movie/2011/7/0/3/M7B9K1R60_M7BAANT03.html\",\"http_user_agent\":\"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.87 Safari/537.36\",\"http_version\":\"http1\",\"mail_account\":\"123456789\",\"mail_attachment_content\":\"\",\"mail_attachment_name\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_attachment_name_charset\":\"UTF-8\",\"mail_bcc\":\"\",\"mail_cc\":\"[email protected]\",\"mail_content\":\"\",\"mail_content_charset\":\"\",\"mail_eml_file\":\"\",\"mail_from\":\"[email protected]\",\"mail_from_cmd\":\"[email protected]\",\"mail_protocol_type\":\"POP3\",\"mail_snapshot\":\"\",\"mail_subject\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_subject_charset\":\"UTF-8\",\"mail_to\":\"[email protected]\",\"mail_to_cmd\":\"[email protected]\",\"ssl_cert_verify\":0,\"ssl_client_side_latency\":5237,\"ssl_client_side_version\":\"SSLv3\",\"ssl_cn\":\"\",\"ssl_con_latency_ms\":3,\"ssl_error\":\"\",\"ssl_intercept_state\":0,\"ssl_pinningst\":1,\"ssl_san\":\"\",\"ssl_server_side_latency\":4644,\"ssl_server_side_version\":\"TLSv1.1\",\"ssl_sni\":\"cztv.com11547021\",\"ssl_version\":\"V3\",\"streaming_media_protocol\":\"RTP\",\"streaming_media_url\":\"http://home.sogua.com/lujingai/mv/play.aspx?id=30195689\",\"voip_called_account\":\"13307536537\",\"voip_called_number\":\"15301710004\",\"voip_calling_account\":\"15901848931\",\"voip_calling_number\":\"13908208553\"}"; - - - public static void main(String[] args) { - - - - resultMap = aggregateJsonToMap(resultMap, test); - - System.out.println("聚合一次之后: " + resultMap.get(key)); - - resultMap = aggregateJsonToMap(resultMap, test); - - System.out.println("聚合两次之后: " + resultMap.get(key)); - - - } - - /** - * 将一条新数据累加到HashMap中 - * @param map - * @param message - * @return - */ - public static HashMap<String, ValueTuple> aggregateJsonToMap(HashMap<String,ValueTuple> map, String message) { - - ValueTuple valueTuple = JSON.parseObject(message, ValueTuple.class); - - key = getKey(message); - - map.put(key,addTuple(map.get(key), valueTuple)); - - return map; - } - - /** - * 两个ValueTuple类型的对象做相应属性的聚合 - * @param result - * @param message - * @return - */ - public static ValueTuple addTuple(ValueTuple result,ValueTuple message){ - - if (result == null){ - - result = new ValueTuple(); - } - - result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + message.getCommon_s2c_byte_num()); - result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + message.getCommon_c2s_byte_num()); - result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + message.getCommon_s2c_pkt_num()); - result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + message.getCommon_c2s_pkt_num()); - result.setCommon_sessions(result.getCommon_sessions() + message.getCommon_sessions()); - - return result; - } - - public static String getKey(String message){ - Object conn = JSONObject.parseObject(message, mapObject.getClass()); - //TODO key - Object common_policy_id = JsonParseUtil.getValue(conn, "common_policy_id"); - Object common_action = JsonParseUtil.getValue(conn, "common_action"); - Object common_sub_action = JsonParseUtil.getValue(conn, "common_sub_action"); - Object common_client_ip = JsonParseUtil.getValue(conn, "common_client_ip"); - Object common_client_location = JsonParseUtil.getValue(conn, "common_client_location"); - Object common_sled_ip = JsonParseUtil.getValue(conn, "common_sled_ip"); - Object common_device_id = JsonParseUtil.getValue(conn, "common_device_id"); - Object common_subscriber_id = JsonParseUtil.getValue(conn, "common_subscriber_id"); - Object common_server_ip = JsonParseUtil.getValue(conn, "common_server_ip"); - Object common_server_location = JsonParseUtil.getValue(conn, "common_server_location"); - Object common_server_port = JsonParseUtil.getValue(conn, "common_server_port"); - Object common_l4_protocol = JsonParseUtil.getValue(conn, "common_l4_protocol"); - Object http_domain = JsonParseUtil.getValue(conn, "http_domain"); - Object ssl_sni = JsonParseUtil.getValue(conn, "ssl_sni"); - - - StringBuilder builder = new StringBuilder(); - builder.append(common_policy_id).append("_") - .append(common_action).append("_") - .append(common_sub_action).append("_") - .append(common_client_ip).append("_") - .append(common_client_location).append("_") - .append(common_sled_ip).append("_") - .append(common_device_id).append("_") - .append(common_subscriber_id).append("_") - .append(common_server_ip).append("_") - .append(common_server_location).append("_") - .append(common_server_port).append("_") - .append(common_l4_protocol).append("_") - .append(http_domain).append("_") - .append(ssl_sni); - - return builder.toString(); - } - - -/* public static ValueTuple getValueTuple(String message){ - - conn = JSONObject.parseObject(message, mapObject.getClass()); - Object common_sessions = JsonParseUtil.getValue(conn, "common_sessions"); - - if (StringUtil.isEmpty(common_sessions)) { - common_sessions = 0; - } - Object common_c2s_pkt_num = JsonParseUtil.getValue(conn, "common_c2s_pkt_num"); - Object common_s2c_pkt_num = JsonParseUtil.getValue(conn, "common_s2c_pkt_num"); - Object common_c2s_byte_num = JsonParseUtil.getValue(conn, "common_c2s_byte_num"); - Object common_s2c_byte_num = JsonParseUtil.getValue(conn, "common_s2c_byte_num"); - - valueTuple.setCommon_sessions((int) common_sessions); - valueTuple.setCommon_c2s_pkt_num((int) common_c2s_pkt_num); - valueTuple.setCommon_s2c_pkt_num((int) common_s2c_pkt_num); - valueTuple.setCommon_c2s_byte_num((int) common_c2s_byte_num); - valueTuple.setCommon_s2c_byte_num((int) common_s2c_byte_num); - - return valueTuple; - - }*/ -} diff --git a/src/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/java/cn/ac/iie/utils/http/HttpClientUtil.java deleted file mode 100644 index 347a69b..0000000 --- a/src/java/cn/ac/iie/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,55 +0,0 @@ -package cn.ac.iie.utils.http; - -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - - /** - * 请求网关获取schema - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder = null; - try { - HttpGet get = new HttpGet(http); - try (CloseableHttpResponse httpResponse = httpClient.execute(get)) { - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - String line = null; - while ((line = bufferedReader.readLine()) != null) { - entityStringBuilder.append(line); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - if (httpClient != null) { - httpClient.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - return entityStringBuilder.toString(); - } - -} diff --git a/src/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/java/cn/ac/iie/utils/json/JsonParseUtil.java deleted file mode 100644 index 0350e7f..0000000 --- a/src/java/cn/ac/iie/utils/json/JsonParseUtil.java +++ /dev/null @@ -1,204 +0,0 @@ -package cn.ac.iie.utils.json; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.http.HttpClientUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.StringUtil; -import net.sf.cglib.beans.BeanGenerator; -import net.sf.cglib.beans.BeanMap; - -import java.util.*; - -/** - * 使用FastJson解析json的工具类 - * - * @author qidaijie - */ -public class JsonParseUtil { - - /** - * 模式匹配,给定一个类型字符串返回一个类类型 - * - * @param type 类型 - * @return 类类型 - */ - - private static Class getClassName(String type) { - Class clazz; - - switch (type) { - case "int": - clazz = Integer.class; - break; - case "String": - clazz = String.class; - break; - case "long": - clazz = long.class; - break; - case "Integer": - clazz = Integer.class; - break; - case "double": - clazz = double.class; - break; - case "float": - clazz = float.class; - break; - case "char": - clazz = char.class; - break; - case "byte": - clazz = byte.class; - break; - case "boolean": - clazz = boolean.class; - break; - case "short": - clazz = short.class; - break; - default: - clazz = String.class; - } - return clazz; - } - - /** - * 根据反射生成对象的方法 - * - * @param properties 反射类用的map - * @return 生成的Object类型的对象 - */ - public static Object generateObject(Map properties) { - BeanGenerator generator = new BeanGenerator(); - Set keySet = properties.keySet(); - for (Object aKeySet : keySet) { - String key = (String) aKeySet; - generator.addProperty(key, (Class) properties.get(key)); - } - return generator.create(); - } - - /** - * 获取属性值的方法 - * - * @param obj 对象 - * @param property key - * @return 属性的值 - */ - public static Object getValue(Object obj, String property) { - BeanMap beanMap = BeanMap.create(obj); - return beanMap.get(property); - } - - /** - * 更新属性值的方法 - * - * @param obj 对象 - * @param property 更新的key - * @param value 更新的值 - */ - public static void setValue(Object obj, String property, Object value) { - BeanMap beanMap = BeanMap.create(obj); - beanMap.put(property, value); - } - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @param http 网关schema地址 - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static HashMap<String, Class> getMapFromHttp(String http) { - HashMap<String, Class> map = new HashMap<>(); - - String schema = HttpClientUtil.requestByGetMethod(http); - Object data = JSON.parseObject(schema).get("data"); - - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(data.toString()); - JSONArray fields = (JSONArray) schemaJson.get("fields"); - - for (Object field : fields) { - String name = JSON.parseObject(field.toString()).get("name").toString(); - String type = JSON.parseObject(field.toString()).get("type").toString(); - //组合用来生成实体类的map - map.put(name, getClassName(type)); - } - - return map; - } - - - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) - * - * @param http 网关url - * @return 任务列表 - */ - public static ArrayList<String[]> getJobListFromHttp(String http) { - ArrayList<String[]> list = new ArrayList<>(); - - String schema = HttpClientUtil.requestByGetMethod(http); - //解析data - Object data = JSON.parseObject(schema).get("data"); - - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(data.toString()); - JSONArray fields = (JSONArray) schemaJson.get("fields"); - - for (Object field : fields) { - - if (JSON.parseObject(field.toString()).containsKey("doc")) { - Object doc = JSON.parseObject(field.toString()).get("doc"); - - if (JSON.parseObject(doc.toString()).containsKey("format")) { - String name = JSON.parseObject(field.toString()).get("name").toString(); - Object format = JSON.parseObject(doc.toString()).get("format"); - JSONObject formatObject = JSON.parseObject(format.toString()); - - String functions = formatObject.get("functions").toString(); - String appendTo = null; - String params = null; - - if (formatObject.containsKey("appendTo")) { - appendTo = formatObject.get("appendTo").toString(); - } - - if (formatObject.containsKey("param")) { - params = formatObject.get("param").toString(); - } - - - if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], null}); - } - - } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); - - } - } else { - list.add(new String[]{name, name, functions, params}); - } - - } - } - - } - return list; - } - - -}
\ No newline at end of file diff --git a/src/java/cn/ac/iie/utils/system/TupleUtils.java b/src/java/cn/ac/iie/utils/system/TupleUtils.java deleted file mode 100644 index 53e14ca..0000000 --- a/src/java/cn/ac/iie/utils/system/TupleUtils.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.ac.iie.utils.system; - -import org.apache.storm.Constants; -import org.apache.storm.tuple.Tuple; - -/** - * 用于检测是否是系统发送的tuple - * - * @author Administrator - */ -public final class TupleUtils { - /** - * 判断是否系统自动发送的Tuple - * - * @param tuple 元组 - * @return true or false - */ - public static boolean isTick(Tuple tuple) { - return tuple != null - && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) - && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); - } -} diff --git a/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java b/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java deleted file mode 100644 index 0529035..0000000 --- a/src/java/cn/ac/iie/utils/tuple/ThreeTuple.java +++ /dev/null @@ -1,20 +0,0 @@ -package cn.ac.iie.utils.tuple; - -public class ThreeTuple<A,B,C> { - - public String first; - - public long second; - - public int third; - - public ThreeTuple(String name,long time, int sum){ - first = name; - second = time; - third = sum; - } - - public String toString(){ - return "[" + first + ", " + second + ", " + third + "]"; - } -} diff --git a/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java b/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java deleted file mode 100644 index b3d2e71..0000000 --- a/src/java/cn/ac/iie/utils/tuple/TupleAggregate.java +++ /dev/null @@ -1,195 +0,0 @@ -package cn.ac.iie.utils.tuple; - - -import com.alibaba.fastjson.JSON; - - -import java.util.HashMap; - -public class TupleAggregate { - - - private static TwoTuple<String, Integer> a = new TwoTuple<>("192.168.40.101", 1); - private static TwoTuple<String, Integer> b = new TwoTuple<>("192.168.40.101", 1); - - private static ThreeTuple<String, Long, Integer> a1 = new ThreeTuple<>("lxk", 30L, 2); - private static ThreeTuple<String, Long, Integer> b1 = new ThreeTuple<>("lxk", 20L, 2); - - - public static TwoTuple<TwoTuple, ThreeTuple> parseJsonToTuple(String json) { - - CONN conn = JSON.parseObject(json, CONN.class); - //二元组 key - TwoTuple key = new TwoTuple<>(conn.getCip(), conn.getNum()); - //三元组 value - ThreeTuple value = new ThreeTuple<>(conn.getName(), conn.getTime(), conn.getSum()); - - - return new TwoTuple<>(key, value); - } - - /** - * 聚合两个三元组 - * - * @param tuple1 - * @param tuple2 - * @return - */ - public static ThreeTuple addTuple(ThreeTuple tuple1, ThreeTuple tuple2) { - - - tuple1.second += tuple2.second; - tuple1.third += tuple2.third; - - return tuple1; - - } - - /** - * 将一条新数据累加到HashMap中 - * - * @param map - * @param json - * @return map1 - */ - public static HashMap aggregate(HashMap<TwoTuple, ThreeTuple> map, String json) { - - - //TODO json解析成对象,取出key聚合组成tuple 与HashMap中具有相同key的聚合 - - /** - * 还存在的问题 - * 1. key是对象 就算值一样也不会相同 (重写HashCode和equal方法) - * - * 2. 拿key去map中取值,如果为null,后面聚合会报错 空指针异常 - */ - - //一条日志 ==》 两元组 - TwoTuple<TwoTuple, ThreeTuple> tuple = parseJsonToTuple(json); - //取出key - TwoTuple key = tuple.first; - //内存中的HashMap中获取具有相同key的value - ThreeTuple value = map.get(key); - //将两个value聚合,赋值给value - value = addTuple(value, tuple.second); - //聚合的结果放回到内存中的HashMap - map.put(key, value); - - return map; - - } - - public static void main(String[] args) { - -// HashMap<TwoTuple, ThreeTuple> map1 = new HashMap<>(); -// a1 = addTuple(a1, b1); -// System.out.println("聚合成功:" + a1); - - - CONN conn1 = new CONN(); - CONN conn2 = new CONN(); - - conn1.setCip("192.168.40.101"); - conn2.setCip("192.168.40.101"); - - conn1.setNum(1); - conn2.setNum(1); - - conn1.setName("lxk"); - conn2.setName("lxk"); - - conn1.setTime(100L); - conn2.setTime(200L); - - conn1.setSum(10); - conn2.setSum(20); - - System.out.println("conn1" + conn1); - System.out.println("conn2" + conn2); - - String json1 = JSON.toJSONString(conn1); - String json2 = JSON.toJSONString(conn2); - - - HashMap map = new HashMap<>(); - - map.put(a, a1); - - System.out.println("开始的map:" + map); - - map = aggregate(map, json1); - - System.out.println("后来的map:" + map); - - System.out.println("a的HashCode: " + a.hashCode()); - System.out.println("b的HashCode: " + b.hashCode()); - - - - } - - -} - -class CONN { - - //二元组使用的 key - private String cip; - private int num; - - //三元组使用的 value - private String name; - private long time; - private int sum; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - public int getSum() { - return sum; - } - - public void setSum(int sum) { - this.sum = sum; - } - - public String getCip() { - return cip; - } - - public void setCip(String cip) { - this.cip = cip; - } - - public int getNum() { - return num; - } - - public void setNum(int num) { - this.num = num; - } - - @Override - public String toString() { - return "CONN{" + - "cip='" + cip + '\'' + - ", num=" + num + - ", name='" + name + '\'' + - ", time=" + time + - ", sum=" + sum + - '}'; - } -}
\ No newline at end of file diff --git a/src/java/cn/ac/iie/utils/tuple/TwoTuple.java b/src/java/cn/ac/iie/utils/tuple/TwoTuple.java deleted file mode 100644 index 618f3ee..0000000 --- a/src/java/cn/ac/iie/utils/tuple/TwoTuple.java +++ /dev/null @@ -1,32 +0,0 @@ -package cn.ac.iie.utils.tuple; - -public class TwoTuple<A, B> { - - public A first; - - public B second; - - public TwoTuple(){}; - public TwoTuple(A cip, B num) { - first = cip; - second = num; - } - - public String toString() { - return "[" + first + ", " + second + "]"; - } - - @Override - public int hashCode() { - - int result = (first != null && second != null) ? (first.toString() + second).hashCode() : 0; - - return result; - } - - @Override - public boolean equals(Object o) { - - return this.first.toString().equals(((TwoTuple) o).first.toString()) && this.first.toString().equals(((TwoTuple) o).first.toString()); - } -} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java new file mode 100644 index 0000000..5ac557b --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/AggCount.java @@ -0,0 +1,98 @@ +package cn.ac.iie.trident.aggregate; + + +import com.alibaba.fastjson.JSON; +import cn.ac.iie.trident.aggregate.bean.ValueBean; +import org.apache.storm.tuple.Values; +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; + + +import java.util.HashMap; + + +/** + * @ClassNameAggCount + * @Author [email protected] + * @Date2020/6/1 10:48 + * @Version V1.0 + **/ +public class AggCount extends BaseAggregator<AggCount.State> { + + + static class State { + HashMap<String,ValueBean> map = new HashMap(); + ValueBean valueBean = new ValueBean(); + ValueBean tupleValueBean = new ValueBean(); + String key = ""; + } + + @Override + public AggCount.State init(Object batchId, TridentCollector collector) { + + return new State(); + } + + /** + * 聚合一个tuple + * @param state + * @param tuple + * @param collector + */ + @Override + public void aggregate(State state, TridentTuple tuple, TridentCollector collector) { + + + //TODO 获取一条tuple数据的key和value + state.key = tuple.getStringByField("key"); + state.tupleValueBean = JSON.parseObject(tuple.getStringByField("value"),ValueBean.class); + + + //TODO 获取HashMap中的key对应的value,如果没有就默认为null + state.valueBean = state.map.getOrDefault(state.key, new ValueBean()); + + + //TODO 聚合两个value + state.valueBean = addValueBean(state.valueBean,state.tupleValueBean); + + //TODO 两个count聚合后放入HashMap中,利用HashMap的去重功能实现value的覆盖 + state.map.put(state.key, state.valueBean); + + + + } + + /** + * 处理一批tuple + * @param state + * @param collector + */ + @Override + public void complete(State state, TridentCollector collector) { + + collector.emit(new Values(JSON.toJSONString(state.map))); + + + } + + /** + * 将两个ValueBean中的相应的属性相加 + * @param result + * @param value2 + * @return + */ + + public ValueBean addValueBean(ValueBean result, ValueBean value2){ + + result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num()); + result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num()); + result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num()); + result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num()); + result.setCommon_sessions(result.getCommon_sessions() + 1L); + + + return result; + } + +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java new file mode 100644 index 0000000..cd179cd --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java @@ -0,0 +1,45 @@ +package cn.ac.iie.trident.aggregate; + +import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; +import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.tuple.Fields; + +import java.util.concurrent.TimeUnit; + + +/** + * @ClassNameWcTopo + * @Author [email protected] + * @Date2020/5/29 10:38 + * @Version V1.0 + **/ +public class AggregateTopology { + + public static void main(String[] args) { + //TODO 创建一个topo任务 + TridentTopology topology = new TridentTopology(); + //TODO 为Topo绑定Spout + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields()); + + Config config = new Config(); + config.setDebug(false); + config.setNumWorkers(5); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("trident-wordcount", config, topology.build()); +// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build()); + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java b/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java new file mode 100644 index 0000000..2a8191c --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/ParseJson2KV.java @@ -0,0 +1,67 @@ +package cn.ac.iie.trident.aggregate; + +import com.alibaba.fastjson.JSONObject; +import cn.ac.iie.trident.aggregate.bean.KeyBean; +import cn.ac.iie.trident.aggregate.bean.ValueBean; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +/** + * 把一个tuple解析成 + * @ClassNameToJson + * @Author [email protected] + * @Date2020/5/29 16:05 + * @Version V1.0 + **/ +public class ParseJson2KV extends BaseFunction { + + + private static ValueBean valueBean = new ValueBean(); + private static KeyBean keyBean = new KeyBean(); + + + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + + + //TODO 获取tuple输入内容,解析成map + Map map = JSONObject.parseObject(tuple.getStringByField("str")); + + + //TODO KEY + + keyBean.setCommon_recv_time(0L); + keyBean.setCommon_policy_id(Integer.parseInt(map.getOrDefault("common_policy_id","0").toString())); + keyBean.setCommon_action(Integer.parseInt(map.getOrDefault("common_action","0").toString())); + keyBean.setCommon_sub_action(map.getOrDefault("common_sub_action","").toString()); + keyBean.setCommon_client_ip(map.getOrDefault("common_client_ip","").toString()); + keyBean.setCommon_client_location(map.getOrDefault("common_client_location","").toString()); + keyBean.setCommon_sled_ip(map.getOrDefault("common_sled_ip","").toString()); + keyBean.setCommon_device_id(map.getOrDefault("common_device_id","").toString()); + keyBean.setCommon_subscriber_id(map.getOrDefault("common_subscriber_id","").toString()); + keyBean.setCommon_server_ip(map.getOrDefault("common_server_ip","").toString()); + keyBean.setCommon_server_location(map.getOrDefault("common_server_location","").toString()); + keyBean.setCommon_server_port(Integer.parseInt(map.getOrDefault("common_server_port","0" ).toString())); + keyBean.setCommon_l4_protocol(map.getOrDefault("common_l4_protocol","").toString()); + keyBean.setHttp_domain(map.getOrDefault("http_domain","").toString()); + keyBean.setSsl_sni(map.getOrDefault("ssl_sni","").toString()); + + //TODO VALUE + + + valueBean.setCommon_c2s_pkt_num(Long.parseLong(map.getOrDefault("common_c2s_pkt_num",0).toString())); + valueBean.setCommon_s2c_pkt_num(Long.parseLong(map.getOrDefault("common_s2c_pkt_num",0).toString())); + valueBean.setCommon_c2s_byte_num(Long.parseLong(map.getOrDefault("common_c2s_byte_num",0).toString())); + valueBean.setCommon_s2c_byte_num(Long.parseLong(map.getOrDefault("common_s2c_byte_num",0).toString())); + valueBean.setCommon_sessions(Long.parseLong(map.getOrDefault("common_sessions",0).toString())); + + + collector.emit(new Values(JSONObject.toJSONString(keyBean), JSONObject.toJSONString(valueBean))); + + } +} diff --git a/src/java/cn/ac/iie/bean/KeyTuple.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java index 8dc6bab..54fa4f3 100644 --- a/src/java/cn/ac/iie/bean/KeyTuple.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/ConnectionRecordLog.java @@ -1,28 +1,45 @@ -package cn.ac.iie.bean; +package cn.ac.iie.trident.aggregate.bean; /** - * @ClassNameKeyTuple + * @ClassNameConnectionRecordLog * @Author [email protected] - * @Date2020/5/27 16:18 + * @Date2020/5/28 13:44 * @Version V1.0 **/ -public class KeyTuple { - - private int common_policy_id; - private int common_action; - private String common_sub_action; - private String common_client_ip; - private String common_client_location; - private String common_sled_ip; - private String common_device_id; - private String common_subscriber_id; - private String common_server_ip; - private String common_server_location; - private int common_server_port; - private String common_l4_protocol; - private String http_domain; - private String ssl_sni; +public class ConnectionRecordLog { + //key + private long common_recv_time; + private int common_policy_id; + private int common_action; + private String common_sub_action; + private String common_client_ip; + private String common_client_location; + private String common_sled_ip; + private String common_device_id; + private String common_subscriber_id; + private String common_server_ip; + private String common_server_location; + private int common_server_port; + private String common_l4_protocol; + private String http_domain; + private String ssl_sni; + + //value + private long common_sessions; + private long common_c2s_pkt_num; + private long common_s2c_pkt_num; + private long common_c2s_byte_num; + private long common_s2c_byte_num; + + + public long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_recv_time = common_recv_time; + } public int getCommon_policy_id() { return common_policy_id; @@ -135,4 +152,44 @@ public class KeyTuple { public void setSsl_sni(String ssl_sni) { this.ssl_sni = ssl_sni; } + + public long getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(long common_sessions) { + this.common_sessions = common_sessions; + } + + public long getCommon_c2s_pkt_num() { + return common_c2s_pkt_num; + } + + public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { + this.common_c2s_pkt_num = common_c2s_pkt_num; + } + + public long getCommon_s2c_pkt_num() { + return common_s2c_pkt_num; + } + + public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { + this.common_s2c_pkt_num = common_s2c_pkt_num; + } + + public long getCommon_c2s_byte_num() { + return common_c2s_byte_num; + } + + public void setCommon_c2s_byte_num(long common_c2s_byte_num) { + this.common_c2s_byte_num = common_c2s_byte_num; + } + + public long getCommon_s2c_byte_num() { + return common_s2c_byte_num; + } + + public void setCommon_s2c_byte_num(long common_s2c_byte_num) { + this.common_s2c_byte_num = common_s2c_byte_num; + } } diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java new file mode 100644 index 0000000..5515c48 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/KeyBean.java @@ -0,0 +1,176 @@ +package cn.ac.iie.trident.aggregate.bean; + +/** + * @ClassNameKeyBean + * @Author [email protected] + * @Date2020/6/3 18:52 + * @Version V1.0 + **/ +public class KeyBean { + + private long common_recv_time; + private int common_policy_id; + private int common_action; + private String common_sub_action; + private String common_client_ip; + private String common_client_location; + private String common_sled_ip; + private String common_device_id; + private String common_subscriber_id; + private String common_server_ip; + private String common_server_location; + private int common_server_port; + private String common_l4_protocol; + private String http_domain; + private String ssl_sni; + + public long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_recv_time = common_recv_time; + } + + public int getCommon_policy_id() { + return common_policy_id; + } + + public void setCommon_policy_id(int common_policy_id) { + this.common_policy_id = common_policy_id; + } + + public int getCommon_action() { + return common_action; + } + + public void setCommon_action(int common_action) { + this.common_action = common_action; + } + + public String getCommon_sub_action() { + return common_sub_action; + } + + public void setCommon_sub_action(String common_sub_action) { + this.common_sub_action = common_sub_action; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public String getCommon_client_location() { + return common_client_location; + } + + public void setCommon_client_location(String common_client_location) { + this.common_client_location = common_client_location; + } + + public String getCommon_sled_ip() { + return common_sled_ip; + } + + public void setCommon_sled_ip(String common_sled_ip) { + this.common_sled_ip = common_sled_ip; + } + + public String getCommon_device_id() { + return common_device_id; + } + + public void setCommon_device_id(String common_device_id) { + this.common_device_id = common_device_id; + } + + public String getCommon_subscriber_id() { + return common_subscriber_id; + } + + public void setCommon_subscriber_id(String common_subscriber_id) { + this.common_subscriber_id = common_subscriber_id; + } + + public String getCommon_server_ip() { + return common_server_ip; + } + + public void setCommon_server_ip(String common_server_ip) { + this.common_server_ip = common_server_ip; + } + + public String getCommon_server_location() { + return common_server_location; + } + + public void setCommon_server_location(String common_server_location) { + this.common_server_location = common_server_location; + } + + public int getCommon_server_port() { + return common_server_port; + } + + public void setCommon_server_port(int common_server_port) { + this.common_server_port = common_server_port; + } + + public String getCommon_l4_protocol() { + return common_l4_protocol; + } + + public void setCommon_l4_protocol(String common_l4_protocol) { + this.common_l4_protocol = common_l4_protocol; + } + + public String getHttp_domain() { + return http_domain; + } + + public void setHttp_domain(String http_domain) { + this.http_domain = http_domain; + } + + public String getSsl_sni() { + return ssl_sni; + } + + public void setSsl_sni(String ssl_sni) { + this.ssl_sni = ssl_sni; + } + + @Override + public int hashCode() { + + return ("" + getCommon_recv_time() + getCommon_policy_id() + getCommon_action() + getCommon_sub_action() + getCommon_client_ip() + getCommon_client_location() + getCommon_sled_ip() + getCommon_device_id() + getCommon_subscriber_id() + getCommon_server_ip() + getCommon_server_location() + getCommon_server_port() + getCommon_l4_protocol() + getHttp_domain() + getSsl_sni()).hashCode(); + + } + + @Override + public boolean equals(Object o) { + if (o instanceof KeyBean) { + KeyBean keyBean = (KeyBean) o; + return (this.getCommon_recv_time()==(keyBean.getCommon_recv_time()) && + this.getCommon_policy_id()==(keyBean.getCommon_policy_id()) && + this.getCommon_action()==(keyBean.getCommon_action()) && + this.getCommon_sub_action().equals(keyBean.getCommon_sub_action()) && + this.getCommon_client_ip().equals(keyBean.getCommon_client_ip()) && + this.getCommon_client_location().equals(keyBean.getCommon_client_location()) && + this.getCommon_sled_ip().equals(keyBean.getCommon_sled_ip()) && + this.getCommon_device_id().equals(keyBean.getCommon_device_id()) && + this.getCommon_subscriber_id().equals(keyBean.getCommon_subscriber_id()) && + this.getCommon_server_ip().equals(keyBean.getCommon_server_ip()) && + this.getCommon_server_location().equals(keyBean.getCommon_server_location()) && + this.getCommon_server_port()==(keyBean.getCommon_server_port()) && + this.getCommon_l4_protocol().equals(keyBean.getCommon_l4_protocol()) && + this.getHttp_domain().equals(keyBean.getHttp_domain()) && + this.getSsl_sni().equals(keyBean.getSsl_sni())); + } + return false; + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java b/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java new file mode 100644 index 0000000..8ddcc38 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/bean/ValueBean.java @@ -0,0 +1,59 @@ +package cn.ac.iie.trident.aggregate.bean; + +import java.io.Serializable; + +/** + * @ClassNameValueBean + * @Author [email protected] + * @Date2020/6/2 14:05 + * @Version V1.0 + **/ +public class ValueBean implements Serializable { + + private long common_sessions; + private long common_c2s_pkt_num; + private long common_s2c_pkt_num; + private long common_c2s_byte_num; + private long common_s2c_byte_num; + + + public long getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(long common_sessions) { + this.common_sessions = common_sessions; + } + + public long getCommon_c2s_pkt_num() { + return common_c2s_pkt_num; + } + + public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { + this.common_c2s_pkt_num = common_c2s_pkt_num; + } + + public long getCommon_s2c_pkt_num() { + return common_s2c_pkt_num; + } + + public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { + this.common_s2c_pkt_num = common_s2c_pkt_num; + } + + public long getCommon_c2s_byte_num() { + return common_c2s_byte_num; + } + + public void setCommon_c2s_byte_num(long common_c2s_byte_num) { + this.common_c2s_byte_num = common_c2s_byte_num; + } + + public long getCommon_s2c_byte_num() { + return common_s2c_byte_num; + } + + public void setCommon_s2c_byte_num(long common_s2c_byte_num) { + this.common_s2c_byte_num = common_s2c_byte_num; + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java new file mode 100644 index 0000000..3c70f01 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/bolt/KafkaBolt.java @@ -0,0 +1,75 @@ +package cn.ac.iie.trident.aggregate.bolt; + +import cn.ac.iie.trident.aggregate.bean.ConnectionRecordLog; +import cn.ac.iie.trident.aggregate.bean.ValueBean; +import cn.ac.iie.trident.aggregate.utils.KafkaLogNtc; +import com.alibaba.fastjson.JSONObject; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; + +/** + * @ClassNameKafkaBolt + * @Author [email protected] + * @Date2020/6/3 16:50 + * @Version V1.0 + **/ +public class KafkaBolt extends BaseFunction { + + + private static final long serialVersionUID = -2107081139682355171L; + private static KafkaLogNtc kafkaLogNtc; + private static ConnectionRecordLog connectionRecordLog = new ConnectionRecordLog(); + private static JSONObject jsonObject; + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + + if (kafkaLogNtc == null) { + kafkaLogNtc = KafkaLogNtc.getInstance(); + } + + //TODO 解析成json对象,方便以后的遍历 + jsonObject = JSONObject.parseObject(tuple.getStringByField("map")); + + for (String key : jsonObject.keySet()) { + + //TODO 为Key赋值 + + JSONObject keys = JSONObject.parseObject(key); + + connectionRecordLog.setCommon_recv_time(System.currentTimeMillis()); + connectionRecordLog.setCommon_policy_id(Integer.parseInt(keys.getString("common_policy_id"))); + connectionRecordLog.setCommon_action(Integer.parseInt(keys.getString("common_action"))); + connectionRecordLog.setCommon_sub_action(keys.getString("common_sub_action")); + connectionRecordLog.setCommon_client_ip(keys.getString("common_client_ip")); + connectionRecordLog.setCommon_client_location(keys.getString("common_client_location")); + connectionRecordLog.setCommon_sled_ip(keys.getString("common_sled_ip")); + connectionRecordLog.setCommon_device_id(keys.getString("common_device_id")); + connectionRecordLog.setCommon_subscriber_id(keys.getString("common_subscriber_id")); + connectionRecordLog.setCommon_server_ip(keys.getString("common_server_ip")); + connectionRecordLog.setCommon_server_location(keys.getString("common_server_location")); + connectionRecordLog.setCommon_server_port(Integer.parseInt(keys.getString("common_server_port"))); + connectionRecordLog.setCommon_l4_protocol(keys.getString("common_l4_protocol")); + connectionRecordLog.setHttp_domain(keys.getString("http_domain")); + connectionRecordLog.setSsl_sni(keys.getString("ssl_sni")); + + + + //TODO 为Value赋值 + + ValueBean valueBean = JSONObject.parseObject(jsonObject.get(key).toString(), ValueBean.class); + connectionRecordLog.setCommon_sessions(valueBean.getCommon_sessions()); + connectionRecordLog.setCommon_c2s_pkt_num(valueBean.getCommon_c2s_pkt_num()); + connectionRecordLog.setCommon_s2c_pkt_num(valueBean.getCommon_s2c_pkt_num()); + connectionRecordLog.setCommon_c2s_byte_num(valueBean.getCommon_c2s_byte_num()); + connectionRecordLog.setCommon_s2c_byte_num(valueBean.getCommon_s2c_byte_num()); + + kafkaLogNtc.sendMessage(JSONObject.toJSONString(connectionRecordLog)); + + + } + + + } +} diff --git a/src/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java index f049547..0ab90a5 100644 --- a/src/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java @@ -1,8 +1,6 @@ -package cn.ac.iie.common; +package cn.ac.iie.trident.aggregate.utils; -import cn.ac.iie.utils.system.FlowWriteConfigurations; - /** * @author Administrator */ @@ -18,9 +16,6 @@ public class FlowWriteConfig { public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); - public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); - public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num"); diff --git a/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java index 273a5f8..bf24f34 100644 --- a/src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfigurations.java @@ -1,4 +1,4 @@ -package cn.ac.iie.utils.system; +package cn.ac.iie.trident.aggregate.utils; import java.util.Properties; diff --git a/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java index 09097c2..22918be 100644 --- a/src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/KafkaLogNtc.java @@ -1,10 +1,9 @@ -package cn.ac.iie.utils.kafka; +package cn.ac.iie.trident.aggregate.utils; + -import cn.ac.iie.common.FlowWriteConfig; import org.apache.kafka.clients.producer.*; import org.apache.log4j.Logger; -import java.util.List; import java.util.Properties; /** @@ -39,10 +38,9 @@ public class KafkaLogNtc { } - public void sendMessage(List<String> list) { + public void sendMessage(String message) { final int[] errorSum = {0}; - for (String value : list) { - kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() { + kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { @@ -51,10 +49,7 @@ public class KafkaLogNtc { } } }); - if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { - list.clear(); - } - } + kafkaProducer.flush(); logger.debug("Log sent to National Center successfully!!!!!"); } diff --git a/src/java/log4j.properties b/src/main/java/cn/ac/iie/trident/log4j.properties index 17c0e9a..8d4ada4 100644 --- a/src/java/log4j.properties +++ b/src/main/java/cn/ac/iie/trident/log4j.properties @@ -1,5 +1,5 @@ #Log4j -log4j.rootLogger=info,console,file +log4j.rootLogger=warn,console,file # 控制台日志设置 log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.Threshold=info diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java new file mode 100644 index 0000000..3202adb --- /dev/null +++ b/src/test/java/com/wp/AppTest.java @@ -0,0 +1,85 @@ +package com.wp; + +import cn.ac.iie.trident.aggregate.bean.ValueBean; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } + + + private static ValueBean valueBean; + + @org.junit.Test + public void test(){ + + System.out.println(valueBean == null); + + } + + static class Demo{ + private String a; + private String b; + private String c; + + public String getA() { + return a; + } + + public void setA(String a) { + this.a = a; + } + + public String getB() { + return b; + } + + public void setB(String b) { + this.b = b; + } + + public String getC() { + return c; + } + + public void setC(String c) { + this.c = c; + } + } +} + + + |
