summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-09-26 18:41:36 +0800
committerwanglihui <[email protected]>2021-09-26 18:41:36 +0800
commitc44250bf733e7471c8e234a691b4774cf111b139 (patch)
tree1ae353afac0c7ec6f83fd0b8660b07f16b8d5b46
parent77bc6a844ee319bc6d8078c64b738a2e18831872 (diff)
新增读取DoS Detection Profiles IP冲突检测机制
修复DoS event日志end_time大于当前时间bug
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java2
-rw-r--r--src/main/java/com/zdjizhi/etl/EtlProcessFunction.java11
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java15
-rw-r--r--src/main/resources/common.properties2
4 files changed, 20 insertions, 10 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index 1f471fb..5cbeccf 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -138,7 +138,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
- dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
+ dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag));
diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
index df13909..ecbd8b6 100644
--- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
@@ -60,11 +60,11 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
- int cnt = 1;
long sessions = 0;
long packets = 0 ;
long bytes = 0;
- long startTime = 0;
+ long startTime = System.currentTimeMillis()/1000;
+ long endTime = System.currentTimeMillis()/1000;
long duration = 0;
HashSet<String> sourceIpSet = new HashSet<>();
try {
@@ -74,17 +74,16 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
sessions += newSketchLog.getSketch_sessions();
packets += newSketchLog.getSketch_packets();
bytes += newSketchLog.getSketch_bytes();
- startTime = newSketchLog.getSketch_start_time();
- duration = newSketchLog.getSketch_duration();
+ startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
+ endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
+ duration = endTime - startTime == 0 ? 5 : endTime - startTime;
}else {
if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
sourceIpSet.add(sourceIp);
}
}
- cnt += 1;
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
-// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
}catch (Exception e){
diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
index 6ac9fa3..1af9bdb 100644
--- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
+++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
@@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -180,7 +179,19 @@ public class ParseStaticThreshold {
thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
}
}else {
- thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
+ Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> entry = thresholdRangeMap.getEntry(address);
+ if (entry != null){
+ Range<IPAddress> entryKey = entry.getKey();
+ Map<String, DosDetectionThreshold> entryValue = entry.getValue();
+ entryValue.put(threshold.getAttackType(), threshold);
+ if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()){
+ thresholdRangeMap.put(Range.closed(address, address), entryValue);
+ }else {
+ thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
+ }
+ }else {
+ thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
+ }
}
}
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index ce634ea..2c11628 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -60,7 +60,7 @@ flink.detection.map.parallelism=1
flink.watermark.max.orderness=10
#计算窗口大小,默认600s
-flink.window.max.time=10
+flink.window.max.time=600
#dos event结果中distinct source IP限制
source.ip.list.limit=10000