summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2019-08-05 18:10:16 +0800
committerqidaijie <[email protected]>2019-08-05 18:10:16 +0800
commit3d7ff38cea3cc9c31b822b1f5d1bf4b6d1f5c654 (patch)
treeb2194ee04485abe6455cba14b9ddae4f2ee3dacf
parente537a307744cbd1e89ac00a3824a2529b5fe0cd2 (diff)
TOP计算初始版本
-rw-r--r--pom.xml159
-rw-r--r--properties/kafka_topic.properties45
-rw-r--r--properties/realtime_routine.properties21
-rw-r--r--src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java96
-rw-r--r--src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java31
-rw-r--r--src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java43
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java128
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java97
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java91
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java97
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java100
-rw-r--r--src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java157
-rw-r--r--src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java155
-rw-r--r--src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java173
-rw-r--r--src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java160
-rw-r--r--src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java175
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java49
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java50
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java50
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java49
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java50
-rw-r--r--src/main/java/cn/ac/iie/common/TopNCountConfig.java45
-rw-r--r--src/main/java/cn/ac/iie/model/ExternalHost.java82
-rw-r--r--src/main/java/cn/ac/iie/model/InternalHost.java73
-rw-r--r--src/main/java/cn/ac/iie/model/Urls.java88
-rw-r--r--src/main/java/cn/ac/iie/model/User.java70
-rw-r--r--src/main/java/cn/ac/iie/model/WebSite.java91
-rw-r--r--src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java79
-rw-r--r--src/main/java/cn/ac/iie/topology/LogTopCountTopology.java137
-rw-r--r--src/main/java/cn/ac/iie/topology/StormRunner.java35
-rw-r--r--src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java45
-rw-r--r--src/main/java/cn/ac/iie/utils/Rankable.java11
-rw-r--r--src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java119
-rw-r--r--src/main/java/cn/ac/iie/utils/Rankings.java124
-rw-r--r--src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java58
-rw-r--r--src/main/java/cn/ac/iie/utils/SlotBasedCounter.java164
-rw-r--r--src/main/java/cn/ac/iie/utils/TopUtils.java22
-rw-r--r--src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java79
-rw-r--r--src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java65
-rw-r--r--src/main/java/cn/ac/iie/utils/system/TupleUtils.java13
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java103
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java60
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java178
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java58
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java199
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java60
-rw-r--r--src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java209
-rw-r--r--src/main/java/log4j.properties23
-rw-r--r--src/test/java/cn/ac/iie/test/ContainerTest.java16
-rw-r--r--src/test/java/cn/ac/iie/test/IpTest.java17
-rw-r--r--src/test/java/cn/ac/iie/test/TimeTest.java42
51 files changed, 4341 insertions, 0 deletions
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..54e3b37
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,159 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>cn.ac.iie</groupId>
+ <artifactId>log-stream-topn</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>log-stream-topn</name>
+ <url>http://maven.apache.org</url>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.10.125:8099/content/groups/public</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>cn.ac.iie.topology.LogTopCountTopology</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kafka.version>1.0.0</kafka.version>
+ <storm.version>1.0.2</storm.version>
+ </properties>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>1.0.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.47</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.0.1</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/properties/kafka_topic.properties b/properties/kafka_topic.properties
new file mode 100644
index 0000000..6850851
--- /dev/null
+++ b/properties/kafka_topic.properties
@@ -0,0 +1,45 @@
+#kafka broker下的topic名称
+kafka.topic=SESSION-RECORD-COMPLETED-LOG
+
+#kafka消费group id
+group.id=top-test
+
+#输出Topic
+results.output.topics=DruidTest
+
+#1:Internal 内部主机 2:External 外部主机
+#3:Website 域名 4:Urls HTTP/HTTPS 5:User 活跃用户
+pattern.num=3
+
+#storm topology workers
+topology.workers=1
+
+#storm topology rolling count window length in seconds
+topology.top.window.length.secs=300
+
+#storm topology rolling count emit Frequency in seconds
+topology.top.emit.frequency.secs=60
+
+#storm topology intermediate rank emit Frequency in seconds
+topology.intermediate.emit.frequency.secs=70
+
+#storm topology total rank emit Frequency in seconds
+topology.taotal.emit.frequency.secs=300
+
+#storm topology intermediate top N
+topology.top.intermediate.n = 10
+
+#storm topology total top N
+topology.top.total.n = 3
+
+#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
+topology.spout.parallelism=3
+
+#storm bolt InternalBolt parallelism_hint
+topology.bolt.check.parallelism=3
+
+#storm bolt InternalCountBolt parallelism_hint
+topology.bolt.count.parallelism=3
+
+#storm bolt IntermediateRankingsBolt parallelism_hint
+topology.bolt.interRanker.parallelism=3
diff --git a/properties/realtime_routine.properties b/properties/realtime_routine.properties
new file mode 100644
index 0000000..b82ca49
--- /dev/null
+++ b/properties/realtime_routine.properties
@@ -0,0 +1,21 @@
+#管理kafka地址
+bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
+#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092
+
+#从kafka哪里开始读:earliest/latest
+auto.offset.reset=latest
+#auto.offset.reset=earliest
+
+#输出Kafka地址
+results.output.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
+#results.output.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092
+
+topology.config.max.spout.pending=150000
+
+topology.num.acks=1
+
+#允许发送kafka最大失败数
+max.failure.num=20
+
+#定位库地址
+ip.library=/home/ceiec/topology/dat/ \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java b/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java
new file mode 100644
index 0000000..757370a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java
@@ -0,0 +1,96 @@
+package cn.ac.iie.bolt;
+
+
+import cn.ac.iie.utils.Rankings;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ */
+public abstract class AbstractRankerBolt extends BaseBasicBolt {
+
+
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+ private static final int DEFAULT_COUNT = 10;
+ private static final long serialVersionUID = 5810280112411348355L;
+
+
+ private final int emitFrequencyInSeconds;
+ private Rankings rankings;
+
+ public AbstractRankerBolt() {
+ this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ public AbstractRankerBolt(int topN) {
+ this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+ if (topN < 1) {
+ throw new IllegalArgumentException("topN must be >=1 (you requested " + topN + ")");
+ }
+ if (emitFrequencyInSeconds < 1) {
+ throw new IllegalArgumentException("The time frequency must be >=1 (you requested " + emitFrequencyInSeconds + " seconds)");
+ }
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ rankings = new Rankings(topN);
+ }
+
+ public Rankings getRankings() {
+ return rankings;
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(input)) {
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+// getLogger().warn("rankings is null, please waiting...");
+ return;
+ }
+// getLogger().info("Received tick tuple, triggering emit of current rankings");
+ emitRankings(collector);
+ } else {
+ updateRankingsWithTuple(input);
+ }
+
+ }
+
+ abstract void updateRankingsWithTuple(Tuple tuple);
+
+ private void emitRankings(BasicOutputCollector collector) {
+ collector.emit(new Values(rankings.copy()));
+ rankings.clearRankedItem();
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("rankings"));
+
+ }
+
+ abstract Logger getLogger();
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java b/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java
new file mode 100644
index 0000000..abbc391
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java
@@ -0,0 +1,31 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.utils.Rankable;
+import cn.ac.iie.utils.RankableObjectWithFields;
+import org.apache.log4j.Logger;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * @author qidaijie
+ */
+public class IntermediateRankingsBolt extends AbstractRankerBolt {
+
+
+ private static final Logger logger = Logger.getLogger(IntermediateRankingsBolt.class);
+ private static final long serialVersionUID = -1954290502931145364L;
+
+ public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+ super(topN, emitFrequencyInSeconds);
+ }
+
+ @Override
+ void updateRankingsWithTuple(Tuple tuple) {
+ Rankable rankable = RankableObjectWithFields.from(tuple);
+ super.getRankings().updateWith(rankable);
+ }
+
+ @Override
+ Logger getLogger() {
+ return logger;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java b/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java
new file mode 100644
index 0000000..5f7e734
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java
@@ -0,0 +1,43 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.utils.Rankings;
+import org.apache.log4j.Logger;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * @author qidaijie
+ */
+public class TotalRankingsBolt extends AbstractRankerBolt {
+
+ private static final Logger logger = Logger.getLogger(TotalRankingsBolt.class);
+ private static final long serialVersionUID = 6256903686188823001L;
+
+ public TotalRankingsBolt() {
+ super();
+ }
+
+ public TotalRankingsBolt(int topN) {
+ super(topN);
+ }
+
+
+ public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
+ super(topN, emitFrequencyInSeconds);
+ }
+
+ @Override
+ void updateRankingsWithTuple(Tuple tuple) {
+ Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
+ super.getRankings().updateWith(rankingsToBeMerged);
+ super.getRankings().pruneZeroCounts();
+
+ logger.info("**********汇总中间统计结果开始****************");
+ logger.info(super.getRankings().toString());
+ logger.info("**********汇总中间统计结果结束****************");
+ }
+
+ @Override
+ Logger getLogger() {
+ return logger;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java
new file mode 100644
index 0000000..e22aba9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java
@@ -0,0 +1,128 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.ExternalHost;
+import cn.ac.iie.utils.*;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class ExternalOutPutBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = -7466372895556294634L;
+ private static final Logger logger = Logger.getLogger(ExternalOutPutBolt.class);
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (!TupleUtils.isTick(tuple)) {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+ logger.warn("rankings is null, don't insert...");
+ } else {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+// test(rankings);
+ }
+
+ collector.ack(tuple);
+ }
+
+ } catch (Exception e) {
+ logger.error("batch insert error:" + e);
+ collector.fail(tuple);
+
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = UniqueCollection.getTimesTamp();
+ ExternalHost externalHost = new ExternalHost();
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ String key = rankable.getObject().toString();
+ externalHost.setDestination(key);
+ externalHost.setProtocol("");
+ externalHost.setSession_num(rankable.getCount());
+ externalHost.setC2s_pkt_num((Long) obj.get(0));
+ externalHost.setS2c_pkt_num((Long) obj.get(1));
+ externalHost.setC2s_byte_num((Long) obj.get(2));
+ externalHost.setS2c_byte_num((Long) obj.get(3));
+ externalHost.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(externalHost));
+ } catch (Exception e) {
+ logger.error("外部主机统计解析数据异常:" + rankable.toString(), e);
+ }
+ }
+ return activeIpList;
+ }
+
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private static void test(Rankings rankings) {
+ long timesTamp = UniqueCollection.getTimesTamp();
+ ExternalHost externalHost = new ExternalHost();
+ int i = 1;
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ String key = rankable.getObject().toString();
+ externalHost.setDestination(key);
+ externalHost.setProtocol("");
+ externalHost.setSession_num(rankable.getCount());
+ externalHost.setC2s_pkt_num((Long) obj.get(0));
+ externalHost.setS2c_pkt_num((Long) obj.get(1));
+ externalHost.setC2s_byte_num((Long) obj.get(2));
+ externalHost.setS2c_byte_num((Long) obj.get(3));
+ externalHost.setStat_time(timesTamp);
+// logger.error(JSONObject.toJSONString(externalHost) + "\t\t名次:" + i);
+ i++;
+ } catch (Exception e) {
+ logger.error("外部主机统计解析数据异常:" + rankable.toString(), e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java
new file mode 100644
index 0000000..63685a2
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java
@@ -0,0 +1,97 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.InternalHost;
+import cn.ac.iie.utils.*;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class InternalOutPutBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = -7466372895556294634L;
+ private static final Logger logger = Logger.getLogger(InternalOutPutBolt.class);
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (!TupleUtils.isTick(tuple)) {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+ logger.warn("rankings is null, don't insert...");
+
+ } else {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+ }
+
+ collector.ack(tuple);
+ }
+
+ } catch (Exception e) {
+ logger.error("batch insert error:" + e);
+ collector.fail(tuple);
+
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = UniqueCollection.getTimesTamp();
+ InternalHost internalHost = new InternalHost();
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ internalHost.setSource(rankable.getObject().toString());
+ internalHost.setSession_num(rankable.getCount());
+ internalHost.setC2s_pkt_num((Long) obj.get(0));
+ internalHost.setS2c_pkt_num((Long) obj.get(1));
+ internalHost.setC2s_byte_num((Long) obj.get(2));
+ internalHost.setS2c_byte_num((Long) obj.get(3));
+ internalHost.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(internalHost));
+ } catch (Exception e) {
+ logger.error("内部主机统计解析数据异常:" + rankable.toString(), e);
+ }
+
+ }
+ return activeIpList;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java
new file mode 100644
index 0000000..1086d5a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java
@@ -0,0 +1,91 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.Urls;
+import cn.ac.iie.utils.*;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class UrlsOutPutBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = -7466372895556294634L;
+ private static final Logger logger = Logger.getLogger(UrlsOutPutBolt.class);
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isNotEmpty(rankings.getRankings())) {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+ }
+ collector.ack(tuple);
+ } catch (Exception e) {
+ logger.error("batch insert error:", e);
+ collector.fail(tuple);
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = UniqueCollection.getTimesTamp();
+ Urls urls = new Urls();
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ String url = rankable.getObject().toString();
+ urls.setUrl(url);
+ urls.setClient_unq_num((Long) obj.get(4));
+ urls.setServer_unq_num((Long) obj.get(5));
+ urls.setSession_num(rankable.getCount());
+ urls.setC2s_pkt_num((Long) obj.get(0));
+ urls.setS2c_pkt_num((Long) obj.get(1));
+ urls.setC2s_byte_num((Long) obj.get(2));
+ urls.setS2c_byte_num((Long) obj.get(3));
+ urls.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(urls));
+ } catch (Exception e) {
+ logger.error("http/https urls统计数据异常:" + rankable.toString(), e);
+ }
+
+ }
+ return activeIpList;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java
new file mode 100644
index 0000000..c11d13a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java
@@ -0,0 +1,97 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.User;
+import cn.ac.iie.utils.*;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class UserOutPutBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = -7466372895556294634L;
+ private static final Logger logger = Logger.getLogger(UserOutPutBolt.class);
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (!TupleUtils.isTick(tuple)) {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+ logger.warn("rankings is null, don't insert...");
+
+ } else {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+ }
+
+ collector.ack(tuple);
+ }
+
+ } catch (Exception e) {
+ logger.error("batch insert error:" + e);
+ collector.fail(tuple);
+
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = UniqueCollection.getTimesTamp();
+ User user = new User();
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ user.setSubscribe_id(rankable.getObject().toString());
+ user.setSession_num((Long) obj.get(4));
+ user.setC2s_pkt_num((Long) obj.get(0));
+ user.setS2c_pkt_num((Long) obj.get(1));
+ user.setC2s_byte_num((Long) obj.get(2));
+ user.setS2c_byte_num((Long) obj.get(3));
+ user.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(user));
+ } catch (Exception e) {
+ logger.error("活跃用户统计解析数据异常:" + rankable.toString(), e);
+ }
+
+ }
+ return activeIpList;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java
new file mode 100644
index 0000000..c3cefd2
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java
@@ -0,0 +1,100 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.WebSite;
+import cn.ac.iie.utils.*;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class WebsiteOutPutBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = -7466372895556294634L;
+ private static final Logger logger = Logger.getLogger(WebsiteOutPutBolt.class);
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (!TupleUtils.isTick(tuple)) {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+ logger.warn("rankings is null, don't insert...");
+
+ } else {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+ }
+
+ collector.ack(tuple);
+ }
+
+ } catch (Exception e) {
+ logger.error("batch insert error:" + e);
+ collector.fail(tuple);
+
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings
+ * @return
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = UniqueCollection.getTimesTamp();
+ WebSite webSite = new WebSite();
+ for (Rankable rankable : rankings.getRankings()) {
+ try {
+ List<Object> obj = rankable.getFields();
+ String domain = rankable.getObject().toString();
+ webSite.setDomain(domain);
+ webSite.setClient_unq_num((Long) obj.get(4));
+ webSite.setServer_unq_num((Long) obj.get(5));
+ webSite.setSession_num((Long) obj.get(6));
+ webSite.setC2s_pkt_num((Long) obj.get(0));
+ webSite.setS2c_pkt_num((Long) obj.get(1));
+ webSite.setC2s_byte_num((Long) obj.get(2));
+ webSite.setS2c_byte_num((Long) obj.get(3));
+ webSite.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(webSite));
+ } catch (Exception e) {
+ logger.error("网站域名统计解析数据异常:" + rankable.toString(), e);
+ }
+
+ }
+ return activeIpList;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java
new file mode 100644
index 0000000..d7eefb3
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java
@@ -0,0 +1,157 @@
+package cn.ac.iie.bolt.scatteredcalculate;
+
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.SlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class ExternalCountBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 7748967375246167982L;
+
+ private static final Logger logger = Logger.getLogger(ExternalCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final SlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+
+ public ExternalCountBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public ExternalCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objFiledMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) {
+
+
+ for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long[] arr = entry.getValue();
+ long count = arr[0];
+ long cpn = arr[1];
+ long spn = arr[2];
+ long cbn = arr[3];
+ long sbn = arr[4];
+ outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+
+ try {
+ Object obj = tuple.getValue(0);
+ long count = 1;
+ long cpn = 0;
+ long spn = 0;
+ long cbn = 0;
+ long sbn = 0;
+ if (StringUtil.isNotBlank(tuple.getString(1))) {
+ cpn = Long.parseLong(tuple.getString(1));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(2))) {
+ spn = Long.parseLong(tuple.getString(2));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(3))) {
+ cbn = Long.parseLong(tuple.getString(3));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(4))) {
+ sbn = Long.parseLong(tuple.getString(4));
+ }
+
+ counter.incrementCount(obj, count, cpn, spn, cbn, sbn);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java
new file mode 100644
index 0000000..f688710
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java
@@ -0,0 +1,155 @@
+package cn.ac.iie.bolt.scatteredcalculate;
+
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.SlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class InternalCountBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 7748967375246167982L;
+
+ private static final Logger logger = Logger.getLogger(InternalCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final SlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+ public InternalCountBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public InternalCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long[]> objfieldMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objfieldMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) {
+
+ for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long[] arr = entry.getValue();
+ Long count = arr[0];
+ Long cpn = arr[1];
+ Long spn = arr[2];
+ Long cbn = arr[3];
+ Long sbn = arr[4];
+ outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+
+ try {
+ Object obj = tuple.getValue(0);
+ long count = 1;
+ long cpn = 0;
+ long spn = 0;
+ long cbn = 0;
+ long sbn = 0;
+ if (StringUtil.isNotBlank(tuple.getString(1))) {
+ cpn = Long.parseLong(tuple.getString(1));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(2))) {
+ spn = Long.parseLong(tuple.getString(2));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(3))) {
+ cbn = Long.parseLong(tuple.getString(3));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(4))) {
+ sbn = Long.parseLong(tuple.getString(4));
+ }
+
+
+ counter.incrementCount(obj, count, cpn, spn, cbn, sbn);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java
new file mode 100644
index 0000000..d9738c2
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java
@@ -0,0 +1,173 @@
+package cn.ac.iie.bolt.scatteredcalculate;
+
+import cn.ac.iie.common.TopNCountConfig;
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.unique.urldimension.UrlSlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class UrlsCountBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 7748967375246167982L;
+
+ private static final Logger logger = Logger.getLogger(UrlsCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ private int max = 0;
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final UrlSlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+ public UrlsCountBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public UrlsCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new UrlSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ if (max == TopNCountConfig.TOPWINDOWLENGTH) {
+ UniqueCollection.clearWebContainer();
+ max = 0;
+ } else {
+ max += TopNCountConfig.TOPEMITFREQUENCYSECS;
+ }
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objFiledMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) {
+
+ for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long[] arr = entry.getValue();
+ Long count = arr[0];
+ Long cpn = arr[1];
+ Long spn = arr[2];
+ Long cbn = arr[3];
+ Long sbn = arr[4];
+ Long cli = arr[5];
+ Long ser = arr[6];
+
+ outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, cli, ser, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+
+ try {
+ Object obj = tuple.getValue(0);
+ long count = 1;
+ long cpn = 0;
+ long spn = 0;
+ long cbn = 0;
+ long sbn = 0;
+
+ if (StringUtil.isNotBlank(tuple.getString(1))) {
+ cpn = Long.parseLong(tuple.getString(1));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(2))) {
+ spn = Long.parseLong(tuple.getString(2));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(3))) {
+ cbn = Long.parseLong(tuple.getString(3));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(4))) {
+ sbn = Long.parseLong(tuple.getString(4));
+ }
+
+ UniqueCollection.setUrlCliUnique(tuple.getString(0), tuple.getString(5));
+ UniqueCollection.setUrlSerUnique(tuple.getString(0), tuple.getString(6));
+
+ long cli = UniqueCollection.getUrlCliUnique(String.valueOf(obj));
+ long ser = UniqueCollection.getUrlSerUnique(String.valueOf(obj));
+
+ counter.incrementCount(obj, count, cpn, spn, cbn, sbn,cli,ser);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "cli", "ser", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java
new file mode 100644
index 0000000..ab6924f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java
@@ -0,0 +1,160 @@
+package cn.ac.iie.bolt.scatteredcalculate;
+
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.SlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.bytecount.BytesSlidingWindowCounter;
+import cn.ac.iie.utils.unique.bytecount.BytesSlotBasedCounter;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class UserCountBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 7748967375246167982L;
+
+ private static final Logger logger = Logger.getLogger(UserCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final BytesSlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+ public UserCountBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public UserCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new BytesSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long[]> objfieldMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objfieldMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) {
+
+
+ for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long[] arr = entry.getValue();
+ Long count = arr[0];
+ Long cpn = arr[1];
+ Long spn = arr[2];
+ Long cbn = arr[3];
+ Long sbn = arr[4];
+ Long session = arr[5];
+ outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, session, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+
+ try {
+ Object obj = tuple.getValue(0);
+ long session = 1;
+ long cpn = 0;
+ long spn = 0;
+ long cbn = 0;
+ long sbn = 0;
+ if (StringUtil.isNotBlank(tuple.getString(1))) {
+ cpn = Long.parseLong(tuple.getString(1));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(2))) {
+ spn = Long.parseLong(tuple.getString(2));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(3))) {
+ cbn = Long.parseLong(tuple.getString(3));
+ }
+ if (StringUtil.isNotBlank(tuple.getString(4))) {
+ sbn = Long.parseLong(tuple.getString(4));
+ }
+
+ long count = cbn + sbn;
+
+ counter.incrementCount(obj, count, cpn, spn, cbn, sbn, session);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "session", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java
new file mode 100644
index 0000000..f4f810a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java
@@ -0,0 +1,175 @@
+package cn.ac.iie.bolt.scatteredcalculate;
+
+import cn.ac.iie.common.TopNCountConfig;
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.unique.urldimension.UrlSlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import cn.ac.iie.utils.unique.UniqueCollection;
+import cn.ac.iie.utils.unique.webdimension.WebSlidingWindowCounter;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class WebsiteCountBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 7748967375246167982L;
+
+ private static final Logger logger = Logger.getLogger(WebsiteCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final WebSlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+ private int max;
+
+ public WebsiteCountBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public WebsiteCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new WebSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ if (max == TopNCountConfig.TOPWINDOWLENGTH) {
+ UniqueCollection.clearWebContainer();
+ max = 0;
+ } else {
+ max += TopNCountConfig.TOPEMITFREQUENCYSECS;
+ }
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objFiledMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) {
+
+
+ for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long[] arr = entry.getValue();
+ Long count = arr[0];
+ Long cpn = arr[1];
+ Long spn = arr[2];
+ Long cbn = arr[3];
+ Long sbn = arr[4];
+ Long cli = arr[5];
+ Long ser = arr[6];
+ Long session = arr[7];
+
+ outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, cli, ser, session, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+
+ try {
+ Object obj = tuple.getValue(0);
+ long session = 1;
+ long cpn = 0;
+ long spn = 0;
+ long cbn = 0;
+ long sbn = 0;
+ if (tuple.getValue(1) != null && tuple.getValue(1) != "") {
+ cpn = Long.parseLong(tuple.getValue(1).toString());
+ }
+ if (tuple.getValue(2) != null && tuple.getValue(2) != "") {
+ spn = Long.parseLong(tuple.getValue(2).toString());
+ }
+ if (tuple.getValue(3) != null && tuple.getValue(3) != "") {
+ cbn = Long.parseLong(tuple.getValue(3).toString());
+ }
+ if (tuple.getValue(4) != null && tuple.getValue(4) != "") {
+ sbn = Long.parseLong(tuple.getValue(4).toString());
+ }
+ long count = cbn + sbn;
+
+ UniqueCollection.setUrlCliUnique(tuple.getString(0), tuple.getString(5));
+ UniqueCollection.setUrlSerUnique(tuple.getString(0), tuple.getString(6));
+
+ long cli = UniqueCollection.getUrlCliUnique(String.valueOf(obj));
+ long ser = UniqueCollection.getUrlSerUnique(String.valueOf(obj));
+
+ counter.incrementCount(obj, count, cpn, spn, cbn, sbn, cli, ser, session);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "cli", "ser", "session", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java
new file mode 100644
index 0000000..d35db8a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java
@@ -0,0 +1,49 @@
+package cn.ac.iie.bolt.segmentation;
+
+import cn.ac.iie.utils.unique.UniqueCollection;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * 切分原始日志获取原始日志中 外部主机
+ *
+ * @author qidaijie
+ */
+public class ExternalBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(ExternalBolt.class);
+ private static final long serialVersionUID = -1599076545635998972L;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ String key = jsonObject.getString("server_ip");
+ if (StringUtil.isNotBlank(key)) {
+ collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"),
+ jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num")));
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拆分原始日志出现异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java
new file mode 100644
index 0000000..09f5f34
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java
@@ -0,0 +1,50 @@
+package cn.ac.iie.bolt.segmentation;
+
+import cn.ac.iie.utils.TopUtils;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * 切分原始日志 获取内部主机
+ *
+ * @author qidaijie
+ */
+public class InternalBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(InternalBolt.class);
+ private static final long serialVersionUID = -1599076545635998972L;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ String key = jsonObject.getString("client_ip");
+ if (StringUtil.isNotBlank(key) && TopUtils.isKazakhstan(key)) {
+ collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"),
+ jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num")));
+ }
+
+ }
+ } catch (Exception e) {
+ logger.error("拆分原始日志出现异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java
new file mode 100644
index 0000000..b6ae0c4
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java
@@ -0,0 +1,50 @@
+package cn.ac.iie.bolt.segmentation;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+
+/**
+ * 切分原始日志
+ *
+ * @author qidaijie
+ */
+public class UrlsBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(UrlsBolt.class);
+ private static final long serialVersionUID = -1599076545635998972L;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ String key = jsonObject.getString("url");
+ if (StringUtil.isNotBlank(key)) {
+ collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"),
+ jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"),
+ jsonObject.getString("client_ip"), jsonObject.getString("server_ip")));
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拆分原始日志出现异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn","cli","ser"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java
new file mode 100644
index 0000000..d707cdc
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java
@@ -0,0 +1,49 @@
+package cn.ac.iie.bolt.segmentation;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * 切分原始日志
+ *
+ * @author qidaijie
+ */
+public class UserBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(UserBolt.class);
+ private static final long serialVersionUID = -1599076545635998972L;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ String key = jsonObject.getString("subscribe_id");
+// String key = jsonObject.getString("client_ip");
+ if (StringUtil.isNotBlank(key)) {
+ collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"),
+ jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num")));
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拆分原始日志出现异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java
new file mode 100644
index 0000000..d94eba3
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java
@@ -0,0 +1,50 @@
+package cn.ac.iie.bolt.segmentation;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+
+/**
+ * 切分原始日志
+ *
+ * @author qidaijie
+ */
+public class WebSiteBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(WebSiteBolt.class);
+ private static final long serialVersionUID = -1599076545635998972L;
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ String key = jsonObject.getString("domain");
+ if (StringUtil.isNotBlank(key)) {
+ collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"),
+ jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"),
+ jsonObject.getString("client_ip"), jsonObject.getString("server_ip")));
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拆分原始日志出现异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn", "cli", "ser"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/common/TopNCountConfig.java b/src/main/java/cn/ac/iie/common/TopNCountConfig.java
new file mode 100644
index 0000000..63c2fdb
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/TopNCountConfig.java
@@ -0,0 +1,45 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.system.TopNCountConfigurations;
+
+import java.io.Serializable;
+
+public class TopNCountConfig implements Serializable {
+
+ private static final long serialVersionUID = -8649024767966235184L;
+
+ public static final Integer SPOUT_PARALLELISM = TopNCountConfigurations.getIntProperty(1, "topology.spout.parallelism");
+ public static final Integer WORDNORMALIZER = TopNCountConfigurations.getIntProperty(1, "topology.bolt.check.parallelism");
+ public static final Integer ROLLINGCOUNT = TopNCountConfigurations.getIntProperty(1, "topology.bolt.count.parallelism");
+ public static final Integer INTERMEDIATERANKINGS = TopNCountConfigurations.getIntProperty(1, "topology.bolt.interRanker.parallelism");
+
+
+ public static final int TOPEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.top.emit.frequency.secs");
+ public static final int INTERMEDIATEEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.intermediate.emit.frequency.secs");
+ public static final int TAOTALEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.taotal.emit.frequency.secs");
+ public static final int TOPINTERMEDIATEN = TopNCountConfigurations.getIntProperty(1, "topology.top.intermediate.n");
+ public static final int TOPTOTALN = TopNCountConfigurations.getIntProperty(1, "topology.top.total.n");
+ public static final int TOPWINDOWLENGTH = TopNCountConfigurations.getIntProperty(1, "topology.top.window.length.secs");
+
+ public static final int PATTERN = TopNCountConfigurations.getIntProperty(1, "pattern.num");
+ public static final String RESULTSOUTPUTTOPIC = TopNCountConfigurations.getStringProperty(1, "results.output.topics");
+
+
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = TopNCountConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = TopNCountConfigurations.getIntProperty(0, "topology.num.acks");
+ public static final Integer MAX_FAILURE_NUM = TopNCountConfigurations.getIntProperty(0, "max.failure.num");
+ public static final String IP_LIBRARY = TopNCountConfigurations.getStringProperty(0, "ip.library");
+
+ /**
+ * Kafka
+ */
+ public static final String GROUP_ID = TopNCountConfigurations.getStringProperty(1, "group.id");
+ public static final String KAFKA_TOPIC = TopNCountConfigurations.getStringProperty(1, "kafka.topic");
+
+ public static final Integer TOPOLOGY_WORKERS = TopNCountConfigurations.getIntProperty(1, "topology.workers");
+ public static final String BOOTSTRAP_SERVERS = TopNCountConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final String RESULTS_OUTPUT_SERVERS = TopNCountConfigurations.getStringProperty(0, "results.output.servers");
+ public static final String AUTO_OFFSET_RESET = TopNCountConfigurations.getStringProperty(0, "auto.offset.reset");
+
+
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/model/ExternalHost.java b/src/main/java/cn/ac/iie/model/ExternalHost.java
new file mode 100644
index 0000000..8a3e9f8
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/ExternalHost.java
@@ -0,0 +1,82 @@
+package cn.ac.iie.model;
+
+/**
+ * @author qidaijie
+ */
+public class ExternalHost {
+ private String destination;
+ private String protocol;
+ private long session_num;
+ private long c2s_pkt_num;
+ private long s2c_pkt_num;
+ private long c2s_byte_num;
+ private long s2c_byte_num;
+ private long stat_time;
+
+ public ExternalHost() {
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/model/InternalHost.java b/src/main/java/cn/ac/iie/model/InternalHost.java
new file mode 100644
index 0000000..b1de1ed
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/InternalHost.java
@@ -0,0 +1,73 @@
+package cn.ac.iie.model;
+
+/**
+ * @author qidaijie
+ */
+public class InternalHost {
+ private String source;
+ private long session_num;
+ private long c2s_pkt_num;
+ private long s2c_pkt_num;
+ private long c2s_byte_num;
+ private long s2c_byte_num;
+ private long stat_time;
+
+ public InternalHost() {
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/model/Urls.java b/src/main/java/cn/ac/iie/model/Urls.java
new file mode 100644
index 0000000..5cfb5a5
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/Urls.java
@@ -0,0 +1,88 @@
+package cn.ac.iie.model;
+
+public class Urls {
+ private String url;
+ private long client_unq_num;
+ private long server_unq_num;
+ private long session_num;
+ private long c2s_pkt_num;
+ private long s2c_pkt_num;
+ private long c2s_byte_num;
+ private long s2c_byte_num;
+ private long stat_time;
+
+ public Urls() {
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getClient_unq_num() {
+ return client_unq_num;
+ }
+
+ public void setClient_unq_num(long client_unq_num) {
+ this.client_unq_num = client_unq_num;
+ }
+
+ public long getServer_unq_num() {
+ return server_unq_num;
+ }
+
+ public void setServer_unq_num(long server_unq_num) {
+ this.server_unq_num = server_unq_num;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/model/User.java b/src/main/java/cn/ac/iie/model/User.java
new file mode 100644
index 0000000..1ab9c6a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/User.java
@@ -0,0 +1,70 @@
+package cn.ac.iie.model;
+
+public class User {
+ private String subscribe_id;
+ private long session_num;
+ private long c2s_pkt_num;
+ private long s2c_pkt_num;
+ private long c2s_byte_num;
+ private long s2c_byte_num;
+ private long stat_time;
+
+ public User() {
+ }
+
+ public String getSubscribe_id() {
+ return subscribe_id;
+ }
+
+ public void setSubscribe_id(String subscribe_id) {
+ this.subscribe_id = subscribe_id;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/model/WebSite.java b/src/main/java/cn/ac/iie/model/WebSite.java
new file mode 100644
index 0000000..4f42d03
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/WebSite.java
@@ -0,0 +1,91 @@
+package cn.ac.iie.model;
+
+/**
+ * @author qidaijie
+ */
+public class WebSite {
+ private String domain;
+ private long client_unq_num;
+ private long server_unq_num;
+ private long session_num;
+ private long c2s_pkt_num;
+ private long s2c_pkt_num;
+ private long c2s_byte_num;
+ private long s2c_byte_num;
+ private long stat_time;
+
+ public WebSite() {
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getClient_unq_num() {
+ return client_unq_num;
+ }
+
+ public void setClient_unq_num(long client_unq_num) {
+ this.client_unq_num = client_unq_num;
+ }
+
+ public long getServer_unq_num() {
+ return server_unq_num;
+ }
+
+ public void setServer_unq_num(long server_unq_num) {
+ this.server_unq_num = server_unq_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
new file mode 100644
index 0000000..8904556
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -0,0 +1,79 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.TopNCountConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public class CustomizedKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", TopNCountConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", TopNCountConfig.GROUP_ID);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", TopNCountConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList(TopNCountConfig.KAFKA_TOPIC));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(1);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("实时统计程序SPOUT出现异常----> " + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java
new file mode 100644
index 0000000..64e6c30
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java
@@ -0,0 +1,137 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.*;
+import cn.ac.iie.bolt.resultoutput.*;
+import cn.ac.iie.bolt.scatteredcalculate.*;
+import cn.ac.iie.bolt.segmentation.*;
+import cn.ac.iie.common.TopNCountConfig;
+import cn.ac.iie.spout.CustomizedKafkaSpout;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * @author Administrator
+ */
+
+public class LogTopCountTopology {
+ private static Logger logger = Logger.getLogger(LogTopCountTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ private LogTopCountTopology() {
+ this(LogTopCountTopology.class.getSimpleName());
+ }
+
+ private LogTopCountTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setMaxSpoutPending(TopNCountConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ conf.setNumAckers(TopNCountConfig.TOPOLOGY_NUM_ACKS);
+ return conf;
+ }
+
+ private void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(TopNCountConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("RealtimeCountSpout", new CustomizedKafkaSpout(), TopNCountConfig.SPOUT_PARALLELISM);
+ switch (TopNCountConfig.PATTERN) {
+ case 1:
+ builder.setBolt("InternalNormalizer", new InternalBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout");
+ builder.setBolt("InternalCount", new InternalCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS),
+ TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("InternalNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN,
+ TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("InternalCount", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("InternalOutPutBolt", new InternalOutPutBolt()).globalGrouping("finalRanker");
+ break;
+ case 2:
+ builder.setBolt("ExternalNormalizer", new ExternalBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout");
+ builder.setBolt("ExternalCount", new ExternalCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS),
+ TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("ExternalNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN,
+ TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("ExternalCount", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("ExternalOutPutBolt", new ExternalOutPutBolt()).globalGrouping("finalRanker");
+ break;
+ case 3:
+ builder.setBolt("WebsiteNormalizer", new WebSiteBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout");
+ builder.setBolt("WebsiteCount", new WebsiteCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS),
+ TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("WebsiteNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN,
+ TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("WebsiteCount", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("WebsiteOutPutBolt", new WebsiteOutPutBolt()).globalGrouping("finalRanker");
+ break;
+ case 4:
+ builder.setBolt("UrlsNormalizer", new UrlsBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout");
+ builder.setBolt("UrlsCount", new UrlsCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS),
+ TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("UrlsNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN,
+ TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("UrlsCount", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("UrlsOutPutBolt", new UrlsOutPutBolt()).globalGrouping("finalRanker");
+ break;
+ case 5:
+ builder.setBolt("UserNormalizer", new UserBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout");
+ builder.setBolt("UserCount", new UserCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS),
+ TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("UserNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN,
+ TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("UserCount", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("UserOutPutBolt", new UserOutPutBolt()).globalGrouping("finalRanker");
+ break;
+ default:
+ }
+
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogTopCountTopology logTopCountTopology;
+ boolean runLocally = true;
+ String command = "remote";
+ if (args.length >= 2 && command.equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ logTopCountTopology = new LogTopCountTopology(args[0]);
+ } else {
+ logTopCountTopology = new LogTopCountTopology();
+ }
+ logTopCountTopology.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ logTopCountTopology.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ logTopCountTopology.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
new file mode 100644
index 0000000..65ddd98
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -0,0 +1,35 @@
+package cn.ac.iie.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * @author qidaijie
+ */
+final class StormRunner {
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {
+ }
+
+ static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+ }
+
+ static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java b/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java
new file mode 100644
index 0000000..e7c8f16
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java
@@ -0,0 +1,45 @@
+package cn.ac.iie.utils;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.storm.utils.Time;
+
+/**
+ * @author qidaijie
+ */
+public class NthLastModifiedTimeTracker {
+ private static final int MILLIS_IN_SEC = 1000;
+ private final CircularFifoBuffer lastModifiedTimesMillis;
+
+ public NthLastModifiedTimeTracker(int numTimesToTrack) {
+ if (numTimesToTrack < 1) {
+ throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
+ }
+ lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
+ initLastModifiedTimesMillis();
+ }
+
+ private void initLastModifiedTimesMillis() {
+ long nowCached = now();
+ for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
+ lastModifiedTimesMillis.add(nowCached);
+ }
+ }
+
+ private long now() {
+ return Time.currentTimeMillis();
+ }
+
+ public int secondsSinceOldestModification() {
+ long modifiedTimeMillis = (Long) lastModifiedTimesMillis.get();
+ return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
+ }
+
+ public void markAsModified() {
+ updateLastModifiedTime();
+ }
+
+ private void updateLastModifiedTime() {
+ lastModifiedTimesMillis.add(now());
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/Rankable.java b/src/main/java/cn/ac/iie/utils/Rankable.java
new file mode 100644
index 0000000..dc3ce97
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/Rankable.java
@@ -0,0 +1,11 @@
+package cn.ac.iie.utils;
+
+import java.util.List;
+
+public interface Rankable extends Comparable<Rankable> {
+ Object getObject();
+ long getCount();
+ List<Object> getFields();
+
+ Rankable copy();
+}
diff --git a/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java b/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java
new file mode 100644
index 0000000..6d3d0fa
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java
@@ -0,0 +1,119 @@
+package cn.ac.iie.utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class RankableObjectWithFields implements Rankable, Serializable {
+ private static final long serialVersionUID = -5291345336126830843L;
+ private final Object obj;
+ private final long count;
+ private final ImmutableList<Object> fields;
+ private static final String toStringSeparator = "|";
+
+ public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
+ if (obj == null) {
+ throw new IllegalArgumentException("The object must not be null");
+ }
+ if (count < 0) {
+ throw new IllegalArgumentException("The count must be >=0");
+ }
+ this.obj = obj;
+ this.count = count;
+ fields = ImmutableList.copyOf(otherFields);
+ }
+
+ public static RankableObjectWithFields from(Tuple tuple) {
+ List<Object> otherFields = Lists.newArrayList(tuple.getValues());
+ Object obj = otherFields.remove(0);
+ Long count = (Long) otherFields.remove(0);
+ return new RankableObjectWithFields(obj, count, otherFields.toArray());
+ }
+
+
+ @Override
+ public int compareTo(Rankable other) {
+ long delta = this.getCount() - other.getCount();
+ if (delta > 0) {
+ return 1;
+ } else if (delta < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+
+
+ @Override
+ public Object getObject() {
+ return obj;
+ }
+
+ @Override
+ public long getCount() {
+ return count;
+ }
+
+ @Override
+ public List<Object> getFields() {
+ return fields;
+ }
+
+ @Override
+ public Rankable copy() {
+ List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
+ return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (count ^ (count >>> 32));
+ result = prime * result + ((obj == null) ? 0 : obj.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ RankableObjectWithFields other = (RankableObjectWithFields) obj;
+ if (this.obj == null) {
+ if (other.obj != null) {
+ return false;
+ }
+ } else if (!this.obj.equals(other.obj)) {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("[");
+ buf.append(obj);
+ buf.append(toStringSeparator);
+ buf.append(count);
+ for (Object field : fields) {
+ buf.append(toStringSeparator);
+ buf.append(field);
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+}
+
diff --git a/src/main/java/cn/ac/iie/utils/Rankings.java b/src/main/java/cn/ac/iie/utils/Rankings.java
new file mode 100644
index 0000000..5213891
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/Rankings.java
@@ -0,0 +1,124 @@
+package cn.ac.iie.utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+
+public class Rankings implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -169358781406063489L;
+ private static final int DEFAULT_COUNT = 10;
+
+ private final int maxSize;
+ private final List<Rankable> rankedItems = Lists.newArrayList();
+
+ public Rankings() {
+ this(DEFAULT_COUNT);
+ }
+
+ public Rankings(int topN) {
+ if (topN < 1) {
+ throw new IllegalArgumentException("topN must be >= 1");
+ }
+ maxSize = topN;
+ }
+
+ public Rankings(Rankings other) {
+ this(other.maxSize());
+ updateWith(other);
+ }
+
+ public int maxSize() {
+ return maxSize;
+ }
+
+ public int size() {
+ return rankedItems.size();
+ }
+
+ public List<Rankable> getRankings() {
+ List<Rankable> copyRankList = Lists.newLinkedList();
+ copyRankList.addAll(rankedItems);
+ return ImmutableList.copyOf(copyRankList);
+ }
+
+ public void updateWith(Rankings other) {
+ for (Rankable r : other.getRankings()) {
+ updateWith(r);
+ }
+ }
+
+ public void updateWith(Rankable r) {
+ synchronized (rankedItems) {
+ addOrReplace(r);
+ rerank();
+ shrinkRankingsIfNeeded();
+ }
+ }
+
+ public void clearRankedItem() {
+ rankedItems.clear();
+ }
+
+ private void addOrReplace(Rankable r) {
+
+ Integer rank = findRankOf(r);
+ if (rank != null) {
+ rankedItems.set(rank, r);
+ } else {
+ rankedItems.add(r);
+ }
+ }
+
+
+ private Integer findRankOf(Rankable r) {
+ Object tag = r.getObject();
+ for (int rank = 0; rank < rankedItems.size(); rank++) {
+ Object cur = rankedItems.get(rank).getObject();
+ if (cur.equals(tag)) {
+ return rank;
+ }
+ }
+ return null;
+ }
+
+ private void rerank() {
+ Collections.sort(rankedItems);
+ Collections.reverse(rankedItems);
+ }
+
+ private void shrinkRankingsIfNeeded() {
+ if (rankedItems.size() > maxSize) {
+ rankedItems.remove(maxSize);
+ }
+ }
+
+ public void pruneZeroCounts() {
+ int i = 0;
+ while (i < rankedItems.size()) {
+ if (rankedItems.get(i).getCount() == 0) {
+ rankedItems.remove(i);
+ } else {
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return rankedItems.toString();
+ }
+
+ public Rankings copy() {
+ return new Rankings(this);
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java
new file mode 100644
index 0000000..95e99b0
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java
@@ -0,0 +1,58 @@
+package cn.ac.iie.utils;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public final class SlidingWindowCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = 7540606699600042717L;
+ private SlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
+
+ public SlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ + windowLengthInSlots + ")");
+ }
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
+
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
+
+ public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn) {
+ objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,headSlot);
+ }
+
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long[]> getCountsThenAdvanceWindow() {
+ Map<T,Long[]> objCountMap = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return objCountMap;
+ }
+
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java
new file mode 100644
index 0000000..172a3a1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java
@@ -0,0 +1,164 @@
+package cn.ac.iie.utils;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SlotBasedCounter<T> implements Serializable {
+ private static final long serialVersionUID = 4935449009558219987L;
+
+ private final Map<T,long[]> objToCounts = new HashMap<>();
+ private final Map<T,long[]> objToCpn = new HashMap<>();
+ private final Map<T,long[]> objToSpn = new HashMap<>();
+ private final Map<T,long[]> objToCbn = new HashMap<>();
+ private final Map<T,long[]> objToSbn = new HashMap<>();
+
+ private final int numSlots;
+
+ public SlotBasedCounter(int numSlots) {
+ if(numSlots <= 0){
+ throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested "+numSlots+")");
+ }
+ this.numSlots = numSlots;
+ }
+
+ /**
+ *
+ * @param obj
+ * @param slot
+ */
+ public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
+ }
+ counts[slot] += count;
+
+ long[] cpns = objToCpn.get(obj);
+ if (cpns == null) {
+ cpns = new long[this.numSlots];
+ objToCpn.put(obj, cpns);
+ }
+ cpns[slot] += cpn;
+
+ long[] spns = objToSpn.get(obj);
+ if (spns == null) {
+ spns = new long[this.numSlots];
+ objToSpn.put(obj, spns);
+ }
+ spns[slot] += spn;
+
+ long[] cbns = objToCbn.get(obj);
+ if (cbns == null) {
+ cbns = new long[this.numSlots];
+ objToCbn.put(obj, cbns);
+ }
+ cbns[slot] += cbn;
+
+ long[] sbns = objToSbn.get(obj);
+ if (sbns == null) {
+ sbns = new long[this.numSlots];
+ objToSbn.put(obj, sbns);
+ }
+ sbns[slot] += sbn;
+ }
+
+ public long getCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ return 0;
+ } else {
+ return counts[slot];
+ }
+
+ }
+
+ public Map<T,Long[]> getCounts() {
+ Map<T,Long[]> result = new HashMap<T, Long[]>();
+ for(Map.Entry<T, long[]> entry : objToCounts.entrySet()){
+ Long[] numbers = new Long[5];
+ numbers[0]=computerTotalCount(entry.getKey(),objToCounts);
+ numbers[1]=computerTotalCount(entry.getKey(),objToCpn);
+ numbers[2]=computerTotalCount(entry.getKey(),objToSpn);
+ numbers[3]=computerTotalCount(entry.getKey(),objToCbn);
+ numbers[4]=computerTotalCount(entry.getKey(),objToSbn);
+ result.put(entry.getKey(),numbers);
+ }
+ return result;
+ }
+
+
+ private long computerTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ total += count;
+ }
+ return total;
+ }
+
+
+
+ /**
+ * Reset the slot count of any tracked objects to zero for given slot
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj,slot);
+ }
+
+ }
+
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ long[] cpns = objToCpn.get(obj);
+ long[] spns = objToSpn.get(obj);
+ long[] cbls = objToCbn.get(obj);
+ long[] sbls = objToSbn.get(obj);
+
+ counts[slot] = 0;
+ cpns[slot] = 0;
+ spns[slot] = 0;
+ cbls[slot] = 0;
+ sbls[slot] = 0;
+
+ }
+
+
+ private boolean shouldBeRemovedfromCounter(T obj) {
+ return computerTotalCount(obj,objToCounts) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+
+ for (T obj :objToCounts.keySet()) {
+ if (shouldBeRemovedfromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
+ objToCpn.remove(obj);
+ objToSpn.remove(obj);
+ objToCbn.remove(obj);
+ objToSbn.remove(obj);
+
+ }
+
+
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/TopUtils.java b/src/main/java/cn/ac/iie/utils/TopUtils.java
new file mode 100644
index 0000000..c145fd1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/TopUtils.java
@@ -0,0 +1,22 @@
+package cn.ac.iie.utils;
+
+import cn.ac.iie.common.TopNCountConfig;
+import com.zdjizhi.utils.IpLookup;
+
+/**
+ * @author qidaijie
+ */
+public class TopUtils {
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(TopNCountConfig.IP_LIBRARY + "Kazakhstan.mmdb")
+ .loadDataFileV6(TopNCountConfig.IP_LIBRARY + "Kazakhstan.mmdb")
+ .build();
+
+ public static boolean isKazakhstan(String ip) {
+ String[] ipAddress = ipLookup.cityLookupDetail(ip).split("\t");
+ String country = ipAddress[2];
+ return "Kazakhstan".equals(country) || "N/A".equals(country);
+// return "China".equals(country) || "N/A".equals(country);
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java b/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java
new file mode 100644
index 0000000..1edd86f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java
@@ -0,0 +1,79 @@
+package cn.ac.iie.utils.kafka;
+
+import cn.ac.iie.common.TopNCountConfig;
+import org.apache.kafka.clients.producer.*;
+import org.apache.log4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+/**
+ * NTC系统配置产生日志写入数据中心类
+ *
+ * @author Administrator
+ * @create 2018-08-13 15:11
+ */
+
+public class ResultToKafka {
+ private static Logger logger = Logger.getLogger(ResultToKafka.class);
+
+ /**
+ * kafka生产者,用于向kafka中发送消息
+ */
+ private static Producer<String, String> kafkaProducer;
+
+ /**
+ * kafka生产者适配器(单例),用来代理kafka生产者发送消息
+ */
+ private static ResultToKafka resultToKafka;
+
+ private ResultToKafka() {
+ initKafkaProducer();
+ }
+
+ public static ResultToKafka getInstance() {
+ if (resultToKafka == null) {
+ resultToKafka = new ResultToKafka();
+ }
+ return resultToKafka;
+ }
+
+
+ public void sendMessage(LinkedList<String> list) {
+ final int[] errorSum = {0};
+ for (String value : list) {
+ kafkaProducer.send(new ProducerRecord<>(TopNCountConfig.RESULTSOUTPUTTOPIC, value), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("发送kafka出现异常", exception);
+ errorSum[0]++;
+ }
+ }
+ });
+ if (errorSum[0] >= TopNCountConfig.MAX_FAILURE_NUM) {
+ logger.error("超出最大容忍错误数抛弃数据条数:" + list.size());
+ list.clear();
+ }
+ }
+ kafkaProducer.flush();
+ logger.warn("Log sent to National Center successfully!!!!!");
+ }
+
+ /**
+ * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
+ */
+ private void initKafkaProducer() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", TopNCountConfig.RESULTS_OUTPUT_SERVERS);
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+ properties.put("linger.ms", "2");
+ properties.put("request.timeout.ms", 20000);
+ properties.put("batch.size", 262144);
+ properties.put("buffer.memory", 33554432);
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java b/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java
new file mode 100644
index 0000000..f39fa06
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java
@@ -0,0 +1,65 @@
+package cn.ac.iie.utils.system;
+
+import java.util.Properties;
+
+
+public final class TopNCountConfigurations {
+
+ private static Properties propCommon = new Properties();
+ private static Properties propService = new Properties();
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propCommon.getProperty(key);
+ } else if (type == 1) {
+ return propService.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propCommon.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propCommon.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+ } else if (type == 1) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(TopNCountConfigurations.class.getClassLoader().getResourceAsStream("realtime_routine.properties"));
+ propService.load(TopNCountConfigurations.class.getClassLoader().getResourceAsStream("kafka_topic.properties"));
+ /*prop.load(new FileInputStream(System.getProperty("user.dir")
+ + File.separator + "config"+File.separator + "config.properties"));*/
+ System.out.println("kafka_topic.properties加载成功");
+ } catch (Exception e) {
+ propCommon = null;
+ propService = null;
+ System.err.println("Configuration loading failed!!");
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/system/TupleUtils.java b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java
new file mode 100644
index 0000000..22e12e5
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java
@@ -0,0 +1,13 @@
+package cn.ac.iie.utils.system;
+
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+
+public final class TupleUtils {
+ //判断是否系统自动发送的Tuple
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java b/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java
new file mode 100644
index 0000000..5c438d9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java
@@ -0,0 +1,103 @@
+package cn.ac.iie.utils.unique;
+
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+/**
+ * @author qidaijie
+ */
+public class UniqueCollection {
+ private static final Logger logger = Logger.getLogger(UniqueCollection.class);
+
+ private static Map<String, Set<String>> urlCliUnique = new HashMap<>(16);
+
+ private static Map<String, Set<String>> urlSerUnique = new HashMap<>(16);
+
+
+ /**
+ * 客户端独立IP
+ *
+ * @param key url
+ * @param value client_ip
+ */
+ public static void setUrlCliUnique(String key, String value) {
+ if (urlCliUnique.containsKey(key)) {
+ Set<String> set = new HashSet<>(urlCliUnique.get(key));
+ set.add(value);
+ urlCliUnique.put(key, set);
+ } else {
+ urlCliUnique.put(key, Collections.singleton(value));
+ }
+ }
+
+ /**
+ * 服务端独立IP
+ *
+ * @param key url
+ * @param value server_ip
+ */
+ public static void setUrlSerUnique(String key, String value) {
+ if (urlSerUnique.containsKey(key)) {
+ Set<String> set = new HashSet<>(urlSerUnique.get(key));
+ set.add(value);
+ urlSerUnique.put(key, set);
+ } else {
+ urlSerUnique.put(key, Collections.singleton(value));
+ }
+ }
+
+
+ /**
+ * 根据key获取Cli ip 数
+ *
+ * @param key
+ * @return
+ */
+ public static long getUrlCliUnique(String key) {
+ int size = 0;
+ try {
+ size = urlCliUnique.get(key).size();
+ } catch (Exception e) {
+ logger.error("容器内没有key,获取value错误");
+ }
+ return size;
+ }
+
+ /**
+ * 根据key获取Ser ip 数
+ *
+ * @param key
+ * @return
+ */
+ public static long getUrlSerUnique(String key) {
+ int size = 0;
+ try {
+ size = urlSerUnique.get(key).size();
+ } catch (Exception e) {
+ logger.error("容器内没有key,获取value错误");
+ }
+ return size;
+ }
+
+
+ /**
+ * 清理容器
+ */
+ public static void clearWebContainer() {
+ urlCliUnique.clear();
+ urlSerUnique.clear();
+ }
+
+
+ /**
+ * 获取当前时间的时间戳(秒)
+ *
+ * @return 时间戳 秒
+ */
+ public static long getTimesTamp() {
+ Date date = new Date();
+ String timestamp = String.valueOf(date.getTime() / 1000);
+ return Integer.valueOf(timestamp);
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java
new file mode 100644
index 0000000..781ae8f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java
@@ -0,0 +1,60 @@
+package cn.ac.iie.utils.unique.bytecount;
+
+import cn.ac.iie.utils.SlotBasedCounter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public final class BytesSlidingWindowCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = 7540606699600042717L;
+ private BytesSlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
+
+ public BytesSlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ + windowLengthInSlots + ")");
+ }
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new BytesSlotBasedCounter<T>(this.windowLengthInSlots);
+
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
+
+ public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long session) {
+ objCounter.incrementCount(obj, count, cpn, spn, cbn, sbn, session, headSlot);
+ }
+
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long[]> getCountsThenAdvanceWindow() {
+ Map<T, Long[]> objCountMap = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return objCountMap;
+ }
+
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java
new file mode 100644
index 0000000..6a15549
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java
@@ -0,0 +1,178 @@
+package cn.ac.iie.utils.unique.bytecount;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author ZDJZ
+ */
+public class BytesSlotBasedCounter<T> implements Serializable {
+ private static final long serialVersionUID = 4935449009558219987L;
+
+ private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
+ private final Map<T, long[]> objToCpn = new HashMap<T, long[]>();
+ private final Map<T, long[]> objToSpn = new HashMap<T, long[]>();
+ private final Map<T, long[]> objToCbn = new HashMap<T, long[]>();
+ private final Map<T, long[]> objToSbn = new HashMap<T, long[]>();
+ private final Map<T, long[]> objToSession = new HashMap<T, long[]>();
+
+ private final int numSlots;
+
+ public BytesSlotBasedCounter(int numSlots) {
+ if (numSlots <= 0) {
+ throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")");
+ }
+ this.numSlots = numSlots;
+ }
+
+ /**
+ * @param obj
+ * @param slot
+ */
+ public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long session, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
+ }
+ counts[slot] += count;
+
+ long[] cpns = objToCpn.get(obj);
+ if (cpns == null) {
+ cpns = new long[this.numSlots];
+ objToCpn.put(obj, cpns);
+ }
+ cpns[slot] += cpn;
+
+ long[] spns = objToSpn.get(obj);
+ if (spns == null) {
+ spns = new long[this.numSlots];
+ objToSpn.put(obj, spns);
+ }
+ spns[slot] += spn;
+
+ long[] cbns = objToCbn.get(obj);
+ if (cbns == null) {
+ cbns = new long[this.numSlots];
+ objToCbn.put(obj, cbns);
+ }
+ cbns[slot] += cbn;
+
+ long[] sbns = objToSbn.get(obj);
+ if (sbns == null) {
+ sbns = new long[this.numSlots];
+ objToSbn.put(obj, sbns);
+ }
+ sbns[slot] += sbn;
+
+ long[] sessions = objToSession.get(obj);
+ if (sessions == null) {
+ sessions = new long[this.numSlots];
+ objToSession.put(obj, sessions);
+ }
+ sessions[slot] += session;
+ }
+
+ public long getCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ return 0;
+ } else {
+ return counts[slot];
+ }
+
+ }
+
+ public Map<T, Long[]> getCounts() {
+ Map<T, Long[]> result = new HashMap<T, Long[]>();
+ for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) {
+ Long[] numbers = new Long[6];
+ numbers[0] = computerTotalCount(entry.getKey(), objToCounts);
+ numbers[1] = computerTotalCount(entry.getKey(), objToCpn);
+ numbers[2] = computerTotalCount(entry.getKey(), objToSpn);
+ numbers[3] = computerTotalCount(entry.getKey(), objToCbn);
+ numbers[4] = computerTotalCount(entry.getKey(), objToSbn);
+ numbers[5] = computerTotalCount(entry.getKey(), objToSession);
+ result.put(entry.getKey(), numbers);
+ }
+ return result;
+ }
+
+
+ private long computerTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ total += count;
+ }
+ return total;
+ }
+
+
+ /**
+ * Reset the slot count of any tracked objects to zero for given slot
+ *
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj, slot);
+ }
+
+ }
+
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ long[] cpns = objToCpn.get(obj);
+ long[] spns = objToSpn.get(obj);
+ long[] cbls = objToCbn.get(obj);
+ long[] sbls = objToSbn.get(obj);
+ long[] sessions = objToSession.get(obj);
+
+ counts[slot] = 0;
+ cpns[slot] = 0;
+ spns[slot] = 0;
+ cbls[slot] = 0;
+ sbls[slot] = 0;
+ sessions[slot] = 0;
+
+ }
+
+
+ private boolean shouldBeRemovedfromCounter(T obj) {
+ return computerTotalCount(obj, objToCounts) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+
+ for (T obj : objToCounts.keySet()) {
+ if (shouldBeRemovedfromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
+ objToCpn.remove(obj);
+ objToSpn.remove(obj);
+ objToCbn.remove(obj);
+ objToSbn.remove(obj);
+ objToSession.remove(obj);
+
+ }
+
+
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java
new file mode 100644
index 0000000..24e72d8
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java
@@ -0,0 +1,58 @@
+package cn.ac.iie.utils.unique.urldimension;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public final class UrlSlidingWindowCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = 7540606699600042717L;
+ private UrlSlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
+
+ public UrlSlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ + windowLengthInSlots + ")");
+ }
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new UrlSlotBasedCounter<T>(this.windowLengthInSlots);
+
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
+
+ public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn,long cli,long ser) {
+ objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,cli,ser,headSlot);
+ }
+
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long[]> getCountsThenAdvanceWindow() {
+ Map<T,Long[]> objCountMap = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return objCountMap;
+ }
+
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java
new file mode 100644
index 0000000..50b5ddb
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java
@@ -0,0 +1,199 @@
+package cn.ac.iie.utils.unique.urldimension;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * @author ZDJZ
+ */
+public class UrlSlotBasedCounter<T> implements Serializable {
+ private static final long serialVersionUID = 4935449009558219987L;
+
+ private final Map<T, long[]> objToCounts = new HashMap<>();
+ private final Map<T, long[]> objToCpn = new HashMap<>();
+ private final Map<T, long[]> objToSpn = new HashMap<>();
+ private final Map<T, long[]> objToCbn = new HashMap<>();
+ private final Map<T, long[]> objToSbn = new HashMap<>();
+ private final Map<T, long[]> objToCli = new HashMap<>();
+ private final Map<T, long[]> objToSer = new HashMap<>();
+
+ private final int numSlots;
+
+ public UrlSlotBasedCounter(int numSlots) {
+ if (numSlots <= 0) {
+ throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")");
+ }
+ this.numSlots = numSlots;
+ }
+
+ /**
+ * @param obj
+ * @param slot
+ */
+ public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long cli, long ser, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
+ }
+ counts[slot] += count;
+
+ long[] cpns = objToCpn.get(obj);
+ if (cpns == null) {
+ cpns = new long[this.numSlots];
+ objToCpn.put(obj, cpns);
+ }
+ cpns[slot] += cpn;
+
+ long[] spns = objToSpn.get(obj);
+ if (spns == null) {
+ spns = new long[this.numSlots];
+ objToSpn.put(obj, spns);
+ }
+ spns[slot] += spn;
+
+ long[] cbns = objToCbn.get(obj);
+ if (cbns == null) {
+ cbns = new long[this.numSlots];
+ objToCbn.put(obj, cbns);
+ }
+ cbns[slot] += cbn;
+
+ long[] sbns = objToSbn.get(obj);
+ if (sbns == null) {
+ sbns = new long[this.numSlots];
+ objToSbn.put(obj, sbns);
+ }
+ sbns[slot] += sbn;
+
+ long[] clis = objToCli.get(obj);
+ if (clis == null) {
+ clis = new long[this.numSlots];
+ objToCli.put(obj, clis);
+ }
+ clis[slot] = cli;
+
+
+ long[] sers = objToSer.get(obj);
+ if (sers == null) {
+ sers = new long[this.numSlots];
+ objToSer.put(obj, sers);
+ }
+ sers[slot] = ser;
+ }
+
+
+ public long getCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ return 0;
+ } else {
+ return counts[slot];
+ }
+
+ }
+
+ public Map<T, Long[]> getCounts() {
+ Map<T, Long[]> result = new HashMap<T, Long[]>();
+ for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) {
+ Long[] numbers = new Long[7];
+ numbers[0] = computerTotalCount(entry.getKey(), objToCounts);
+ numbers[1] = computerTotalCount(entry.getKey(), objToCpn);
+ numbers[2] = computerTotalCount(entry.getKey(), objToSpn);
+ numbers[3] = computerTotalCount(entry.getKey(), objToCbn);
+ numbers[4] = computerTotalCount(entry.getKey(), objToSbn);
+ numbers[5] = uniqueTotalCount(entry.getKey(), objToCli);
+ numbers[6] = uniqueTotalCount(entry.getKey(), objToSer);
+ result.put(entry.getKey(), numbers);
+ }
+ return result;
+ }
+
+ private long computerTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ total += count;
+ }
+ System.out.println(Arrays.toString(counts));
+ return total;
+ }
+
+ private long uniqueTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ if (count != 0 && count > total) {
+ total = count;
+ }
+ }
+ return total;
+ }
+
+ /**
+ * Reset the slot count of any tracked objects to zero for given slot
+ *
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj, slot);
+ }
+
+ }
+
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ long[] cpns = objToCpn.get(obj);
+ long[] spns = objToSpn.get(obj);
+ long[] cbls = objToCbn.get(obj);
+ long[] sbls = objToSbn.get(obj);
+ long[] clis = objToCli.get(obj);
+ long[] sers = objToSer.get(obj);
+
+ counts[slot] = 0;
+ cpns[slot] = 0;
+ spns[slot] = 0;
+ cbls[slot] = 0;
+ sbls[slot] = 0;
+ clis[slot] = 0;
+ sers[slot] = 0;
+
+ }
+
+
+ private boolean shouldBeRemovedfromCounter(T obj) {
+ return computerTotalCount(obj, objToCounts) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+
+ for (T obj : objToCounts.keySet()) {
+ if (shouldBeRemovedfromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
+ objToCpn.remove(obj);
+ objToSpn.remove(obj);
+ objToCbn.remove(obj);
+ objToSbn.remove(obj);
+ objToCli.remove(obj);
+ objToSer.remove(obj);
+
+ }
+
+
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java
new file mode 100644
index 0000000..c6067a2
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java
@@ -0,0 +1,60 @@
+package cn.ac.iie.utils.unique.webdimension;
+
+import cn.ac.iie.utils.unique.urldimension.UrlSlotBasedCounter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public final class WebSlidingWindowCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = 7540606699600042717L;
+ private WebSlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
+
+ public WebSlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ + windowLengthInSlots + ")");
+ }
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new WebSlotBasedCounter<T>(this.windowLengthInSlots);
+
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
+
+ public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn,long cli,long ser,long session) {
+ objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,cli,ser,session,headSlot);
+ }
+
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long[]> getCountsThenAdvanceWindow() {
+ Map<T,Long[]> objCountMap = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return objCountMap;
+ }
+
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java
new file mode 100644
index 0000000..cced0b9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java
@@ -0,0 +1,209 @@
+package cn.ac.iie.utils.unique.webdimension;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * @author ZDJZ
+ */
+public class WebSlotBasedCounter<T> implements Serializable {
+ private static final long serialVersionUID = 4935449009558219987L;
+
+ private final Map<T, long[]> objToCounts = new HashMap<>();
+ private final Map<T, long[]> objToCpn = new HashMap<>();
+ private final Map<T, long[]> objToSpn = new HashMap<>();
+ private final Map<T, long[]> objToCbn = new HashMap<>();
+ private final Map<T, long[]> objToSbn = new HashMap<>();
+ private final Map<T, long[]> objToCli = new HashMap<>();
+ private final Map<T, long[]> objToSer = new HashMap<>();
+ private final Map<T, long[]> objToSession = new HashMap<>();
+
+ private final int numSlots;
+
+ public WebSlotBasedCounter(int numSlots) {
+ if (numSlots <= 0) {
+ throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")");
+ }
+ this.numSlots = numSlots;
+ }
+
+ /**
+ * @param obj
+ * @param slot
+ */
+ public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long cli, long ser, long session, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
+ }
+ counts[slot] += count;
+
+ long[] cpns = objToCpn.get(obj);
+ if (cpns == null) {
+ cpns = new long[this.numSlots];
+ objToCpn.put(obj, cpns);
+ }
+ cpns[slot] += cpn;
+
+ long[] spns = objToSpn.get(obj);
+ if (spns == null) {
+ spns = new long[this.numSlots];
+ objToSpn.put(obj, spns);
+ }
+ spns[slot] += spn;
+
+ long[] cbns = objToCbn.get(obj);
+ if (cbns == null) {
+ cbns = new long[this.numSlots];
+ objToCbn.put(obj, cbns);
+ }
+ cbns[slot] += cbn;
+
+ long[] sbns = objToSbn.get(obj);
+ if (sbns == null) {
+ sbns = new long[this.numSlots];
+ objToSbn.put(obj, sbns);
+ }
+ sbns[slot] += sbn;
+
+ long[] clis = objToCli.get(obj);
+ if (clis == null) {
+ clis = new long[this.numSlots];
+ objToCli.put(obj, clis);
+ }
+ clis[slot] = cli;
+
+
+ long[] sers = objToSer.get(obj);
+ if (sers == null) {
+ sers = new long[this.numSlots];
+ objToSer.put(obj, sers);
+ }
+ sers[slot] = ser;
+
+ long[] sessions = objToSession.get(obj);
+ if (sessions == null) {
+ sessions = new long[this.numSlots];
+ objToSession.put(obj, sessions);
+ }
+ sessions[slot] += session;
+ }
+
+ public long getCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ return 0;
+ } else {
+ return counts[slot];
+ }
+
+ }
+
+ public Map<T, Long[]> getCounts() {
+ Map<T, Long[]> result = new HashMap<T, Long[]>();
+ for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) {
+ Long[] numbers = new Long[8];
+ numbers[0] = computerTotalCount(entry.getKey(), objToCounts);
+ numbers[1] = computerTotalCount(entry.getKey(), objToCpn);
+ numbers[2] = computerTotalCount(entry.getKey(), objToSpn);
+ numbers[3] = computerTotalCount(entry.getKey(), objToCbn);
+ numbers[4] = computerTotalCount(entry.getKey(), objToSbn);
+ numbers[5] = uniqueTotalCount(entry.getKey(), objToCli);
+ numbers[6] = uniqueTotalCount(entry.getKey(), objToSer);
+ numbers[7] = computerTotalCount(entry.getKey(), objToSession);
+ result.put(entry.getKey(), numbers);
+ }
+ return result;
+ }
+
+ private long computerTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ total += count;
+ }
+ return total;
+ }
+
+ private long uniqueTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ if (count != 0 && count > total) {
+ total = count;
+ }
+ }
+ return total;
+ }
+
+ /**
+ * Reset the slot count of any tracked objects to zero for given slot
+ *
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj, slot);
+ }
+
+ }
+
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ long[] cpns = objToCpn.get(obj);
+ long[] spns = objToSpn.get(obj);
+ long[] cbls = objToCbn.get(obj);
+ long[] sbls = objToSbn.get(obj);
+ long[] clis = objToCli.get(obj);
+ long[] sers = objToSer.get(obj);
+ long[] sessions = objToSession.get(obj);
+
+ counts[slot] = 0;
+ cpns[slot] = 0;
+ spns[slot] = 0;
+ cbls[slot] = 0;
+ sbls[slot] = 0;
+ clis[slot] = 0;
+ sers[slot] = 0;
+ sessions[slot] = 0;
+
+ }
+
+
+ private boolean shouldBeRemovedfromCounter(T obj) {
+ return computerTotalCount(obj, objToCounts) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+
+ for (T obj : objToCounts.keySet()) {
+ if (shouldBeRemovedfromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
+ objToCpn.remove(obj);
+ objToSpn.remove(obj);
+ objToCbn.remove(obj);
+ objToSbn.remove(obj);
+ objToCli.remove(obj);
+ objToSer.remove(obj);
+ objToSession.remove(obj);
+
+ }
+
+
+ }
+
+}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
new file mode 100644
index 0000000..c0f5ea4
--- /dev/null
+++ b/src/main/java/log4j.properties
@@ -0,0 +1,23 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=debug
+#bonecp数据源配置
+log4j.category.com.jolbox=debug,console \ No newline at end of file
diff --git a/src/test/java/cn/ac/iie/test/ContainerTest.java b/src/test/java/cn/ac/iie/test/ContainerTest.java
new file mode 100644
index 0000000..156321e
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/ContainerTest.java
@@ -0,0 +1,16 @@
+package cn.ac.iie.test;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ContainerTest {
+ public static void main(String[] args) {
+ System.out.println(Long.MAX_VALUE);
+ System.out.println(Long.MIN_VALUE);
+
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/IpTest.java b/src/test/java/cn/ac/iie/test/IpTest.java
new file mode 100644
index 0000000..c60679d
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/IpTest.java
@@ -0,0 +1,17 @@
+package cn.ac.iie.test;
+
+import cn.ac.iie.common.TopNCountConfig;
+import com.zdjizhi.utils.IpLookup;
+
+public class IpTest {
+
+ public static void main(String[] args) {
+ IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4("Kazakhstan.mmdb")
+ .loadDataFileV6("Kazakhstan.mmdb")
+ .build();
+
+ System.out.println(ipLookup.cityLookupDetail("2001:470:19:790::7f"));
+// System.out.println(ipLookup.asnLookup("2001:470:19:790::7f"));
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/TimeTest.java b/src/test/java/cn/ac/iie/test/TimeTest.java
new file mode 100644
index 0000000..d7e634e
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/TimeTest.java
@@ -0,0 +1,42 @@
+package cn.ac.iie.test;
+
+import scala.Int;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class TimeTest {
+ public static void main(String[] args) {
+ long timestamp = System.currentTimeMillis() / 1000;
+
+ Map<Integer, Long> startMap = new HashMap<>();
+ Map<Integer, Long> endMap = new HashMap<>();
+ for (int j = 1; j <= 10; j++) {
+ endMap.put(j, 0L);
+ startMap.put(j, 0L);
+ }
+ System.out.println(endMap);
+ long nowTime = timestamp - dateToStamp("2019-05-20 09:33:00") ;
+
+ System.out.println(nowTime);
+ double l = nowTime / 30;
+ System.out.println(l);
+ }
+
+ private static long dateToStamp(String time) {
+ long res = 0;
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ try {
+ Date date = simpleDateFormat.parse(time);
+ long ts = date.getTime() / 1000;
+ res = Long.parseLong(String.valueOf(ts));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ return res;
+ }
+}