summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-05-11 17:12:44 +0800
committerqidaijie <[email protected]>2021-05-11 17:12:44 +0800
commit0c20098e33ed1117a2143b39a7716ca47157ea67 (patch)
treede35b7987cce65eb6b8d0d1f8bea61c8cec05663
parent928581d8600d03d6af11ea19a7bfc1bb0aa9d4ec (diff)
适配21.05功能:HEADmaster
1:取消app_id对准功能,直接使用app_label进行拼接。 2:取消协议层反转,正序进行计算。
-rw-r--r--pom.xml44
-rw-r--r--properties/service_flow_config.properties13
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java50
-rw-r--r--src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java21
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java74
-rw-r--r--src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java5
6 files changed, 82 insertions, 125 deletions
diff --git a/pom.xml b/pom.xml
index 5088c6c..a8cd9da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,16 +2,14 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>cn.ac.iie</groupId>
+ <groupId>com.zdjizhi</groupId>
<artifactId>storm-olap-aggregation</artifactId>
- <version>v3.21.03.16-eal4</version>
+ <version>v3.21.04.28-history-live</version>
<packaging>jar</packaging>
-
<name>storm-olap-aggregation</name>
<url>http://maven.apache.org</url>
-
<repositories>
<repository>
@@ -20,19 +18,6 @@
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
- <repository>
- <id>maven-ali</id>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- <updatePolicy>always</updatePolicy>
- <checksumPolicy>fail</checksumPolicy>
- </snapshots>
- </repository>
-
</repositories>
@@ -43,9 +28,12 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
<executions>
<execution>
- <phase>package</phase>
+ <phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
@@ -78,7 +66,7 @@
<goals>
<goal>strip-jar</goal>
</goals>
- <phase>package</phase>
+ <phase>install</phase>
</execution>
</executions>
</plugin>
@@ -191,24 +179,6 @@
</exclusions>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.9</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
-
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 43e8ee0..151c329 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -51,17 +51,11 @@ gathering.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=1
-#kafka批量条数
-batch.insert.num=2000
-
#网关的schema位置
-schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_history
#网关的schema位置
-app.id.http=http://192.168.40.12:9999/open-api/appDicList
-
-#数据中心(UID)
-data.center.id.num=15
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
@@ -69,6 +63,3 @@ topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
-#允许发送kafka最大失败数
-max.failure.num=20
-
diff --git a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
index 14011c1..484ff6f 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java
@@ -1,6 +1,5 @@
package com.zdjizhi.storm.bolt;
-
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
import com.zdjizhi.storm.utils.exception.AggregationException;
import com.zdjizhi.storm.utils.http.HttpClientUtil;
@@ -18,12 +17,10 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static com.zdjizhi.storm.utils.combine.AggregateUtils.transDimensions;
-import static com.zdjizhi.storm.utils.combine.AggregateUtils.updateAppRelation;
/**
* @ClassNameMyWindowBolt
@@ -36,7 +33,7 @@ public class ParseKvBolt extends BaseBasicBolt {
private static final long serialVersionUID = -999382396035310355L;
private JSONArray transforms;
private JSONArray dimensions;
- private static HashMap<Long, String> appMap = new HashMap<>(32);
+// private static HashMap<Integer, String> appMap = new HashMap<>(32);
/**
@@ -55,7 +52,7 @@ public class ParseKvBolt extends BaseBasicBolt {
//TODO 获取dimensions
dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions"));
- updateAppRelation(appMap);
+// updateAppRelation(appMap);
}
@@ -63,7 +60,7 @@ public class ParseKvBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
- updateAppRelation(appMap);
+// updateAppRelation(appMap);
} else {
String source = tuple.getStringByField("source");
if (StringUtil.isNotBlank(source)) {
@@ -84,25 +81,25 @@ public class ParseKvBolt extends BaseBasicBolt {
String parameters = transformObj.getString("parameters");
switch (function) {
- case "alignment":
+ case "dismantling":
if (StringUtil.isNotBlank(parameters)) {
if (message.containsKey(fieldName)) {
- alignmentUtils(parameters, message, name, fieldName);
+ dismantlingUtils(parameters, message, name, fieldName);
}
}
break;
case "combination":
if (StringUtil.isNotBlank(parameters)) {
- String l7Proto = message.getString("common_l7_protocol");
- if (StringUtil.isNotBlank(l7Proto)) {
- String res = l7Proto + "/" + message.getString(fieldName);
- message.put(fieldName, res);
- dimensionsObj.put(name, res);
+ if (message.containsKey(fieldName)) {
+ combinationUtils(parameters, message, name, fieldName, dimensionsObj);
}
- combinationUtils(parameters, message, name, fieldName, dimensionsObj);
- collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString()));
}
break;
+ case "hierarchy":
+// if (StringUtil.isNotBlank(parameters)) {
+ collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString()));
+// }
+ break;
default:
break;
}
@@ -131,26 +128,24 @@ public class ParseKvBolt extends BaseBasicBolt {
/**
* alignment ID替换操作
+ * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。
*
* @param parameters 参数集
* @param message 原始日志
* @param name 结果日志列名
* @param fieldName 原始日志列名
*/
- private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
+ private static void dismantlingUtils(String parameters, JSONObject message, String name, String fieldName) {
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
- String data = message.getString(fieldName);
- logger.warn("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
- int subscript = Integer.parseInt(alignmentPars[0]);
- String[] fieldSplit = data.split(alignmentPars[1]);
- Long appId = Long.valueOf(fieldSplit[subscript]);
- int length = fieldSplit[subscript].length();
- StringBuilder sb = new StringBuilder(data);
- message.put(name, sb.replace(0, length, appMap.get(appId)));
+ String commonAppId = message.getString(fieldName);
+ int digits = Integer.parseInt(alignmentPars[0]);
+ String appName = commonAppId.split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
+ message.put(name, appName);
}
/**
* combination 拼接操作
+ * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
*
* @param parameters 参数集
* @param message 原始日志
@@ -160,9 +155,10 @@ public class ParseKvBolt extends BaseBasicBolt {
*/
private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) {
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
- String parameter0Value = message.getString(combinationPars[0]);
- if (StringUtil.isNotBlank(parameter0Value)) {
- String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName);
+ String combinationField = message.getString(combinationPars[0]);
+ String splitter = combinationPars[1];
+ if (StringUtil.isNotBlank(combinationField)) {
+ String combinationValue = message.getString(fieldName) + splitter + combinationField;
message.put(fieldName, combinationValue);
dimensionsObj.put(name, combinationValue);
}
diff --git a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
index 2925188..54b7bac 100644
--- a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
+++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java
@@ -1,7 +1,9 @@
package com.zdjizhi.storm.bolt;
+import com.zdjizhi.storm.utils.combine.AggregateUtils;
import com.zdjizhi.storm.utils.exception.AggregationException;
+import com.zdjizhi.storm.utils.http.HttpClientUtil;
import com.zdjizhi.storm.utils.kafka.LogSendKafka;
import com.zdjizhi.storm.utils.common.StreamAggregateConfig;
import cn.hutool.log.Log;
@@ -26,10 +28,15 @@ public class ResultSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
private static final Log logger = LogFactory.get();
private LogSendKafka logSendKafka;
+ private static String regex;
+ private static String name;
@Override
public void prepare(Map stormConf, TopologyContext context) {
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
logSendKafka = LogSendKafka.getInstance();
+ regex = AggregateUtils.getHierarchy(schema, "parameters");
+ name = AggregateUtils.getHierarchy(schema, "name");
}
@Override
@@ -39,21 +46,21 @@ public class ResultSendBolt extends BaseBasicBolt {
if (StringUtil.isNotBlank(message)) {
JSONObject jsonObject = JSONObject.parseObject(message);
StringBuffer stringBuffer = new StringBuffer();
- String[] protocolIds = jsonObject.getString("protocol_id").split("/");
- for (int i = (protocolIds.length - 1); i >= 0; i--) {
+ String[] protocolIds = jsonObject.getString(name).split(StreamAggregateConfig.PROTOCOL_SPLITTER);
+ for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) {
- stringBuffer.append(protocolIds[i]);
- jsonObject.put("protocol_id", stringBuffer.toString());
+ stringBuffer.append(proto);
+ jsonObject.put(name, stringBuffer.toString());
logSendKafka.sendMessage(jsonObject.toString());
} else {
- stringBuffer.append("/").append(protocolIds[i]);
- jsonObject.put("protocol_id", stringBuffer.toString());
+ stringBuffer.append(regex).append(proto);
+ jsonObject.put(name, stringBuffer.toString());
logSendKafka.sendMessage(jsonObject.toString());
}
}
}
} catch (AggregationException e) {
- logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
+ logger.error("日志发送Kafka过程出现异常,异常信息:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
index 22cd06b..4137eca 100644
--- a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
+++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java
@@ -36,16 +36,16 @@ public class AggregateUtils {
return Long.parseLong(value1) + Long.parseLong(value2);
}
-// /**
-// * 计算Count
-// *
-// * @param count 当前count值
-// * @return count+1
-// */
-// public static Long count(Long count) {
-//
-// return count++;
-// }
+ /**
+ * 计算Count
+ *
+ * @param count 当前count值
+ * @return count+1
+ */
+ public static Long count(Long count) {
+
+ return count++;
+ }
/**
@@ -72,34 +72,6 @@ public class AggregateUtils {
}
-// /**
-// * 递归发送tuple
-// *
-// * @param headIndex ssss
-// * @param splitArr
-// * @param initStr
-// * @param collector
-// * @param message
-// * @param dimesionsObj
-// * @param name
-// */
-// public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
-// //递归拼接字符串
-// if (splitArr.length != headIndex - 1) {
-// //递归的核心代码
-// if ("".equals(initStr)) {
-// initStr = splitArr[splitArr.length - headIndex];
-// } else {
-// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
-// }
-// dimesionsObj.put(name, initStr);
-//
-// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
-// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
-// }
-// }
-
-
/**
* 获取action模块的Map集合
*
@@ -123,6 +95,27 @@ public class AggregateUtils {
return map;
}
+ /**
+ * 获取分隔符
+ *
+ * @param schema 动态获取的schema
+ * @return (HTTP,metrics数组)
+ */
+ public static String getHierarchy(String schema, String key) {
+ String result = "";
+ JSONObject jsonObject = JSONObject.parseObject(schema);
+ String data = jsonObject.getString("data");
+ JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("doc")).getString("transforms"));
+ for (Object transform : actions) {
+ JSONObject transformObj = JSONObject.parseObject(transform.toString());
+ String function = transformObj.getString("function");
+ if ("hierarchy".equals(function)) {
+ result = transformObj.getString(key);
+ }
+ }
+ return result;
+ }
+
/**
* 获取时间列的集合
@@ -145,7 +138,7 @@ public class AggregateUtils {
*
* @param hashMap 当前缓存对应关系map
*/
- public static void updateAppRelation(HashMap<Long, String> hashMap) {
+ public static void updateAppRelation(HashMap<Integer, String> hashMap) {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
@@ -154,7 +147,7 @@ public class AggregateUtils {
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
- Long key = jsonArray.getLong(0);
+ int key = jsonArray.getInteger(0);
String value = jsonArray.getString(1);
if (hashMap.containsKey(key)) {
if (!value.equals(hashMap.get(key))) {
@@ -188,4 +181,5 @@ public class AggregateUtils {
}
return dimensionsObj;
}
+
}
diff --git a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java
index 32e059b..d157ed0 100644
--- a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java
@@ -8,6 +8,8 @@ public class StreamAggregateConfig {
public static final String FORMAT_SPLITTER = ",";
public static final String MODEL = "remote";
+ public static final String PROTOCOL_SPLITTER = "\\.";
+
/**
* System
*/
@@ -19,9 +21,6 @@ public class StreamAggregateConfig {
public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time");
- public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num");
- public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num");
- public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num");
public static final Integer FIRST_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.agg.time");