summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2023-11-30 11:04:09 +0800
committergujinkai <[email protected]>2023-11-30 11:04:09 +0800
commitd01f4ee99b14c11cdb5525f0e0944cf3f49df90c (patch)
tree5fde55cf3354a29bac2bbf60efd6e73466d78e0d
parentd996e4086edcd04f01f7b7f63d370a76449e6ba3 (diff)
feat: adapt to the log changes following the TSG restructuring and adjust the pre-aggregation of subscriber
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java16
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java31
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java2
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java51
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java24
9 files changed, 85 insertions, 55 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
index 4f9d199..76d84f2 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
@@ -143,12 +143,6 @@ public class CnPreMetric implements Schedule {
.reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc());
linkMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))).setParallelism(outputParallelism);
- SingleOutputStreamOperator<String> subscriberMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER)
- .keyBy(CnMetricLog::getSubscriber_id)
- .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
- .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessFunc());
- subscriberMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_TOPIC))).setParallelism(outputParallelism);
-
SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP)
.keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() {
@Override
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
index 08e8add..e2af60b 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
@@ -45,9 +45,7 @@ public class MetricKeyConfig {
public static final int DNS_RRCNAME = 21;
- public static final int SUBSCRIBER = 22;
-
- public static final int SUBSCRIBER_APP = 23;
+ public static final int SUBSCRIBER_APP = 22;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
index f3f4245..357f231 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
@@ -189,7 +189,7 @@ public class FirstAggregation extends AbstractFirstAggregation<DnsMetricLog> {
private void initMetric(CnRecordLog cnRecordLog, DnsMetricLog dnsMetricLog) {
dnsMetricLog.setDns_count(1L);
- dnsMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time());
+ dnsMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time() / 1000);
dnsMetricLog.setCommon_recv_time(cnRecordLog.getCommon_recv_time());
dnsMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions());
dnsMetricLog.setDns_response_latency_ms(cnRecordLog.getDns_response_latency_ms());
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
index 15ce6fd..d69e027 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
@@ -11,5 +11,9 @@ public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tu
public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
metricResultLog.setSubscriber_id(key.f0);
metricResultLog.setCommon_app_label(key.f1);
+ metricResultLog.setImei(cnMetricLog.getImei());
+ metricResultLog.setImsi(cnMetricLog.getImsi());
+ metricResultLog.setPhone_number(cnMetricLog.getPhone_number());
+ metricResultLog.setApn(cnMetricLog.getApn());
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java
deleted file mode 100644
index 1eaf366..0000000
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.zdjizhi.pre.function;
-
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
-
-public class MetricSubscriberProcessFunc extends AbstractMetricProcessFunc<String> {
-
- @Override
- public void setCommonFields(String key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
- metricResultLog.setSubscriber_id(key);
- metricResultLog.setImei(cnMetricLog.getImei());
- metricResultLog.setImsi(cnMetricLog.getImsi());
- metricResultLog.setPhone_number(cnMetricLog.getPhone_number());
- metricResultLog.setApn(cnMetricLog.getApn());
- }
-}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
index e6a7f32..a86d66a 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
@@ -28,7 +28,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
private final String sslSniMetricKey = "sslSniMetric";
private final String protocolMetricKey = "protocolMetric";
private final String linkMetricKey = "linkMetric";
- private final String subscriberKey = "subscriberMetric";
private final String subscriberAppKey = "subscriberAppMetric";
@Override
@@ -48,7 +47,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
accumulator.put(sslSniMetricKey, new HashMap<>());
accumulator.put(protocolMetricKey, new HashMap<>());
accumulator.put(linkMetricKey, new HashMap<>());
- accumulator.put(subscriberKey, new HashMap<>());
accumulator.put(subscriberAppKey, new HashMap<>());
return accumulator;
@@ -263,24 +261,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
}
String subscriberId = value.getSubscriber_id();
- if (StringUtils.isNotBlank(subscriberId)) {
- Map<String, CnMetricLog> subscriberMetricMap = accumulator.get(subscriberKey);
- CnMetricLog cnMetricLog = subscriberMetricMap.get(subscriberId);
- if (cnMetricLog != null) {
- updateMetric(value, cnMetricLog);
- } else {
- cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER);
- initMetric(value, cnMetricLog);
- cnMetricLog.setSubscriber_id(subscriberId);
- cnMetricLog.setImei(value.getImei());
- cnMetricLog.setImsi(value.getImsi());
- cnMetricLog.setPhone_number(value.getPhone_number());
- cnMetricLog.setApn(value.getApn());
- }
- }
-
String appLabel = value.getCommon_app_label();
- if (StringUtils.isNotBlank(subscriberId) && StringUtils.isNotBlank(appLabel)) {
+ if (StringUtils.isNotBlank(subscriberId)) {
String key = subscriberId + "-" + appLabel;
Map<String, CnMetricLog> subscriberAppMetricMap = accumulator.get(subscriberAppKey);
CnMetricLog cnMetricLog = subscriberAppMetricMap.get(key);
@@ -290,6 +272,10 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER_APP);
cnMetricLog.setSubscriber_id(subscriberId);
cnMetricLog.setCommon_app_label(appLabel);
+ cnMetricLog.setImei(value.getImei());
+ cnMetricLog.setImsi(value.getImsi());
+ cnMetricLog.setPhone_number(value.getPhone_number());
+ cnMetricLog.setApn(value.getApn());
}
}
} catch (Exception e) {
@@ -298,7 +284,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
}
private void initMetric(CnRecordLog cnRecordLog, CnMetricLog cnMetricLog) {
- cnMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time());
+ cnMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time() / 1000);
cnMetricLog.setCommon_recv_time(cnRecordLog.getCommon_recv_time());
cnMetricLog.setCommon_c2s_pkt_num(cnRecordLog.getCommon_c2s_pkt_num());
cnMetricLog.setCommon_s2c_pkt_num(cnRecordLog.getCommon_s2c_pkt_num());
@@ -307,7 +293,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions());
String commonL4Protocol = cnRecordLog.getCommon_l4_protocol();
- if ("IPv4_TCP".equals(commonL4Protocol) || "IPv6_TCP".equals(commonL4Protocol)){
+ if ("TCP".equals(commonL4Protocol)) {
cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num());
cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num());
cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num());
@@ -322,7 +308,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_s2c_pkt_retrans(cnRecordLog.getCommon_s2c_pkt_retrans());
cnMetricLog.setTcp_count(1L);
cnMetricLog.setCommon_establish_latency_ms(cnRecordLog.getCommon_establish_latency_ms());
-
}
String commonL7Protocol = cnRecordLog.getCommon_l7_protocol();
long httpResponseLatencyMs = cnRecordLog.getHttp_response_latency_ms();
@@ -387,7 +372,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions() + cnMetricLog.getCommon_sessions());
String commonL4Protocol = cnRecordLog.getCommon_l4_protocol();
- if ("IPv4_TCP".equals(commonL4Protocol) || "IPv6_TCP".equals(commonL4Protocol)){
+ if ("TCP".equals(commonL4Protocol)) {
cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num() + cnMetricLog.getC2s_tcp_pkt_num());
cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num() + cnMetricLog.getS2c_tcp_pkt_num());
cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num() + cnMetricLog.getC2s_tcp_byte_num());
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index ca14e0a..205f368 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -23,7 +23,7 @@ public class CnRelationMetric implements Schedule {
@Override
public void schedule() throws Exception {
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
- SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "IPv4_TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
+ SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
.process(new FirstAggregation()).name("relationProcess");
SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1)
diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
index 82f9a3a..2777593 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
@@ -1,5 +1,7 @@
package com.zdjizhi.base.common;
+import com.alibaba.fastjson2.annotation.JSONField;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -7,22 +9,39 @@ import java.util.List;
public class CnRecordLog implements Serializable {
//Common Attributes
+ @JSONField(name = "recv_time")
private long common_recv_time;
+ @JSONField(name = "log_id")
private long common_log_id;
+ @JSONField(name = "flags")
private long common_flags;
+ @JSONField(name = "start_timestamp_ms")
private long common_start_time;
+ @JSONField(name = "end_timestamp_ms")
private long common_end_time;
+ @JSONField(name = "duration_ms")
private long common_con_duration_ms;
+ @JSONField(name = "decoded_as")
private String common_schema_type;
+ @JSONField(name = "client_ip")
private String common_client_ip;
+ @JSONField(name = "server_ip")
private String common_server_ip;
+ @JSONField(name = "client_port")
private int common_client_port;
+ @JSONField(name = "server_port")
private int common_server_port;
+ @JSONField(name = "app")
private String common_app_label;
+ @JSONField(name = "app_path")
private String common_app_full_path;
+ private String protocol_path;
private String common_l4_protocol;
+ @JSONField(name = "l7_protocol")
private String common_l7_protocol;
+ @JSONField(name = "out_link_id")
private Long common_out_link_id;
+ @JSONField(name = "in_link_id")
private Long common_in_link_id;
//Source
@@ -90,6 +109,11 @@ public class CnRecordLog implements Serializable {
private String server_asn;
private List<String> server_ip_tags = new ArrayList<>();
+ private Double subscriber_longitude;
+ private Double subscriber_latitude;
+ private String mail_account;
+ private String wx_account;
+
private String app_category;
private String app_subcategory;
private String app_company;
@@ -97,22 +121,39 @@ public class CnRecordLog implements Serializable {
private List<String> app_tags = new ArrayList<>();
//Metrics
+ @JSONField(name = "sent_pkts")
private long common_c2s_pkt_num;
+ @JSONField(name = "sent_bytes")
private long common_c2s_byte_num;
+ @JSONField(name = "received_pkts")
private long common_s2c_pkt_num;
+ @JSONField(name = "received_bytes")
private long common_s2c_byte_num;
- private long common_sessions;
+ @JSONField(name = "sessions")
+ private long common_sessions = 1;
+ @JSONField(name = "tcp_c2s_lost_bytes")
private long common_c2s_tcp_lostlen;
+ @JSONField(name = "tcp_s2c_lost_bytes")
private long common_s2c_tcp_lostlen;
+ @JSONField(name = "tcp_c2s_o3_pkts")
private long common_c2s_tcp_unorder_num;
+ @JSONField(name = "tcp_s2c_o3_pkts")
private long common_s2c_tcp_unorder_num;
+ @JSONField(name = "tcp_c2s_rtx_bytes")
private long common_c2s_byte_retrans;
+ @JSONField(name = "tcp_s2c_rtx_bytes")
private long common_s2c_byte_retrans;
+ @JSONField(name = "tcp_c2s_rtx_pkts")
private long common_c2s_pkt_retrans;
+ @JSONField(name = "tcp_s2c_rtx_pkts")
private long common_s2c_pkt_retrans;
+ @JSONField(name = "tcp_rtt_ms")
private long common_establish_latency_ms;
+ @JSONField(name = "http_response_latency_ms")
private long http_response_latency_ms;
+ @JSONField(name = "ssl_handshake_latency_ms")
private long ssl_con_latency_ms;
+ @JSONField(name = "dns_response_latency_ms")
private long dns_response_latency_ms;
//逻辑过程需要增加的字段,不属于业务字段
@@ -364,6 +405,14 @@ public class CnRecordLog implements Serializable {
this.common_app_label = common_app_label;
}
+ public String getProtocol_path() {
+ return protocol_path;
+ }
+
+ public void setProtocol_path(String protocol_path) {
+ this.protocol_path = protocol_path;
+ }
+
public String getCommon_l4_protocol() {
return common_l4_protocol;
}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java
index 88335ff..b253589 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java
@@ -34,8 +34,24 @@ public class CompletedUtils {
}
public static void setCompletedMessage(CnRecordLog recordLog) {
+ setl4Protocol(recordLog);
+ setl7ProtocolAndApp(recordLog);
+ }
+
+ private static void setl4Protocol(CnRecordLog cnRecordLog) {
+ String protocolPath = cnRecordLog.getProtocol_path();
+ if (protocolPath != null) {
+ String[] protocols = cnRecordLog.getProtocol_path().split("\\.");
+ if (protocols.length > 0) {
+ String l4Protocol = protocols[protocols.length - 1];
+ cnRecordLog.setCommon_l4_protocol(l4Protocol);
+ }
+ }
+ }
+
+ private static void setl7ProtocolAndApp(CnRecordLog cnRecordLog) {
try {
- String appFullPath = recordLog.getCommon_app_full_path();
+ String appFullPath = cnRecordLog.getCommon_app_full_path();
String l7Protocol = "UNCATEGORIZED";
String app = "";
if (appFullPath != null) {
@@ -59,10 +75,10 @@ public class CompletedUtils {
i++;
}
}
- recordLog.setCommon_l7_protocol(l7Protocol);
- recordLog.setCommon_app_label(app);
+ cnRecordLog.setCommon_l7_protocol(l7Protocol);
+ cnRecordLog.setCommon_app_label(app);
} catch (Exception e) {
- LOG.error("ETL解析错误\nrecordLog:{} \n", recordLog, e);
+ LOG.error("ETL解析错误\nrecordLog:{} \n", cnRecordLog, e);
}
}
}