diff options
51 files changed, 4341 insertions, 0 deletions
@@ -0,0 +1,159 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>cn.ac.iie</groupId> + <artifactId>log-stream-topn</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>log-stream-topn</name> + <url>http://maven.apache.org</url> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.10.125:8099/content/groups/public</url> + </repository> + </repositories> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>cn.ac.iie.topology.LogTopCountTopology</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + </includes> + <filtering>false</filtering> + </resource> + <resource> + <directory>src/main/java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <kafka.version>1.0.0</kafka.version> + <storm.version>1.0.2</storm.version> + </properties> + + <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>1.0.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>3.2.1</version> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <version>${storm.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.1</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> +</project> diff --git a/properties/kafka_topic.properties b/properties/kafka_topic.properties new file mode 100644 index 0000000..6850851 --- /dev/null +++ b/properties/kafka_topic.properties @@ -0,0 +1,45 @@ +#kafka broker下的topic名称 +kafka.topic=SESSION-RECORD-COMPLETED-LOG + +#kafka消费group id +group.id=top-test + +#输出Topic +results.output.topics=DruidTest + +#1:Internal 内部主机 2:External 外部主机 +#3:Website 域名 4:Urls HTTP/HTTPS 5:User 活跃用户 +pattern.num=3 + +#storm topology workers +topology.workers=1 + +#storm topology rolling count window length in seconds +topology.top.window.length.secs=300 + +#storm topology rolling count emit Frequency in seconds +topology.top.emit.frequency.secs=60 + +#storm topology intermediate rank emit Frequency in seconds +topology.intermediate.emit.frequency.secs=70 + +#storm topology total rank emit Frequency in seconds +topology.taotal.emit.frequency.secs=300 + +#storm topology intermediate top N +topology.top.intermediate.n = 10 + +#storm topology total top N +topology.top.total.n = 3 + +#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系 +topology.spout.parallelism=3 + +#storm bolt InternalBolt parallelism_hint +topology.bolt.check.parallelism=3 + +#storm bolt InternalCountBolt parallelism_hint +topology.bolt.count.parallelism=3 + +#storm bolt IntermediateRankingsBolt parallelism_hint +topology.bolt.interRanker.parallelism=3 diff --git a/properties/realtime_routine.properties b/properties/realtime_routine.properties new file mode 100644 index 0000000..b82ca49 --- /dev/null +++ b/properties/realtime_routine.properties @@ -0,0 +1,21 @@ +#管理kafka地址 +bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 + +#从kafka哪里开始读:earliest/latest +auto.offset.reset=latest +#auto.offset.reset=earliest + +#输出Kafka地址 +results.output.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +#results.output.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 + +topology.config.max.spout.pending=150000 + +topology.num.acks=1 + +#允许发送kafka最大失败数 +max.failure.num=20 + +#定位库地址 +ip.library=/home/ceiec/topology/dat/
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java b/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java new file mode 100644 index 0000000..757370a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/AbstractRankerBolt.java @@ -0,0 +1,96 @@ +package cn.ac.iie.bolt; + + +import cn.ac.iie.utils.Rankings; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; + +import java.util.HashMap; +import java.util.Map; + + +/** + * @author qidaijie + */ +public abstract class AbstractRankerBolt extends BaseBasicBolt { + + + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; + private static final int DEFAULT_COUNT = 10; + private static final long serialVersionUID = 5810280112411348355L; + + + private final int emitFrequencyInSeconds; + private Rankings rankings; + + public AbstractRankerBolt() { + this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + public AbstractRankerBolt(int topN) { + this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >=1 (you requested " + topN + ")"); + } + if (emitFrequencyInSeconds < 1) { + throw new IllegalArgumentException("The time frequency must be >=1 (you requested " + emitFrequencyInSeconds + " seconds)"); + } + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + rankings = new Rankings(topN); + } + + public Rankings getRankings() { + return rankings; + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + if (TupleUtils.isTick(input)) { + if (StringUtil.isEmpty(rankings.getRankings())) { +// getLogger().warn("rankings is null, please waiting..."); + return; + } +// getLogger().info("Received tick tuple, triggering emit of current rankings"); + emitRankings(collector); + } else { + updateRankingsWithTuple(input); + } + + } + + abstract void updateRankingsWithTuple(Tuple tuple); + + private void emitRankings(BasicOutputCollector collector) { + collector.emit(new Values(rankings.copy())); + rankings.clearRankedItem(); + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("rankings")); + + } + + abstract Logger getLogger(); + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + + +} diff --git a/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java b/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java new file mode 100644 index 0000000..abbc391 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/IntermediateRankingsBolt.java @@ -0,0 +1,31 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.utils.Rankable; +import cn.ac.iie.utils.RankableObjectWithFields; +import org.apache.log4j.Logger; +import org.apache.storm.tuple.Tuple; + +/** + * @author qidaijie + */ +public class IntermediateRankingsBolt extends AbstractRankerBolt { + + + private static final Logger logger = Logger.getLogger(IntermediateRankingsBolt.class); + private static final long serialVersionUID = -1954290502931145364L; + + public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { + super(topN, emitFrequencyInSeconds); + } + + @Override + void updateRankingsWithTuple(Tuple tuple) { + Rankable rankable = RankableObjectWithFields.from(tuple); + super.getRankings().updateWith(rankable); + } + + @Override + Logger getLogger() { + return logger; + } +} diff --git a/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java b/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java new file mode 100644 index 0000000..5f7e734 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/TotalRankingsBolt.java @@ -0,0 +1,43 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.utils.Rankings; +import org.apache.log4j.Logger; +import org.apache.storm.tuple.Tuple; + +/** + * @author qidaijie + */ +public class TotalRankingsBolt extends AbstractRankerBolt { + + private static final Logger logger = Logger.getLogger(TotalRankingsBolt.class); + private static final long serialVersionUID = 6256903686188823001L; + + public TotalRankingsBolt() { + super(); + } + + public TotalRankingsBolt(int topN) { + super(topN); + } + + + public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) { + super(topN, emitFrequencyInSeconds); + } + + @Override + void updateRankingsWithTuple(Tuple tuple) { + Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); + super.getRankings().updateWith(rankingsToBeMerged); + super.getRankings().pruneZeroCounts(); + + logger.info("**********汇总中间统计结果开始****************"); + logger.info(super.getRankings().toString()); + logger.info("**********汇总中间统计结果结束****************"); + } + + @Override + Logger getLogger() { + return logger; + } +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java new file mode 100644 index 0000000..e22aba9 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/ExternalOutPutBolt.java @@ -0,0 +1,128 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.ExternalHost; +import cn.ac.iie.utils.*; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class ExternalOutPutBolt extends BaseRichBolt { + + private static final long serialVersionUID = -7466372895556294634L; + private static final Logger logger = Logger.getLogger(ExternalOutPutBolt.class); + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + if (!TupleUtils.isTick(tuple)) { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isEmpty(rankings.getRankings())) { + logger.warn("rankings is null, don't insert..."); + } else { + resultToKafka.sendMessage(parserRankingToObject(rankings)); +// test(rankings); + } + + collector.ack(tuple); + } + + } catch (Exception e) { + logger.error("batch insert error:" + e); + collector.fail(tuple); + + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = UniqueCollection.getTimesTamp(); + ExternalHost externalHost = new ExternalHost(); + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + String key = rankable.getObject().toString(); + externalHost.setDestination(key); + externalHost.setProtocol(""); + externalHost.setSession_num(rankable.getCount()); + externalHost.setC2s_pkt_num((Long) obj.get(0)); + externalHost.setS2c_pkt_num((Long) obj.get(1)); + externalHost.setC2s_byte_num((Long) obj.get(2)); + externalHost.setS2c_byte_num((Long) obj.get(3)); + externalHost.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(externalHost)); + } catch (Exception e) { + logger.error("外部主机统计解析数据异常:" + rankable.toString(), e); + } + } + return activeIpList; + } + + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private static void test(Rankings rankings) { + long timesTamp = UniqueCollection.getTimesTamp(); + ExternalHost externalHost = new ExternalHost(); + int i = 1; + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + String key = rankable.getObject().toString(); + externalHost.setDestination(key); + externalHost.setProtocol(""); + externalHost.setSession_num(rankable.getCount()); + externalHost.setC2s_pkt_num((Long) obj.get(0)); + externalHost.setS2c_pkt_num((Long) obj.get(1)); + externalHost.setC2s_byte_num((Long) obj.get(2)); + externalHost.setS2c_byte_num((Long) obj.get(3)); + externalHost.setStat_time(timesTamp); +// logger.error(JSONObject.toJSONString(externalHost) + "\t\t名次:" + i); + i++; + } catch (Exception e) { + logger.error("外部主机统计解析数据异常:" + rankable.toString(), e); + } + } + } +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java new file mode 100644 index 0000000..63685a2 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/InternalOutPutBolt.java @@ -0,0 +1,97 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.InternalHost; +import cn.ac.iie.utils.*; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class InternalOutPutBolt extends BaseRichBolt { + + private static final long serialVersionUID = -7466372895556294634L; + private static final Logger logger = Logger.getLogger(InternalOutPutBolt.class); + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + if (!TupleUtils.isTick(tuple)) { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isEmpty(rankings.getRankings())) { + logger.warn("rankings is null, don't insert..."); + + } else { + resultToKafka.sendMessage(parserRankingToObject(rankings)); + } + + collector.ack(tuple); + } + + } catch (Exception e) { + logger.error("batch insert error:" + e); + collector.fail(tuple); + + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = UniqueCollection.getTimesTamp(); + InternalHost internalHost = new InternalHost(); + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + internalHost.setSource(rankable.getObject().toString()); + internalHost.setSession_num(rankable.getCount()); + internalHost.setC2s_pkt_num((Long) obj.get(0)); + internalHost.setS2c_pkt_num((Long) obj.get(1)); + internalHost.setC2s_byte_num((Long) obj.get(2)); + internalHost.setS2c_byte_num((Long) obj.get(3)); + internalHost.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(internalHost)); + } catch (Exception e) { + logger.error("内部主机统计解析数据异常:" + rankable.toString(), e); + } + + } + return activeIpList; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java new file mode 100644 index 0000000..1086d5a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlsOutPutBolt.java @@ -0,0 +1,91 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.Urls; +import cn.ac.iie.utils.*; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class UrlsOutPutBolt extends BaseRichBolt { + + private static final long serialVersionUID = -7466372895556294634L; + private static final Logger logger = Logger.getLogger(UrlsOutPutBolt.class); + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isNotEmpty(rankings.getRankings())) { + resultToKafka.sendMessage(parserRankingToObject(rankings)); + } + collector.ack(tuple); + } catch (Exception e) { + logger.error("batch insert error:", e); + collector.fail(tuple); + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = UniqueCollection.getTimesTamp(); + Urls urls = new Urls(); + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + String url = rankable.getObject().toString(); + urls.setUrl(url); + urls.setClient_unq_num((Long) obj.get(4)); + urls.setServer_unq_num((Long) obj.get(5)); + urls.setSession_num(rankable.getCount()); + urls.setC2s_pkt_num((Long) obj.get(0)); + urls.setS2c_pkt_num((Long) obj.get(1)); + urls.setC2s_byte_num((Long) obj.get(2)); + urls.setS2c_byte_num((Long) obj.get(3)); + urls.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(urls)); + } catch (Exception e) { + logger.error("http/https urls统计数据异常:" + rankable.toString(), e); + } + + } + return activeIpList; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java new file mode 100644 index 0000000..c11d13a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UserOutPutBolt.java @@ -0,0 +1,97 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.User; +import cn.ac.iie.utils.*; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class UserOutPutBolt extends BaseRichBolt { + + private static final long serialVersionUID = -7466372895556294634L; + private static final Logger logger = Logger.getLogger(UserOutPutBolt.class); + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + if (!TupleUtils.isTick(tuple)) { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isEmpty(rankings.getRankings())) { + logger.warn("rankings is null, don't insert..."); + + } else { + resultToKafka.sendMessage(parserRankingToObject(rankings)); + } + + collector.ack(tuple); + } + + } catch (Exception e) { + logger.error("batch insert error:" + e); + collector.fail(tuple); + + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = UniqueCollection.getTimesTamp(); + User user = new User(); + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + user.setSubscribe_id(rankable.getObject().toString()); + user.setSession_num((Long) obj.get(4)); + user.setC2s_pkt_num((Long) obj.get(0)); + user.setS2c_pkt_num((Long) obj.get(1)); + user.setC2s_byte_num((Long) obj.get(2)); + user.setS2c_byte_num((Long) obj.get(3)); + user.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(user)); + } catch (Exception e) { + logger.error("活跃用户统计解析数据异常:" + rankable.toString(), e); + } + + } + return activeIpList; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java new file mode 100644 index 0000000..c3cefd2 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/WebsiteOutPutBolt.java @@ -0,0 +1,100 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.WebSite; +import cn.ac.iie.utils.*; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class WebsiteOutPutBolt extends BaseRichBolt { + + private static final long serialVersionUID = -7466372895556294634L; + private static final Logger logger = Logger.getLogger(WebsiteOutPutBolt.class); + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + if (!TupleUtils.isTick(tuple)) { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isEmpty(rankings.getRankings())) { + logger.warn("rankings is null, don't insert..."); + + } else { + resultToKafka.sendMessage(parserRankingToObject(rankings)); + } + + collector.ack(tuple); + } + + } catch (Exception e) { + logger.error("batch insert error:" + e); + collector.fail(tuple); + + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings + * @return + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = UniqueCollection.getTimesTamp(); + WebSite webSite = new WebSite(); + for (Rankable rankable : rankings.getRankings()) { + try { + List<Object> obj = rankable.getFields(); + String domain = rankable.getObject().toString(); + webSite.setDomain(domain); + webSite.setClient_unq_num((Long) obj.get(4)); + webSite.setServer_unq_num((Long) obj.get(5)); + webSite.setSession_num((Long) obj.get(6)); + webSite.setC2s_pkt_num((Long) obj.get(0)); + webSite.setS2c_pkt_num((Long) obj.get(1)); + webSite.setC2s_byte_num((Long) obj.get(2)); + webSite.setS2c_byte_num((Long) obj.get(3)); + webSite.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(webSite)); + } catch (Exception e) { + logger.error("网站域名统计解析数据异常:" + rankable.toString(), e); + } + + } + return activeIpList; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java new file mode 100644 index 0000000..d7eefb3 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/ExternalCountBolt.java @@ -0,0 +1,157 @@ +package cn.ac.iie.bolt.scatteredcalculate; + +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.SlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class ExternalCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 7748967375246167982L; + + private static final Logger logger = Logger.getLogger(ExternalCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final SlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + + + public ExternalCountBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public ExternalCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objFiledMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) { + + + for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long[] arr = entry.getValue(); + long count = arr[0]; + long cpn = arr[1]; + long spn = arr[2]; + long cbn = arr[3]; + long sbn = arr[4]; + outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + + try { + Object obj = tuple.getValue(0); + long count = 1; + long cpn = 0; + long spn = 0; + long cbn = 0; + long sbn = 0; + if (StringUtil.isNotBlank(tuple.getString(1))) { + cpn = Long.parseLong(tuple.getString(1)); + } + if (StringUtil.isNotBlank(tuple.getString(2))) { + spn = Long.parseLong(tuple.getString(2)); + } + if (StringUtil.isNotBlank(tuple.getString(3))) { + cbn = Long.parseLong(tuple.getString(3)); + } + if (StringUtil.isNotBlank(tuple.getString(4))) { + sbn = Long.parseLong(tuple.getString(4)); + } + + counter.incrementCount(obj, count, cpn, spn, cbn, sbn); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java new file mode 100644 index 0000000..f688710 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/InternalCountBolt.java @@ -0,0 +1,155 @@ +package cn.ac.iie.bolt.scatteredcalculate; + +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.SlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class InternalCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 7748967375246167982L; + + private static final Logger logger = Logger.getLogger(InternalCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final SlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + + public InternalCountBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public InternalCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long[]> objfieldMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objfieldMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) { + + for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long[] arr = entry.getValue(); + Long count = arr[0]; + Long cpn = arr[1]; + Long spn = arr[2]; + Long cbn = arr[3]; + Long sbn = arr[4]; + outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + + try { + Object obj = tuple.getValue(0); + long count = 1; + long cpn = 0; + long spn = 0; + long cbn = 0; + long sbn = 0; + if (StringUtil.isNotBlank(tuple.getString(1))) { + cpn = Long.parseLong(tuple.getString(1)); + } + if (StringUtil.isNotBlank(tuple.getString(2))) { + spn = Long.parseLong(tuple.getString(2)); + } + if (StringUtil.isNotBlank(tuple.getString(3))) { + cbn = Long.parseLong(tuple.getString(3)); + } + if (StringUtil.isNotBlank(tuple.getString(4))) { + sbn = Long.parseLong(tuple.getString(4)); + } + + + counter.incrementCount(obj, count, cpn, spn, cbn, sbn); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java new file mode 100644 index 0000000..d9738c2 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UrlsCountBolt.java @@ -0,0 +1,173 @@ +package cn.ac.iie.bolt.scatteredcalculate; + +import cn.ac.iie.common.TopNCountConfig; +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.unique.urldimension.UrlSlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class UrlsCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 7748967375246167982L; + + private static final Logger logger = Logger.getLogger(UrlsCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + private int max = 0; + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final UrlSlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + + public UrlsCountBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public UrlsCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new UrlSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + if (max == TopNCountConfig.TOPWINDOWLENGTH) { + UniqueCollection.clearWebContainer(); + max = 0; + } else { + max += TopNCountConfig.TOPEMITFREQUENCYSECS; + } + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objFiledMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) { + + for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long[] arr = entry.getValue(); + Long count = arr[0]; + Long cpn = arr[1]; + Long spn = arr[2]; + Long cbn = arr[3]; + Long sbn = arr[4]; + Long cli = arr[5]; + Long ser = arr[6]; + + outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, cli, ser, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + + try { + Object obj = tuple.getValue(0); + long count = 1; + long cpn = 0; + long spn = 0; + long cbn = 0; + long sbn = 0; + + if (StringUtil.isNotBlank(tuple.getString(1))) { + cpn = Long.parseLong(tuple.getString(1)); + } + if (StringUtil.isNotBlank(tuple.getString(2))) { + spn = Long.parseLong(tuple.getString(2)); + } + if (StringUtil.isNotBlank(tuple.getString(3))) { + cbn = Long.parseLong(tuple.getString(3)); + } + if (StringUtil.isNotBlank(tuple.getString(4))) { + sbn = Long.parseLong(tuple.getString(4)); + } + + UniqueCollection.setUrlCliUnique(tuple.getString(0), tuple.getString(5)); + UniqueCollection.setUrlSerUnique(tuple.getString(0), tuple.getString(6)); + + long cli = UniqueCollection.getUrlCliUnique(String.valueOf(obj)); + long ser = UniqueCollection.getUrlSerUnique(String.valueOf(obj)); + + counter.incrementCount(obj, count, cpn, spn, cbn, sbn,cli,ser); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "cli", "ser", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java new file mode 100644 index 0000000..ab6924f --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/UserCountBolt.java @@ -0,0 +1,160 @@ +package cn.ac.iie.bolt.scatteredcalculate; + +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.SlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.bytecount.BytesSlidingWindowCounter; +import cn.ac.iie.utils.unique.bytecount.BytesSlotBasedCounter; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class UserCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 7748967375246167982L; + + private static final Logger logger = Logger.getLogger(UserCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final BytesSlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + + public UserCountBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public UserCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new BytesSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long[]> objfieldMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objfieldMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) { + + + for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long[] arr = entry.getValue(); + Long count = arr[0]; + Long cpn = arr[1]; + Long spn = arr[2]; + Long cbn = arr[3]; + Long sbn = arr[4]; + Long session = arr[5]; + outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, session, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + + try { + Object obj = tuple.getValue(0); + long session = 1; + long cpn = 0; + long spn = 0; + long cbn = 0; + long sbn = 0; + if (StringUtil.isNotBlank(tuple.getString(1))) { + cpn = Long.parseLong(tuple.getString(1)); + } + if (StringUtil.isNotBlank(tuple.getString(2))) { + spn = Long.parseLong(tuple.getString(2)); + } + if (StringUtil.isNotBlank(tuple.getString(3))) { + cbn = Long.parseLong(tuple.getString(3)); + } + if (StringUtil.isNotBlank(tuple.getString(4))) { + sbn = Long.parseLong(tuple.getString(4)); + } + + long count = cbn + sbn; + + counter.incrementCount(obj, count, cpn, spn, cbn, sbn, session); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "session", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java new file mode 100644 index 0000000..f4f810a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/scatteredcalculate/WebsiteCountBolt.java @@ -0,0 +1,175 @@ +package cn.ac.iie.bolt.scatteredcalculate; + +import cn.ac.iie.common.TopNCountConfig; +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.unique.urldimension.UrlSlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.unique.UniqueCollection; +import cn.ac.iie.utils.unique.webdimension.WebSlidingWindowCounter; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class WebsiteCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 7748967375246167982L; + + private static final Logger logger = Logger.getLogger(WebsiteCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final WebSlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + private int max; + + public WebsiteCountBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public WebsiteCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new WebSlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + if (max == TopNCountConfig.TOPWINDOWLENGTH) { + UniqueCollection.clearWebContainer(); + max = 0; + } else { + max += TopNCountConfig.TOPEMITFREQUENCYSECS; + } + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long[]> objFiledMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objFiledMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long[]> objCountMap, int actualWindowLengthInSeconds) { + + + for (Map.Entry<Object, Long[]> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long[] arr = entry.getValue(); + Long count = arr[0]; + Long cpn = arr[1]; + Long spn = arr[2]; + Long cbn = arr[3]; + Long sbn = arr[4]; + Long cli = arr[5]; + Long ser = arr[6]; + Long session = arr[7]; + + outputCollector.emit(new Values(obj, count, cpn, spn, cbn, sbn, cli, ser, session, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + + try { + Object obj = tuple.getValue(0); + long session = 1; + long cpn = 0; + long spn = 0; + long cbn = 0; + long sbn = 0; + if (tuple.getValue(1) != null && tuple.getValue(1) != "") { + cpn = Long.parseLong(tuple.getValue(1).toString()); + } + if (tuple.getValue(2) != null && tuple.getValue(2) != "") { + spn = Long.parseLong(tuple.getValue(2).toString()); + } + if (tuple.getValue(3) != null && tuple.getValue(3) != "") { + cbn = Long.parseLong(tuple.getValue(3).toString()); + } + if (tuple.getValue(4) != null && tuple.getValue(4) != "") { + sbn = Long.parseLong(tuple.getValue(4).toString()); + } + long count = cbn + sbn; + + UniqueCollection.setUrlCliUnique(tuple.getString(0), tuple.getString(5)); + UniqueCollection.setUrlSerUnique(tuple.getString(0), tuple.getString(6)); + + long cli = UniqueCollection.getUrlCliUnique(String.valueOf(obj)); + long ser = UniqueCollection.getUrlSerUnique(String.valueOf(obj)); + + counter.incrementCount(obj, count, cpn, spn, cbn, sbn, cli, ser, session); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "cpn", "spn", "cbn", "sbn", "cli", "ser", "session", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java new file mode 100644 index 0000000..d35db8a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/ExternalBolt.java @@ -0,0 +1,49 @@ +package cn.ac.iie.bolt.segmentation; + +import cn.ac.iie.utils.unique.UniqueCollection; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * 切分原始日志获取原始日志中 外部主机 + * + * @author qidaijie + */ +public class ExternalBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(ExternalBolt.class); + private static final long serialVersionUID = -1599076545635998972L; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + String key = jsonObject.getString("server_ip"); + if (StringUtil.isNotBlank(key)) { + collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"), + jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"))); + } + } + } catch (Exception e) { + logger.error("拆分原始日志出现异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java new file mode 100644 index 0000000..09f5f34 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/InternalBolt.java @@ -0,0 +1,50 @@ +package cn.ac.iie.bolt.segmentation; + +import cn.ac.iie.utils.TopUtils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * 切分原始日志 获取内部主机 + * + * @author qidaijie + */ +public class InternalBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(InternalBolt.class); + private static final long serialVersionUID = -1599076545635998972L; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + String key = jsonObject.getString("client_ip"); + if (StringUtil.isNotBlank(key) && TopUtils.isKazakhstan(key)) { + collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"), + jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"))); + } + + } + } catch (Exception e) { + logger.error("拆分原始日志出现异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java new file mode 100644 index 0000000..b6ae0c4 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/UrlsBolt.java @@ -0,0 +1,50 @@ +package cn.ac.iie.bolt.segmentation; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + + +/** + * 切分原始日志 + * + * @author qidaijie + */ +public class UrlsBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(UrlsBolt.class); + private static final long serialVersionUID = -1599076545635998972L; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + String key = jsonObject.getString("url"); + if (StringUtil.isNotBlank(key)) { + collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"), + jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"), + jsonObject.getString("client_ip"), jsonObject.getString("server_ip"))); + } + } + } catch (Exception e) { + logger.error("拆分原始日志出现异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn","cli","ser")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java new file mode 100644 index 0000000..d707cdc --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/UserBolt.java @@ -0,0 +1,49 @@ +package cn.ac.iie.bolt.segmentation; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * 切分原始日志 + * + * @author qidaijie + */ +public class UserBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(UserBolt.class); + private static final long serialVersionUID = -1599076545635998972L; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + String key = jsonObject.getString("subscribe_id"); +// String key = jsonObject.getString("client_ip"); + if (StringUtil.isNotBlank(key)) { + collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"), + jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"))); + } + } + } catch (Exception e) { + logger.error("拆分原始日志出现异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java new file mode 100644 index 0000000..d94eba3 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/WebSiteBolt.java @@ -0,0 +1,50 @@ +package cn.ac.iie.bolt.segmentation; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + + +/** + * 切分原始日志 + * + * @author qidaijie + */ +public class WebSiteBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(WebSiteBolt.class); + private static final long serialVersionUID = -1599076545635998972L; + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + String key = jsonObject.getString("domain"); + if (StringUtil.isNotBlank(key)) { + collector.emit(new Values(key, jsonObject.getString("c2s_pkt_num"), jsonObject.getString("s2c_pkt_num"), + jsonObject.getString("c2s_byte_num"), jsonObject.getString("s2c_byte_num"), + jsonObject.getString("client_ip"), jsonObject.getString("server_ip"))); + } + } + } catch (Exception e) { + logger.error("拆分原始日志出现异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "cpn", "spn", "cbn", "sbn", "cli", "ser")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/common/TopNCountConfig.java b/src/main/java/cn/ac/iie/common/TopNCountConfig.java new file mode 100644 index 0000000..63c2fdb --- /dev/null +++ b/src/main/java/cn/ac/iie/common/TopNCountConfig.java @@ -0,0 +1,45 @@ +package cn.ac.iie.common; + +import cn.ac.iie.utils.system.TopNCountConfigurations; + +import java.io.Serializable; + +public class TopNCountConfig implements Serializable { + + private static final long serialVersionUID = -8649024767966235184L; + + public static final Integer SPOUT_PARALLELISM = TopNCountConfigurations.getIntProperty(1, "topology.spout.parallelism"); + public static final Integer WORDNORMALIZER = TopNCountConfigurations.getIntProperty(1, "topology.bolt.check.parallelism"); + public static final Integer ROLLINGCOUNT = TopNCountConfigurations.getIntProperty(1, "topology.bolt.count.parallelism"); + public static final Integer INTERMEDIATERANKINGS = TopNCountConfigurations.getIntProperty(1, "topology.bolt.interRanker.parallelism"); + + + public static final int TOPEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.top.emit.frequency.secs"); + public static final int INTERMEDIATEEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.intermediate.emit.frequency.secs"); + public static final int TAOTALEMITFREQUENCYSECS = TopNCountConfigurations.getIntProperty(1, "topology.taotal.emit.frequency.secs"); + public static final int TOPINTERMEDIATEN = TopNCountConfigurations.getIntProperty(1, "topology.top.intermediate.n"); + public static final int TOPTOTALN = TopNCountConfigurations.getIntProperty(1, "topology.top.total.n"); + public static final int TOPWINDOWLENGTH = TopNCountConfigurations.getIntProperty(1, "topology.top.window.length.secs"); + + public static final int PATTERN = TopNCountConfigurations.getIntProperty(1, "pattern.num"); + public static final String RESULTSOUTPUTTOPIC = TopNCountConfigurations.getStringProperty(1, "results.output.topics"); + + + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = TopNCountConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = TopNCountConfigurations.getIntProperty(0, "topology.num.acks"); + public static final Integer MAX_FAILURE_NUM = TopNCountConfigurations.getIntProperty(0, "max.failure.num"); + public static final String IP_LIBRARY = TopNCountConfigurations.getStringProperty(0, "ip.library"); + + /** + * Kafka + */ + public static final String GROUP_ID = TopNCountConfigurations.getStringProperty(1, "group.id"); + public static final String KAFKA_TOPIC = TopNCountConfigurations.getStringProperty(1, "kafka.topic"); + + public static final Integer TOPOLOGY_WORKERS = TopNCountConfigurations.getIntProperty(1, "topology.workers"); + public static final String BOOTSTRAP_SERVERS = TopNCountConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final String RESULTS_OUTPUT_SERVERS = TopNCountConfigurations.getStringProperty(0, "results.output.servers"); + public static final String AUTO_OFFSET_RESET = TopNCountConfigurations.getStringProperty(0, "auto.offset.reset"); + + +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/model/ExternalHost.java b/src/main/java/cn/ac/iie/model/ExternalHost.java new file mode 100644 index 0000000..8a3e9f8 --- /dev/null +++ b/src/main/java/cn/ac/iie/model/ExternalHost.java @@ -0,0 +1,82 @@ +package cn.ac.iie.model; + +/** + * @author qidaijie + */ +public class ExternalHost { + private String destination; + private String protocol; + private long session_num; + private long c2s_pkt_num; + private long s2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private long stat_time; + + public ExternalHost() { + } + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return s2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/model/InternalHost.java b/src/main/java/cn/ac/iie/model/InternalHost.java new file mode 100644 index 0000000..b1de1ed --- /dev/null +++ b/src/main/java/cn/ac/iie/model/InternalHost.java @@ -0,0 +1,73 @@ +package cn.ac.iie.model; + +/** + * @author qidaijie + */ +public class InternalHost { + private String source; + private long session_num; + private long c2s_pkt_num; + private long s2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private long stat_time; + + public InternalHost() { + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return s2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/model/Urls.java b/src/main/java/cn/ac/iie/model/Urls.java new file mode 100644 index 0000000..5cfb5a5 --- /dev/null +++ b/src/main/java/cn/ac/iie/model/Urls.java @@ -0,0 +1,88 @@ +package cn.ac.iie.model; + +public class Urls { + private String url; + private long client_unq_num; + private long server_unq_num; + private long session_num; + private long c2s_pkt_num; + private long s2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private long stat_time; + + public Urls() { + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public long getClient_unq_num() { + return client_unq_num; + } + + public void setClient_unq_num(long client_unq_num) { + this.client_unq_num = client_unq_num; + } + + public long getServer_unq_num() { + return server_unq_num; + } + + public void setServer_unq_num(long server_unq_num) { + this.server_unq_num = server_unq_num; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return s2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/model/User.java b/src/main/java/cn/ac/iie/model/User.java new file mode 100644 index 0000000..1ab9c6a --- /dev/null +++ b/src/main/java/cn/ac/iie/model/User.java @@ -0,0 +1,70 @@ +package cn.ac.iie.model; + +public class User { + private String subscribe_id; + private long session_num; + private long c2s_pkt_num; + private long s2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private long stat_time; + + public User() { + } + + public String getSubscribe_id() { + return subscribe_id; + } + + public void setSubscribe_id(String subscribe_id) { + this.subscribe_id = subscribe_id; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return s2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/model/WebSite.java b/src/main/java/cn/ac/iie/model/WebSite.java new file mode 100644 index 0000000..4f42d03 --- /dev/null +++ b/src/main/java/cn/ac/iie/model/WebSite.java @@ -0,0 +1,91 @@ +package cn.ac.iie.model; + +/** + * @author qidaijie + */ +public class WebSite { + private String domain; + private long client_unq_num; + private long server_unq_num; + private long session_num; + private long c2s_pkt_num; + private long s2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private long stat_time; + + public WebSite() { + } + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return s2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public long getClient_unq_num() { + return client_unq_num; + } + + public void setClient_unq_num(long client_unq_num) { + this.client_unq_num = client_unq_num; + } + + public long getServer_unq_num() { + return server_unq_num; + } + + public void setServer_unq_num(long server_unq_num) { + this.server_unq_num = server_unq_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..8904556 --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,79 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.TopNCountConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", TopNCountConfig.BOOTSTRAP_SERVERS); + props.put("group.id", TopNCountConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", TopNCountConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList(TopNCountConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + Thread.sleep(1); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("实时统计程序SPOUT出现异常----> " + e); + e.printStackTrace(); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java new file mode 100644 index 0000000..64e6c30 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java @@ -0,0 +1,137 @@ +package cn.ac.iie.topology; + +import cn.ac.iie.bolt.*; +import cn.ac.iie.bolt.resultoutput.*; +import cn.ac.iie.bolt.scatteredcalculate.*; +import cn.ac.iie.bolt.segmentation.*; +import cn.ac.iie.common.TopNCountConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +/** + * @author Administrator + */ + +public class LogTopCountTopology { + private static Logger logger = Logger.getLogger(LogTopCountTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + private LogTopCountTopology() { + this(LogTopCountTopology.class.getSimpleName()); + } + + private LogTopCountTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(120); + conf.setMaxSpoutPending(TopNCountConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + conf.setNumAckers(TopNCountConfig.TOPOLOGY_NUM_ACKS); + return conf; + } + + private void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(TopNCountConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("RealtimeCountSpout", new CustomizedKafkaSpout(), TopNCountConfig.SPOUT_PARALLELISM); + switch (TopNCountConfig.PATTERN) { + case 1: + builder.setBolt("InternalNormalizer", new InternalBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout"); + builder.setBolt("InternalCount", new InternalCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS), + TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("InternalNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN, + TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("InternalCount", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("InternalOutPutBolt", new InternalOutPutBolt()).globalGrouping("finalRanker"); + break; + case 2: + builder.setBolt("ExternalNormalizer", new ExternalBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout"); + builder.setBolt("ExternalCount", new ExternalCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS), + TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("ExternalNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN, + TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("ExternalCount", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("ExternalOutPutBolt", new ExternalOutPutBolt()).globalGrouping("finalRanker"); + break; + case 3: + builder.setBolt("WebsiteNormalizer", new WebSiteBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout"); + builder.setBolt("WebsiteCount", new WebsiteCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS), + TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("WebsiteNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN, + TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("WebsiteCount", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("WebsiteOutPutBolt", new WebsiteOutPutBolt()).globalGrouping("finalRanker"); + break; + case 4: + builder.setBolt("UrlsNormalizer", new UrlsBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout"); + builder.setBolt("UrlsCount", new UrlsCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS), + TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("UrlsNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN, + TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("UrlsCount", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("UrlsOutPutBolt", new UrlsOutPutBolt()).globalGrouping("finalRanker"); + break; + case 5: + builder.setBolt("UserNormalizer", new UserBolt(), (TopNCountConfig.WORDNORMALIZER)).shuffleGrouping("RealtimeCountSpout"); + builder.setBolt("UserCount", new UserCountBolt(TopNCountConfig.TOPWINDOWLENGTH, TopNCountConfig.TOPEMITFREQUENCYSECS), + TopNCountConfig.ROLLINGCOUNT).fieldsGrouping("UserNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopNCountConfig.TOPINTERMEDIATEN, + TopNCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopNCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("UserCount", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopNCountConfig.TOPTOTALN, TopNCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("UserOutPutBolt", new UserOutPutBolt()).globalGrouping("finalRanker"); + break; + default: + } + + + } + + public static void main(String[] args) throws Exception { + LogTopCountTopology logTopCountTopology; + boolean runLocally = true; + String command = "remote"; + if (args.length >= 2 && command.equalsIgnoreCase(args[1])) { + runLocally = false; + logTopCountTopology = new LogTopCountTopology(args[0]); + } else { + logTopCountTopology = new LogTopCountTopology(); + } + logTopCountTopology.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + logTopCountTopology.runLocally(); + } else { + logger.info("执行远程部署模式..."); + logTopCountTopology.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java new file mode 100644 index 0000000..65ddd98 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author qidaijie + */ +final class StormRunner { + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() { + } + + static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + } + + static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java b/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java new file mode 100644 index 0000000..e7c8f16 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/NthLastModifiedTimeTracker.java @@ -0,0 +1,45 @@ +package cn.ac.iie.utils; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.storm.utils.Time; + +/** + * @author qidaijie + */ +public class NthLastModifiedTimeTracker { + private static final int MILLIS_IN_SEC = 1000; + private final CircularFifoBuffer lastModifiedTimesMillis; + + public NthLastModifiedTimeTracker(int numTimesToTrack) { + if (numTimesToTrack < 1) { + throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); + } + lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); + initLastModifiedTimesMillis(); + } + + private void initLastModifiedTimesMillis() { + long nowCached = now(); + for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { + lastModifiedTimesMillis.add(nowCached); + } + } + + private long now() { + return Time.currentTimeMillis(); + } + + public int secondsSinceOldestModification() { + long modifiedTimeMillis = (Long) lastModifiedTimesMillis.get(); + return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); + } + + public void markAsModified() { + updateLastModifiedTime(); + } + + private void updateLastModifiedTime() { + lastModifiedTimesMillis.add(now()); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/Rankable.java b/src/main/java/cn/ac/iie/utils/Rankable.java new file mode 100644 index 0000000..dc3ce97 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/Rankable.java @@ -0,0 +1,11 @@ +package cn.ac.iie.utils; + +import java.util.List; + +public interface Rankable extends Comparable<Rankable> { + Object getObject(); + long getCount(); + List<Object> getFields(); + + Rankable copy(); +} diff --git a/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java b/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java new file mode 100644 index 0000000..6d3d0fa --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/RankableObjectWithFields.java @@ -0,0 +1,119 @@ +package cn.ac.iie.utils; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.storm.tuple.Tuple; + +import java.io.Serializable; +import java.util.List; + +public class RankableObjectWithFields implements Rankable, Serializable { + private static final long serialVersionUID = -5291345336126830843L; + private final Object obj; + private final long count; + private final ImmutableList<Object> fields; + private static final String toStringSeparator = "|"; + + public RankableObjectWithFields(Object obj, long count, Object... otherFields) { + if (obj == null) { + throw new IllegalArgumentException("The object must not be null"); + } + if (count < 0) { + throw new IllegalArgumentException("The count must be >=0"); + } + this.obj = obj; + this.count = count; + fields = ImmutableList.copyOf(otherFields); + } + + public static RankableObjectWithFields from(Tuple tuple) { + List<Object> otherFields = Lists.newArrayList(tuple.getValues()); + Object obj = otherFields.remove(0); + Long count = (Long) otherFields.remove(0); + return new RankableObjectWithFields(obj, count, otherFields.toArray()); + } + + + @Override + public int compareTo(Rankable other) { + long delta = this.getCount() - other.getCount(); + if (delta > 0) { + return 1; + } else if (delta < 0) { + return -1; + } else { + return 0; + } + } + + + @Override + public Object getObject() { + return obj; + } + + @Override + public long getCount() { + return count; + } + + @Override + public List<Object> getFields() { + return fields; + } + + @Override + public Rankable copy() { + List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); + return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (count ^ (count >>> 32)); + result = prime * result + ((obj == null) ? 0 : obj.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RankableObjectWithFields other = (RankableObjectWithFields) obj; + if (this.obj == null) { + if (other.obj != null) { + return false; + } + } else if (!this.obj.equals(other.obj)) { + return false; + } + return true; + } + + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("["); + buf.append(obj); + buf.append(toStringSeparator); + buf.append(count); + for (Object field : fields) { + buf.append(toStringSeparator); + buf.append(field); + } + buf.append("]"); + return buf.toString(); + } +} + diff --git a/src/main/java/cn/ac/iie/utils/Rankings.java b/src/main/java/cn/ac/iie/utils/Rankings.java new file mode 100644 index 0000000..5213891 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/Rankings.java @@ -0,0 +1,124 @@ +package cn.ac.iie.utils; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + + +public class Rankings implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -169358781406063489L; + private static final int DEFAULT_COUNT = 10; + + private final int maxSize; + private final List<Rankable> rankedItems = Lists.newArrayList(); + + public Rankings() { + this(DEFAULT_COUNT); + } + + public Rankings(int topN) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >= 1"); + } + maxSize = topN; + } + + public Rankings(Rankings other) { + this(other.maxSize()); + updateWith(other); + } + + public int maxSize() { + return maxSize; + } + + public int size() { + return rankedItems.size(); + } + + public List<Rankable> getRankings() { + List<Rankable> copyRankList = Lists.newLinkedList(); + copyRankList.addAll(rankedItems); + return ImmutableList.copyOf(copyRankList); + } + + public void updateWith(Rankings other) { + for (Rankable r : other.getRankings()) { + updateWith(r); + } + } + + public void updateWith(Rankable r) { + synchronized (rankedItems) { + addOrReplace(r); + rerank(); + shrinkRankingsIfNeeded(); + } + } + + public void clearRankedItem() { + rankedItems.clear(); + } + + private void addOrReplace(Rankable r) { + + Integer rank = findRankOf(r); + if (rank != null) { + rankedItems.set(rank, r); + } else { + rankedItems.add(r); + } + } + + + private Integer findRankOf(Rankable r) { + Object tag = r.getObject(); + for (int rank = 0; rank < rankedItems.size(); rank++) { + Object cur = rankedItems.get(rank).getObject(); + if (cur.equals(tag)) { + return rank; + } + } + return null; + } + + private void rerank() { + Collections.sort(rankedItems); + Collections.reverse(rankedItems); + } + + private void shrinkRankingsIfNeeded() { + if (rankedItems.size() > maxSize) { + rankedItems.remove(maxSize); + } + } + + public void pruneZeroCounts() { + int i = 0; + while (i < rankedItems.size()) { + if (rankedItems.get(i).getCount() == 0) { + rankedItems.remove(i); + } else { + i++; + } + } + } + + @Override + public String toString() { + return rankedItems.toString(); + } + + public Rankings copy() { + return new Rankings(this); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java new file mode 100644 index 0000000..95e99b0 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/SlidingWindowCounter.java @@ -0,0 +1,58 @@ +package cn.ac.iie.utils; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author qidaijie + */ +public final class SlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = 7540606699600042717L; + private SlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public SlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn) { + objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return + */ + public Map<T, Long[]> getCountsThenAdvanceWindow() { + Map<T,Long[]> objCountMap = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return objCountMap; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } +} diff --git a/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java new file mode 100644 index 0000000..172a3a1 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/SlotBasedCounter.java @@ -0,0 +1,164 @@ +package cn.ac.iie.utils; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class SlotBasedCounter<T> implements Serializable { + private static final long serialVersionUID = 4935449009558219987L; + + private final Map<T,long[]> objToCounts = new HashMap<>(); + private final Map<T,long[]> objToCpn = new HashMap<>(); + private final Map<T,long[]> objToSpn = new HashMap<>(); + private final Map<T,long[]> objToCbn = new HashMap<>(); + private final Map<T,long[]> objToSbn = new HashMap<>(); + + private final int numSlots; + + public SlotBasedCounter(int numSlots) { + if(numSlots <= 0){ + throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested "+numSlots+")"); + } + this.numSlots = numSlots; + } + + /** + * + * @param obj + * @param slot + */ + public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot] += count; + + long[] cpns = objToCpn.get(obj); + if (cpns == null) { + cpns = new long[this.numSlots]; + objToCpn.put(obj, cpns); + } + cpns[slot] += cpn; + + long[] spns = objToSpn.get(obj); + if (spns == null) { + spns = new long[this.numSlots]; + objToSpn.put(obj, spns); + } + spns[slot] += spn; + + long[] cbns = objToCbn.get(obj); + if (cbns == null) { + cbns = new long[this.numSlots]; + objToCbn.put(obj, cbns); + } + cbns[slot] += cbn; + + long[] sbns = objToSbn.get(obj); + if (sbns == null) { + sbns = new long[this.numSlots]; + objToSbn.put(obj, sbns); + } + sbns[slot] += sbn; + } + + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } else { + return counts[slot]; + } + + } + + public Map<T,Long[]> getCounts() { + Map<T,Long[]> result = new HashMap<T, Long[]>(); + for(Map.Entry<T, long[]> entry : objToCounts.entrySet()){ + Long[] numbers = new Long[5]; + numbers[0]=computerTotalCount(entry.getKey(),objToCounts); + numbers[1]=computerTotalCount(entry.getKey(),objToCpn); + numbers[2]=computerTotalCount(entry.getKey(),objToSpn); + numbers[3]=computerTotalCount(entry.getKey(),objToCbn); + numbers[4]=computerTotalCount(entry.getKey(),objToSbn); + result.put(entry.getKey(),numbers); + } + return result; + } + + + private long computerTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + total += count; + } + return total; + } + + + + /** + * Reset the slot count of any tracked objects to zero for given slot + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj,slot); + } + + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + long[] cpns = objToCpn.get(obj); + long[] spns = objToSpn.get(obj); + long[] cbls = objToCbn.get(obj); + long[] sbls = objToSbn.get(obj); + + counts[slot] = 0; + cpns[slot] = 0; + spns[slot] = 0; + cbls[slot] = 0; + sbls[slot] = 0; + + } + + + private boolean shouldBeRemovedfromCounter(T obj) { + return computerTotalCount(obj,objToCounts) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + + for (T obj :objToCounts.keySet()) { + if (shouldBeRemovedfromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + objToCpn.remove(obj); + objToSpn.remove(obj); + objToCbn.remove(obj); + objToSbn.remove(obj); + + } + + + } + +} diff --git a/src/main/java/cn/ac/iie/utils/TopUtils.java b/src/main/java/cn/ac/iie/utils/TopUtils.java new file mode 100644 index 0000000..c145fd1 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/TopUtils.java @@ -0,0 +1,22 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.common.TopNCountConfig; +import com.zdjizhi.utils.IpLookup; + +/** + * @author qidaijie + */ +public class TopUtils { + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(TopNCountConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadDataFileV6(TopNCountConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .build(); + + public static boolean isKazakhstan(String ip) { + String[] ipAddress = ipLookup.cityLookupDetail(ip).split("\t"); + String country = ipAddress[2]; + return "Kazakhstan".equals(country) || "N/A".equals(country); +// return "China".equals(country) || "N/A".equals(country); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java b/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java new file mode 100644 index 0000000..1edd86f --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/kafka/ResultToKafka.java @@ -0,0 +1,79 @@ +package cn.ac.iie.utils.kafka; + +import cn.ac.iie.common.TopNCountConfig; +import org.apache.kafka.clients.producer.*; +import org.apache.log4j.Logger; + +import java.util.LinkedList; +import java.util.Properties; + +/** + * NTC系统配置产生日志写入数据中心类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class ResultToKafka { + private static Logger logger = Logger.getLogger(ResultToKafka.class); + + /** + * kafka生产者,用于向kafka中发送消息 + */ + private static Producer<String, String> kafkaProducer; + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static ResultToKafka resultToKafka; + + private ResultToKafka() { + initKafkaProducer(); + } + + public static ResultToKafka getInstance() { + if (resultToKafka == null) { + resultToKafka = new ResultToKafka(); + } + return resultToKafka; + } + + + public void sendMessage(LinkedList<String> list) { + final int[] errorSum = {0}; + for (String value : list) { + kafkaProducer.send(new ProducerRecord<>(TopNCountConfig.RESULTSOUTPUTTOPIC, value), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("发送kafka出现异常", exception); + errorSum[0]++; + } + } + }); + if (errorSum[0] >= TopNCountConfig.MAX_FAILURE_NUM) { + logger.error("超出最大容忍错误数抛弃数据条数:" + list.size()); + list.clear(); + } + } + kafkaProducer.flush(); + logger.warn("Log sent to National Center successfully!!!!!"); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", TopNCountConfig.RESULTS_OUTPUT_SERVERS); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 20000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); + kafkaProducer = new KafkaProducer<>(properties); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java b/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java new file mode 100644 index 0000000..f39fa06 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/TopNCountConfigurations.java @@ -0,0 +1,65 @@ +package cn.ac.iie.utils.system; + +import java.util.Properties; + + +public final class TopNCountConfigurations { + + private static Properties propCommon = new Properties(); + private static Properties propService = new Properties(); + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propCommon.getProperty(key); + } else if (type == 1) { + return propService.getProperty(key); + } else { + return null; + } + } + + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propCommon.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propCommon.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } else if (type == 1) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propCommon.load(TopNCountConfigurations.class.getClassLoader().getResourceAsStream("realtime_routine.properties")); + propService.load(TopNCountConfigurations.class.getClassLoader().getResourceAsStream("kafka_topic.properties")); + /*prop.load(new FileInputStream(System.getProperty("user.dir") + + File.separator + "config"+File.separator + "config.properties"));*/ + System.out.println("kafka_topic.properties加载成功"); + } catch (Exception e) { + propCommon = null; + propService = null; + System.err.println("Configuration loading failed!!"); + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/system/TupleUtils.java b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java new file mode 100644 index 0000000..22e12e5 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java @@ -0,0 +1,13 @@ +package cn.ac.iie.utils.system; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +public final class TupleUtils { + //判断是否系统自动发送的Tuple + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java b/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java new file mode 100644 index 0000000..5c438d9 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/UniqueCollection.java @@ -0,0 +1,103 @@ +package cn.ac.iie.utils.unique; + +import org.apache.log4j.Logger; + +import java.util.*; + +/** + * @author qidaijie + */ +public class UniqueCollection { + private static final Logger logger = Logger.getLogger(UniqueCollection.class); + + private static Map<String, Set<String>> urlCliUnique = new HashMap<>(16); + + private static Map<String, Set<String>> urlSerUnique = new HashMap<>(16); + + + /** + * 客户端独立IP + * + * @param key url + * @param value client_ip + */ + public static void setUrlCliUnique(String key, String value) { + if (urlCliUnique.containsKey(key)) { + Set<String> set = new HashSet<>(urlCliUnique.get(key)); + set.add(value); + urlCliUnique.put(key, set); + } else { + urlCliUnique.put(key, Collections.singleton(value)); + } + } + + /** + * 服务端独立IP + * + * @param key url + * @param value server_ip + */ + public static void setUrlSerUnique(String key, String value) { + if (urlSerUnique.containsKey(key)) { + Set<String> set = new HashSet<>(urlSerUnique.get(key)); + set.add(value); + urlSerUnique.put(key, set); + } else { + urlSerUnique.put(key, Collections.singleton(value)); + } + } + + + /** + * 根据key获取Cli ip 数 + * + * @param key + * @return + */ + public static long getUrlCliUnique(String key) { + int size = 0; + try { + size = urlCliUnique.get(key).size(); + } catch (Exception e) { + logger.error("容器内没有key,获取value错误"); + } + return size; + } + + /** + * 根据key获取Ser ip 数 + * + * @param key + * @return + */ + public static long getUrlSerUnique(String key) { + int size = 0; + try { + size = urlSerUnique.get(key).size(); + } catch (Exception e) { + logger.error("容器内没有key,获取value错误"); + } + return size; + } + + + /** + * 清理容器 + */ + public static void clearWebContainer() { + urlCliUnique.clear(); + urlSerUnique.clear(); + } + + + /** + * 获取当前时间的时间戳(秒) + * + * @return 时间戳 秒 + */ + public static long getTimesTamp() { + Date date = new Date(); + String timestamp = String.valueOf(date.getTime() / 1000); + return Integer.valueOf(timestamp); + } +} diff --git a/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java new file mode 100644 index 0000000..781ae8f --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlidingWindowCounter.java @@ -0,0 +1,60 @@ +package cn.ac.iie.utils.unique.bytecount; + +import cn.ac.iie.utils.SlotBasedCounter; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author qidaijie + */ +public final class BytesSlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = 7540606699600042717L; + private BytesSlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public BytesSlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new BytesSlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long session) { + objCounter.incrementCount(obj, count, cpn, spn, cbn, sbn, session, headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return + */ + public Map<T, Long[]> getCountsThenAdvanceWindow() { + Map<T, Long[]> objCountMap = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return objCountMap; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } +} diff --git a/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java new file mode 100644 index 0000000..6a15549 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/bytecount/BytesSlotBasedCounter.java @@ -0,0 +1,178 @@ +package cn.ac.iie.utils.unique.bytecount; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * @author ZDJZ + */ +public class BytesSlotBasedCounter<T> implements Serializable { + private static final long serialVersionUID = 4935449009558219987L; + + private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); + private final Map<T, long[]> objToCpn = new HashMap<T, long[]>(); + private final Map<T, long[]> objToSpn = new HashMap<T, long[]>(); + private final Map<T, long[]> objToCbn = new HashMap<T, long[]>(); + private final Map<T, long[]> objToSbn = new HashMap<T, long[]>(); + private final Map<T, long[]> objToSession = new HashMap<T, long[]>(); + + private final int numSlots; + + public BytesSlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; + } + + /** + * @param obj + * @param slot + */ + public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long session, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot] += count; + + long[] cpns = objToCpn.get(obj); + if (cpns == null) { + cpns = new long[this.numSlots]; + objToCpn.put(obj, cpns); + } + cpns[slot] += cpn; + + long[] spns = objToSpn.get(obj); + if (spns == null) { + spns = new long[this.numSlots]; + objToSpn.put(obj, spns); + } + spns[slot] += spn; + + long[] cbns = objToCbn.get(obj); + if (cbns == null) { + cbns = new long[this.numSlots]; + objToCbn.put(obj, cbns); + } + cbns[slot] += cbn; + + long[] sbns = objToSbn.get(obj); + if (sbns == null) { + sbns = new long[this.numSlots]; + objToSbn.put(obj, sbns); + } + sbns[slot] += sbn; + + long[] sessions = objToSession.get(obj); + if (sessions == null) { + sessions = new long[this.numSlots]; + objToSession.put(obj, sessions); + } + sessions[slot] += session; + } + + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } else { + return counts[slot]; + } + + } + + public Map<T, Long[]> getCounts() { + Map<T, Long[]> result = new HashMap<T, Long[]>(); + for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) { + Long[] numbers = new Long[6]; + numbers[0] = computerTotalCount(entry.getKey(), objToCounts); + numbers[1] = computerTotalCount(entry.getKey(), objToCpn); + numbers[2] = computerTotalCount(entry.getKey(), objToSpn); + numbers[3] = computerTotalCount(entry.getKey(), objToCbn); + numbers[4] = computerTotalCount(entry.getKey(), objToSbn); + numbers[5] = computerTotalCount(entry.getKey(), objToSession); + result.put(entry.getKey(), numbers); + } + return result; + } + + + private long computerTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + total += count; + } + return total; + } + + + /** + * Reset the slot count of any tracked objects to zero for given slot + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } + + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + long[] cpns = objToCpn.get(obj); + long[] spns = objToSpn.get(obj); + long[] cbls = objToCbn.get(obj); + long[] sbls = objToSbn.get(obj); + long[] sessions = objToSession.get(obj); + + counts[slot] = 0; + cpns[slot] = 0; + spns[slot] = 0; + cbls[slot] = 0; + sbls[slot] = 0; + sessions[slot] = 0; + + } + + + private boolean shouldBeRemovedfromCounter(T obj) { + return computerTotalCount(obj, objToCounts) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + + for (T obj : objToCounts.keySet()) { + if (shouldBeRemovedfromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + objToCpn.remove(obj); + objToSpn.remove(obj); + objToCbn.remove(obj); + objToSbn.remove(obj); + objToSession.remove(obj); + + } + + + } + +} diff --git a/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java new file mode 100644 index 0000000..24e72d8 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlidingWindowCounter.java @@ -0,0 +1,58 @@ +package cn.ac.iie.utils.unique.urldimension; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author qidaijie + */ +public final class UrlSlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = 7540606699600042717L; + private UrlSlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public UrlSlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new UrlSlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn,long cli,long ser) { + objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,cli,ser,headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return + */ + public Map<T, Long[]> getCountsThenAdvanceWindow() { + Map<T,Long[]> objCountMap = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return objCountMap; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } +} diff --git a/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java new file mode 100644 index 0000000..50b5ddb --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/urldimension/UrlSlotBasedCounter.java @@ -0,0 +1,199 @@ +package cn.ac.iie.utils.unique.urldimension; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import java.io.Serializable; +import java.util.*; + +/** + * @author ZDJZ + */ +public class UrlSlotBasedCounter<T> implements Serializable { + private static final long serialVersionUID = 4935449009558219987L; + + private final Map<T, long[]> objToCounts = new HashMap<>(); + private final Map<T, long[]> objToCpn = new HashMap<>(); + private final Map<T, long[]> objToSpn = new HashMap<>(); + private final Map<T, long[]> objToCbn = new HashMap<>(); + private final Map<T, long[]> objToSbn = new HashMap<>(); + private final Map<T, long[]> objToCli = new HashMap<>(); + private final Map<T, long[]> objToSer = new HashMap<>(); + + private final int numSlots; + + public UrlSlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; + } + + /** + * @param obj + * @param slot + */ + public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long cli, long ser, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot] += count; + + long[] cpns = objToCpn.get(obj); + if (cpns == null) { + cpns = new long[this.numSlots]; + objToCpn.put(obj, cpns); + } + cpns[slot] += cpn; + + long[] spns = objToSpn.get(obj); + if (spns == null) { + spns = new long[this.numSlots]; + objToSpn.put(obj, spns); + } + spns[slot] += spn; + + long[] cbns = objToCbn.get(obj); + if (cbns == null) { + cbns = new long[this.numSlots]; + objToCbn.put(obj, cbns); + } + cbns[slot] += cbn; + + long[] sbns = objToSbn.get(obj); + if (sbns == null) { + sbns = new long[this.numSlots]; + objToSbn.put(obj, sbns); + } + sbns[slot] += sbn; + + long[] clis = objToCli.get(obj); + if (clis == null) { + clis = new long[this.numSlots]; + objToCli.put(obj, clis); + } + clis[slot] = cli; + + + long[] sers = objToSer.get(obj); + if (sers == null) { + sers = new long[this.numSlots]; + objToSer.put(obj, sers); + } + sers[slot] = ser; + } + + + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } else { + return counts[slot]; + } + + } + + public Map<T, Long[]> getCounts() { + Map<T, Long[]> result = new HashMap<T, Long[]>(); + for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) { + Long[] numbers = new Long[7]; + numbers[0] = computerTotalCount(entry.getKey(), objToCounts); + numbers[1] = computerTotalCount(entry.getKey(), objToCpn); + numbers[2] = computerTotalCount(entry.getKey(), objToSpn); + numbers[3] = computerTotalCount(entry.getKey(), objToCbn); + numbers[4] = computerTotalCount(entry.getKey(), objToSbn); + numbers[5] = uniqueTotalCount(entry.getKey(), objToCli); + numbers[6] = uniqueTotalCount(entry.getKey(), objToSer); + result.put(entry.getKey(), numbers); + } + return result; + } + + private long computerTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + total += count; + } + System.out.println(Arrays.toString(counts)); + return total; + } + + private long uniqueTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + if (count != 0 && count > total) { + total = count; + } + } + return total; + } + + /** + * Reset the slot count of any tracked objects to zero for given slot + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } + + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + long[] cpns = objToCpn.get(obj); + long[] spns = objToSpn.get(obj); + long[] cbls = objToCbn.get(obj); + long[] sbls = objToSbn.get(obj); + long[] clis = objToCli.get(obj); + long[] sers = objToSer.get(obj); + + counts[slot] = 0; + cpns[slot] = 0; + spns[slot] = 0; + cbls[slot] = 0; + sbls[slot] = 0; + clis[slot] = 0; + sers[slot] = 0; + + } + + + private boolean shouldBeRemovedfromCounter(T obj) { + return computerTotalCount(obj, objToCounts) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + + for (T obj : objToCounts.keySet()) { + if (shouldBeRemovedfromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + objToCpn.remove(obj); + objToSpn.remove(obj); + objToCbn.remove(obj); + objToSbn.remove(obj); + objToCli.remove(obj); + objToSer.remove(obj); + + } + + + } + +} diff --git a/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java new file mode 100644 index 0000000..c6067a2 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlidingWindowCounter.java @@ -0,0 +1,60 @@ +package cn.ac.iie.utils.unique.webdimension; + +import cn.ac.iie.utils.unique.urldimension.UrlSlotBasedCounter; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author qidaijie + */ +public final class WebSlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = 7540606699600042717L; + private WebSlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public WebSlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new WebSlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj, long count,long cpn ,long spn,long cbn,long sbn,long cli,long ser,long session) { + objCounter.incrementCount(obj,count,cpn,spn,cbn,sbn,cli,ser,session,headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return + */ + public Map<T, Long[]> getCountsThenAdvanceWindow() { + Map<T,Long[]> objCountMap = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return objCountMap; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } +} diff --git a/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java new file mode 100644 index 0000000..cced0b9 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/unique/webdimension/WebSlotBasedCounter.java @@ -0,0 +1,209 @@ +package cn.ac.iie.utils.unique.webdimension; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import java.io.Serializable; +import java.util.*; + +/** + * @author ZDJZ + */ +public class WebSlotBasedCounter<T> implements Serializable { + private static final long serialVersionUID = 4935449009558219987L; + + private final Map<T, long[]> objToCounts = new HashMap<>(); + private final Map<T, long[]> objToCpn = new HashMap<>(); + private final Map<T, long[]> objToSpn = new HashMap<>(); + private final Map<T, long[]> objToCbn = new HashMap<>(); + private final Map<T, long[]> objToSbn = new HashMap<>(); + private final Map<T, long[]> objToCli = new HashMap<>(); + private final Map<T, long[]> objToSer = new HashMap<>(); + private final Map<T, long[]> objToSession = new HashMap<>(); + + private final int numSlots; + + public WebSlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; + } + + /** + * @param obj + * @param slot + */ + public void incrementCount(T obj, long count, long cpn, long spn, long cbn, long sbn, long cli, long ser, long session, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot] += count; + + long[] cpns = objToCpn.get(obj); + if (cpns == null) { + cpns = new long[this.numSlots]; + objToCpn.put(obj, cpns); + } + cpns[slot] += cpn; + + long[] spns = objToSpn.get(obj); + if (spns == null) { + spns = new long[this.numSlots]; + objToSpn.put(obj, spns); + } + spns[slot] += spn; + + long[] cbns = objToCbn.get(obj); + if (cbns == null) { + cbns = new long[this.numSlots]; + objToCbn.put(obj, cbns); + } + cbns[slot] += cbn; + + long[] sbns = objToSbn.get(obj); + if (sbns == null) { + sbns = new long[this.numSlots]; + objToSbn.put(obj, sbns); + } + sbns[slot] += sbn; + + long[] clis = objToCli.get(obj); + if (clis == null) { + clis = new long[this.numSlots]; + objToCli.put(obj, clis); + } + clis[slot] = cli; + + + long[] sers = objToSer.get(obj); + if (sers == null) { + sers = new long[this.numSlots]; + objToSer.put(obj, sers); + } + sers[slot] = ser; + + long[] sessions = objToSession.get(obj); + if (sessions == null) { + sessions = new long[this.numSlots]; + objToSession.put(obj, sessions); + } + sessions[slot] += session; + } + + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } else { + return counts[slot]; + } + + } + + public Map<T, Long[]> getCounts() { + Map<T, Long[]> result = new HashMap<T, Long[]>(); + for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) { + Long[] numbers = new Long[8]; + numbers[0] = computerTotalCount(entry.getKey(), objToCounts); + numbers[1] = computerTotalCount(entry.getKey(), objToCpn); + numbers[2] = computerTotalCount(entry.getKey(), objToSpn); + numbers[3] = computerTotalCount(entry.getKey(), objToCbn); + numbers[4] = computerTotalCount(entry.getKey(), objToSbn); + numbers[5] = uniqueTotalCount(entry.getKey(), objToCli); + numbers[6] = uniqueTotalCount(entry.getKey(), objToSer); + numbers[7] = computerTotalCount(entry.getKey(), objToSession); + result.put(entry.getKey(), numbers); + } + return result; + } + + private long computerTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + total += count; + } + return total; + } + + private long uniqueTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + if (count != 0 && count > total) { + total = count; + } + } + return total; + } + + /** + * Reset the slot count of any tracked objects to zero for given slot + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } + + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + long[] cpns = objToCpn.get(obj); + long[] spns = objToSpn.get(obj); + long[] cbls = objToCbn.get(obj); + long[] sbls = objToSbn.get(obj); + long[] clis = objToCli.get(obj); + long[] sers = objToSer.get(obj); + long[] sessions = objToSession.get(obj); + + counts[slot] = 0; + cpns[slot] = 0; + spns[slot] = 0; + cbls[slot] = 0; + sbls[slot] = 0; + clis[slot] = 0; + sers[slot] = 0; + sessions[slot] = 0; + + } + + + private boolean shouldBeRemovedfromCounter(T obj) { + return computerTotalCount(obj, objToCounts) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + + for (T obj : objToCounts.keySet()) { + if (shouldBeRemovedfromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + objToCpn.remove(obj); + objToSpn.remove(obj); + objToCbn.remove(obj); + objToSbn.remove(obj); + objToCli.remove(obj); + objToSer.remove(obj); + objToSession.remove(obj); + + } + + + } + +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..c0f5ea4 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,23 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console
\ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/ContainerTest.java b/src/test/java/cn/ac/iie/test/ContainerTest.java new file mode 100644 index 0000000..156321e --- /dev/null +++ b/src/test/java/cn/ac/iie/test/ContainerTest.java @@ -0,0 +1,16 @@ +package cn.ac.iie.test; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; + +import java.util.HashSet; +import java.util.Set; + +public class ContainerTest { + public static void main(String[] args) { + System.out.println(Long.MAX_VALUE); + System.out.println(Long.MIN_VALUE); + + } +} diff --git a/src/test/java/cn/ac/iie/test/IpTest.java b/src/test/java/cn/ac/iie/test/IpTest.java new file mode 100644 index 0000000..c60679d --- /dev/null +++ b/src/test/java/cn/ac/iie/test/IpTest.java @@ -0,0 +1,17 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.TopNCountConfig; +import com.zdjizhi.utils.IpLookup; + +public class IpTest { + + public static void main(String[] args) { + IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4("Kazakhstan.mmdb") + .loadDataFileV6("Kazakhstan.mmdb") + .build(); + + System.out.println(ipLookup.cityLookupDetail("2001:470:19:790::7f")); +// System.out.println(ipLookup.asnLookup("2001:470:19:790::7f")); + } +} diff --git a/src/test/java/cn/ac/iie/test/TimeTest.java b/src/test/java/cn/ac/iie/test/TimeTest.java new file mode 100644 index 0000000..d7e634e --- /dev/null +++ b/src/test/java/cn/ac/iie/test/TimeTest.java @@ -0,0 +1,42 @@ +package cn.ac.iie.test; + +import scala.Int; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class TimeTest { + public static void main(String[] args) { + long timestamp = System.currentTimeMillis() / 1000; + + Map<Integer, Long> startMap = new HashMap<>(); + Map<Integer, Long> endMap = new HashMap<>(); + for (int j = 1; j <= 10; j++) { + endMap.put(j, 0L); + startMap.put(j, 0L); + } + System.out.println(endMap); + long nowTime = timestamp - dateToStamp("2019-05-20 09:33:00") ; + + System.out.println(nowTime); + double l = nowTime / 30; + System.out.println(l); + } + + private static long dateToStamp(String time) { + long res = 0; + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try { + Date date = simpleDateFormat.parse(time); + long ts = date.getTime() / 1000; + res = Long.parseLong(String.valueOf(ts)); + } catch (ParseException e) { + e.printStackTrace(); + } + return res; + } +} |
