summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java
diff options
context:
space:
mode:
authorcaohui <[email protected]>2020-04-29 14:32:05 +0800
committercaohui <[email protected]>2020-04-29 14:32:05 +0800
commitd15d7536f385ec4a1250ed15ed52fd6c05eb7431 (patch)
tree737ec8462ef62ac70caeee1533cbee4e76ceef98 /src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java
VoIP Knowledge Base sip-voip-completion Initial commit 202004291431HEADmaster
Diffstat (limited to 'src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java')
-rw-r--r--src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java94
1 files changed, 94 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java b/src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java
new file mode 100644
index 0000000..73c7888
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/LogRealtimeCountTopology.java
@@ -0,0 +1,94 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.*;
+import cn.ac.iie.common.RealtimeCountConfig;
+import cn.ac.iie.spout.sip.SIP_ORIGIN_ALL_KafkaSpout;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+public class LogRealtimeCountTopology {
+ private static Logger logger = Logger.getLogger(LogRealtimeCountTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ public LogRealtimeCountTopology() {
+ this(LogRealtimeCountTopology.class.getSimpleName());
+ }
+
+ public LogRealtimeCountTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setMaxSpoutPending(RealtimeCountConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ if (RealtimeCountConfig.TOPOLOGY_NUM_ACKS == 0) {
+ conf.setNumAckers(0);
+ }
+ return conf;
+ }
+
+ public void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(RealtimeCountConfig.TOPOLOGY_WORKERS);
+ topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);//老版配置为8
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
+
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ public void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("SIP_ORIGIN_ALL_KafkaSpout", new SIP_ORIGIN_ALL_KafkaSpout(), RealtimeCountConfig.SPOUT_PARALLELISM);
+
+ builder.setBolt("FromSpoutBufferBoltDC", new FromSpoutBufferBoltDC(), RealtimeCountConfig.BUFFER_BOLT_PARALLELISM).shuffleGrouping("SIP_ORIGIN_ALL_KafkaSpout");
+
+ builder.setBolt("GetSipOriBoltDC", new GetSipOriBoltDC(), RealtimeCountConfig.FORMAT_BOLT_PARALLELISM).shuffleGrouping("FromSpoutBufferBoltDC");
+
+ if (RealtimeCountConfig.GROUP_STRATEGY == 0) {
+ builder.setBolt("SipInsertBoltDC", new SipInsertBoltDC(RealtimeCountConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS), RealtimeCountConfig.DATABASE_BOLT_PARALLELISM).fieldsGrouping("GetSipOriBoltDC", new Fields("jsonLog"));
+ } else {
+ builder.setBolt("SipInsertBoltDC", new SipInsertBoltDC(RealtimeCountConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS), RealtimeCountConfig.DATABASE_BOLT_PARALLELISM).shuffleGrouping("GetSipOriBoltDC");
+ }
+
+ builder.setBolt("SipRealTimeCountBoltDC", new SipRealTimeCountBoltDC(RealtimeCountConfig.TOPOLOGY_TICK_TUPLE_COUNT_FREQ_SECS), RealtimeCountConfig.COUNT_BOLT_PARALLELISM).shuffleGrouping("SipInsertBoltDC");
+
+ builder.setBolt("SipRealTimeMergeBoltDC", new SipRealTimeMergeBoltDC(RealtimeCountConfig.TOPOLOGY_TICK_TUPLE_MERGE_FREQ_SECS), RealtimeCountConfig.MERGE_BOLT_PARALLELISM).fieldsGrouping("SipRealTimeCountBoltDC", new Fields("countType"));
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogRealtimeCountTopology csst = null;
+ boolean runLocally = true;
+ if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+ runLocally = false;
+ csst = new LogRealtimeCountTopology(args[0]);
+ } else {
+ csst = new LogRealtimeCountTopology();
+ }
+
+ csst.buildTopology();
+ RealtimeCountConfig.configShow();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}