summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李玺康 <[email protected]>2020-11-12 09:45:25 +0800
committer李玺康 <[email protected]>2020-11-12 09:45:25 +0800
commit5f094ab7edaf040b7a99211503e29c5e6dfd13e6 (patch)
tree8d335f6fe582a6dca2ec98674063f2eb59c8a2cf
parent5260718fbaf5b72d06902be1fe378520c0875d4f (diff)
live chart 功能正常版本
-rw-r--r--pom.xml2
-rw-r--r--properties/service_flow_config.properties6
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java3
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java2
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java4
5 files changed, 6 insertions, 11 deletions
diff --git a/pom.xml b/pom.xml
index 4ab0fc3..dbf8f94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
- <!--<scope>provided</scope>-->
+ <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 8394de4..85ef010 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,5 +1,5 @@
#管理kafka地址
-bootstrap.servers=192.168.44.12:9092
+bootstrap.servers=192.168.40.203:9092
#latest/earliest
auto.offset.reset=latest
@@ -11,10 +11,10 @@ kafka.compression.type=none
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=test-20201026
+group.id=2020-11-11-2
#输出topic
-results.bootstrap.servers=192.168.44.12:9092
+results.bootstrap.servers=192.168.40.203:9092
#输出topic
results.output.topic=TRAFFIC-PROTOCOL-STAT-LOG
diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
index a17fbd5..6a81fad 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
+++ b/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
@@ -67,16 +67,13 @@ public class AggregateBolt extends BaseBasicBolt {
} else {
String label = input.getStringByField("label");
- System.out.println("recv label=================="+label);
//action中某个协议的所有function,如果没有就默认
String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
String dimensions = input.getStringByField("dimensions");
- System.out.println("recv dimensions=============="+dimensions);
String message = input.getStringByField("message");
- System.out.println("recv message==================="+message);
//一条数据
JSONObject event = JSONObject.parseObject(message);
//数据中的key值 (protocol,device_id,isp)
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
index 2bb8062..9c59c99 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
+++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
@@ -138,7 +138,7 @@ public class ParseKvBolt extends BaseBasicBolt {
private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = message.getString(fieldName);
- System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
+
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
Long appID = Long.valueOf(fieldSplit[subscript]);
diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
index d172f87..c270040 100644
--- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
+++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
@@ -107,10 +107,8 @@ public class AggregateUtils {
initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
}
dimesionsObj.put(name, initStr);
- System.out.println("发送之前================================");
+
collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
- System.out.println("send label ==============="+splitArr[splitArr.length - headIndex]);
- System.out.println("send dimesion============="+dimesionsObj.toString());
reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);