diff options
4 files changed, 75 insertions, 5 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java index 186956e..a396e0c 100644 --- a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java @@ -36,7 +36,7 @@ public class ConnCompletionBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { if (TupleUtils.isTick(tuple)) { - //HbaseUtils.change(); + HbaseUtils.change(); } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { diff --git a/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java new file mode 100644 index 0000000..48cb1df --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java @@ -0,0 +1,65 @@ +package cn.ac.iie.bolt.radius; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage; + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ +@SuppressWarnings("all") +public class ConnCompletionBolt extends BaseBasicBolt { + private static final long serialVersionUID = -1059151670138465894L; + private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + //HbaseUtils.change(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(getJsonMessage(message))); + } + } + + } catch (Exception e) { + logger.error("接收解析过程出现异常", e); + } + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index fdc04aa..966d4d6 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -1,8 +1,8 @@ package cn.ac.iie.topology; -import cn.ac.iie.bolt.ConnCompletionBolt; import cn.ac.iie.bolt.NtcLogSendBolt; +import cn.ac.iie.bolt.radius.ConnCompletionBolt; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -57,7 +57,12 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + if(FlowWriteConfig.KAFKA_TOPIC.equals("RADIUS-LOG")){ + builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + }else{ + builder.setBolt("ConnCompletionBolt", new cn.ac.iie.bolt.ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + + } builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); // builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); } @@ -83,5 +88,6 @@ public class LogFlowWriteTopology { logger.info("执行远程部署模式..."); csst.runRemotely(); } + } } diff --git a/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java index 0f15028..ee7a354 100644 --- a/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java +++ b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java @@ -25,7 +25,7 @@ public class HbaseUtils { private static Connection connection; private static Long time; - /* static { + static { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 @@ -39,7 +39,6 @@ public class HbaseUtils { e.printStackTrace(); } } -*/ public static void change() { Long nowTime = System.currentTimeMillis(); timestampsFilter(time, nowTime); |
