summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2023-02-22 14:05:02 +0800
committerwanglihui <[email protected]>2023-02-22 14:05:02 +0800
commit1eae3d75d12e46062a91fa1ae1ee33c9fa317d55 (patch)
tree2c866a9fb1430f7449bd3d4b82fb8c4b96fd1277
parentfcf95a0f28e04052bd8960b72995ff147b9c27b2 (diff)
parent332be4a33b0c07ef2f3cf6bf348a13ea60a4554a (diff)
Merge branch '22.12.rc1' into 22.1222.12
# Conflicts: # platform-base/src/main/resources/common.properties # platform-etl/src/main/java/com/zdjizhi/etl/utils/FcUtils.java
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java220
-rw-r--r--module-CN-pre-metrics/src/main/resources/pre-metrics.properties39
-rw-r--r--module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java14
-rw-r--r--module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties22
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java21
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java183
-rw-r--r--platform-etl/src/main/resources/etl.properties2
-rw-r--r--platform-schedule/src/main/resources/business.properties4
8 files changed, 368 insertions, 137 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
index 55383af..0a9d504 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
@@ -13,161 +13,166 @@ import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.math.RoundingMode;
import java.text.DecimalFormat;
+import java.util.Random;
public class CnPreMetric implements Schedule {
@Override
public void schedule() throws Exception {
+ Random random = new Random();
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
- SingleOutputStreamOperator<CnMetricLog> process = source.keyBy(CnRecordLog::getCommon_server_ip)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ SingleOutputStreamOperator<CnMetricLog> process = source.keyBy(new KeySelector<CnRecordLog, String>() {
+ @Override
+ public String getKey(CnRecordLog value) throws Exception {
+ int nextInt = random.nextInt(com.zdjizhi.base.common.CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
+ return Integer.toString(nextInt);
+ }
+ }).window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
.process(new FirstAggregationProcess());
//metric_ip
- SingleOutputStreamOperator<String> clientIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_client_ip()))
- .keyBy(CnMetricLog::getCommon_client_ip)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client"));
+ SingleOutputStreamOperator<String> clientIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_client_ip()))
+ .keyBy(CnMetricLog::getCommon_client_ip)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client"));
clientIpMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IP_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
- SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_server_ip()))
- .keyBy(CnMetricLog::getCommon_server_ip)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server"));
+ SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> StringUtil.isNotEmpty(log.getCommon_server_ip()))
+ .keyBy(CnMetricLog::getCommon_server_ip)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server"));
serverIpMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IP_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_region
- SingleOutputStreamOperator<String> clientRegionMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getClient_country()) || StringUtil.isNotEmpty(log.getClient_province()) || StringUtil.isNotEmpty(log.getClient_region()))
- .keyBy(new KeySelector<CnMetricLog, Tuple3<String,String,String>>() {
- @Override
- public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception {
- return Tuple3.of(value.getClient_country(),value.getClient_province(),value.getClient_region());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client"));
+ SingleOutputStreamOperator<String> clientRegionMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getClient_country()) || StringUtil.isNotEmpty(log.getClient_province()) || StringUtil.isNotEmpty(log.getClient_region()))
+ .keyBy(new KeySelector<CnMetricLog, Tuple3<String, String, String>>() {
+ @Override
+ public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception {
+ return Tuple3.of(value.getClient_country(), value.getClient_province(), value.getClient_region());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client"));
clientRegionMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_REGION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
- SingleOutputStreamOperator<String> serverRegionMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getServer_country()) || StringUtil.isNotEmpty(log.getServer_province()) || StringUtil.isNotEmpty(log.getServer_region()))
- .keyBy(new KeySelector<CnMetricLog, Tuple3<String,String,String>>() {
- @Override
- public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception {
- return Tuple3.of(value.getServer_country(),value.getServer_province(),value.getServer_region());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server"));
+ SingleOutputStreamOperator<String> serverRegionMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getServer_country()) || StringUtil.isNotEmpty(log.getServer_province()) || StringUtil.isNotEmpty(log.getServer_region()))
+ .keyBy(new KeySelector<CnMetricLog, Tuple3<String, String, String>>() {
+ @Override
+ public Tuple3<String, String, String> getKey(CnMetricLog value) throws Exception {
+ return Tuple3.of(value.getServer_country(), value.getServer_province(), value.getServer_region());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server"));
serverRegionMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_REGION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
// //metric_asn
- SingleOutputStreamOperator<String> clientAsnMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getClient_asn()) && StringUtil.isNotEmpty(log.getClient_isp()))
- .keyBy(new KeySelector<CnMetricLog, Tuple2<String,String>>() {
- @Override
- public Tuple2<String, String> getKey(CnMetricLog value) throws Exception {
- return Tuple2.of(value.getClient_asn(),value.getClient_isp());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client"));
+ SingleOutputStreamOperator<String> clientAsnMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getClient_asn()) && StringUtil.isNotEmpty(log.getClient_isp()))
+ .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() {
+ @Override
+ public Tuple2<String, String> getKey(CnMetricLog value) throws Exception {
+ return Tuple2.of(value.getClient_asn(), value.getClient_isp());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client"));
clientAsnMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_ASN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
- SingleOutputStreamOperator<String> serverAsnMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getServer_asn()) && StringUtil.isNotEmpty(log.getServer_isp()))
- .keyBy(new KeySelector<CnMetricLog, Tuple2<String,String>>() {
- @Override
- public Tuple2<String, String> getKey(CnMetricLog value) throws Exception {
- return Tuple2.of(value.getServer_asn(),value.getServer_isp());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server"));
+ SingleOutputStreamOperator<String> serverAsnMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getServer_asn()) && StringUtil.isNotEmpty(log.getServer_isp()))
+ .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() {
+ @Override
+ public Tuple2<String, String> getKey(CnMetricLog value) throws Exception {
+ return Tuple2.of(value.getServer_asn(), value.getServer_isp());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server"));
serverAsnMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_ASN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_idc_renter
- SingleOutputStreamOperator<String> clientIdcMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getClient_idc_renter()))
- .keyBy(CnMetricLog::getClient_idc_renter)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("client"));
+ SingleOutputStreamOperator<String> clientIdcMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getClient_idc_renter()))
+ .keyBy(CnMetricLog::getClient_idc_renter)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("client"));
clientIdcMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IDC_RENTER_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
- SingleOutputStreamOperator<String> serverIdcMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getServer_idc_renter()))
- .keyBy(CnMetricLog::getServer_idc_renter)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("server"));
+ SingleOutputStreamOperator<String> serverIdcMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getServer_idc_renter()))
+ .keyBy(CnMetricLog::getServer_idc_renter)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricIdcProcessFunc("server"));
serverIdcMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_IDC_RENTER_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_application
- SingleOutputStreamOperator<String> appMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getCommon_app_label()))
- .keyBy(CnMetricLog::getCommon_app_label)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricAppProcessFunc());
+ SingleOutputStreamOperator<String> appMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getCommon_app_label()))
+ .keyBy(CnMetricLog::getCommon_app_label)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricAppProcessFunc());
appMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_APPLICATION_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_domain
- SingleOutputStreamOperator<String> domainMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getDomain()))
- .keyBy(CnMetricLog::getDomain)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc());
+ SingleOutputStreamOperator<String> domainMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getDomain()))
+ .keyBy(CnMetricLog::getDomain)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc());
domainMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_DOMAIN_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_http_host
- SingleOutputStreamOperator<String> httpHostMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getHttp_host()))
- .keyBy(CnMetricLog::getHttp_host)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricHostProcessFunc());
+ SingleOutputStreamOperator<String> httpHostMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getHttp_host()))
+ .keyBy(CnMetricLog::getHttp_host)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricHostProcessFunc());
httpHostMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_HTTP_HOST_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_ssl_sni
- SingleOutputStreamOperator<String> sslSniMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getSsl_sni()))
- .keyBy(CnMetricLog::getSsl_sni)
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricSslProcessFunc());
+ SingleOutputStreamOperator<String> sslSniMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getSsl_sni()))
+ .keyBy(CnMetricLog::getSsl_sni)
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricSslProcessFunc());
sslSniMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_SSL_SNI_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_protocol
- SingleOutputStreamOperator<String> protocolMetric = process
- .filter((log) -> StringUtil.isNotEmpty(log.getCommon_l7_protocol()) && StringUtil.isNotEmpty(log.getCommon_server_port()))
- .keyBy(new KeySelector<CnMetricLog, Tuple2<String,Integer>>() {
- @Override
- public Tuple2<String, Integer> getKey(CnMetricLog value) throws Exception {
- return Tuple2.of(value.getCommon_l7_protocol(),value.getCommon_server_port());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc());
+ SingleOutputStreamOperator<String> protocolMetric = process
+ .filter((log) -> StringUtil.isNotEmpty(log.getCommon_l7_protocol()) && StringUtil.isNotEmpty(log.getCommon_server_port()))
+ .keyBy(new KeySelector<CnMetricLog, Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> getKey(CnMetricLog value) throws Exception {
+ return Tuple2.of(value.getCommon_l7_protocol(), value.getCommon_server_port());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc());
protocolMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_PROTOCOL_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
//metric_link
SingleOutputStreamOperator<String> linkMetric = process
- .keyBy(new KeySelector<CnMetricLog, Tuple2<Long, Long>>() {
- @Override
- public Tuple2<Long, Long> getKey(CnMetricLog value) throws Exception {
- return Tuple2.of(value.getCommon_ingress_link_id(),value.getCommon_egress_link_id());
- }
- })
- .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
- .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc());
+ .keyBy(new KeySelector<CnMetricLog, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> getKey(CnMetricLog value) throws Exception {
+ return Tuple2.of(value.getCommon_ingress_link_id(), value.getCommon_egress_link_id());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.PRE_METRICS_WINDOW_TIME)))
+ .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc());
// linkMetric.print();
- linkMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_LINK_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
-
-}
+ linkMetric.addSink(KafkaUtils.getKafkaSink(CommonConfig.METRIC_LINK_TOPIC)).setParallelism(CommonConfig.METRIC_OUTPUT_PARALLELISM);
+ }
public static void main(String[] args) throws Exception {
@@ -176,9 +181,14 @@ public class CnPreMetric implements Schedule {
float number = (float) 0L / 1L;
String format = df.format(number);
System.out.println(format);
- System.out.println("Float.isInfinite:"+Float.isInfinite(number));
- System.out.println("Float.isNaN:"+Float.isNaN(number));
- System.out.println("Float.isFinite:"+Float.isFinite(number));
+ System.out.println("Float.isInfinite:" + Float.isInfinite(number));
+ System.out.println("Float.isNaN:" + Float.isNaN(number));
+ System.out.println("Float.isFinite:" + Float.isFinite(number));
+
+
+ Random random = new Random();
+ int nextInt = random.nextInt(1);
+ System.out.println(Integer.toString(nextInt));
// System.out.println(Float.parseFloat(format));
diff --git a/module-CN-pre-metrics/src/main/resources/pre-metrics.properties b/module-CN-pre-metrics/src/main/resources/pre-metrics.properties
index 88d572c..094cf07 100644
--- a/module-CN-pre-metrics/src/main/resources/pre-metrics.properties
+++ b/module-CN-pre-metrics/src/main/resources/pre-metrics.properties
@@ -5,27 +5,28 @@ pre.metrics.window.time=1
pre.metrics.round.scale=4
#metric topic名
-metric.ip.topic=METRIC-IP
-metric.region.topic=METRIC-REGION
-metric.asn.topic=METRIC-ASN
-metric.idc.renter.topic=METRIC-IDC-RENTER
-metric.application.topic=METRIC-APPLICATION
-metric.domain.topic=METRIC-DOMAIN
-metric.http.host.topic=METRIC-HTTP-HOST
-metric.ssl.sni.topic=METRIC-SSL-SNI
-metric.protocol.topic=METRIC-PROTOCOL
-metric.link.topic=METRIC-LINK
+#metric.ip.topic=METRIC-IP
+#metric.region.topic=METRIC-REGION
+#metric.asn.topic=METRIC-ASN
+#metric.idc.renter.topic=METRIC-IDC-RENTER
+#metric.application.topic=METRIC-APPLICATION
+#metric.domain.topic=METRIC-DOMAIN
+#metric.http.host.topic=METRIC-HTTP-HOST
+#metric.ssl.sni.topic=METRIC-SSL-SNI
+#metric.protocol.topic=METRIC-PROTOCOL
+#metric.link.topic=METRIC-LINK
-#metric.ip.topic=METRIC-IP-TEST
-#metric.region.topic=METRIC-REGION-TEST
-#metric.asn.topic=METRIC-ASN-TEST
-#metric.idc.renter.topic=METRIC-IDC-RENTER-TEST
-#metric.application.topic=METRIC-APPLICATION-TEST
-#metric.domain.topic=METRIC-DOMAIN-TEST
-#metric.http.host.topic=METRIC-HTTP-HOST-TEST
-#metric.ssl.sni.topic=METRIC-SSL-SNI-TEST
-#metric.protocol.topic=METRIC-PROTOCOL-TEST
+metric.ip.topic=METRIC-IP-TEST
+metric.region.topic=METRIC-REGION-TEST
+metric.asn.topic=METRIC-ASN-TEST
+metric.idc.renter.topic=METRIC-IDC-RENTER-TEST
+metric.application.topic=METRIC-APPLICATION-TEST
+metric.domain.topic=METRIC-DOMAIN-TEST
+metric.http.host.topic=METRIC-HTTP-HOST-TEST
+metric.ssl.sni.topic=METRIC-SSL-SNI-TEST
+metric.protocol.topic=METRIC-PROTOCOL-TEST
+metric.link.topic=METRIC-LINK-TEST
#metric输出并行度设置
diff --git a/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java b/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java
index f71bf9a..6cda972 100644
--- a/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java
+++ b/module-DNS-pre-metrics/src/main/java/com/zdjizhi/dns/pre/DnsPreMetric.java
@@ -10,18 +10,28 @@ import com.zdjizhi.dns.pre.process.FirstAggregationProcess;
import com.zdjizhi.dns.pre.process.SecondAggregationReduce;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import java.util.Random;
+
public class DnsPreMetric implements Schedule {
@Override
public void schedule() throws Exception {
+ Random random = new Random();
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
- SingleOutputStreamOperator<DnsMetricLog> firstProcess = source.filter((log)->"DNS".equals(log.getCommon_schema_type()))
- .keyBy(CnRecordLog::getCommon_server_ip)
+ SingleOutputStreamOperator<DnsMetricLog> firstProcess = source.filter((log) -> "DNS".equals(log.getCommon_schema_type()))
+ .keyBy(new KeySelector<CnRecordLog, String>() {
+ @Override
+ public String getKey(CnRecordLog value) throws Exception {
+ int nextInt = random.nextInt(com.zdjizhi.base.common.CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
+ return Integer.toString(nextInt);
+ }
+ })
.window(TumblingEventTimeWindows.of(Time.minutes(CommonConfig.DNS_PRE_METRICS_WINDOW_TIME)))
.process(new FirstAggregationProcess());
diff --git a/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties b/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties
index 3fa239c..058c28d 100644
--- a/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties
+++ b/module-DNS-pre-metrics/src/main/resources/dns_pre_metrics.properties
@@ -5,15 +5,21 @@ dns.pre.metrics.window.time=1
dns.pre.metrics.round.scale=4
#dns metric topic
-dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP
-dns.metric.qname.topic=METRIC-DNS-QNAME
-dns.metric.qtype.topic=METRIC-DNS-QTYPE
-dns.metric.rcode.topic=METRIC-DNS-RCODE
-dns.metric.rra.topic=METRIC-DNS-RR-A
-dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA
-dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME
-
+#dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP
+#dns.metric.qname.topic=METRIC-DNS-QNAME
+#dns.metric.qtype.topic=METRIC-DNS-QTYPE
+#dns.metric.rcode.topic=METRIC-DNS-RCODE
+#dns.metric.rra.topic=METRIC-DNS-RR-A
+#dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA
+#dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME
+dns.metric.server.ip.topic=METRIC-DNS-SERVER-IP-TEST
+dns.metric.qname.topic=METRIC-DNS-QNAME-TEST
+dns.metric.qtype.topic=METRIC-DNS-QTYPE-TEST
+dns.metric.rcode.topic=METRIC-DNS-RCODE-TEST
+dns.metric.rra.topic=METRIC-DNS-RR-A-TEST
+dns.metric.rraaaa.topic=METRIC-DNS-RR-AAAA-TEST
+dns.metric.rrcname.topic=METRIC-DNS-RR-CNAME-TEST
#metric输出并行度设置
dns.metric.output.parallelism=1 \ No newline at end of file
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java b/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java
index 10bee28..bc3615e 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/common/DomainCategoryReputation.java
@@ -7,6 +7,16 @@ public class DomainCategoryReputation {
private String reputationLevel;
private long reputationScore;
+ private String match_pattern;
+
+ public String getMatch_pattern() {
+ return match_pattern;
+ }
+
+ public void setMatch_pattern(String match_pattern) {
+ this.match_pattern = match_pattern;
+ }
+
public String getFqdn() {
return fqdn;
}
@@ -46,4 +56,15 @@ public class DomainCategoryReputation {
public void setReputationScore(long reputationScore) {
this.reputationScore = reputationScore;
}
+
+ @Override
+ public String toString() {
+ return "DomainCategoryReputation{" +
+ "fqdn='" + fqdn + '\'' +
+ ", categoryName='" + categoryName + '\'' +
+ ", categoryGroup='" + categoryGroup + '\'' +
+ ", reputationLevel='" + reputationLevel + '\'' +
+ ", reputationScore=" + reputationScore +
+ '}';
+ }
}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java
new file mode 100644
index 0000000..3d81217
--- /dev/null
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/DnsServerTest.java
@@ -0,0 +1,183 @@
+package com.zdjizhi.etl.utils;
+
+import com.opencsv.CSVParser;
+import com.opencsv.CSVReader;
+import com.zdjizhi.base.common.CommonConfig;
+import com.zdjizhi.base.utils.ESSinkUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class DnsServerTest {
+ private static Logger LOG = LoggerFactory.getLogger(DnsServerTest.class);
+
+ public static void main(String[] args) throws Exception {
+ switch (CommonConfig.CN_RECORD_PARALLELISM) {
+ case 1:
+ setDnsByOprdns();
+ break;
+ case 2:
+ setDnsByDoh();
+ break;
+ case 3:
+ setdomain();
+ break;
+ default:
+ throw new RuntimeException();
+ }
+
+ }
+
+ public static void setDnsByOprdns() throws Exception {
+ DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\Public DNS Server List.txt")));
+// DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH)));
+ CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR,
+ CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1);
+
+// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST);
+ List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200");
+ RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0]));
+
+ RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
+
+ BulkRequest bulkRequest = new BulkRequest();
+ int i = 0;
+
+ for (String[] strs : csvReader) {
+ i += 1;
+ String key = strs[0];
+ XContentBuilder builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .field("id", key)
+ .field("entity_type", "ip")
+ .field("found_time", System.currentTimeMillis() / 1000)
+ .field("update_time", System.currentTimeMillis() / 1000)
+ .field("ip_addr", key)
+ .field("dns_server_role", "OPRDNS")
+ .field("doh_support", false)
+ .endObject();
+ UpdateRequest source = new UpdateRequest("entity_info", key)
+ .doc(builder)
+ .upsert(builder)
+ .id(key);
+ bulkRequest.add(source);
+
+ if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) {
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入{}", i);
+ i = 0;
+ }
+ }
+ csvReader.close();
+
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入完毕");
+ client.close();
+
+ }
+
+ public static void setDnsByDoh() throws Exception {
+ DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\Public DoH Server List.txt")));
+// DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH)));
+ CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR,
+ CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1);
+
+// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST);
+ List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200");
+ RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0]));
+
+ RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
+
+ BulkRequest bulkRequest = new BulkRequest();
+ int i = 0;
+
+ for (String[] strs : csvReader) {
+ i += 1;
+ String key = strs[0];
+ XContentBuilder builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .field("id", key)
+ .field("entity_type", "ip")
+ .field("found_time", System.currentTimeMillis() / 1000)
+ .field("update_time", System.currentTimeMillis() / 1000)
+ .field("ip_addr", key)
+ .field("dns_server_role", "OPRDNS")
+ .field("doh_support", true)
+ .endObject();
+ UpdateRequest source = new UpdateRequest("entity_info", key)
+ .doc(builder)
+ .upsert(builder)
+ .id(key);
+ bulkRequest.add(source);
+
+ if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) {
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入{}", i);
+ i = 0;
+ }
+ }
+ csvReader.close();
+
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入完毕");
+ client.close();
+
+ }
+
+ public static void setdomain() throws Exception {
+// DataInputStream in = new DataInputStream(new FileInputStream(new File("D:\\data\\tmp\\domains.csv.ag")));
+ DataInputStream in = new DataInputStream(new FileInputStream(new File(CommonConfig.WEBSKT_PATH)));
+ CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR,
+ CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1);
+
+// List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses(CommonConfig.ES_HOST);
+ List<HttpHost> esAddresses = ESSinkUtils.getEsAddresses("192.168.40.73:9200");
+ RestClientBuilder lowLevelRestClient = RestClient.builder(esAddresses.toArray(new HttpHost[0]));
+
+ RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
+
+ BulkRequest bulkRequest = new BulkRequest();
+ int i = 0;
+
+ for (String[] strs : csvReader) {
+ i += 1;
+ String key = strs[0];
+ XContentBuilder builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .field("id", key)
+ .field("entity_type", "domain")
+ .field("found_time", System.currentTimeMillis() / 1000)
+ .field("update_time", System.currentTimeMillis() / 1000)
+ .field("domain_name", key)
+ .endObject();
+ UpdateRequest source = new UpdateRequest("entity_info", key)
+ .doc(builder)
+ .upsert(builder)
+ .id(key);
+ bulkRequest.add(source);
+
+ if (i >= com.zdjizhi.etl.common.CommonConfig.IP_INTERNAL_OR_EXTERNAL_PATTERN) {
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入{}", i);
+ i = 0;
+ }
+ }
+ csvReader.close();
+
+ client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ LOG.info("写入完毕");
+ client.close();
+
+ }
+
+
+}
diff --git a/platform-etl/src/main/resources/etl.properties b/platform-etl/src/main/resources/etl.properties
index dfc05c1..065adfc 100644
--- a/platform-etl/src/main/resources/etl.properties
+++ b/platform-etl/src/main/resources/etl.properties
@@ -1,5 +1,5 @@
#判断IP域内外逻辑模式 1:基于网段配置文件判断 2:基于地理位置判断 3:基于common_direction字段判断
-ip.internal.or.external.pattern=1
+ip.internal.or.external.pattern=10000
#当ip.internal.or.external.pattern=2时,以下两个配置生效
#选择基于地理位置判断的字段,可选:country/province/region
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index 9c714e1..2bc150a 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1,7 +1,7 @@
#cn.record.etl.class=com.zdjizhi.etl.CnRecordEtl
#cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric
-#cn.dns.pre.metric.class=com.zdjizhi.dns.pre.DnsPreMetric
-cn.security.event.class=com.zdjizhi.security.CnSecurityEvent
+cn.dns.pre.metric.class=com.zdjizhi.dns.pre.DnsPreMetric
+#cn.security.event.class=com.zdjizhi.security.CnSecurityEvent
#cn.dns.error.detection=com.zdjizhi.detection.dns.DnsErrorDetection
#cn.dns.response.detection=com.zdjizhi.detection.dns.DnsResponseTimeDetection
#cn.http.error.detection=com.zdjizhi.detection.dns.HttpErrorDetection \ No newline at end of file