diff options
| author | qidaijie <[email protected]> | 2021-05-11 17:12:44 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-05-11 17:12:44 +0800 |
| commit | 0c20098e33ed1117a2143b39a7716ca47157ea67 (patch) | |
| tree | de35b7987cce65eb6b8d0d1f8bea61c8cec05663 | |
| parent | 928581d8600d03d6af11ea19a7bfc1bb0aa9d4ec (diff) | |
1:取消app_id对准功能,直接使用app_label进行拼接。
2:取消协议层反转,正序进行计算。
| -rw-r--r-- | pom.xml | 44 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java | 50 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java | 21 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java | 74 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java | 5 |
6 files changed, 82 insertions, 125 deletions
@@ -2,16 +2,14 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>cn.ac.iie</groupId> + <groupId>com.zdjizhi</groupId> <artifactId>storm-olap-aggregation</artifactId> - <version>v3.21.03.16-eal4</version> + <version>v3.21.04.28-history-live</version> <packaging>jar</packaging> - <name>storm-olap-aggregation</name> <url>http://maven.apache.org</url> - <repositories> <repository> @@ -20,19 +18,6 @@ <url>http://192.168.40.125:8099/content/groups/public</url> </repository> - <repository> - <id>maven-ali</id> - <url>http://maven.aliyun.com/nexus/content/groups/public/</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - <updatePolicy>always</updatePolicy> - <checksumPolicy>fail</checksumPolicy> - </snapshots> - </repository> - </repositories> @@ -43,9 +28,12 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.2</version> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + </configuration> <executions> <execution> - <phase>package</phase> + <phase>install</phase> <goals> <goal>shade</goal> </goals> @@ -78,7 +66,7 @@ <goals> <goal>strip-jar</goal> </goals> - <phase>package</phase> + <phase>install</phase> </execution> </executions> </plugin> @@ -191,24 +179,6 @@ </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>3.4.9</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 43e8ee0..151c329 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -51,17 +51,11 @@ gathering.bolt.parallelism=1 #写入kafka的并行度10 kafka.bolt.parallelism=1 -#kafka批量条数 -batch.insert.num=2000 - #网关的schema位置 -schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_history #网关的schema位置 -app.id.http=http://192.168.40.12:9999/open-api/appDicList - -#数据中心(UID) -data.center.id.num=15 +app.id.http=http://192.168.44.67:9999/open-api/appDicList #ack设置 1启动ack 0不启动ack topology.num.acks=0 @@ -69,6 +63,3 @@ topology.num.acks=0 #spout接收睡眠时间 topology.spout.sleep.time=1 -#允许发送kafka最大失败数 -max.failure.num=20 - diff --git a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java index 14011c1..484ff6f 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ParseKvBolt.java @@ -1,6 +1,5 @@ package com.zdjizhi.storm.bolt; - import com.zdjizhi.storm.utils.common.StreamAggregateConfig; import com.zdjizhi.storm.utils.exception.AggregationException; import com.zdjizhi.storm.utils.http.HttpClientUtil; @@ -18,12 +17,10 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import static com.zdjizhi.storm.utils.combine.AggregateUtils.transDimensions; -import static com.zdjizhi.storm.utils.combine.AggregateUtils.updateAppRelation; /** * @ClassNameMyWindowBolt @@ -36,7 +33,7 @@ public class ParseKvBolt extends BaseBasicBolt { private static final long serialVersionUID = -999382396035310355L; private JSONArray transforms; private JSONArray dimensions; - private static HashMap<Long, String> appMap = new HashMap<>(32); +// private static HashMap<Integer, String> appMap = new HashMap<>(32); /** @@ -55,7 +52,7 @@ public class ParseKvBolt extends BaseBasicBolt { //TODO 获取dimensions dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions")); - updateAppRelation(appMap); +// updateAppRelation(appMap); } @@ -63,7 +60,7 @@ public class ParseKvBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { try { if (TupleUtils.isTick(tuple)) { - updateAppRelation(appMap); +// updateAppRelation(appMap); } else { String source = tuple.getStringByField("source"); if (StringUtil.isNotBlank(source)) { @@ -84,25 +81,25 @@ public class ParseKvBolt extends BaseBasicBolt { String parameters = transformObj.getString("parameters"); switch (function) { - case "alignment": + case "dismantling": if (StringUtil.isNotBlank(parameters)) { if (message.containsKey(fieldName)) { - alignmentUtils(parameters, message, name, fieldName); + dismantlingUtils(parameters, message, name, fieldName); } } break; case "combination": if (StringUtil.isNotBlank(parameters)) { - String l7Proto = message.getString("common_l7_protocol"); - if (StringUtil.isNotBlank(l7Proto)) { - String res = l7Proto + "/" + message.getString(fieldName); - message.put(fieldName, res); - dimensionsObj.put(name, res); + if (message.containsKey(fieldName)) { + combinationUtils(parameters, message, name, fieldName, dimensionsObj); } - combinationUtils(parameters, message, name, fieldName, dimensionsObj); - collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString())); } break; + case "hierarchy": +// if (StringUtil.isNotBlank(parameters)) { + collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString())); +// } + break; default: break; } @@ -131,26 +128,24 @@ public class ParseKvBolt extends BaseBasicBolt { /** * alignment ID替换操作 + * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 * * @param parameters 参数集 * @param message 原始日志 * @param name 结果日志列名 * @param fieldName 原始日志列名 */ - private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) { + private static void dismantlingUtils(String parameters, JSONObject message, String name, String fieldName) { String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - String data = message.getString(fieldName); - logger.warn("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data); - int subscript = Integer.parseInt(alignmentPars[0]); - String[] fieldSplit = data.split(alignmentPars[1]); - Long appId = Long.valueOf(fieldSplit[subscript]); - int length = fieldSplit[subscript].length(); - StringBuilder sb = new StringBuilder(data); - message.put(name, sb.replace(0, length, appMap.get(appId))); + String commonAppId = message.getString(fieldName); + int digits = Integer.parseInt(alignmentPars[0]); + String appName = commonAppId.split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; + message.put(name, appName); } /** * combination 拼接操作 + * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 * * @param parameters 参数集 * @param message 原始日志 @@ -160,9 +155,10 @@ public class ParseKvBolt extends BaseBasicBolt { */ private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) { String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - String parameter0Value = message.getString(combinationPars[0]); - if (StringUtil.isNotBlank(parameter0Value)) { - String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName); + String combinationField = message.getString(combinationPars[0]); + String splitter = combinationPars[1]; + if (StringUtil.isNotBlank(combinationField)) { + String combinationValue = message.getString(fieldName) + splitter + combinationField; message.put(fieldName, combinationValue); dimensionsObj.put(name, combinationValue); } diff --git a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java index 2925188..54b7bac 100644 --- a/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java +++ b/src/main/java/com/zdjizhi/storm/bolt/ResultSendBolt.java @@ -1,7 +1,9 @@ package com.zdjizhi.storm.bolt; +import com.zdjizhi.storm.utils.combine.AggregateUtils; import com.zdjizhi.storm.utils.exception.AggregationException; +import com.zdjizhi.storm.utils.http.HttpClientUtil; import com.zdjizhi.storm.utils.kafka.LogSendKafka; import com.zdjizhi.storm.utils.common.StreamAggregateConfig; import cn.hutool.log.Log; @@ -26,10 +28,15 @@ public class ResultSendBolt extends BaseBasicBolt { private static final long serialVersionUID = 3237813470939823159L; private static final Log logger = LogFactory.get(); private LogSendKafka logSendKafka; + private static String regex; + private static String name; @Override public void prepare(Map stormConf, TopologyContext context) { + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); logSendKafka = LogSendKafka.getInstance(); + regex = AggregateUtils.getHierarchy(schema, "parameters"); + name = AggregateUtils.getHierarchy(schema, "name"); } @Override @@ -39,21 +46,21 @@ public class ResultSendBolt extends BaseBasicBolt { if (StringUtil.isNotBlank(message)) { JSONObject jsonObject = JSONObject.parseObject(message); StringBuffer stringBuffer = new StringBuffer(); - String[] protocolIds = jsonObject.getString("protocol_id").split("/"); - for (int i = (protocolIds.length - 1); i >= 0; i--) { + String[] protocolIds = jsonObject.getString(name).split(StreamAggregateConfig.PROTOCOL_SPLITTER); + for (String proto : protocolIds) { if (StringUtil.isBlank(stringBuffer.toString())) { - stringBuffer.append(protocolIds[i]); - jsonObject.put("protocol_id", stringBuffer.toString()); + stringBuffer.append(proto); + jsonObject.put(name, stringBuffer.toString()); logSendKafka.sendMessage(jsonObject.toString()); } else { - stringBuffer.append("/").append(protocolIds[i]); - jsonObject.put("protocol_id", stringBuffer.toString()); + stringBuffer.append(regex).append(proto); + jsonObject.put(name, stringBuffer.toString()); logSendKafka.sendMessage(jsonObject.toString()); } } } } catch (AggregationException e) { - logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e); + logger.error("日志发送Kafka过程出现异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java index 22cd06b..4137eca 100644 --- a/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java +++ b/src/main/java/com/zdjizhi/storm/utils/combine/AggregateUtils.java @@ -36,16 +36,16 @@ public class AggregateUtils { return Long.parseLong(value1) + Long.parseLong(value2); } -// /** -// * 计算Count -// * -// * @param count 当前count值 -// * @return count+1 -// */ -// public static Long count(Long count) { -// -// return count++; -// } + /** + * 计算Count + * + * @param count 当前count值 + * @return count+1 + */ + public static Long count(Long count) { + + return count++; + } /** @@ -72,34 +72,6 @@ public class AggregateUtils { } -// /** -// * 递归发送tuple -// * -// * @param headIndex ssss -// * @param splitArr -// * @param initStr -// * @param collector -// * @param message -// * @param dimesionsObj -// * @param name -// */ -// public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) { -// //递归拼接字符串 -// if (splitArr.length != headIndex - 1) { -// //递归的核心代码 -// if ("".equals(initStr)) { -// initStr = splitArr[splitArr.length - headIndex]; -// } else { -// initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; -// } -// dimesionsObj.put(name, initStr); -// -// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); -// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); -// } -// } - - /** * 获取action模块的Map集合 * @@ -123,6 +95,27 @@ public class AggregateUtils { return map; } + /** + * 获取分隔符 + * + * @param schema 动态获取的schema + * @return (HTTP,metrics数组) + */ + public static String getHierarchy(String schema, String key) { + String result = ""; + JSONObject jsonObject = JSONObject.parseObject(schema); + String data = jsonObject.getString("data"); + JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("doc")).getString("transforms")); + for (Object transform : actions) { + JSONObject transformObj = JSONObject.parseObject(transform.toString()); + String function = transformObj.getString("function"); + if ("hierarchy".equals(function)) { + result = transformObj.getString(key); + } + } + return result; + } + /** * 获取时间列的集合 @@ -145,7 +138,7 @@ public class AggregateUtils { * * @param hashMap 当前缓存对应关系map */ - public static void updateAppRelation(HashMap<Long, String> hashMap) { + public static void updateAppRelation(HashMap<Integer, String> hashMap) { try { Long begin = System.currentTimeMillis(); String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); @@ -154,7 +147,7 @@ public class AggregateUtils { JSONArray objects = JSONArray.parseArray(data); for (Object object : objects) { JSONArray jsonArray = JSONArray.parseArray(object.toString()); - Long key = jsonArray.getLong(0); + int key = jsonArray.getInteger(0); String value = jsonArray.getString(1); if (hashMap.containsKey(key)) { if (!value.equals(hashMap.get(key))) { @@ -188,4 +181,5 @@ public class AggregateUtils { } return dimensionsObj; } + } diff --git a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java index 32e059b..d157ed0 100644 --- a/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/storm/utils/common/StreamAggregateConfig.java @@ -8,6 +8,8 @@ public class StreamAggregateConfig { public static final String FORMAT_SPLITTER = ","; public static final String MODEL = "remote"; + public static final String PROTOCOL_SPLITTER = "\\."; + /** * System */ @@ -19,9 +21,6 @@ public class StreamAggregateConfig { public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks"); public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time"); - public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num"); - public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num"); - public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num"); public static final Integer FIRST_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.agg.time"); |
