diff options
| author | wanglihui <[email protected]> | 2021-09-26 18:41:36 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-09-26 18:41:36 +0800 |
| commit | c44250bf733e7471c8e234a691b4774cf111b139 (patch) | |
| tree | 1ae353afac0c7ec6f83fd0b8660b07f16b8d5b46 | |
| parent | 77bc6a844ee319bc6d8078c64b738a2e18831872 (diff) | |
新增读取DoS Detection Profiles IP冲突检测机制
修复DoS event日志end_time大于当前时间bug
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DosDetection.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/EtlProcessFunction.java | 11 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java | 15 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 2 |
4 files changed, 20 insertions, 10 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 1f471fb..5cbeccf 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -138,7 +138,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setStart_time(value.getSketch_start_time()); - dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME); + dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); dosEventLog.setAttack_type(value.getAttack_type()); dosEventLog.setSeverity(severity.severity); dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag)); diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index df13909..ecbd8b6 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -60,11 +60,11 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS } private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){ - int cnt = 1; long sessions = 0; long packets = 0 ; long bytes = 0; - long startTime = 0; + long startTime = System.currentTimeMillis()/1000; + long endTime = System.currentTimeMillis()/1000; long duration = 0; HashSet<String> sourceIpSet = new HashSet<>(); try { @@ -74,17 +74,16 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS sessions += newSketchLog.getSketch_sessions(); packets += newSketchLog.getSketch_packets(); bytes += newSketchLog.getSketch_bytes(); - startTime = newSketchLog.getSketch_start_time(); - duration = newSketchLog.getSketch_duration(); + startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time(); + endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime; + duration = endTime - startTime == 0 ? 5 : endTime - startTime; }else { if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){ sourceIpSet.add(sourceIp); } } - cnt += 1; } String sourceIpList = StringUtils.join(sourceIpSet, ","); -// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration); return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME, bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); }catch (Exception e){ diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 6ac9fa3..1af9bdb 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -180,7 +179,19 @@ public class ParseStaticThreshold { thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); } }else { - thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); + Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> entry = thresholdRangeMap.getEntry(address); + if (entry != null){ + Range<IPAddress> entryKey = entry.getKey(); + Map<String, DosDetectionThreshold> entryValue = entry.getValue(); + entryValue.put(threshold.getAttackType(), threshold); + if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()){ + thresholdRangeMap.put(Range.closed(address, address), entryValue); + }else { + thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); + } + }else { + thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); + } } } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index ce634ea..2c11628 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -60,7 +60,7 @@ flink.detection.map.parallelism=1 flink.watermark.max.orderness=10 #计算窗口大小,默认600s -flink.window.max.time=10 +flink.window.max.time=600 #dos event结果中distinct source IP限制 source.ip.list.limit=10000 |
