diff options
| author | gujinkai <[email protected]> | 2023-11-30 11:04:09 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2023-11-30 11:04:09 +0800 |
| commit | d01f4ee99b14c11cdb5525f0e0944cf3f49df90c (patch) | |
| tree | 5fde55cf3354a29bac2bbf60efd6e73466d78e0d | |
| parent | d996e4086edcd04f01f7b7f63d370a76449e6ba3 (diff) | |
feat: adapt to the log changes following the TSG restructuring and adjust the pre-aggregation of subscriber
9 files changed, 85 insertions, 55 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java index 4f9d199..76d84f2 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java @@ -143,12 +143,6 @@ public class CnPreMetric implements Schedule { .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc()); linkMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))).setParallelism(outputParallelism); - SingleOutputStreamOperator<String> subscriberMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER) - .keyBy(CnMetricLog::getSubscriber_id) - .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) - .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessFunc()); - subscriberMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_TOPIC))).setParallelism(outputParallelism); - SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP) .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() { @Override diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java index 08e8add..e2af60b 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java @@ -45,9 +45,7 @@ public class MetricKeyConfig { public static final int DNS_RRCNAME = 21; - public static final int SUBSCRIBER = 22; - - public static final int SUBSCRIBER_APP = 23; + public static final int SUBSCRIBER_APP = 22; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java index f3f4245..357f231 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java @@ -189,7 +189,7 @@ public class FirstAggregation extends AbstractFirstAggregation<DnsMetricLog> { private void initMetric(CnRecordLog cnRecordLog, DnsMetricLog dnsMetricLog) { dnsMetricLog.setDns_count(1L); - dnsMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time()); + dnsMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time() / 1000); dnsMetricLog.setCommon_recv_time(cnRecordLog.getCommon_recv_time()); dnsMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions()); dnsMetricLog.setDns_response_latency_ms(cnRecordLog.getDns_response_latency_ms()); diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java index 15ce6fd..d69e027 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java @@ -11,5 +11,9 @@ public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tu public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { metricResultLog.setSubscriber_id(key.f0); metricResultLog.setCommon_app_label(key.f1); + metricResultLog.setImei(cnMetricLog.getImei()); + metricResultLog.setImsi(cnMetricLog.getImsi()); + metricResultLog.setPhone_number(cnMetricLog.getPhone_number()); + metricResultLog.setApn(cnMetricLog.getApn()); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java deleted file mode 100644 index 1eaf366..0000000 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.zdjizhi.pre.function; - -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; - -public class MetricSubscriberProcessFunc extends AbstractMetricProcessFunc<String> { - - @Override - public void setCommonFields(String key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { - metricResultLog.setSubscriber_id(key); - metricResultLog.setImei(cnMetricLog.getImei()); - metricResultLog.setImsi(cnMetricLog.getImsi()); - metricResultLog.setPhone_number(cnMetricLog.getPhone_number()); - metricResultLog.setApn(cnMetricLog.getApn()); - } -} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java index e6a7f32..a86d66a 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java @@ -28,7 +28,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { private final String sslSniMetricKey = "sslSniMetric"; private final String protocolMetricKey = "protocolMetric"; private final String linkMetricKey = "linkMetric"; - private final String subscriberKey = "subscriberMetric"; private final String subscriberAppKey = "subscriberAppMetric"; @Override @@ -48,7 +47,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { accumulator.put(sslSniMetricKey, new HashMap<>()); accumulator.put(protocolMetricKey, new HashMap<>()); accumulator.put(linkMetricKey, new HashMap<>()); - accumulator.put(subscriberKey, new HashMap<>()); accumulator.put(subscriberAppKey, new HashMap<>()); return accumulator; @@ -263,24 +261,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { } String subscriberId = value.getSubscriber_id(); - if (StringUtils.isNotBlank(subscriberId)) { - Map<String, CnMetricLog> subscriberMetricMap = accumulator.get(subscriberKey); - CnMetricLog cnMetricLog = subscriberMetricMap.get(subscriberId); - if (cnMetricLog != null) { - updateMetric(value, cnMetricLog); - } else { - cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER); - initMetric(value, cnMetricLog); - cnMetricLog.setSubscriber_id(subscriberId); - cnMetricLog.setImei(value.getImei()); - cnMetricLog.setImsi(value.getImsi()); - cnMetricLog.setPhone_number(value.getPhone_number()); - cnMetricLog.setApn(value.getApn()); - } - } - String appLabel = value.getCommon_app_label(); - if (StringUtils.isNotBlank(subscriberId) && StringUtils.isNotBlank(appLabel)) { + if (StringUtils.isNotBlank(subscriberId)) { String key = subscriberId + "-" + appLabel; Map<String, CnMetricLog> subscriberAppMetricMap = accumulator.get(subscriberAppKey); CnMetricLog cnMetricLog = subscriberAppMetricMap.get(key); @@ -290,6 +272,10 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER_APP); cnMetricLog.setSubscriber_id(subscriberId); cnMetricLog.setCommon_app_label(appLabel); + cnMetricLog.setImei(value.getImei()); + cnMetricLog.setImsi(value.getImsi()); + cnMetricLog.setPhone_number(value.getPhone_number()); + cnMetricLog.setApn(value.getApn()); } } } catch (Exception e) { @@ -298,7 +284,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { } private void initMetric(CnRecordLog cnRecordLog, CnMetricLog cnMetricLog) { - cnMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time()); + cnMetricLog.setCommon_start_time(cnRecordLog.getCommon_start_time() / 1000); cnMetricLog.setCommon_recv_time(cnRecordLog.getCommon_recv_time()); cnMetricLog.setCommon_c2s_pkt_num(cnRecordLog.getCommon_c2s_pkt_num()); cnMetricLog.setCommon_s2c_pkt_num(cnRecordLog.getCommon_s2c_pkt_num()); @@ -307,7 +293,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions()); String commonL4Protocol = cnRecordLog.getCommon_l4_protocol(); - if ("IPv4_TCP".equals(commonL4Protocol) || "IPv6_TCP".equals(commonL4Protocol)){ + if ("TCP".equals(commonL4Protocol)) { cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num()); cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num()); cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num()); @@ -322,7 +308,6 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog.setCommon_s2c_pkt_retrans(cnRecordLog.getCommon_s2c_pkt_retrans()); cnMetricLog.setTcp_count(1L); cnMetricLog.setCommon_establish_latency_ms(cnRecordLog.getCommon_establish_latency_ms()); - } String commonL7Protocol = cnRecordLog.getCommon_l7_protocol(); long httpResponseLatencyMs = cnRecordLog.getHttp_response_latency_ms(); @@ -387,7 +372,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions() + cnMetricLog.getCommon_sessions()); String commonL4Protocol = cnRecordLog.getCommon_l4_protocol(); - if ("IPv4_TCP".equals(commonL4Protocol) || "IPv6_TCP".equals(commonL4Protocol)){ + if ("TCP".equals(commonL4Protocol)) { cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num() + cnMetricLog.getC2s_tcp_pkt_num()); cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num() + cnMetricLog.getS2c_tcp_pkt_num()); cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num() + cnMetricLog.getC2s_tcp_byte_num()); diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index ca14e0a..205f368 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -23,7 +23,7 @@ public class CnRelationMetric implements Schedule { @Override public void schedule() throws Exception { SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; - SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "IPv4_TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) + SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) .process(new FirstAggregation()).name("relationProcess"); SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1) diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java index 82f9a3a..2777593 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java +++ b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java @@ -1,5 +1,7 @@ package com.zdjizhi.base.common; +import com.alibaba.fastjson2.annotation.JSONField; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -7,22 +9,39 @@ import java.util.List; public class CnRecordLog implements Serializable { //Common Attributes + @JSONField(name = "recv_time") private long common_recv_time; + @JSONField(name = "log_id") private long common_log_id; + @JSONField(name = "flags") private long common_flags; + @JSONField(name = "start_timestamp_ms") private long common_start_time; + @JSONField(name = "end_timestamp_ms") private long common_end_time; + @JSONField(name = "duration_ms") private long common_con_duration_ms; + @JSONField(name = "decoded_as") private String common_schema_type; + @JSONField(name = "client_ip") private String common_client_ip; + @JSONField(name = "server_ip") private String common_server_ip; + @JSONField(name = "client_port") private int common_client_port; + @JSONField(name = "server_port") private int common_server_port; + @JSONField(name = "app") private String common_app_label; + @JSONField(name = "app_path") private String common_app_full_path; + private String protocol_path; private String common_l4_protocol; + @JSONField(name = "l7_protocol") private String common_l7_protocol; + @JSONField(name = "out_link_id") private Long common_out_link_id; + @JSONField(name = "in_link_id") private Long common_in_link_id; //Source @@ -90,6 +109,11 @@ public class CnRecordLog implements Serializable { private String server_asn; private List<String> server_ip_tags = new ArrayList<>(); + private Double subscriber_longitude; + private Double subscriber_latitude; + private String mail_account; + private String wx_account; + private String app_category; private String app_subcategory; private String app_company; @@ -97,22 +121,39 @@ public class CnRecordLog implements Serializable { private List<String> app_tags = new ArrayList<>(); //Metrics + @JSONField(name = "sent_pkts") private long common_c2s_pkt_num; + @JSONField(name = "sent_bytes") private long common_c2s_byte_num; + @JSONField(name = "received_pkts") private long common_s2c_pkt_num; + @JSONField(name = "received_bytes") private long common_s2c_byte_num; - private long common_sessions; + @JSONField(name = "sessions") + private long common_sessions = 1; + @JSONField(name = "tcp_c2s_lost_bytes") private long common_c2s_tcp_lostlen; + @JSONField(name = "tcp_s2c_lost_bytes") private long common_s2c_tcp_lostlen; + @JSONField(name = "tcp_c2s_o3_pkts") private long common_c2s_tcp_unorder_num; + @JSONField(name = "tcp_s2c_o3_pkts") private long common_s2c_tcp_unorder_num; + @JSONField(name = "tcp_c2s_rtx_bytes") private long common_c2s_byte_retrans; + @JSONField(name = "tcp_s2c_rtx_bytes") private long common_s2c_byte_retrans; + @JSONField(name = "tcp_c2s_rtx_pkts") private long common_c2s_pkt_retrans; + @JSONField(name = "tcp_s2c_rtx_pkts") private long common_s2c_pkt_retrans; + @JSONField(name = "tcp_rtt_ms") private long common_establish_latency_ms; + @JSONField(name = "http_response_latency_ms") private long http_response_latency_ms; + @JSONField(name = "ssl_handshake_latency_ms") private long ssl_con_latency_ms; + @JSONField(name = "dns_response_latency_ms") private long dns_response_latency_ms; //逻辑过程需要增加的字段,不属于业务字段 @@ -364,6 +405,14 @@ public class CnRecordLog implements Serializable { this.common_app_label = common_app_label; } + public String getProtocol_path() { + return protocol_path; + } + + public void setProtocol_path(String protocol_path) { + this.protocol_path = protocol_path; + } + public String getCommon_l4_protocol() { return common_l4_protocol; } diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java index 88335ff..b253589 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/CompletedUtils.java @@ -34,8 +34,24 @@ public class CompletedUtils { } public static void setCompletedMessage(CnRecordLog recordLog) { + setl4Protocol(recordLog); + setl7ProtocolAndApp(recordLog); + } + + private static void setl4Protocol(CnRecordLog cnRecordLog) { + String protocolPath = cnRecordLog.getProtocol_path(); + if (protocolPath != null) { + String[] protocols = cnRecordLog.getProtocol_path().split("\\."); + if (protocols.length > 0) { + String l4Protocol = protocols[protocols.length - 1]; + cnRecordLog.setCommon_l4_protocol(l4Protocol); + } + } + } + + private static void setl7ProtocolAndApp(CnRecordLog cnRecordLog) { try { - String appFullPath = recordLog.getCommon_app_full_path(); + String appFullPath = cnRecordLog.getCommon_app_full_path(); String l7Protocol = "UNCATEGORIZED"; String app = ""; if (appFullPath != null) { @@ -59,10 +75,10 @@ public class CompletedUtils { i++; } } - recordLog.setCommon_l7_protocol(l7Protocol); - recordLog.setCommon_app_label(app); + cnRecordLog.setCommon_l7_protocol(l7Protocol); + cnRecordLog.setCommon_app_label(app); } catch (Exception e) { - LOG.error("ETL解析错误\nrecordLog:{} \n", recordLog, e); + LOG.error("ETL解析错误\nrecordLog:{} \n", cnRecordLog, e); } } } |
