diff options
| author | wanglihui <[email protected]> | 2023-01-03 11:26:49 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2023-01-03 11:26:49 +0800 |
| commit | cc9a4f6835e5f9f02df07cdad3fcc01c20b4560a (patch) | |
| tree | adbbb494e9c57132823c9301eeb7cf93273eb438 | |
| parent | 40cc688d305ae8d4e0341d0532e6f7ad5183dbb1 (diff) | |
CN-845 实时检测增加静态阈值判定条件22.11
9 files changed, 50 insertions, 113 deletions
diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java index b24076f..16f81d3 100644 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java +++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/common/CommonConfig.java @@ -48,6 +48,10 @@ public class CommonConfig { public static final double HTTP_ERROR_DETECTION_HIGH_THRESHOLD = configurations.getDoubleProperty("http.error.detection.high.threshold"); public static final double HTTP_ERROR_DETECTION_CRITICAL_THRESHOLD = configurations.getDoubleProperty("http.error.detection.critical.threshold"); + public static final int DNS_ERROR_DETECTION_THRESHOLD = configurations.getIntProperty("dns.error.detection.threshold"); + public static final int DNS_RESPONSE_DETECTION_THRESHOLD = configurations.getIntProperty("dns.response.detection.threshold"); + public static final int HTTP_ERROR_DETECTION_THRESHOLD = configurations.getIntProperty("http.error.detection.threshold"); + public static final int BULK_FLUSH_MAX_ACTION = configurations.getIntProperty("bulk.flush.max.action"); public static final int ES_RETRY_CONFLICT_NUM = configurations.getIntProperty("es.retry.conflict.num"); public static final String ES_PERFORMANCE_TABLE_NAME = configurations.getStringProperty("es.performance.table.name"); diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java deleted file mode 100644 index 94b2a89..0000000 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/BroadcastHandler.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.zdjizhi.detection.dns.handler; - -import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.detection.dns.common.CommonConfig; -import com.zdjizhi.detection.dns.operator.BroadcastProcess; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.BroadcastStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; - -public class BroadcastHandler { - - public static BroadcastStream<Tuple2<Long,Long>> getBroadcast(SingleOutputStreamOperator<CnRecordLog> cnRecordSource){ - - SingleOutputStreamOperator<Tuple2<Long, Long>> broadcastNum = cnRecordSource -// .keyBy(log -> DateUtils.getTimeFloor(new Date(log.getCommon_recv_time() * 1000L), "PT10S").getTime() / 1000) - .keyBy(log -> log.getCommon_recv_time() % CommonConfig.PERFORMANCE_DETECTION_PARALLELISM) - .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) - .process(new BroadcastProcess()) - .setParallelism(CommonConfig.PERFORMANCE_DETECTION_PARALLELISM) - .windowAll(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) - .reduce((ReduceFunction<Tuple2<Long, Long>>) (value1, value2) -> Tuple2.of(value1.f0 + value2.f0, value1.f1 + value2.f1)); - - broadcastNum.print(); - - MapStateDescriptor<Void, Tuple2<Integer,Integer>> bcStateDescriptor = - new MapStateDescriptor<>("broadcast", Types.VOID, Types.TUPLE(Types.LONG,Types.LONG)); - - return broadcastNum.broadcast(bcStateDescriptor); - } - - public static void main(String[] args) { - long x = 20L % 2; - System.out.println(x); - } - -} diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/DetectionEventLogHandler.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/DetectionEventLogHandler.java index aded7a8..525ad50 100644 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/DetectionEventLogHandler.java +++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/handler/DetectionEventLogHandler.java @@ -108,8 +108,8 @@ public class DetectionEventLogHandler { } public static void main(String[] args) { - System.out.println(IPUtils.ipLookup.asnLookup("218.202.147.134")); - System.out.println(IPUtils.ipLookup.asnLookup("192.168.56.27")); +// System.out.println(IPUtils.ipLookup.asnLookup("218.202.147.134")); +// System.out.println(IPUtils.ipLookup.asnLookup("192.168.56.27")); } } diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java deleted file mode 100644 index 2902ece..0000000 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/BroadcastProcess.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.zdjizhi.detection.dns.operator; - -import com.zdjizhi.base.common.CnRecordLog; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author wlh - */ -public class BroadcastProcess extends ProcessWindowFunction<CnRecordLog,Tuple2<Long,Long>,Long,TimeWindow> { - - private static Logger LOG = LoggerFactory.getLogger(BroadcastProcess.class); - - @Override - public void process(Long aLong, Context context, Iterable<CnRecordLog> elements, Collector<Tuple2<Long, Long>> out) { - try { - long dnsCnt = 0; - long httpCnt = 0; - for (CnRecordLog ignored : elements) { - if ("DNS".equals(ignored.getCommon_schema_type())) { - dnsCnt++; - } - if ("HTTP".equals(ignored.getCommon_schema_type())){ - httpCnt++; - } - } - out.collect(Tuple2.of(dnsCnt,httpCnt)); - }catch (Exception e){ - LOG.error("计算 broadcast 失败",e); - } - } - -} diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java index 091c46e..4683010 100644 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java +++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsErrorDetectionProcess.java @@ -98,9 +98,9 @@ public class DnsErrorDetectionProcess extends KeyedProcessFunction<String, CnRec PerformanceEventLog eventLog = eventLogIntegerTuple.f0; Double num = eventLogIntegerTuple.f2; Long totalNum = eventLogIntegerTuple.f1; - if (totalNum != null && totalNum > 0) { + if (totalNum != null && totalNum > CommonConfig.DNS_ERROR_DETECTION_THRESHOLD) { double rate = BigDecimal.valueOf(num / totalNum).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); - LOG.debug("dns error detection key:{} num:{},totalNum:{},rate:{}",key,num,totalNum,rate); + LOG.info("dns error detection key:{} num:{},totalNum:{},rate:{}",key,num,totalNum,rate); if (rate >= CommonConfig.DNS_ERROR_DETECTION_INFO_THRESHOLD && rate < CommonConfig.DNS_ERROR_DETECTION_LOW_THRESHOLD) { eventLog.setEvent_severity("info"); } else if (rate >= CommonConfig.DNS_ERROR_DETECTION_LOW_THRESHOLD && rate < CommonConfig.DNS_ERROR_DETECTION_MEDIUM_THRESHOLD) { @@ -118,7 +118,7 @@ public class DnsErrorDetectionProcess extends KeyedProcessFunction<String, CnRec } } } catch (Exception e) { - LOG.error("dns error检测异常,key:{}",e,key); + LOG.error("dns error检测异常,key:{} \n{}",key,e); } return null; } diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java index a982009..ff8ee0b 100644 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java +++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/DnsResponseTimeDetectionProcess.java @@ -38,45 +38,48 @@ public class DnsResponseTimeDetectionProcess extends ProcessWindowFunction<CnRec @Override public void process(String key, Context context, Iterable<CnRecordLog> elements, Collector<PerformanceEventLog> out) { try { - double median = 0d; ArrayList<Long> dnsResponseList = new ArrayList<>(); for (CnRecordLog cnRecordLog : elements) { dnsResponseList.add(cnRecordLog.getDns_response_latency_ms()); } - dnsResponseList.sort(Long::compareTo); int size = dnsResponseList.size(); - if (size > 0) { - median = dnsResponseList.get(size / 2).doubleValue(); - } - double dis = 1e-6; - PerformanceEventLog eventLog = null; - if (Math.abs(median - 0d) > dis) { - if (middleState.contains(key)) { - Tuple2<Double, Long> state = middleState.get(key); - Long cnt = state.f1; - Double threshold = state.f0; - if (cnt < CommonConfig.DNS_RESPONSE_DETECTION_TIME_COUNT) { - threshold = (threshold * cnt + median) / (cnt + 1); - middleState.put(key, Tuple2.of(threshold, ++cnt)); - } else { - double rate = BigDecimal.valueOf(median / threshold).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); - if (rate >= CommonConfig.DNS_RESPONSE_DETECTION_INFO_THRESHOLD) { - eventLog = new PerformanceEventLog(); - CnRecordLog value = elements.iterator().next(); - DetectionEventLogHandler.initDetectionEventLog(eventLog, value, "high dns response time", "ip"); - LOG.debug("high dns response time detection key:{} median:{},threshold:{},cnt:{},rate:{}", key, median, threshold, cnt, rate); - dnsResponseDetectionEventLog(eventLog, rate); - } else { + if (size > CommonConfig.DNS_RESPONSE_DETECTION_MEDIUM_THRESHOLD){ + double median = 0d; + dnsResponseList.sort(Long::compareTo); + if (size > 0) { + median = dnsResponseList.get(size / 2).doubleValue(); + } + double dis = 1e-6; + + PerformanceEventLog eventLog = null; + if (Math.abs(median - 0d) > dis) { + if (middleState.contains(key)) { + Tuple2<Double, Long> state = middleState.get(key); + Long cnt = state.f1; + Double threshold = state.f0; + if (cnt < CommonConfig.DNS_RESPONSE_DETECTION_TIME_COUNT) { threshold = (threshold * cnt + median) / (cnt + 1); middleState.put(key, Tuple2.of(threshold, ++cnt)); + } else { + double rate = BigDecimal.valueOf(median / threshold).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); + if (rate >= CommonConfig.DNS_RESPONSE_DETECTION_INFO_THRESHOLD) { + eventLog = new PerformanceEventLog(); + CnRecordLog value = elements.iterator().next(); + DetectionEventLogHandler.initDetectionEventLog(eventLog, value, "high dns response time", "ip"); + LOG.info("high dns response time detection key:{} median:{},threshold:{},cnt:{},rate:{}", key, median, threshold, cnt, rate); + dnsResponseDetectionEventLog(eventLog, rate); + } else { + threshold = (threshold * cnt + median) / (cnt + 1); + middleState.put(key, Tuple2.of(threshold, ++cnt)); + } } + } else { + middleState.put(key, Tuple2.of(median, 1L)); } - } else { - middleState.put(key, Tuple2.of(median, 1L)); } + DetectionEventLogHandler.outputResult(key, eventLog, previousResultDesc, out); } - DetectionEventLogHandler.outputResult(key, eventLog, previousResultDesc, out); } catch (Exception e) { LOG.error("high dns response time 检测异常,key: {} \n{}", key, e); } diff --git a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java index 23c8e8d..36771bf 100644 --- a/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java +++ b/module-DNS-performance-detection/src/main/java/com/zdjizhi/detection/dns/operator/HttpErrorDetectionProcess.java @@ -96,9 +96,9 @@ public class HttpErrorDetectionProcess extends KeyedProcessFunction<String, CnRe PerformanceEventLog eventLog = eventLogIntegerTuple.f0; Double num = eventLogIntegerTuple.f2; Long totalNum = eventLogIntegerTuple.f1; - if (totalNum != null && totalNum > 0) { + if (totalNum != null && totalNum > CommonConfig.HTTP_ERROR_DETECTION_THRESHOLD) { double rate = BigDecimal.valueOf(num / totalNum).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); - LOG.debug("http error detection key:{} num:{},totalNum:{},rate:{}",currentKey,num,totalNum,rate); + LOG.info("http error detection key:{} num:{},totalNum:{},rate:{}",currentKey,num,totalNum,rate); if (rate >= CommonConfig.HTTP_ERROR_DETECTION_INFO_THRESHOLD && rate < CommonConfig.HTTP_ERROR_DETECTION_LOW_THRESHOLD) { eventLog.setEvent_severity("info"); } else if (rate >= CommonConfig.HTTP_ERROR_DETECTION_LOW_THRESHOLD && rate < CommonConfig.HTTP_ERROR_DETECTION_MEDIUM_THRESHOLD) { @@ -116,7 +116,7 @@ public class HttpErrorDetectionProcess extends KeyedProcessFunction<String, CnRe } } } catch (Exception e) { - LOG.error("http dns error检测异常,key:{}",e,currentKey); + LOG.error("http dns error检测异常,key:{} \n{}",currentKey,e); } return null; } diff --git a/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties b/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties index 74986b2..e150065 100644 --- a/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties +++ b/module-DNS-performance-detection/src/main/resources/DNS-performance-detection.properties @@ -23,6 +23,10 @@ http.error.detection.medium.threshold=0.3 http.error.detection.high.threshold=0.4 http.error.detection.critical.threshold=0.5 +dns.error.detection.threshold=5 +dns.response.detection.threshold=5 +http.error.detection.threshold=5 + #每次写入ES条数 bulk.flush.max.action=1000 es.retry.conflict.num=3 @@ -153,6 +153,10 @@ <artifactId>httpcore</artifactId> <groupId>org.apache.httpcomponents</groupId> </exclusion> + <exclusion> + <artifactId>jdk.tools</artifactId> + <groupId>jdk.tools</groupId> + </exclusion> </exclusions> </dependency> <dependency> |
