diff options
| author | qidaijie <[email protected]> | 2019-12-26 17:06:39 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2019-12-26 17:06:39 +0800 |
| commit | 4b522a1806632b359eee5169d4b7858cae3825b3 (patch) | |
| tree | 2638a45e5458c4d31dce02e195cdc84d8f2c4103 | |
| parent | 7c6446501011ad68ddc054ff88da4daadb2c854b (diff) | |
增加url session 统计tsg_galaxy_top_v3.0
10 files changed, 663 insertions, 8 deletions
diff --git a/properties/kafka_topic.properties b/properties/kafka_topic.properties index 9d91116..9f6ec16 100644 --- a/properties/kafka_topic.properties +++ b/properties/kafka_topic.properties @@ -1,21 +1,21 @@ #管理kafka地址 -bootstrap.servers=192.168.40.123:9092 +bootstrap.servers=192.168.40.224:9092 #kafka broker下的topic名称 kafka.topic=CONNECTION-RECORD-LOG #kafka消费group id -group.id=internal-top-program +group.id=url-top-program #从kafka哪里开始读:earliest/latest auto.offset.reset=latest #auto.offset.reset=earliest #输出Kafka地址 -results.output.servers=192.168.40.123:9092 +results.output.servers=192.168.40.224:9092 #输出Topic -results.output.topics=TOP-EXTERNAL-HOST-LOG +results.output.topics=TOP-URLS-LOG #topology pending topology.config.max.spout.pending=150000 @@ -27,11 +27,11 @@ topology.num.acks=1 max.failure.num=20 #定位库地址 -ip.library=/home/ceiec/topology/dat/ +ip.library=/dat/ #1:Internal 内部主机 2:External 外部主机 -#3:Website 域名 4:User 活跃用户 -pattern.num=2 +#3:Website 域名 4:User 活跃用户 5:Url +pattern.num=5 # bytes,packets, sessions dimension.type=bytes diff --git a/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java b/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java new file mode 100644 index 0000000..e9a1a1d --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/dimension/OnlySessionBolt.java @@ -0,0 +1,129 @@ +package cn.ac.iie.bolt.dimension; + + +import cn.ac.iie.utils.NthLastModifiedTimeTracker; +import cn.ac.iie.utils.onlysessioncount.OnlySlidingWindowCounter; +import cn.ac.iie.utils.system.TupleUtils; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class OnlySessionBolt extends BaseRichBolt { + private static final long serialVersionUID = 3110758302345522419L; + private static final Logger logger = Logger.getLogger(OnlySessionBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + /** + * 默认统计5分钟内的排名数据 + */ + private static final int DEFAULT_SILDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + /** + * 默认自动提交频率为1分钟 + */ + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SILDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final OnlySlidingWindowCounter<Object> counter; + + private OutputCollector outputCollector; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private NthLastModifiedTimeTracker lastModifiedTracker; + + public OnlySessionBolt() { + this(DEFAULT_SILDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + /** + * @param windowLengthInSeconds + * @param emitFrequencyInSeconds 提交频率,用于缓存,提交下一个bolt或存入数据库; + */ + public OnlySessionBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new OnlySlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + outputCollector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); + } + + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // now you can trigger e.g. a periodic activity + logger.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + } else { + // do something with the normal tuple + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long> objFiledMap = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + logger.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(objFiledMap, actualWindowLengthInSeconds); + + } + + private void emit(Map<Object, Long> objCountMap, int actualWindowLengthInSeconds) { + + for (Map.Entry<Object, Long> entry : objCountMap.entrySet()) { + Object obj = entry.getKey(); + Long count = entry.getValue(); + outputCollector.emit(new Values(obj, count, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + try { + Object obj = tuple.getValue(0); + long count = 1; + counter.incrementCount(obj, count); + outputCollector.ack(tuple); + } catch (Exception e) { + logger.error(tuple.toString() + e.toString()); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); + } + + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java new file mode 100644 index 0000000..66ac2e7 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/resultoutput/UrlOutPutBolt.java @@ -0,0 +1,100 @@ +package cn.ac.iie.bolt.resultoutput; + +import cn.ac.iie.model.Url; +import cn.ac.iie.utils.Rankable; +import cn.ac.iie.utils.Rankings; +import cn.ac.iie.utils.kafka.ResultToKafka; +import cn.ac.iie.utils.system.TupleUtils; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static cn.ac.iie.utils.TopUtils.getTimesTamp; + + +/** + * 数据计算结果后存储 + * + * @author Administrator + */ +public class UrlOutPutBolt extends BaseRichBolt { + + private static final Logger logger = Logger.getLogger(UrlOutPutBolt.class); + private static final long serialVersionUID = 4180912852801273297L; + private OutputCollector collector; + private ResultToKafka resultToKafka; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + resultToKafka = ResultToKafka.getInstance(); + } + + @Override + public void execute(Tuple tuple) { + try { + if (!TupleUtils.isTick(tuple)) { + Rankings rankings = (Rankings) tuple.getValue(0); + if (StringUtil.isEmpty(rankings.getRankings())) { + logger.warn("rankings is null, don't insert..."); + } else { + resultToKafka.sendMessage(parserRankingToObject(rankings)); + } + collector.ack(tuple); + } + + } catch (Exception e) { + logger.error("batch insert error:" + e); + collector.fail(tuple); + + } + + } + + /** + * 将排名对象解析为实际业务类型热门网站 + * + * @param rankings 结果集 + * @return json集合 + */ + private LinkedList<String> parserRankingToObject(Rankings rankings) { + LinkedList<String> activeIpList = new LinkedList<>(); + long timesTamp = getTimesTamp(); + Url url = new Url(); + for (Rankable rankable : rankings.getRankings()) { + List<Object> obj = rankable.getFields(); + url.setUrl(rankable.getObject().toString()); + url.setSession_num(rankable.getCount()); + url.setStat_time(timesTamp); + activeIpList.add(JSONObject.toJSONString(url)); + } + return activeIpList; + } + + /** + * 获取当前时间的时间戳(秒) + * + * @return 时间戳 秒 + */ + private static long getTimesTamp() { + Date date = new Date(); + String timestamp = String.valueOf(date.getTime() / 1000); + return Integer.valueOf(timestamp); + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java b/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java new file mode 100644 index 0000000..22eda11 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/segmentation/UrlBolt.java @@ -0,0 +1,50 @@ +package cn.ac.iie.bolt.segmentation; + +import cn.ac.iie.common.TopCountConfig; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +/** + * 切分原始日志 + * + * @author qidaijie + */ +public class UrlBolt extends BaseBasicBolt { + + private static final Logger logger = Logger.getLogger(UrlBolt.class); + private static final long serialVersionUID = -5494628124427550753L; + + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + String message = input.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSON.parseObject(message); + if (jsonObject.containsKey("http_url")) { + String key = jsonObject.getString("http_url"); + System.out.println(key); + collector.emit(new Values(key)); + } + } + } catch (Exception e) { + logger.error(TopCountConfig.KAFKA_TOPIC + "原始日志拆分异常:", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key")); + + } + + +} diff --git a/src/main/java/cn/ac/iie/model/Url.java b/src/main/java/cn/ac/iie/model/Url.java new file mode 100644 index 0000000..1a8c2b5 --- /dev/null +++ b/src/main/java/cn/ac/iie/model/Url.java @@ -0,0 +1,36 @@ +package cn.ac.iie.model; + +/** + * url 实体类 + * + * @author qidaijie + */ +public class Url { + private String url; + private long session_num; + private long stat_time; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public long getSession_num() { + return session_num; + } + + public void setSession_num(long session_num) { + this.session_num = session_num; + } + + public long getStat_time() { + return stat_time; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } +} diff --git a/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java b/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java new file mode 100644 index 0000000..44405df --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/UrlProxyKafkaSpout.java @@ -0,0 +1,79 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.TopCountConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + */ +public class UrlProxyKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(UrlProxyKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", TopCountConfig.BOOTSTRAP_SERVERS); + props.put("group.id", TopCountConfig.GROUP_ID+"pr"); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", TopCountConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList("PROXY-EVENT-LOG")); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + Thread.sleep(1); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("实时统计程序SPOUT出现异常----> " + e); + e.printStackTrace(); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java b/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java new file mode 100644 index 0000000..e424434 --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/UrlSecurityKafkaSpout.java @@ -0,0 +1,79 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.TopCountConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + */ +public class UrlSecurityKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(UrlSecurityKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", TopCountConfig.BOOTSTRAP_SERVERS); + props.put("group.id", TopCountConfig.GROUP_ID+"se"); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", TopCountConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList("SECURITY-EVENT-LOG")); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + Thread.sleep(1); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("实时统计程序SPOUT出现异常----> " + e); + e.printStackTrace(); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java index dc3360e..767fb6d 100644 --- a/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogTopCountTopology.java @@ -2,12 +2,15 @@ package cn.ac.iie.topology; import cn.ac.iie.bolt.*; import cn.ac.iie.bolt.dimension.BytesCountBolt; +import cn.ac.iie.bolt.dimension.OnlySessionBolt; import cn.ac.iie.bolt.dimension.PackCountBolt; import cn.ac.iie.bolt.dimension.SessionCountBolt; import cn.ac.iie.bolt.resultoutput.*; import cn.ac.iie.bolt.segmentation.*; import cn.ac.iie.common.TopCountConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; +import cn.ac.iie.spout.UrlProxyKafkaSpout; +import cn.ac.iie.spout.UrlSecurityKafkaSpout; import org.apache.log4j.Logger; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; @@ -58,9 +61,9 @@ public class LogTopCountTopology { private void buildTopology() { builder = new TopologyBuilder(); - builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); switch (TopCountConfig.PATTERN) { case 1: + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); builder.setBolt("InternalNormalizer", new InternalBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout"); if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) { builder.setBolt("InternalCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS), @@ -79,6 +82,7 @@ public class LogTopCountTopology { builder.setBolt("InternalOutPutBolt", new InternalOutPutBolt()).globalGrouping("finalRanker"); break; case 2: + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); builder.setBolt("ExternalNormalizer", new ExternalBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout"); if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) { builder.setBolt("ExternalCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS), @@ -97,6 +101,7 @@ public class LogTopCountTopology { builder.setBolt("ExternalOutPutBolt", new ExternalOutPutBolt()).globalGrouping("finalRanker"); break; case 3: + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); builder.setBolt("WebsiteNormalizer", new WebSiteBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout"); if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) { builder.setBolt("WebsiteCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS), @@ -115,6 +120,7 @@ public class LogTopCountTopology { builder.setBolt("WebsiteOutPutBolt", new WebsiteOutPutBolt()).globalGrouping("finalRanker"); break; case 4: + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); builder.setBolt("UserNormalizer", new UserBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("CustomizedKafkaSpout"); if (TopCountConfig.SESSIONS_TYPE.equals(TopCountConfig.DIMENSION_TYPE)) { builder.setBolt("UserCount", new SessionCountBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS), @@ -132,6 +138,20 @@ public class LogTopCountTopology { builder.setBolt("finalRanker", new TotalRankingsBolt(TopCountConfig.TOPTOTALN, TopCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); builder.setBolt("UserOutPutBolt", new UserOutPutBolt()).globalGrouping("finalRanker"); break; + case 5: + builder.setSpout("UrlProxyKafkaSpout", new UrlProxyKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); + builder.setSpout("UrlSecurityKafkaSpout", new UrlSecurityKafkaSpout(), TopCountConfig.SPOUT_PARALLELISM); + + builder.setBolt("UrlNormalizer", new UrlBolt(), (TopCountConfig.WORDNORMALIZER)).shuffleGrouping("UrlProxyKafkaSpout").shuffleGrouping("UrlSecurityKafkaSpout"); + + builder.setBolt("OnlySessionBolt", new OnlySessionBolt(TopCountConfig.TOPWINDOWLENGTH, TopCountConfig.TOPEMITFREQUENCYSECS), + TopCountConfig.ROLLINGCOUNT).fieldsGrouping("UrlNormalizer", new Fields("key")); + builder.setBolt("IntermediateRanker", new IntermediateRankingsBolt(TopCountConfig.TOPINTERMEDIATEN, + TopCountConfig.INTERMEDIATEEMITFREQUENCYSECS), + TopCountConfig.INTERMEDIATERANKINGS).fieldsGrouping("OnlySessionBolt", new Fields("obj")); + builder.setBolt("finalRanker", new TotalRankingsBolt(TopCountConfig.TOPTOTALN, TopCountConfig.TAOTALEMITFREQUENCYSECS)).globalGrouping("IntermediateRanker"); + builder.setBolt("UrlOutPutBolt", new UrlOutPutBolt()).globalGrouping("finalRanker"); + break; default: } diff --git a/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java new file mode 100644 index 0000000..2660954 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlidingWindowCounter.java @@ -0,0 +1,58 @@ +package cn.ac.iie.utils.onlysessioncount; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author qidaijie + */ +public final class OnlySlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = 5919639415485376341L; + private OnlySlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public OnlySlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new OnlySlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj, long count) { + objCounter.incrementCount(obj,count,headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return + */ + public Map<T, Long> getCountsThenAdvanceWindow() { + Map<T,Long> objCountMap = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return objCountMap; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } +} diff --git a/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java new file mode 100644 index 0000000..0ed778a --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/onlysessioncount/OnlySlotBasedCounter.java @@ -0,0 +1,104 @@ +package cn.ac.iie.utils.onlysessioncount; + + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * @author qidaijie + */ +public class OnlySlotBasedCounter<T> implements Serializable { + + private static final long serialVersionUID = -7266348235766531079L; + private final Map<T, long[]> objToCounts = new HashMap<>(); + + private final int numSlots; + + public OnlySlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException(" Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; + } + + /** + * @param obj + * @param slot + */ + public void incrementCount(T obj, long count, int slot) { + + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot] += count; + } + + public Map<T, Long> getCounts() { + Map<T, Long> result = new HashMap<T, Long>(); + for (Map.Entry<T, long[]> entry : objToCounts.entrySet()) { + Long numbers = computerTotalCount(entry.getKey(), objToCounts); + result.put(entry.getKey(), numbers); +// result.put(entry.getKey(), numbers); + } + return result; + } + + + private long computerTotalCount(T obj, Map<T, long[]> objTo) { + long[] counts = objTo.get(obj); + long total = 0; + for (long count : counts) { + total += count; + } + return total; + } + + + /** + * Reset the slot count of any tracked objects to zero for given slot + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } + + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + + counts[slot] = 0; + } + + + private boolean shouldBeRemovedfromCounter(T obj) { + return computerTotalCount(obj, objToCounts) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + + for (T obj : objToCounts.keySet()) { + if (shouldBeRemovedfromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + } + + + } + +} |
