summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java2
-rw-r--r--src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java65
-rw-r--r--src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java10
-rw-r--r--src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java3
4 files changed, 75 insertions, 5 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java
index 186956e..a396e0c 100644
--- a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java
+++ b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java
@@ -36,7 +36,7 @@ public class ConnCompletionBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
- //HbaseUtils.change();
+ HbaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
diff --git a/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java
new file mode 100644
index 0000000..48cb1df
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/radius/ConnCompletionBolt.java
@@ -0,0 +1,65 @@
+package cn.ac.iie.bolt.radius;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.system.TupleUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage;
+
+/**
+ * 通联关系日志补全
+ *
+ * @author qidaijie
+ */
+@SuppressWarnings("all")
+public class ConnCompletionBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = -1059151670138465894L;
+ private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class);
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ //HbaseUtils.change();
+ } else {
+ String message = tuple.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ basicOutputCollector.emit(new Values(getJsonMessage(message)));
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("接收解析过程出现异常", e);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("connLog"));
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java
index fdc04aa..966d4d6 100644
--- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java
+++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java
@@ -1,8 +1,8 @@
package cn.ac.iie.topology;
-import cn.ac.iie.bolt.ConnCompletionBolt;
import cn.ac.iie.bolt.NtcLogSendBolt;
+import cn.ac.iie.bolt.radius.ConnCompletionBolt;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.log4j.Logger;
@@ -57,7 +57,12 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
- builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
+ if(FlowWriteConfig.KAFKA_TOPIC.equals("RADIUS-LOG")){
+ builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
+ }else{
+ builder.setBolt("ConnCompletionBolt", new cn.ac.iie.bolt.ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
+
+ }
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
}
@@ -83,5 +88,6 @@ public class LogFlowWriteTopology {
logger.info("执行远程部署模式...");
csst.runRemotely();
}
+
}
}
diff --git a/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java
index 0f15028..ee7a354 100644
--- a/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java
+++ b/src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java
@@ -25,7 +25,7 @@ public class HbaseUtils {
private static Connection connection;
private static Long time;
- /* static {
+ static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
@@ -39,7 +39,6 @@ public class HbaseUtils {
e.printStackTrace();
}
}
-*/
public static void change() {
Long nowTime = System.currentTimeMillis();
timestampsFilter(time, nowTime);