diff options
| author | 李玺康 <[email protected]> | 2020-11-12 09:45:25 +0800 |
|---|---|---|
| committer | 李玺康 <[email protected]> | 2020-11-12 09:45:25 +0800 |
| commit | 5f094ab7edaf040b7a99211503e29c5e6dfd13e6 (patch) | |
| tree | 8d335f6fe582a6dca2ec98674063f2eb59c8a2cf | |
| parent | 5260718fbaf5b72d06902be1fe378520c0875d4f (diff) | |
live chart 功能正常版本
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 6 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java | 3 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java | 2 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java | 4 |
5 files changed, 6 insertions, 11 deletions
@@ -125,7 +125,7 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> - <!--<scope>provided</scope>--> + <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 8394de4..85ef010 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,5 +1,5 @@ #管理kafka地址 -bootstrap.servers=192.168.44.12:9092 +bootstrap.servers=192.168.40.203:9092 #latest/earliest auto.offset.reset=latest @@ -11,10 +11,10 @@ kafka.compression.type=none kafka.topic=CONNECTION-RECORD-COMPLETED-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=test-20201026 +group.id=2020-11-11-2 #输出topic -results.bootstrap.servers=192.168.44.12:9092 +results.bootstrap.servers=192.168.40.203:9092 #输出topic results.output.topic=TRAFFIC-PROTOCOL-STAT-LOG diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java index a17fbd5..6a81fad 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java +++ b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java @@ -67,16 +67,13 @@ public class AggregateBolt extends BaseBasicBolt { } else { String label = input.getStringByField("label"); - System.out.println("recv label=================="+label); //action中某个协议的所有function,如果没有就默认 String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default")); String dimensions = input.getStringByField("dimensions"); - System.out.println("recv dimensions=============="+dimensions); String message = input.getStringByField("message"); - System.out.println("recv message==================="+message); //一条数据 JSONObject event = JSONObject.parseObject(message); //数据中的key值 (protocol,device_id,isp) diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java index 2bb8062..9c59c99 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java +++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java @@ -138,7 +138,7 @@ public class ParseKvBolt extends BaseBasicBolt { private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) { String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); String data = message.getString(fieldName); - System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data); + int subscript = Integer.parseInt(alignmentPars[0]); String[] fieldSplit = data.split(alignmentPars[1]); Long appID = Long.valueOf(fieldSplit[subscript]); diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java index d172f87..c270040 100644 --- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java +++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java @@ -107,10 +107,8 @@ public class AggregateUtils { initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; } dimesionsObj.put(name, initStr); - System.out.println("发送之前================================"); + collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); - System.out.println("send label ==============="+splitArr[splitArr.length - headIndex]); - System.out.println("send dimesion============="+dimesionsObj.toString()); reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); |
