summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2023-09-26 18:31:15 +0800
committergujinkai <[email protected]>2023-09-26 18:31:15 +0800
commitf0802b296b7bfd6b1a69d4d48fe48790b3ed9851 (patch)
treebf2007b67b082c6c7e2a54c82a96963dc81f3086
parent37eab588300386493b8b9eef259491bb3240d933 (diff)
修改s2c_pkt_retrans字段初始化逻辑错误的问题realease-23.09-0926-rc1
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java2
-rw-r--r--module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java27
-rw-r--r--module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java93
-rw-r--r--module-CN-pre-metrics/src/test/resources/cn-session-pre.json1
4 files changed, 109 insertions, 14 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
index f824d52..466133d 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
@@ -269,7 +269,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_c2s_byte_retrans(cnRecordLog.getCommon_c2s_byte_retrans());
cnMetricLog.setCommon_s2c_byte_retrans(cnRecordLog.getCommon_s2c_byte_retrans());
cnMetricLog.setCommon_c2s_pkt_retrans(cnRecordLog.getCommon_c2s_pkt_retrans());
- cnMetricLog.setCommon_s2c_pkt_retrans(cnMetricLog.getCommon_s2c_pkt_retrans());
+ cnMetricLog.setCommon_s2c_pkt_retrans(cnRecordLog.getCommon_s2c_pkt_retrans());
cnMetricLog.setCommon_establish_latency_ms(cnRecordLog.getCommon_establish_latency_ms());
cnMetricLog.setHttp_response_latency_ms(cnRecordLog.getHttp_response_latency_ms());
cnMetricLog.setSsl_con_latency_ms(cnRecordLog.getSsl_con_latency_ms());
diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
index d10884d..f1f2f60 100644
--- a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
+++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
@@ -5,13 +5,14 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
+import com.zdjizhi.pre.common.CnMetricLog;
+import com.zdjizhi.pre.common.CommonConfig;
import com.zdjizhi.pre.common.MetricKeyConfig;
-import com.zdjizhi.pre.dns.common.CommonConfig;
-import com.zdjizhi.pre.dns.common.DnsMetricLog;
-import com.zdjizhi.pre.dns.common.DnsMetricResultLog;
-import com.zdjizhi.pre.dns.function.MetricDnsServerIpFunc;
-import com.zdjizhi.pre.dns.process.FirstAggregation;
-import com.zdjizhi.pre.dns.process.SecondAggregationReduce;
+
+
+import com.zdjizhi.pre.function.MetricIpProcessWindowFunc;
+import com.zdjizhi.pre.operator.FirstAggregation;
+import com.zdjizhi.pre.operator.SecondAggregationReduce;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -69,20 +70,20 @@ public class DNSMetricTest {
}
});
- SingleOutputStreamOperator<DnsMetricLog> firstProcess = source.assignTimestampsAndWatermarks(WatermarkStrategy
+ SingleOutputStreamOperator<CnMetricLog> firstProcess = source.assignTimestampsAndWatermarks(WatermarkStrategy
.<CnRecordLog>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((event, timestamp) -> event.getCommon_recv_time() * 1000)).process(new FirstAggregation()).name("dnsProcess");
- SingleOutputStreamOperator<String> reduce = firstProcess.filter((log) -> log.getMetricKey() == MetricKeyConfig.DNS_SERVER_IP)
- .keyBy(DnsMetricLog::getCommon_server_ip)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.DNS_PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricDnsServerIpFunc());
+ SingleOutputStreamOperator<String> reduce = firstProcess.filter((log) -> log.getMetricKey() == MetricKeyConfig.SERVER_IP)
+ .keyBy(CnMetricLog::getCommon_server_ip)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server"));
reduce.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
super.invoke(value, context);
System.out.println(value);
- DnsMetricResultLog dnsMetricResultLog = JSON.parseObject(value, DnsMetricResultLog.class);
- assertEquals(dnsMetricResultLog.getQuery_num().intValue(), 72);
+ //DnsMetricResultLog dnsMetricResultLog = JSON.parseObject(value, DnsMetricResultLog.class);
+ //assertEquals(dnsMetricResultLog.getQuery_num().intValue(), 72);
}
});
diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java
new file mode 100644
index 0000000..9bab6ec
--- /dev/null
+++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java
@@ -0,0 +1,93 @@
+package com.zdjizhi.pre;
+
+import cn.hutool.core.io.file.FileReader;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.zdjizhi.base.common.CnRecordLog;
+import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
+import com.zdjizhi.pre.common.CnMetricLog;
+import com.zdjizhi.pre.common.CommonConfig;
+import com.zdjizhi.pre.common.MetricKeyConfig;
+
+
+import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.function.MetricIpProcessWindowFunc;
+import com.zdjizhi.pre.operator.FirstAggregation;
+import com.zdjizhi.pre.operator.SecondAggregationReduce;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+
+public class PreMetricTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .build()
+ );
+
+
+ @Test
+ public void jobTest() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(1);
+
+ env.getConfig().setAutoWatermarkInterval(1L);
+
+ DataStreamSource<CnRecordLog> source = env.addSource(new RichSourceFunction<CnRecordLog>() {
+ @Override
+ public void run(SourceContext<CnRecordLog> ctx) throws Exception {
+ String path = Thread.currentThread().getContextClassLoader().getResource("cn-session-pre.json").getPath();
+ FileReader fileReader = FileReader.create(new File(path));
+ String s = fileReader.getReader().readLine();
+ JSONArray objects = JSON.parseArray(s);
+ for (Object object : objects) {
+ CnRecordLog cnRecordLog = JSON.parseObject(object.toString(), CnRecordLog.class);
+ Thread.sleep(100L);
+ ctx.collect(cnRecordLog);
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ });
+ SingleOutputStreamOperator<CnMetricLog> firstProcess = source.assignTimestampsAndWatermarks(WatermarkStrategy
+ .<CnRecordLog>forBoundedOutOfOrderness(Duration.ofSeconds(60))
+ .withTimestampAssigner((event, timestamp) -> event.getCommon_recv_time() * 1000)).process(new FirstAggregation()).name("dnsProcess");
+ SingleOutputStreamOperator<String> reduce = firstProcess.filter((log) -> log.getMetricKey() == MetricKeyConfig.SERVER_IP)
+ .keyBy(CnMetricLog::getCommon_server_ip)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server"));
+ reduce.addSink(new RichSinkFunction<String>() {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ super.invoke(value, context);
+ System.out.println(value);
+ MetricResultLog log = JSON.parseObject(value, MetricResultLog.class);
+ assertEquals("", log.getPkt_retrans_ratio().toString(), "0.0909");
+ }
+ });
+
+ env.execute();
+ }
+}
diff --git a/module-CN-pre-metrics/src/test/resources/cn-session-pre.json b/module-CN-pre-metrics/src/test/resources/cn-session-pre.json
new file mode 100644
index 0000000..7dfe788
--- /dev/null
+++ b/module-CN-pre-metrics/src/test/resources/cn-session-pre.json
@@ -0,0 +1 @@
+[{"common_recv_time":1695632102,"common_log_id":1505799256823875584,"common_flags":994762888,"common_start_time":1695621600,"common_end_time":1695621604,"common_con_duration_ms":60821,"common_schema_type":"SSL","common_client_ip":"192.168.88.99","common_server_ip":"3.3.44.66","common_client_port":61166,"common_server_port":443,"common_app_label":"grammarly","common_app_full_path":"sslhttps.http2.grammarly","common_l4_protocol":"IPv4_TCP","common_l7_protocol":"UNCATEGORIZED","common_out_link_id":null,"common_in_link_id":null,"http_host":"","http_url":"","http_cookie":"","http_referer":"","http_user_agent":"","http_request_line":"","http_response_line":"","http_status_code":0,"ssl_sni":"auth.grammarly.com","ssl_version":"","ssl_san":"auth.grammarly.com","ssl_ja3_hash":"596b32d31bb9826fe2bd0a5eefb5ee29","ssl_ja3s_hash":"bfc90d56141386ee83b56cda231cccfc","ssl_cert_issuer":"CN=Amazon RSA 2048 M02;O=Amazon;C=US;;;;","ssl_cert_subject":"CN=www.test.com;;;;;;","dns_qr":0,"dns_opcode":0,"dns_aa":0,"dns_rcode":0,"dns_qname":"","dns_qtype":0,"dns_qclass":0,"dns_sub":0,"dns_rr":"","ssh_version":"","ssh_auth_success":"","ssh_client_version":"","ssh_server_version":"","ssh_cipher_alg":"","ssh_mac_alg":"","ssh_compression_alg":"","ssh_kex_alg":"","ssh_host_key_alg":"","ssh_host_key":"","ssh_hassh":"","stratum_cryptocurrency":"","stratum_mining_pools":"","stratum_mining_program":"","out_link_direction":"","in_link_direction":"","domain":"auth.grammarly.com","domain_sld":"grammarly.com","domain_category_name":"Entertainment and Arts","domain_category_group":"Productivity","domain_reputation_level":"","domain_icp_company_name":"","domain_whois_org":"","domain_tags":"[]","client_zone":"internal","client_country_region":"","client_super_admin_area":"","client_admin_area":"","client_longitude":0.0,"client_latitude":0.0,"client_isp":"","client_asn":"","client_ip_tags":"[]","server_zone":"external","server_country_region":"United States","server_super_admin_area":"","server_admin_area":"","server_longitude":-101.407912,"server_latitude":39.765054,"server_isp":"","server_asn":"","server_ip_tags":"[]","app_category":"general-internet","app_subcategory":"internet-utility","app_company":"","app_company_category":"","app_tags":"[]","common_c2s_pkt_num":17,"common_c2s_byte_num":104857600,"common_s2c_pkt_num":16,"common_s2c_byte_num":104857600,"common_sessions":1,"common_c2s_tcp_lostlen":0,"common_s2c_tcp_lostlen":0,"common_c2s_tcp_unorder_num":0,"common_s2c_tcp_unorder_num":0,"common_c2s_byte_retrans":64,"common_s2c_byte_retrans":0,"common_c2s_pkt_retrans":2,"common_s2c_pkt_retrans":1,"common_establish_latency_ms":360,"http_response_latency_ms":0,"ssl_con_latency_ms":360,"dns_response_latency_ms":0},{"common_recv_time":1695732101,"common_log_id":1505799256823875584,"common_flags":994762888,"common_start_time":1695621600,"common_end_time":1695621604,"common_con_duration_ms":60821,"common_schema_type":"SSL","common_client_ip":"192.168.88.99","common_server_ip":"3.3.44.66","common_client_port":61166,"common_server_port":443,"common_app_label":"grammarly","common_app_full_path":"sslhttps.http2.grammarly","common_l4_protocol":"IPv4_TCP","common_l7_protocol":"UNCATEGORIZED","common_out_link_id":null,"common_in_link_id":null,"http_host":"","http_url":"","http_cookie":"","http_referer":"","http_user_agent":"","http_request_line":"","http_response_line":"","http_status_code":0,"ssl_sni":"auth.grammarly.com","ssl_version":"","ssl_san":"auth.grammarly.com","ssl_ja3_hash":"596b32d31bb9826fe2bd0a5eefb5ee29","ssl_ja3s_hash":"bfc90d56141386ee83b56cda231cccfc","ssl_cert_issuer":"CN=Amazon RSA 2048 M02;O=Amazon;C=US;;;;","ssl_cert_subject":"CN=www.test.com;;;;;;","dns_qr":0,"dns_opcode":0,"dns_aa":0,"dns_rcode":0,"dns_qname":"","dns_qtype":0,"dns_qclass":0,"dns_sub":0,"dns_rr":"","ssh_version":"","ssh_auth_success":"","ssh_client_version":"","ssh_server_version":"","ssh_cipher_alg":"","ssh_mac_alg":"","ssh_compression_alg":"","ssh_kex_alg":"","ssh_host_key_alg":"","ssh_host_key":"","ssh_hassh":"","stratum_cryptocurrency":"","stratum_mining_pools":"","stratum_mining_program":"","out_link_direction":"","in_link_direction":"","domain":"auth.grammarly.com","domain_sld":"grammarly.com","domain_category_name":"Entertainment and Arts","domain_category_group":"Productivity","domain_reputation_level":"","domain_icp_company_name":"","domain_whois_org":"","domain_tags":"[]","client_zone":"internal","client_country_region":"","client_super_admin_area":"","client_admin_area":"","client_longitude":0.0,"client_latitude":0.0,"client_isp":"","client_asn":"","client_ip_tags":"[]","server_zone":"external","server_country_region":"United States","server_super_admin_area":"","server_admin_area":"","server_longitude":-101.407912,"server_latitude":39.765054,"server_isp":"","server_asn":"","server_ip_tags":"[]","app_category":"general-internet","app_subcategory":"internet-utility","app_company":"","app_company_category":"","app_tags":"[]","common_c2s_pkt_num":17,"common_c2s_byte_num":104857600,"common_s2c_pkt_num":16,"common_s2c_byte_num":104857600,"common_sessions":1,"common_c2s_tcp_lostlen":0,"common_s2c_tcp_lostlen":0,"common_c2s_tcp_unorder_num":0,"common_s2c_tcp_unorder_num":0,"common_c2s_byte_retrans":64,"common_s2c_byte_retrans":0,"common_c2s_pkt_retrans":2,"common_s2c_pkt_retrans":1,"common_establish_latency_ms":360,"http_response_latency_ms":0,"ssl_con_latency_ms":360,"dns_response_latency_ms":0},{"common_recv_time":1695732102,"common_log_id":1505799256823875584,"common_flags":994762888,"common_start_time":1695621600,"common_end_time":1695621604,"common_con_duration_ms":60821,"common_schema_type":"SSL","common_client_ip":"192.168.88.99","common_server_ip":"3.3.44.66","common_client_port":61166,"common_server_port":443,"common_app_label":"grammarly","common_app_full_path":"sslhttps.http2.grammarly","common_l4_protocol":"IPv4_TCP","common_l7_protocol":"UNCATEGORIZED","common_out_link_id":null,"common_in_link_id":null,"http_host":"","http_url":"","http_cookie":"","http_referer":"","http_user_agent":"","http_request_line":"","http_response_line":"","http_status_code":0,"ssl_sni":"auth.grammarly.com","ssl_version":"","ssl_san":"auth.grammarly.com","ssl_ja3_hash":"596b32d31bb9826fe2bd0a5eefb5ee29","ssl_ja3s_hash":"bfc90d56141386ee83b56cda231cccfc","ssl_cert_issuer":"CN=Amazon RSA 2048 M02;O=Amazon;C=US;;;;","ssl_cert_subject":"CN=www.test.com;;;;;;","dns_qr":0,"dns_opcode":0,"dns_aa":0,"dns_rcode":0,"dns_qname":"","dns_qtype":0,"dns_qclass":0,"dns_sub":0,"dns_rr":"","ssh_version":"","ssh_auth_success":"","ssh_client_version":"","ssh_server_version":"","ssh_cipher_alg":"","ssh_mac_alg":"","ssh_compression_alg":"","ssh_kex_alg":"","ssh_host_key_alg":"","ssh_host_key":"","ssh_hassh":"","stratum_cryptocurrency":"","stratum_mining_pools":"","stratum_mining_program":"","out_link_direction":"","in_link_direction":"","domain":"auth.grammarly.com","domain_sld":"grammarly.com","domain_category_name":"Entertainment and Arts","domain_category_group":"Productivity","domain_reputation_level":"","domain_icp_company_name":"","domain_whois_org":"","domain_tags":"[]","client_zone":"internal","client_country_region":"","client_super_admin_area":"","client_admin_area":"","client_longitude":0.0,"client_latitude":0.0,"client_isp":"","client_asn":"","client_ip_tags":"[]","server_zone":"external","server_country_region":"United States","server_super_admin_area":"","server_admin_area":"","server_longitude":-101.407912,"server_latitude":39.765054,"server_isp":"","server_asn":"","server_ip_tags":"[]","app_category":"general-internet","app_subcategory":"internet-utility","app_company":"","app_company_category":"","app_tags":"[]","common_c2s_pkt_num":17,"common_c2s_byte_num":104857600,"common_s2c_pkt_num":16,"common_s2c_byte_num":104857600,"common_sessions":1,"common_c2s_tcp_lostlen":0,"common_s2c_tcp_lostlen":0,"common_c2s_tcp_unorder_num":0,"common_s2c_tcp_unorder_num":0,"common_c2s_byte_retrans":64,"common_s2c_byte_retrans":0,"common_c2s_pkt_retrans":2,"common_s2c_pkt_retrans":1,"common_establish_latency_ms":360,"http_response_latency_ms":0,"ssl_con_latency_ms":360,"dns_response_latency_ms":0}] \ No newline at end of file