diff options
| author | 戚岱杰 <[email protected]> | 2024-01-19 11:11:13 +0000 |
|---|---|---|
| committer | 戚岱杰 <[email protected]> | 2024-01-19 11:11:13 +0000 |
| commit | a7841797d87cc219be216eea3d80ce3c76eabb1e (patch) | |
| tree | 62ec8b8a3c8ba775c4fc49142026cfccadca2d1b | |
| parent | 8830625e6d04ff8a7b8de2f858b63f4ce119891d (diff) | |
| parent | cd119058778fcb7ffb410aeb25be3ff2a65a231c (diff) | |
Merge branch 'develop' into 'master'2.1.0
Merge branch 'develop' into 'master'
See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!4
| -rw-r--r-- | README.md | 31 | ||||
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java | 37 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java | 2 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/ConventionalTest.java | 22 |
5 files changed, 69 insertions, 25 deletions
@@ -16,21 +16,22 @@ Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到� ### 2.基于Tags内容进行分组统计。 -### 3.拆分protocol_stack_id协议树为多个节点 - -#### 例如,ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为: - -##### ETHERNET - -##### ETHERNET.IPv4 - -##### ETHERNET.IPv4.TCP - -##### ETHERNET.IPv4.TCP.https - -##### ETHERNET.IPv4.TCP.https.kingsoft - -##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office +### 3.拆分protocol_stack_id协议树为多个节点,例如:ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为 + +1. ETHERNET +2. ETHERNET.IPv4 +3. ETHERNET.IPv4.TCP +4. ETHERNET.IPv4.TCP.https +5. ETHERNET.IPv4.TCP.https.kingsoft +6. ETHERNET.IPv4.TCP.https.kingsoft.wps_office + +#### 为避免展示重复的协议,拆分应去除Decoded Path(最后一个元素)与 Application(第一个元素)重复的基础协议,例如:{"protocol_label": "ETHERNET.IPv4.TCP.dns","app_full_path": "dns"} + +1. ETHERNET +2. ETHERNET.IPv4 +3. ETHERNET.IPv4.TCP +4. ETHERNET.IPv4.TCP.dns +##### 将protocol_label内最后的一个基础协议去除 ### 4.app_name仅在终端节点输出。 @@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>app-protocol-stat-traffic-merge</artifactId> - <version>2.0.1</version> + <version>2.1.0</version> <name>app-protocol-stat-traffic-merge</name> <url>http://www.example.com</url> diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java index 257915c..e88ae78 100644 --- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java +++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java @@ -28,14 +28,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); Long timestamp_ms = originalLog.getLong("timestamp_ms"); - String appFullPath = tags.getApp_name(); - if (StringUtil.isNotBlank(appFullPath)) { - String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); - String protocolLabel = tags.getProtocol_stack_id(); - - tags.setApp_name(appName); - tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); - } + joinProtocol(tags); out.collect(new Tuple3<>(tags, fields, timestamp_ms)); } @@ -44,4 +37,32 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage()); } } + + /** + * 避免计算重复的协议,去除Decoded Path(最后一个元素) 与 Application(第一个元素)重复的基础协议。 + * + * @param tags + */ + private static void joinProtocol(Tags tags) { + String appFullPath = tags.getApp_name(); + + if (StringUtil.isNotBlank(appFullPath)) { + String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); + tags.setApp_name(appName); + + String protocolLabel = tags.getProtocol_stack_id(); + + String endProtocol = protocolLabel.substring(protocolLabel.lastIndexOf(".") + 1); + + String[] appSplits = appFullPath.split("\\."); + + String firstAppProtocol = appSplits[0]; + + if (endProtocol.equals(firstAppProtocol)) { + tags.setProtocol_stack_id(protocolLabel.substring(0, protocolLabel.lastIndexOf(".")).concat(".").concat(appFullPath)); + } else { + tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); + } + } + } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index c7cd3f2..c276dc8 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,7 +15,7 @@ import java.util.Properties; public class KafkaProducer { public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { - setDefaultConfig(properties, "ack", 1); + setDefaultConfig(properties, "ack", "1"); setDefaultConfig(properties, "retries", 0); setDefaultConfig(properties, "linger.ms", 10); setDefaultConfig(properties, "request.timeout.ms", 30000); diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java index 287b3fb..9e5f185 100644 --- a/src/test/java/com/zdjizhi/ConventionalTest.java +++ b/src/test/java/com/zdjizhi/ConventionalTest.java @@ -72,4 +72,26 @@ public class ConventionalTest { System.out.println(protocol.concat(".").concat(app)); } + + @Test + public void removeDuplicateProtocolTest() { + String str = "[.]"; + String protocol = "ETHERNET.IPv4.TCP.http"; + String[] protocolSplits = protocol.split(str); + String endProtocol = protocolSplits[protocolSplits.length -1]; + System.out.println(endProtocol); + + String app = "http.test"; + String[] appSplits = app.split(str); + String firstAppProtocol = appSplits[0]; + System.out.println(firstAppProtocol); + + + System.out.println(app.substring(app.lastIndexOf(".") + 1)); + System.out.println(app.lastIndexOf(".") + 1); + System.out.println(protocol.substring(0,protocol.lastIndexOf("."))); + + } + + } |
