summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2019-12-26 17:06:39 +0800
committerqidaijie <[email protected]>2019-12-26 17:06:39 +0800
commit4b522a1806632b359eee5169d4b7858cae3825b3 (patch)
tree2638a45e5458c4d31dce02e195cdc84d8f2c4103
parent7c6446501011ad68ddc054ff88da4daadb2c854b (diff)
增加url session 统计tsg_galaxy_top_v3.0
-rw-r--r--properties/kafka_topic.properties14
-rw-r--r--src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java129
-rw-r--r--src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java100
-rw-r--r--src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java50
-rw-r--r--src/main/java/cn/ac/iie/model/Url.java36
-rw-r--r--src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java79
-rw-r--r--src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java79
-rw-r--r--src/main/java/cn/ac/iie/topology/LogTopCountTopology.java22
-rw-r--r--src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java58
-rw-r--r--src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java104
10 files changed, 663 insertions, 8 deletions
diff --git a/properties/kafka_topic.properties b/properties/kafka_topic.properties
index 9d91116..9f6ec16 100644
--- a/properties/kafka_topic.properties
+++ b/properties/kafka_topic.properties
@@ -1,21 +1,21 @@
#管理kafka地址
-bootstrap.servers=192.168.40.123:9092
+bootstrap.servers=192.168.40.224:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
-group.id=internal-top-program
+group.id=url-top-program
#从kafka哪里开始读:earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
-results.output.servers=192.168.40.123:9092
+results.output.servers=192.168.40.224:9092
#输出Topic
-results.output.topics=TOP-EXTERNAL-HOST-LOG
+results.output.topics=TOP-URLS-LOG
#topology pending
topology.config.max.spout.pending=150000
@@ -27,11 +27,11 @@ topology.num.acks=1
max.failure.num=20
#定位库地址
-ip.library=/home/ceiec/topology/dat/
+ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
-#3:Website 域名 4:User 活跃用户
-pattern.num=2
+#3:Website 域名 4:User 活跃用户 5:Url
+pattern.num=5
# bytes,packets, sessions
dimension.type=bytes
diff --git a/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java b/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java
new file mode 100644
index 0000000..e9a1a1d
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java
@@ -0,0 +1,129 @@
+package cn.ac.iie.bolt.dimension;
+
+
+import cn.ac.iie.utils.NthLastModifiedTimeTracker;
+import cn.ac.iie.utils.onlysessioncount.OnlySlidingWindowCounter;
+import cn.ac.iie.utils.system.TupleUtils;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public class OnlySessionBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 3110758302345522419L;
+ private static final Logger logger = Logger.getLogger(OnlySessionBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ /**
+ * 默认统计5分钟内的排名数据
+ */
+ private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ /**
+ * 默认自动提交频率为1分钟
+ */
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final OnlySlidingWindowCounter<Object> counter;
+
+ private OutputCollector outputCollector;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+ public OnlySessionBolt() {
+ this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ /**
+ * @param windowLengthInSeconds
+ * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库;
+ */
+ public OnlySessionBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new OnlySlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ outputCollector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
+ }
+
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // now you can trigger e.g. a periodic activity
+ logger.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
+ } else {
+ // do something with the normal tuple
+ countObjAndAck(tuple);
+ }
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long> objFiledMap = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
+ }
+ emit(objFiledMap, actualWindowLengthInSeconds);
+
+ }
+
+ private void emit(Map<Object, Long> objCountMap, int actualWindowLengthInSeconds) {
+
+ for (Map.Entry<Object, Long> entry : objCountMap.entrySet()) {
+ Object obj = entry.getKey();
+ Long count = entry.getValue();
+ outputCollector.emit(new Values(obj, count, actualWindowLengthInSeconds));
+ }
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+ try {
+ Object obj = tuple.getValue(0);
+ long count = 1;
+ counter.incrementCount(obj, count);
+ outputCollector.ack(tuple);
+ } catch (Exception e) {
+ logger.error(tuple.toString() + e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
+ }
+
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java
new file mode 100644
index 0000000..66ac2e7
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java
@@ -0,0 +1,100 @@
+package cn.ac.iie.bolt.resultoutput;
+
+import cn.ac.iie.model.Url;
+import cn.ac.iie.utils.Rankable;
+import cn.ac.iie.utils.Rankings;
+import cn.ac.iie.utils.kafka.ResultToKafka;
+import cn.ac.iie.utils.system.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static cn.ac.iie.utils.TopUtils.getTimesTamp;
+
+
+/**
+ * 数据计算结果后存储
+ *
+ * @author Administrator
+ */
+public class UrlOutPutBolt extends BaseRichBolt {
+
+ private static final Logger logger = Logger.getLogger(UrlOutPutBolt.class);
+ private static final long serialVersionUID = 4180912852801273297L;
+ private OutputCollector collector;
+ private ResultToKafka resultToKafka;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ resultToKafka = ResultToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (!TupleUtils.isTick(tuple)) {
+ Rankings rankings = (Rankings) tuple.getValue(0);
+ if (StringUtil.isEmpty(rankings.getRankings())) {
+ logger.warn("rankings is null, don't insert...");
+ } else {
+ resultToKafka.sendMessage(parserRankingToObject(rankings));
+ }
+ collector.ack(tuple);
+ }
+
+ } catch (Exception e) {
+ logger.error("batch insert error:" + e);
+ collector.fail(tuple);
+
+ }
+
+ }
+
+ /**
+ * 将排名对象解析为实际业务类型热门网站
+ *
+ * @param rankings 结果集
+ * @return json集合
+ */
+ private LinkedList<String> parserRankingToObject(Rankings rankings) {
+ LinkedList<String> activeIpList = new LinkedList<>();
+ long timesTamp = getTimesTamp();
+ Url url = new Url();
+ for (Rankable rankable : rankings.getRankings()) {
+ List<Object> obj = rankable.getFields();
+ url.setUrl(rankable.getObject().toString());
+ url.setSession_num(rankable.getCount());
+ url.setStat_time(timesTamp);
+ activeIpList.add(JSONObject.toJSONString(url));
+ }
+ return activeIpList;
+ }
+
+ /**
+ * 获取当前时间的时间戳(秒)
+ *
+ * @return 时间戳 秒
+ */
+ private static long getTimesTamp() {
+ Date date = new Date();
+ String timestamp = String.valueOf(date.getTime() / 1000);
+ return Integer.valueOf(timestamp);
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java
new file mode 100644
index 0000000..22eda11
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java
@@ -0,0 +1,50 @@
+package cn.ac.iie.bolt.segmentation;
+
+import cn.ac.iie.common.TopCountConfig;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+/**
+ * 切分原始日志
+ *
+ * @author qidaijie
+ */
+public class UrlBolt extends BaseBasicBolt {
+
+ private static final Logger logger = Logger.getLogger(UrlBolt.class);
+ private static final long serialVersionUID = -5494628124427550753L;
+
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ String message = input.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSON.parseObject(message);
+ if (jsonObject.containsKey("http_url")) {
+ String key = jsonObject.getString("http_url");
+ System.out.println(key);
+ collector.emit(new Values(key));
+ }
+ }
+ } catch (Exception e) {
+ logger.error(TopCountConfig.KAFKA_TOPIC + "原始日志拆分异常:", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key"));
+
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/model/Url.java b/src/main/java/cn/ac/iie/model/Url.java
new file mode 100644
index 0000000..1a8c2b5
--- /dev/null
+++ b/src/main/java/cn/ac/iie/model/Url.java
@@ -0,0 +1,36 @@
+package cn.ac.iie.model;
+
+/**
+ * url 实体类
+ *
+ * @author qidaijie
+ */
+public class Url {
+ private String url;
+ private long session_num;
+ private long stat_time;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java b/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java
new file mode 100644
index 0000000..44405df
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java
@@ -0,0 +1,79 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.TopCountConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public class UrlProxyKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(UrlProxyKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", TopCountConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", TopCountConfig.GROUP_ID+"pr");
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", TopCountConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList("PROXY-EVENT-LOG"));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(1);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("实时统计程序SPOUT出现异常----> " + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java b/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java
new file mode 100644
index 0000000..e424434
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java
@@ -0,0 +1,79 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.TopCountConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public class UrlSecurityKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(UrlSecurityKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", TopCountConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", TopCountConfig.GROUP_ID+"se");
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", TopCountConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList("SECURITY-EVENT-LOG"));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(1);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("实时统计程序SPOUT出现异常----> " + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java
index dc3360e..767fb6d 100644
--- a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java
+++ b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java
@@ -2,12 +2,15 @@ package cn.ac.iie.topology;
import cn.ac.iie.bolt.*;
import cn.ac.iie.bolt.dimension.BytesCountBolt;
+import cn.ac.iie.bolt.dimension.OnlySessionBolt;
import cn.ac.iie.bolt.dimension.PackCountBolt;
import cn.ac.iie.bolt.dimension.SessionCountBolt;
import cn.ac.iie.bolt.resultoutput.*;
import cn.ac.iie.bolt.segmentation.*;
import cn.ac.iie.common.TopCountConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
+import cn.ac.iie.spout.UrlProxyKafkaSpout;
+import cn.ac.iie.spout.UrlSecurityKafkaSpout;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
@@ -58,9 +61,9 @@ public class LogTopCountTopology {
private void buildTopology() {
builder = new TopologyBuilder();
- builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
switch (TopCountConfig.PATTERN) {
case 1:
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
builder.setBolt("InternalNormalizer", new InternalBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout");
if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) {
builder.setBolt("InternalCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS),
@@ -79,6 +82,7 @@ public class LogTopCountTopology {
builder.setBolt("InternalOutPutBolt", new InternalOutPutBolt()).globalGrouping("finalRanker");
break;
case 2:
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
builder.setBolt("ExternalNormalizer", new ExternalBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout");
if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) {
builder.setBolt("ExternalCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS),
@@ -97,6 +101,7 @@ public class LogTopCountTopology {
builder.setBolt("ExternalOutPutBolt", new ExternalOutPutBolt()).globalGrouping("finalRanker");
break;
case 3:
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
builder.setBolt("WebsiteNormalizer", new WebSiteBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout");
if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) {
builder.setBolt("WebsiteCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS),
@@ -115,6 +120,7 @@ public class LogTopCountTopology {
builder.setBolt("WebsiteOutPutBolt", new WebsiteOutPutBolt()).globalGrouping("finalRanker");
break;
case 4:
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
builder.setBolt("UserNormalizer", new UserBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout");
if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) {
builder.setBolt("UserCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS),
@@ -132,6 +138,20 @@ public class LogTopCountTopology {
builder.setBolt("finalRanker", new TotalRankingsBolt(TopCountConfig.TOPTOTALN, TopCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
builder.setBolt("UserOutPutBolt", new UserOutPutBolt()).globalGrouping("finalRanker");
break;
+ case 5:
+ builder.setSpout("UrlProxyKafkaSpout", new UrlProxyKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
+ builder.setSpout("UrlSecurityKafkaSpout", new UrlSecurityKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM);
+
+ builder.setBolt("UrlNormalizer", new UrlBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("UrlProxyKafkaSpout").shuffleGrouping("UrlSecurityKafkaSpout");
+
+ builder.setBolt("OnlySessionBolt", new OnlySessionBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS),
+ TopCountConfig.ROLLINGCOUNT).fieldsGrouping("UrlNormalizer", new Fields("key"));
+ builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopCountConfig.TOPINTERMEDIATEN,
+ TopCountConfig.INTERMEDIATEEMITFREQUENCYSECS),
+ TopCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("OnlySessionBolt", new Fields("obj"));
+ builder.setBolt("finalRanker", new TotalRankingsBolt(TopCountConfig.TOPTOTALN, TopCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker");
+ builder.setBolt("UrlOutPutBolt", new UrlOutPutBolt()).globalGrouping("finalRanker");
+ break;
default:
}
diff --git a/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java
new file mode 100644
index 0000000..2660954
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java
@@ -0,0 +1,58 @@
+package cn.ac.iie.utils.onlysessioncount;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ */
+public final class OnlySlidingWindowCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = 5919639415485376341L;
+ private OnlySlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
+
+ public OnlySlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ + windowLengthInSlots + ")");
+ }
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new OnlySlotBasedCounter<T>(this.windowLengthInSlots);
+
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
+
+ public void incrementCount(T obj, long count) {
+ objCounter.incrementCount(obj,count,headSlot);
+ }
+
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long> getCountsThenAdvanceWindow() {
+ Map<T,Long> objCountMap = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return objCountMap;
+ }
+
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java
new file mode 100644
index 0000000..0ed778a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java
@@ -0,0 +1,104 @@
+package cn.ac.iie.utils.onlysessioncount;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author qidaijie
+ */
+public class OnlySlotBasedCounter<T> implements Serializable {
+
+ private static final long serialVersionUID = -7266348235766531079L;
+ private final Map<T, long[]> objToCounts = new HashMap<>();
+
+ private final int numSlots;
+
+ public OnlySlotBasedCounter(int numSlots) {
+ if (numSlots <= 0) {
+ throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")");
+ }
+ this.numSlots = numSlots;
+ }
+
+ /**
+ * @param obj
+ * @param slot
+ */
+ public void incrementCount(T obj, long count, int slot) {
+
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
+ }
+ counts[slot] += count;
+ }
+
+ public Map<T, Long> getCounts() {
+ Map<T, Long> result = new HashMap<T, Long>();
+ for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) {
+ Long numbers = computerTotalCount(entry.getKey(), objToCounts);
+ result.put(entry.getKey(), numbers);
+// result.put(entry.getKey(), numbers);
+ }
+ return result;
+ }
+
+
+ private long computerTotalCount(T obj, Map<T, long[]> objTo) {
+ long[] counts = objTo.get(obj);
+ long total = 0;
+ for (long count : counts) {
+ total += count;
+ }
+ return total;
+ }
+
+
+ /**
+ * Reset the slot count of any tracked objects to zero for given slot
+ *
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj, slot);
+ }
+
+ }
+
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+
+ counts[slot] = 0;
+ }
+
+
+ private boolean shouldBeRemovedfromCounter(T obj) {
+ return computerTotalCount(obj, objToCounts) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+
+ for (T obj : objToCounts.keySet()) {
+ if (shouldBeRemovedfromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
+ }
+
+
+ }
+
+}