summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2023-01-04 09:43:32 +0800
committerwanglihui <[email protected]>2023-01-04 09:43:32 +0800
commit2942357cd9637f8db7f9e86f5b08bd8a9e4f39c6 (patch)
treed56d91adf231de030814de39e04f639e3a003029
parent64bdcac908f0c9b9956b88e9aea4e29ebbcc6d07 (diff)
实时检测增加静态阈值判定条件22.10
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java4
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java41
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java37
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java6
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java56
-rw-r--r--module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java6
-rw-r--r--module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties4
7 files changed, 43 insertions, 111 deletions
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java
index b24076f..16f81d3 100644
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java
+++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java
@@ -48,6 +48,10 @@ public class CommonConfig {
public static final double HTTP_ERROR_DETECTION_HIGH_THRESHOLD = configurations.getDoubleProperty("http.error.detection.high.threshold");
public static final double HTTP_ERROR_DETECTION_CRITICAL_THRESHOLD = configurations.getDoubleProperty("http.error.detection.critical.threshold");
+ public static final int DNS_ERROR_DETECTION_THRESHOLD = configurations.getIntProperty("dns.error.detection.threshold");
+ public static final int DNS_RESPONSE_DETECTION_THRESHOLD = configurations.getIntProperty("dns.response.detection.threshold");
+ public static final int HTTP_ERROR_DETECTION_THRESHOLD = configurations.getIntProperty("http.error.detection.threshold");
+
public static final int BULK_FLUSH_MAX_ACTION = configurations.getIntProperty("bulk.flush.max.action");
public static final int ES_RETRY_CONFLICT_NUM = configurations.getIntProperty("es.retry.conflict.num");
public static final String ES_PERFORMANCE_TABLE_NAME = configurations.getStringProperty("es.performance.table.name");
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java
deleted file mode 100644
index 94b2a89..0000000
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.zdjizhi.detection.dns.handler;
-
-import com.zdjizhi.base.common.CnRecordLog;
-import com.zdjizhi.detection.dns.common.CommonConfig;
-import com.zdjizhi.detection.dns.operator.BroadcastProcess;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.BroadcastStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-
-public class BroadcastHandler {
-
- public static BroadcastStream<Tuple2<Long,Long>> getBroadcast(SingleOutputStreamOperator<CnRecordLog> cnRecordSource){
-
- SingleOutputStreamOperator<Tuple2<Long, Long>> broadcastNum = cnRecordSource
-// .keyBy(log -> DateUtils.getTimeFloor(new Date(log.getCommon_recv_time() * 1000L), "PT10S").getTime() / 1000)
- .keyBy(log -> log.getCommon_recv_time() % CommonConfig.PERFORMANCE_DETECTION_PARALLELISM)
- .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
- .process(new BroadcastProcess())
- .setParallelism(CommonConfig.PERFORMANCE_DETECTION_PARALLELISM)
- .windowAll(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
- .reduce((ReduceFunction<Tuple2<Long, Long>>) (value1, value2) -> Tuple2.of(value1.f0 + value2.f0, value1.f1 + value2.f1));
-
- broadcastNum.print();
-
- MapStateDescriptor<Void, Tuple2<Integer,Integer>> bcStateDescriptor =
- new MapStateDescriptor<>("broadcast", Types.VOID, Types.TUPLE(Types.LONG,Types.LONG));
-
- return broadcastNum.broadcast(bcStateDescriptor);
- }
-
- public static void main(String[] args) {
- long x = 20L % 2;
- System.out.println(x);
- }
-
-}
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java
deleted file mode 100644
index 2902ece..0000000
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.zdjizhi.detection.dns.operator;
-
-import com.zdjizhi.base.common.CnRecordLog;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author wlh
- */
-public class BroadcastProcess extends ProcessWindowFunction<CnRecordLog,Tuple2<Long,Long>,Long,TimeWindow> {
-
- private static Logger LOG = LoggerFactory.getLogger(BroadcastProcess.class);
-
- @Override
- public void process(Long aLong, Context context, Iterable<CnRecordLog> elements, Collector<Tuple2<Long, Long>> out) {
- try {
- long dnsCnt = 0;
- long httpCnt = 0;
- for (CnRecordLog ignored : elements) {
- if ("DNS".equals(ignored.getCommon_schema_type())) {
- dnsCnt++;
- }
- if ("HTTP".equals(ignored.getCommon_schema_type())){
- httpCnt++;
- }
- }
- out.collect(Tuple2.of(dnsCnt,httpCnt));
- }catch (Exception e){
- LOG.error("计算 broadcast 失败",e);
- }
- }
-
-}
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java
index 091c46e..4683010 100644
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java
+++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java
@@ -98,9 +98,9 @@ public class DnsErrorDetectionProcess extends KeyedProcessFunction<String, CnRec
PerformanceEventLog eventLog = eventLogIntegerTuple.f0;
Double num = eventLogIntegerTuple.f2;
Long totalNum = eventLogIntegerTuple.f1;
- if (totalNum != null && totalNum > 0) {
+ if (totalNum != null && totalNum > CommonConfig.DNS_ERROR_DETECTION_THRESHOLD) {
double rate = BigDecimal.valueOf(num / totalNum).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
- LOG.debug("dns error detection key:{} num:{},totalNum:{},rate:{}",key,num,totalNum,rate);
+ LOG.info("dns error detection key:{} num:{},totalNum:{},rate:{}",key,num,totalNum,rate);
if (rate >= CommonConfig.DNS_ERROR_DETECTION_INFO_THRESHOLD && rate < CommonConfig.DNS_ERROR_DETECTION_LOW_THRESHOLD) {
eventLog.setEvent_severity("info");
} else if (rate >= CommonConfig.DNS_ERROR_DETECTION_LOW_THRESHOLD && rate < CommonConfig.DNS_ERROR_DETECTION_MEDIUM_THRESHOLD) {
@@ -118,7 +118,7 @@ public class DnsErrorDetectionProcess extends KeyedProcessFunction<String, CnRec
}
}
} catch (Exception e) {
- LOG.error("dns error检测异常,key:{}",e,key);
+ LOG.error("dns error检测异常,key:{} \n{}",key,e);
}
return null;
}
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java
index a982009..0f0094d 100644
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java
+++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java
@@ -38,45 +38,47 @@ public class DnsResponseTimeDetectionProcess extends ProcessWindowFunction<CnRec
@Override
public void process(String key, Context context, Iterable<CnRecordLog> elements, Collector<PerformanceEventLog> out) {
try {
- double median = 0d;
ArrayList<Long> dnsResponseList = new ArrayList<>();
for (CnRecordLog cnRecordLog : elements) {
dnsResponseList.add(cnRecordLog.getDns_response_latency_ms());
}
- dnsResponseList.sort(Long::compareTo);
int size = dnsResponseList.size();
- if (size > 0) {
- median = dnsResponseList.get(size / 2).doubleValue();
- }
- double dis = 1e-6;
+ if (size > CommonConfig.DNS_RESPONSE_DETECTION_THRESHOLD){
+ double median = 0d;
+ dnsResponseList.sort(Long::compareTo);
+ if (size > 0) {
+ median = dnsResponseList.get(size / 2).doubleValue();
+ }
+ double dis = 1e-6;
- PerformanceEventLog eventLog = null;
- if (Math.abs(median - 0d) > dis) {
- if (middleState.contains(key)) {
- Tuple2<Double, Long> state = middleState.get(key);
- Long cnt = state.f1;
- Double threshold = state.f0;
- if (cnt < CommonConfig.DNS_RESPONSE_DETECTION_TIME_COUNT) {
- threshold = (threshold * cnt + median) / (cnt + 1);
- middleState.put(key, Tuple2.of(threshold, ++cnt));
- } else {
- double rate = BigDecimal.valueOf(median / threshold).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
- if (rate >= CommonConfig.DNS_RESPONSE_DETECTION_INFO_THRESHOLD) {
- eventLog = new PerformanceEventLog();
- CnRecordLog value = elements.iterator().next();
- DetectionEventLogHandler.initDetectionEventLog(eventLog, value, "high dns response time", "ip");
- LOG.debug("high dns response time detection key:{} median:{},threshold:{},cnt:{},rate:{}", key, median, threshold, cnt, rate);
- dnsResponseDetectionEventLog(eventLog, rate);
- } else {
+ PerformanceEventLog eventLog = null;
+ if (Math.abs(median - 0d) > dis) {
+ if (middleState.contains(key)) {
+ Tuple2<Double, Long> state = middleState.get(key);
+ Long cnt = state.f1;
+ Double threshold = state.f0;
+ if (cnt < CommonConfig.DNS_RESPONSE_DETECTION_TIME_COUNT) {
threshold = (threshold * cnt + median) / (cnt + 1);
middleState.put(key, Tuple2.of(threshold, ++cnt));
+ } else {
+ double rate = BigDecimal.valueOf(median / threshold).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
+ if (rate >= CommonConfig.DNS_RESPONSE_DETECTION_INFO_THRESHOLD) {
+ eventLog = new PerformanceEventLog();
+ CnRecordLog value = elements.iterator().next();
+ DetectionEventLogHandler.initDetectionEventLog(eventLog, value, "high dns response time", "ip");
+ LOG.info("high dns response time detection key:{} median:{},threshold:{},cnt:{},rate:{}", key, median, threshold, cnt, rate);
+ dnsResponseDetectionEventLog(eventLog, rate);
+ } else {
+ threshold = (threshold * cnt + median) / (cnt + 1);
+ middleState.put(key, Tuple2.of(threshold, ++cnt));
+ }
}
+ } else {
+ middleState.put(key, Tuple2.of(median, 1L));
}
- } else {
- middleState.put(key, Tuple2.of(median, 1L));
}
+ DetectionEventLogHandler.outputResult(key, eventLog, previousResultDesc, out);
}
- DetectionEventLogHandler.outputResult(key, eventLog, previousResultDesc, out);
} catch (Exception e) {
LOG.error("high dns response time 检测异常,key: {} \n{}", key, e);
}
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java
index 23c8e8d..36771bf 100644
--- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java
+++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java
@@ -96,9 +96,9 @@ public class HttpErrorDetectionProcess extends KeyedProcessFunction<String, CnRe
PerformanceEventLog eventLog = eventLogIntegerTuple.f0;
Double num = eventLogIntegerTuple.f2;
Long totalNum = eventLogIntegerTuple.f1;
- if (totalNum != null && totalNum > 0) {
+ if (totalNum != null && totalNum > CommonConfig.HTTP_ERROR_DETECTION_THRESHOLD) {
double rate = BigDecimal.valueOf(num / totalNum).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
- LOG.debug("http error detection key:{} num:{},totalNum:{},rate:{}",currentKey,num,totalNum,rate);
+ LOG.info("http error detection key:{} num:{},totalNum:{},rate:{}",currentKey,num,totalNum,rate);
if (rate >= CommonConfig.HTTP_ERROR_DETECTION_INFO_THRESHOLD && rate < CommonConfig.HTTP_ERROR_DETECTION_LOW_THRESHOLD) {
eventLog.setEvent_severity("info");
} else if (rate >= CommonConfig.HTTP_ERROR_DETECTION_LOW_THRESHOLD && rate < CommonConfig.HTTP_ERROR_DETECTION_MEDIUM_THRESHOLD) {
@@ -116,7 +116,7 @@ public class HttpErrorDetectionProcess extends KeyedProcessFunction<String, CnRe
}
}
} catch (Exception e) {
- LOG.error("http dns error检测异常,key:{}",e,currentKey);
+ LOG.error("http dns error检测异常,key:{} \n{}",currentKey,e);
}
return null;
}
diff --git a/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties b/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties
index 74986b2..e150065 100644
--- a/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties
+++ b/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties
@@ -23,6 +23,10 @@ http.error.detection.medium.threshold=0.3
http.error.detection.high.threshold=0.4
http.error.detection.critical.threshold=0.5
+dns.error.detection.threshold=5
+dns.response.detection.threshold=5
+http.error.detection.threshold=5
+
#每次写入ES条数
bulk.flush.max.action=1000
es.retry.conflict.num=3