diff options
| author | wanglihui <[email protected]> | 2023-02-22 14:05:02 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2023-02-22 14:05:02 +0800 |
| commit | 1eae3d75d12e46062a91fa1ae1ee33c9fa317d55 (patch) | |
| tree | 2c866a9fb1430f7449bd3d4b82fb8c4b96fd1277 | |
| parent | fcf95a0f28e04052bd8960b72995ff147b9c27b2 (diff) | |
| parent | 332be4a33b0c07ef2f3cf6bf348a13ea60a4554a (diff) | |
Merge branch '22.12.rc1' into 22.1222.12
# Conflicts:
# platform-base/src/main/resources/common.properties
# platform-etl/src/main/java/com/zdjizhi/etl/utils/FcUtils.java
8 files changed, 368 insertions, 137 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java index 55383af..0a9d504 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java @@ -13,161 +13,166 @@ import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.math.RoundingMode; import java.text.DecimalFormat; +import java.util.Random; public class CnPreMetric implements Schedule { @Override public void schedule() throws Exception { + Random random = new Random(); SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; - SingleOutputStreamOperator<CnMetricLog> process = source.keyBy(CnRecordLog::getCommon_server_ip) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + SingleOutputStreamOperator<CnMetricLog> process = source.keyBy(new KeySelector<CnRecordLog, String>() { + @Override + public String getKey(CnRecordLog value) throws Exception { + int nextInt = random.nextInt(com.zdjizhi.base.common.CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + return Integer.toString(nextInt); + } + }).window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) .process(new FirstAggregationProcess()); //metric_ip - SingleOutputStreamOperator<String> clientIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_client_ip())) - .keyBy(CnMetricLog::getCommon_client_ip) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client")); + SingleOutputStreamOperator<String> clientIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_client_ip())) + .keyBy(CnMetricLog::getCommon_client_ip) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client")); clientIpMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IP_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); - SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_server_ip())) - .keyBy(CnMetricLog::getCommon_server_ip) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server")); + SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_server_ip())) + .keyBy(CnMetricLog::getCommon_server_ip) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server")); serverIpMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IP_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_region - SingleOutputStreamOperator<String> clientRegionMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getClient_country()) || StringUtil.isNotEmpty(log.getClient_province()) || StringUtil.isNotEmpty(log.getClient_region())) - .keyBy(new KeySelector<CnMetricLog, Tuple3<String,String,String>>() { - @Override - public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception { - return Tuple3.of(value.getClient_country(),value.getClient_province(),value.getClient_region()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client")); + SingleOutputStreamOperator<String> clientRegionMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getClient_country()) || StringUtil.isNotEmpty(log.getClient_province()) || StringUtil.isNotEmpty(log.getClient_region())) + .keyBy(new KeySelector<CnMetricLog, Tuple3<String, String, String>>() { + @Override + public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception { + return Tuple3.of(value.getClient_country(), value.getClient_province(), value.getClient_region()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client")); clientRegionMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_REGION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); - SingleOutputStreamOperator<String> serverRegionMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getServer_country()) || StringUtil.isNotEmpty(log.getServer_province()) || StringUtil.isNotEmpty(log.getServer_region())) - .keyBy(new KeySelector<CnMetricLog, Tuple3<String,String,String>>() { - @Override - public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception { - return Tuple3.of(value.getServer_country(),value.getServer_province(),value.getServer_region()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server")); + SingleOutputStreamOperator<String> serverRegionMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getServer_country()) || StringUtil.isNotEmpty(log.getServer_province()) || StringUtil.isNotEmpty(log.getServer_region())) + .keyBy(new KeySelector<CnMetricLog, Tuple3<String, String, String>>() { + @Override + public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception { + return Tuple3.of(value.getServer_country(), value.getServer_province(), value.getServer_region()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server")); serverRegionMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_REGION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); // //metric_asn - SingleOutputStreamOperator<String> clientAsnMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getClient_asn()) && StringUtil.isNotEmpty(log.getClient_isp())) - .keyBy(new KeySelector<CnMetricLog, Tuple2<String,String>>() { - @Override - public Tuple2<String, String> getKey(CnMetricLog value) throws Exception { - return Tuple2.of(value.getClient_asn(),value.getClient_isp()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client")); + SingleOutputStreamOperator<String> clientAsnMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getClient_asn()) && StringUtil.isNotEmpty(log.getClient_isp())) + .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() { + @Override + public Tuple2<String, String> getKey(CnMetricLog value) throws Exception { + return Tuple2.of(value.getClient_asn(), value.getClient_isp()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client")); clientAsnMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_ASN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); - SingleOutputStreamOperator<String> serverAsnMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getServer_asn()) && StringUtil.isNotEmpty(log.getServer_isp())) - .keyBy(new KeySelector<CnMetricLog, Tuple2<String,String>>() { - @Override - public Tuple2<String, String> getKey(CnMetricLog value) throws Exception { - return Tuple2.of(value.getServer_asn(),value.getServer_isp()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server")); + SingleOutputStreamOperator<String> serverAsnMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getServer_asn()) && StringUtil.isNotEmpty(log.getServer_isp())) + .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() { + @Override + public Tuple2<String, String> getKey(CnMetricLog value) throws Exception { + return Tuple2.of(value.getServer_asn(), value.getServer_isp()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server")); serverAsnMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_ASN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_idc_renter - SingleOutputStreamOperator<String> clientIdcMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getClient_idc_renter())) - .keyBy(CnMetricLog::getClient_idc_renter) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("client")); + SingleOutputStreamOperator<String> clientIdcMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getClient_idc_renter())) + .keyBy(CnMetricLog::getClient_idc_renter) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("client")); clientIdcMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IDC_RENTER_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); - SingleOutputStreamOperator<String> serverIdcMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getServer_idc_renter())) - .keyBy(CnMetricLog::getServer_idc_renter) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("server")); + SingleOutputStreamOperator<String> serverIdcMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getServer_idc_renter())) + .keyBy(CnMetricLog::getServer_idc_renter) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("server")); serverIdcMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IDC_RENTER_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_application - SingleOutputStreamOperator<String> appMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getCommon_app_label())) - .keyBy(CnMetricLog::getCommon_app_label) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricAppProcessFunc()); + SingleOutputStreamOperator<String> appMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getCommon_app_label())) + .keyBy(CnMetricLog::getCommon_app_label) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricAppProcessFunc()); appMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_APPLICATION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_domain - SingleOutputStreamOperator<String> domainMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getDomain())) - .keyBy(CnMetricLog::getDomain) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc()); + SingleOutputStreamOperator<String> domainMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getDomain())) + .keyBy(CnMetricLog::getDomain) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc()); domainMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_DOMAIN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_http_host - SingleOutputStreamOperator<String> httpHostMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getHttp_host())) - .keyBy(CnMetricLog::getHttp_host) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricHostProcessFunc()); + SingleOutputStreamOperator<String> httpHostMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getHttp_host())) + .keyBy(CnMetricLog::getHttp_host) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricHostProcessFunc()); httpHostMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_HTTP_HOST_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_ssl_sni - SingleOutputStreamOperator<String> sslSniMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getSsl_sni())) - .keyBy(CnMetricLog::getSsl_sni) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricSslProcessFunc()); + SingleOutputStreamOperator<String> sslSniMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getSsl_sni())) + .keyBy(CnMetricLog::getSsl_sni) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricSslProcessFunc()); sslSniMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_SSL_SNI_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_protocol - SingleOutputStreamOperator<String> protocolMetric = process - .filter((log) -> StringUtil.isNotEmpty(log.getCommon_l7_protocol()) && StringUtil.isNotEmpty(log.getCommon_server_port())) - .keyBy(new KeySelector<CnMetricLog, Tuple2<String,Integer>>() { - @Override - public Tuple2<String, Integer> getKey(CnMetricLog value) throws Exception { - return Tuple2.of(value.getCommon_l7_protocol(),value.getCommon_server_port()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc()); + SingleOutputStreamOperator<String> protocolMetric = process + .filter((log) -> StringUtil.isNotEmpty(log.getCommon_l7_protocol()) && StringUtil.isNotEmpty(log.getCommon_server_port())) + .keyBy(new KeySelector<CnMetricLog, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> getKey(CnMetricLog value) throws Exception { + return Tuple2.of(value.getCommon_l7_protocol(), value.getCommon_server_port()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc()); protocolMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_PROTOCOL_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); //metric_link SingleOutputStreamOperator<String> linkMetric = process - .keyBy(new KeySelector<CnMetricLog, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> getKey(CnMetricLog value) throws Exception { - return Tuple2.of(value.getCommon_ingress_link_id(),value.getCommon_egress_link_id()); - } - }) - .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) - .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc()); + .keyBy(new KeySelector<CnMetricLog, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> getKey(CnMetricLog value) throws Exception { + return Tuple2.of(value.getCommon_ingress_link_id(), value.getCommon_egress_link_id()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME))) + .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc()); // linkMetric.print(); - linkMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_LINK_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); - -} + linkMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_LINK_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM); + } public static void main(String[] args) throws Exception { @@ -176,9 +181,14 @@ public class CnPreMetric implements Schedule { float number = (float) 0L / 1L; String format = df.format(number); System.out.println(format); - System.out.println("Float.isInfinite:"+Float.isInfinite(number)); - System.out.println("Float.isNaN:"+Float.isNaN(number)); - System.out.println("Float.isFinite:"+Float.isFinite(number)); + System.out.println("Float.isInfinite:" + Float.isInfinite(number)); + System.out.println("Float.isNaN:" + Float.isNaN(number)); + System.out.println("Float.isFinite:" + Float.isFinite(number)); + + + Random random = new Random(); + int nextInt = random.nextInt(1); + System.out.println(Integer.toString(nextInt)); // System.out.println(Float.parseFloat(format)); diff --git a/module-CN-pre-metrics/src/main/resources/pre-metrics.properties b/module-CN-pre-metrics/src/main/resources/pre-metrics.properties index 88d572c..094cf07 100644 --- a/module-CN-pre-metrics/src/main/resources/pre-metrics.properties +++ b/module-CN-pre-metrics/src/main/resources/pre-metrics.properties @@ -5,27 +5,28 @@ pre.metrics.window.time=1 pre.metrics.round.scale=4 #metric topic名 -metric.ip.topic=METRIC-IP -metric.region.topic=METRIC-REGION -metric.asn.topic=METRIC-ASN -metric.idc.renter.topic=METRIC-IDC-RENTER -metric.application.topic=METRIC-APPLICATION -metric.domain.topic=METRIC-DOMAIN -metric.http.host.topic=METRIC-HTTP-HOST -metric.ssl.sni.topic=METRIC-SSL-SNI -metric.protocol.topic=METRIC-PROTOCOL -metric.link.topic=METRIC-LINK +#metric.ip.topic=METRIC-IP +#metric.region.topic=METRIC-REGION +#metric.asn.topic=METRIC-ASN +#metric.idc.renter.topic=METRIC-IDC-RENTER +#metric.application.topic=METRIC-APPLICATION +#metric.domain.topic=METRIC-DOMAIN +#metric.http.host.topic=METRIC-HTTP-HOST +#metric.ssl.sni.topic=METRIC-SSL-SNI +#metric.protocol.topic=METRIC-PROTOCOL +#metric.link.topic=METRIC-LINK -#metric.ip.topic=METRIC-IP-TEST -#metric.region.topic=METRIC-REGION-TEST -#metric.asn.topic=METRIC-ASN-TEST -#metric.idc.renter.topic=METRIC-IDC-RENTER-TEST -#metric.application.topic=METRIC-APPLICATION-TEST -#metric.domain.topic=METRIC-DOMAIN-TEST -#metric.http.host.topic=METRIC-HTTP-HOST-TEST -#metric.ssl.sni.topic=METRIC-SSL-SNI-TEST -#metric.protocol.topic=METRIC-PROTOCOL-TEST +metric.ip.topic=METRIC-IP-TEST +metric.region.topic=METRIC-REGION-TEST +metric.asn.topic=METRIC-ASN-TEST +metric.idc.renter.topic=METRIC-IDC-RENTER-TEST +metric.application.topic=METRIC-APPLICATION-TEST +metric.domain.topic=METRIC-DOMAIN-TEST +metric.http.host.topic=METRIC-HTTP-HOST-TEST +metric.ssl.sni.topic=METRIC-SSL-SNI-TEST +metric.protocol.topic=METRIC-PROTOCOL-TEST +metric.link.topic=METRIC-LINK-TEST #metric输出并行度设置 diff --git a/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java b/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java index f71bf9a..6cda972 100644 --- a/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java +++ b/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java @@ -10,18 +10,28 @@ import com.zdjizhi.dns.pre.process.FirstAggregationProcess; import com.zdjizhi.dns.pre.process.SecondAggregationReduce; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import java.util.Random; + public class DnsPreMetric implements Schedule { @Override public void schedule() throws Exception { + Random random = new Random(); SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; - SingleOutputStreamOperator<DnsMetricLog> firstProcess = source.filter((log)->"DNS".equals(log.getCommon_schema_type())) - .keyBy(CnRecordLog::getCommon_server_ip) + SingleOutputStreamOperator<DnsMetricLog> firstProcess = source.filter((log) -> "DNS".equals(log.getCommon_schema_type())) + .keyBy(new KeySelector<CnRecordLog, String>() { + @Override + public String getKey(CnRecordLog value) throws Exception { + int nextInt = random.nextInt(com.zdjizhi.base.common.CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + return Integer.toString(nextInt); + } + }) .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.DNS_PRE_METRICS_WINDOW_TIME))) .process(new FirstAggregationProcess()); diff --git a/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties b/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties index 3fa239c..058c28d 100644 --- a/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties +++ b/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties @@ -5,15 +5,21 @@ dns.pre.metrics.window.time=1 dns.pre.metrics.round.scale=4 #dns metric topic -dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP -dns.metric.qname.topic=METRIC-DNS-QNAME -dns.metric.qtype.topic=METRIC-DNS-QTYPE -dns.metric.rcode.topic=METRIC-DNS-RCODE -dns.metric.rra.topic=METRIC-DNS-RR-A -dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA -dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME - +#dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP +#dns.metric.qname.topic=METRIC-DNS-QNAME +#dns.metric.qtype.topic=METRIC-DNS-QTYPE +#dns.metric.rcode.topic=METRIC-DNS-RCODE +#dns.metric.rra.topic=METRIC-DNS-RR-A +#dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA +#dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME +dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP-TEST +dns.metric.qname.topic=METRIC-DNS-QNAME-TEST +dns.metric.qtype.topic=METRIC-DNS-QTYPE-TEST +dns.metric.rcode.topic=METRIC-DNS-RCODE-TEST +dns.metric.rra.topic=METRIC-DNS-RR-A-TEST +dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA-TEST +dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME-TEST #metric输出并行度设置 dns.metric.output.parallelism=1
\ No newline at end of file diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java b/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java index 10bee28..bc3615e 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java @@ -7,6 +7,16 @@ public class DomainCategoryReputation { private String reputationLevel; private long reputationScore; + private String match_pattern; + + public String getMatch_pattern() { + return match_pattern; + } + + public void setMatch_pattern(String match_pattern) { + this.match_pattern = match_pattern; + } + public String getFqdn() { return fqdn; } @@ -46,4 +56,15 @@ public class DomainCategoryReputation { public void setReputationScore(long reputationScore) { this.reputationScore = reputationScore; } + + @Override + public String toString() { + return "DomainCategoryReputation{" + + "fqdn='" + fqdn + '\'' + + ", categoryName='" + categoryName + '\'' + + ", categoryGroup='" + categoryGroup + '\'' + + ", reputationLevel='" + reputationLevel + '\'' + + ", reputationScore=" + reputationScore + + '}'; + } } diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java new file mode 100644 index 0000000..3d81217 --- /dev/null +++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java @@ -0,0 +1,183 @@ +package com.zdjizhi.etl.utils; + +import com.opencsv.CSVParser; +import com.opencsv.CSVReader; +import com.zdjizhi.base.common.CommonConfig; +import com.zdjizhi.base.utils.ESSinkUtils; +import org.apache.http.HttpHost; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.*; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class DnsServerTest { + private static Logger LOG = LoggerFactory.getLogger(DnsServerTest.class); + + public static void main(String[] args) throws Exception { + switch (CommonConfig.CN_RECORD_PARALLELISM) { + case 1: + setDnsByOprdns(); + break; + case 2: + setDnsByDoh(); + break; + case 3: + setdomain(); + break; + default: + throw new RuntimeException(); + } + + } + + public static void setDnsByOprdns() throws Exception { + DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\Public DNS Server List.txt"))); +// DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH))); + CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1); + +// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST); + List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200"); + RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0])); + + RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); + + BulkRequest bulkRequest = new BulkRequest(); + int i = 0; + + for (String[] strs : csvReader) { + i += 1; + String key = strs[0]; + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .field("id", key) + .field("entity_type", "ip") + .field("found_time", System.currentTimeMillis() / 1000) + .field("update_time", System.currentTimeMillis() / 1000) + .field("ip_addr", key) + .field("dns_server_role", "OPRDNS") + .field("doh_support", false) + .endObject(); + UpdateRequest source = new UpdateRequest("entity_info", key) + .doc(builder) + .upsert(builder) + .id(key); + bulkRequest.add(source); + + if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) { + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入{}", i); + i = 0; + } + } + csvReader.close(); + + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入完毕"); + client.close(); + + } + + public static void setDnsByDoh() throws Exception { + DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\Public DoH Server List.txt"))); +// DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH))); + CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1); + +// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST); + List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200"); + RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0])); + + RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); + + BulkRequest bulkRequest = new BulkRequest(); + int i = 0; + + for (String[] strs : csvReader) { + i += 1; + String key = strs[0]; + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .field("id", key) + .field("entity_type", "ip") + .field("found_time", System.currentTimeMillis() / 1000) + .field("update_time", System.currentTimeMillis() / 1000) + .field("ip_addr", key) + .field("dns_server_role", "OPRDNS") + .field("doh_support", true) + .endObject(); + UpdateRequest source = new UpdateRequest("entity_info", key) + .doc(builder) + .upsert(builder) + .id(key); + bulkRequest.add(source); + + if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) { + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入{}", i); + i = 0; + } + } + csvReader.close(); + + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入完毕"); + client.close(); + + } + + public static void setdomain() throws Exception { +// DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\tmp\\domains.csv.ag"))); + DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH))); + CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1); + +// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST); + List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200"); + RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0])); + + RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient); + + BulkRequest bulkRequest = new BulkRequest(); + int i = 0; + + for (String[] strs : csvReader) { + i += 1; + String key = strs[0]; + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .field("id", key) + .field("entity_type", "domain") + .field("found_time", System.currentTimeMillis() / 1000) + .field("update_time", System.currentTimeMillis() / 1000) + .field("domain_name", key) + .endObject(); + UpdateRequest source = new UpdateRequest("entity_info", key) + .doc(builder) + .upsert(builder) + .id(key); + bulkRequest.add(source); + + if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) { + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入{}", i); + i = 0; + } + } + csvReader.close(); + + client.bulk(bulkRequest, RequestOptions.DEFAULT); + LOG.info("写入完毕"); + client.close(); + + } + + +} diff --git a/platform-etl/src/main/resources/etl.properties b/platform-etl/src/main/resources/etl.properties index dfc05c1..065adfc 100644 --- a/platform-etl/src/main/resources/etl.properties +++ b/platform-etl/src/main/resources/etl.properties @@ -1,5 +1,5 @@ #判断IP域内外逻辑模式 1:基于网段配置文件判断 2:基于地理位置判断 3:基于common_direction字段判断 -ip.internal.or.external.pattern=1 +ip.internal.or.external.pattern=10000 #当ip.internal.or.external.pattern=2时,以下两个配置生效 #选择基于地理位置判断的字段,可选:country/province/region diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties index 9c714e1..2bc150a 100644 --- a/platform-schedule/src/main/resources/business.properties +++ b/platform-schedule/src/main/resources/business.properties @@ -1,7 +1,7 @@ #cn.record.etl.class=com.zdjizhi.etl.CnRecordEtl #cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric -#cn.dns.pre.metric.class=com.zdjizhi.dns.pre.DnsPreMetric -cn.security.event.class=com.zdjizhi.security.CnSecurityEvent +cn.dns.pre.metric.class=com.zdjizhi.dns.pre.DnsPreMetric +#cn.security.event.class=com.zdjizhi.security.CnSecurityEvent #cn.dns.error.detection=com.zdjizhi.detection.dns.DnsErrorDetection #cn.dns.response.detection=com.zdjizhi.detection.dns.DnsResponseTimeDetection #cn.http.error.detection=com.zdjizhi.detection.dns.HttpErrorDetection
\ No newline at end of file |
