summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-10-20 18:23:12 +0800
committerwanglihui <[email protected]>2021-10-20 18:23:12 +0800
commitbe916531fb412d2596701daed456ce2179eb4776 (patch)
treeb589eaf7d9191ab1f119535ad61f392de2acbe97
parentc692112445428b9e0b26a9cfc6da7352c12973d4 (diff)
修改构建threshold RangeMap逻辑,基于attack type为key,避免IP冲突问题。
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java42
-rw-r--r--src/main/java/com/zdjizhi/etl/EtlProcessFunction.java5
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java58
-rw-r--r--src/main/resources/common.properties6
4 files changed, 48 insertions, 63 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index 2fa3bfa..eace1c3 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -33,7 +33,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
private final static int BASELINE_SIZE = 144;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
- private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
+ private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
@Override
public void open(Configuration parameters) {
@@ -58,14 +58,14 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
- Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
+ DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
- if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && baselineMap.containsKey(destinationIp)) {
+ if (threshold == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value);
- } else if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && !baselineMap.containsKey(destinationIp)) {
+ } else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
- } else if (thresholdMap != null && thresholdMap.containsKey(attackType)) {
- finalResult = getDosEventLogByStaticThreshold(value, thresholdMap);
+ } else if (threshold != null) {
+ finalResult = getDosEventLogByStaticThreshold(value, threshold);
} else {
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
@@ -99,24 +99,18 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return result;
}
- private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
- DosEventLog result = null;
- String attackType = value.getAttack_type();
- long base, diff;
- if (thresholdMap.containsKey(attackType)) {
- DosDetectionThreshold threshold = thresholdMap.get(attackType);
- base = threshold.getSessionsPerSec();
- diff = value.getSketch_sessions() - base;
- result = getDosEventLog(value, base, diff, 1, "sessions");
+ private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
+ long base = threshold.getSessionsPerSec();
+ long diff = value.getSketch_sessions() - base;
+ DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions");
+ if (result == null) {
+ base = threshold.getPacketsPerSec();
+ diff = value.getSketch_packets() - base;
+ result = getDosEventLog(value, base, diff, 1, "packets");
if (result == null) {
- base = threshold.getPacketsPerSec();
- diff = value.getSketch_packets() - base;
- result = getDosEventLog(value, base, diff, 1, "packets");
- if (result == null) {
- base = threshold.getBitsPerSec();
- diff = value.getSketch_bytes() - base;
- result = getDosEventLog(value, base, diff, 1, "bits");
- }
+ base = threshold.getBitsPerSec();
+ diff = value.getSketch_bytes() - base;
+ result = getDosEventLog(value, base, diff, 1, "bits");
}
}
return result;
@@ -134,7 +128,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent, type, tag);
- logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, result);
+ logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
} else {
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
index ecbd8b6..bed7fdb 100644
--- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
@@ -21,6 +21,9 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
+ private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
+ private static final String EMPTY_SOURCE_IP_IPV6 = "::";
+
@Override
public void process(Tuple2<String, String> keys,
Context context, Iterable<DosSketchLog> elements,
@@ -70,7 +73,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
try {
for (DosSketchLog newSketchLog : elements){
String sourceIp = newSketchLog.getSource_ip();
- if ("0.0.0.0".equals(sourceIp) || "::".equals(sourceIp)){
+ if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
sessions += newSketchLog.getSketch_sessions();
packets += newSketchLog.getSketch_packets();
bytes += newSketchLog.getSketch_bytes();
diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
index 2a4a96d..49f4fea 100644
--- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
+++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
@@ -141,19 +141,19 @@ public class ParseStaticThreshold {
*
* @return threshold RangeMap
*/
- static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() {
- TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create();
+ static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
+ HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
+ String attackType = threshold.getAttackType();
+ TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServerIpList();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
if (ipAddressString.isIPAddress()) {
IPAddress address = ipAddressString.getAddress();
- Map<String, DosDetectionThreshold> floodTypeThresholdMap = new HashMap<>();
- floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
if (address.isPrefixed()) {
IPAddress lower = address.getLower();
IPAddress upper = address.getUpper();
@@ -161,40 +161,27 @@ public class ParseStaticThreshold {
lower = address.adjustPrefixLength(address.getBitCount());
upper = address.toMaxHost().withoutPrefixLength();
}
- Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> lowerEntry = thresholdRangeMap.getEntry(lower);
- Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> upperEntry = thresholdRangeMap.getEntry(upper);
+ Map.Entry<Range<IPAddress>, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower);
+ Map.Entry<Range<IPAddress>, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper);
if (lowerEntry != null && upperEntry == null) {
Range<IPAddress> lowerEntryKey = lowerEntry.getKey();
- Map<String, DosDetectionThreshold> lowerEntryValue = lowerEntry.getValue();
- lowerEntryValue.put(threshold.getAttackType(), threshold);
- thresholdRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
- thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
+ DosDetectionThreshold lowerEntryValue = lowerEntry.getValue();
+ treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
} else if (lowerEntry == null && upperEntry != null) {
Range<IPAddress> upperEntryKey = upperEntry.getKey();
- Map<String, DosDetectionThreshold> upperEntryValue = upperEntry.getValue();
- upperEntryValue.put(threshold.getAttackType(), threshold);
- thresholdRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
- thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
+ DosDetectionThreshold upperEntryValue = upperEntry.getValue();
+ treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
} else {
- thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
}
} else {
- Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> entry = thresholdRangeMap.getEntry(address);
- if (entry != null) {
- Range<IPAddress> entryKey = entry.getKey();
- Map<String, DosDetectionThreshold> entryValue = entry.getValue();
- if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()) {
- entryValue.put(threshold.getAttackType(), threshold);
- thresholdRangeMap.put(Range.closed(address, address), entryValue);
- } else {
- thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
- }
- } else {
- thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
- }
+ treeRangeMap.put(Range.closed(address, address), threshold);
}
}
}
+ thresholdRangeMap.put(attackType,treeRangeMap);
}
}
} catch (Exception e) {
@@ -210,16 +197,17 @@ public class ParseStaticThreshold {
System.out.println("------------------------");
- TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
+ HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
System.out.println("------------------------");
- Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
- for (Range<IPAddress> range : rangeMapMap.keySet()) {
- Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
- for (String type : thresholdMap.keySet()) {
- DosDetectionThreshold threshold = thresholdMap.get(type);
- System.out.println(range + "---" + type + "---" + threshold);
+
+ for (String type : staticThreshold.keySet()) {
+ Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
+ for (Range<IPAddress> range : asMapOfRanges.keySet()) {
+ DosDetectionThreshold threshold = asMapOfRanges.get(range);
+ System.out.println(type + "---" + range + "---" + threshold);
}
+ System.out.println("------------------------");
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 97286f7..15cc646 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -60,7 +60,7 @@ flink.detection.map.parallelism=1
flink.watermark.max.orderness=10
#计算窗口大小,默认600s
-flink.window.max.time=60
+flink.window.max.time=600
#dos event结果中distinct source IP限制
source.ip.list.limit=10000
@@ -89,8 +89,8 @@ baseline.sessions.severe.threshold=3
baseline.sessions.critical.threshold=8
#bifang服务访问地址
-#bifang.server.uri=http://192.168.44.72:80
-bifang.server.uri=http://192.168.44.3:80
+bifang.server.uri=http://192.168.44.72:80
+#bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限token,bifang内置,无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867