diff options
| author | wanglihui <[email protected]> | 2022-09-23 18:37:33 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2022-09-23 18:37:33 +0800 |
| commit | 859cd379e552411605e40ddb713f17329d328407 (patch) | |
| tree | a31e1cd08c3f1b66cc14ab8cc8b411b7136b0caf | |
| parent | 47ddef9bca4a4b5ddc55912720166bbbf93fb206 (diff) | |
DoS 检测支持vsys idtsg-22.10
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CommonConfig.java | 11 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosDetectionThreshold.java | 73 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosEventLog.java | 44 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosMetricsLog.java | 33 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosSketchLog.java | 36 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosVsysId.java | 24 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DosDetection.java | 63 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/EtlProcessFunction.java | 9 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/ParseSketchLog.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java | 113 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/sink/OutputStreamSink.java | 13 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 10 |
13 files changed, 206 insertions, 226 deletions
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 61c310b..62c079b 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -1,7 +1,6 @@ package com.zdjizhi.common; import com.zdjizhi.utils.CommonConfigurations; -import com.zdjizhi.utils.NacosUtils; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** @@ -49,17 +48,7 @@ public class CommonConfig { public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path"); -// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold"); -// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold"); -// -// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold"); -// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold"); -// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold"); -// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold"); -// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold"); - public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri"); - public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token"); public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path"); public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path"); public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path"); diff --git a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java index 48820d8..bfcda1d 100644 --- a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java +++ b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java @@ -2,6 +2,7 @@ package com.zdjizhi.common; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Objects; /** @@ -16,44 +17,8 @@ public class DosDetectionThreshold implements Serializable { private long bitsPerSec; private long sessionsPerSec; private int isValid; - - @Override - public String toString() { - return "DosDetectionThreshold{" + - "profileId='" + profileId + '\'' + - ", attackType='" + attackType + '\'' + - ", serverIpList=" + serverIpList + - ", serverIpAddr='" + serverIpAddr + '\'' + - ", packetsPerSec=" + packetsPerSec + - ", bitsPerSec=" + bitsPerSec + - ", sessionsPerSec=" + sessionsPerSec + - ", isValid=" + isValid + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DosDetectionThreshold threshold = (DosDetectionThreshold) o; - return packetsPerSec == threshold.packetsPerSec && - bitsPerSec == threshold.bitsPerSec && - sessionsPerSec == threshold.sessionsPerSec && - isValid == threshold.isValid && - Objects.equals(profileId, threshold.profileId) && - Objects.equals(attackType, threshold.attackType) && - Objects.equals(serverIpList, threshold.serverIpList) && - Objects.equals(serverIpAddr, threshold.serverIpAddr); - } - - @Override - public int hashCode() { - return Objects.hash(profileId, attackType, serverIpList, serverIpAddr, packetsPerSec, bitsPerSec, sessionsPerSec, isValid); - } + private int vsysId; + private Integer[] superiorIds; public String getProfileId() { return profileId; @@ -118,4 +83,36 @@ public class DosDetectionThreshold implements Serializable { public void setIsValid(int isValid) { this.isValid = isValid; } + + public int getVsysId() { + return vsysId; + } + + public void setVsysId(int vsysId) { + this.vsysId = vsysId; + } + + public Integer[] getSuperiorIds() { + return superiorIds; + } + + public void setSuperiorIds(Integer[] superiorIds) { + this.superiorIds = superiorIds; + } + + @Override + public String toString() { + return "DosDetectionThreshold{" + + "profileId='" + profileId + '\'' + + ", attackType='" + attackType + '\'' + + ", serverIpList=" + serverIpList + + ", serverIpAddr='" + serverIpAddr + '\'' + + ", packetsPerSec=" + packetsPerSec + + ", bitsPerSec=" + bitsPerSec + + ", sessionsPerSec=" + sessionsPerSec + + ", isValid=" + isValid + + ", vsysId=" + vsysId + + ", superiorIds=" + Arrays.toString(superiorIds) + + '}'; + } } diff --git a/src/main/java/com/zdjizhi/common/DosEventLog.java b/src/main/java/com/zdjizhi/common/DosEventLog.java index c5ad395..c901076 100644 --- a/src/main/java/com/zdjizhi/common/DosEventLog.java +++ b/src/main/java/com/zdjizhi/common/DosEventLog.java @@ -1,12 +1,11 @@ package com.zdjizhi.common; import java.io.Serializable; -import java.util.Objects; -public class DosEventLog implements Serializable { +public class DosEventLog implements Serializable,Cloneable { private long log_id; - private int common_vsys_id; + private int vsys_id; private long start_time; private long end_time; private String attack_type; @@ -28,12 +27,12 @@ public class DosEventLog implements Serializable { this.log_id = log_id; } - public int getCommon_vsys_id() { - return common_vsys_id; + public int getVsys_id() { + return vsys_id; } - public void setCommon_vsys_id(int common_vsys_id) { - this.common_vsys_id = common_vsys_id; + public void setVsys_id(int vsys_id) { + this.vsys_id = vsys_id; } public long getStart_time() { @@ -136,7 +135,7 @@ public class DosEventLog implements Serializable { public String toString() { return "DosEventLog{" + "log_id=" + log_id + - ", common_vsys_id=" + common_vsys_id + + ", vsys_id=" + vsys_id + ", start_time=" + start_time + ", end_time=" + end_time + ", attack_type='" + attack_type + '\'' + @@ -153,32 +152,7 @@ public class DosEventLog implements Serializable { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof DosEventLog)) { - return false; - } - DosEventLog that = (DosEventLog) o; - return getLog_id() == that.getLog_id() && - getCommon_vsys_id() == that.getCommon_vsys_id() && - getStart_time() == that.getStart_time() && - getEnd_time() == that.getEnd_time() && - getSession_rate() == that.getSession_rate() && - getPacket_rate() == that.getPacket_rate() && - getBit_rate() == that.getBit_rate() && - Objects.equals(getAttack_type(), that.getAttack_type()) && - Objects.equals(getSeverity(), that.getSeverity()) && - Objects.equals(getConditions(), that.getConditions()) && - Objects.equals(getDestination_ip(), that.getDestination_ip()) && - Objects.equals(getDestination_country(), that.getDestination_country()) && - Objects.equals(getSource_ip_list(), that.getSource_ip_list()) && - Objects.equals(getSource_country_list(), that.getSource_country_list()); - } - - @Override - public int hashCode() { - return Objects.hash(getLog_id(), getCommon_vsys_id(), getStart_time(), getEnd_time(), getAttack_type(), getSeverity(), getConditions(), getDestination_ip(), getDestination_country(), getSource_ip_list(), getSource_country_list(), getSession_rate(), getPacket_rate(), getBit_rate()); + public Object clone() throws CloneNotSupportedException { + return super.clone(); } } diff --git a/src/main/java/com/zdjizhi/common/DosMetricsLog.java b/src/main/java/com/zdjizhi/common/DosMetricsLog.java index 495b9c4..72d4a2f 100644 --- a/src/main/java/com/zdjizhi/common/DosMetricsLog.java +++ b/src/main/java/com/zdjizhi/common/DosMetricsLog.java @@ -12,6 +12,7 @@ public class DosMetricsLog implements Serializable { private long packet_rate; private long bit_rate; private int partition_num; + private int vsys_id; public int getPartition_num() { return partition_num; @@ -69,6 +70,14 @@ public class DosMetricsLog implements Serializable { this.bit_rate = bit_rate; } + public int getVsys_id() { + return vsys_id; + } + + public void setVsys_id(int vsys_id) { + this.vsys_id = vsys_id; + } + @Override public String toString() { return "DosMetricsLog{" + @@ -79,29 +88,7 @@ public class DosMetricsLog implements Serializable { ", packet_rate=" + packet_rate + ", bit_rate=" + bit_rate + ", partition_num=" + partition_num + + ", vsys_id=" + vsys_id + '}'; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof DosMetricsLog)) { - return false; - } - DosMetricsLog that = (DosMetricsLog) o; - return getSketch_start_time() == that.getSketch_start_time() && - getSession_rate() == that.getSession_rate() && - getPacket_rate() == that.getPacket_rate() && - getBit_rate() == that.getBit_rate() && - getPartition_num() == that.getPartition_num() && - Objects.equals(getAttack_type(), that.getAttack_type()) && - Objects.equals(getDestination_ip(), that.getDestination_ip()); - } - - @Override - public int hashCode() { - return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num()); - } } diff --git a/src/main/java/com/zdjizhi/common/DosSketchLog.java b/src/main/java/com/zdjizhi/common/DosSketchLog.java index 2980dad..d495eef 100644 --- a/src/main/java/com/zdjizhi/common/DosSketchLog.java +++ b/src/main/java/com/zdjizhi/common/DosSketchLog.java @@ -15,6 +15,7 @@ public class DosSketchLog implements Serializable { private long sketch_sessions; private long sketch_packets; private long sketch_bytes; + private int vsys_id; @Override public String toString() { @@ -29,35 +30,10 @@ public class DosSketchLog implements Serializable { ", sketch_sessions=" + sketch_sessions + ", sketch_packets=" + sketch_packets + ", sketch_bytes=" + sketch_bytes + + ", vsys_id=" + vsys_id + '}'; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof DosSketchLog)) { - return false; - } - DosSketchLog sketchLog = (DosSketchLog) o; - return getSketch_start_time() == sketchLog.getSketch_start_time() && - getSketch_duration() == sketchLog.getSketch_duration() && - getSketch_sessions() == sketchLog.getSketch_sessions() && - getSketch_packets() == sketchLog.getSketch_packets() && - getSketch_bytes() == sketchLog.getSketch_bytes() && - Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) && - Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) && - Objects.equals(getAttack_type(), sketchLog.getAttack_type()) && - Objects.equals(getSource_ip(), sketchLog.getSource_ip()) && - Objects.equals(getDestination_ip(), sketchLog.getDestination_ip()); - } - - @Override - public int hashCode() { - return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes()); - } - public String getCommon_sled_ip() { return common_sled_ip; } @@ -137,4 +113,12 @@ public class DosSketchLog implements Serializable { public void setSketch_bytes(long sketch_bytes) { this.sketch_bytes = sketch_bytes; } + + public int getVsys_id() { + return vsys_id; + } + + public void setVsys_id(int vsys_id) { + this.vsys_id = vsys_id; + } } diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java index 27c0eaf..0369f69 100644 --- a/src/main/java/com/zdjizhi/common/DosVsysId.java +++ b/src/main/java/com/zdjizhi/common/DosVsysId.java @@ -1,22 +1,32 @@ package com.zdjizhi.common; -import java.util.Objects; +import java.util.Arrays; public class DosVsysId { - private int vsysId; + private Integer id; + private Integer[] superiorIds; - public int getVsysId() { - return vsysId; + public Integer getId() { + return id; } - public void setVsysId(int vsysId) { - this.vsysId = vsysId; + public void setId(Integer id) { + this.id = id; + } + + public Integer[] getSuperiorIds() { + return superiorIds; + } + + public void setSuperiorIds(Integer[] superiorIds) { + this.superiorIds = superiorIds; } @Override public String toString() { return "DosVsysId{" + - "vsysId=" + vsysId + + "id=" + id + + ", superiorIds=" + Arrays.toString(superiorIds) + '}'; } } diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 8221b81..1f409a0 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -7,9 +7,10 @@ import inet.ipaddr.IPAddressString; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,12 +24,12 @@ import java.util.concurrent.TimeUnit; /** * @author wlh */ -public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { +public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> { private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); - private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap; + private HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap; private final static int BASELINE_SIZE = 144; private final static int STATIC_CONDITION_TYPE = 1; @@ -58,28 +59,40 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { } @Override - public DosEventLog map(DosSketchLog value) { - DosEventLog finalResult = null; + public void processElement(DosSketchLog value, Context ctx, Collector<DosEventLog> out) { + ArrayList<DosEventLog> finalResults = new ArrayList<>(); try { String destinationIp = value.getDestination_ip(); + int vsysId = value.getVsys_id(); + String key = destinationIp + "-" + vsysId; String attackType = value.getAttack_type(); IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); - DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress); - logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if (threshold == null && baselineMap.containsKey(destinationIp)) { - finalResult = getDosEventLogByBaseline(value); - } else if (threshold == null && !baselineMap.containsKey(destinationIp)) { - finalResult = getDosEventLogBySensitivityThreshold(value); + + DosDetectionThreshold threshold = null; + if (thresholdRangeMap.containsKey(vsysId)){ + threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress); + } + + logger.debug("当前判断IP:{}, 类型: {}", key, attackType); + if (threshold == null && baselineMap.containsKey(key)) { + DosEventLog finalResult = getDosEventLogByBaseline(value,key); + finalResults.add(finalResult); + } else if (threshold == null && !baselineMap.containsKey(key)) { + DosEventLog finalResult = getDosEventLogBySensitivityThreshold(value); + finalResults.add(finalResult); } else if (threshold != null) { - finalResult = getDosEventLogByStaticThreshold(value, threshold); + finalResults = getDosEventLogByStaticThreshold(value, threshold); } else { - logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); + logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType); } } catch (Exception e) { logger.error("判定失败\n {} \n{}", value, e); } - return finalResult; + + for (DosEventLog dosEventLog:finalResults){ + out.collect(dosEventLog); + } } private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { @@ -93,13 +106,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { return result; } - private DosEventLog getDosEventLogByBaseline(DosSketchLog value) { + private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) { DosEventLog result = null; - String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) { - DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType); + DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); Integer base = getBaseValue(dosBaselineThreshold, value); long diff = sketchSessions - base; result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG); @@ -107,7 +119,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { return result; } - private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) { + private ArrayList<DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException { long base = threshold.getSessionsPerSec(); long diff = value.getSketch_sessions() - base; DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG); @@ -121,7 +133,18 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG); } } - return result; + ArrayList<DosEventLog> dosEventLogs = new ArrayList<>(); + dosEventLogs.add(result); + Integer[] superiorIds = threshold.getSuperiorIds(); + if (superiorIds != null && superiorIds.length > 0){ + for (Integer integer:superiorIds){ + DosEventLog clone = (DosEventLog) result.clone(); + clone.setVsys_id(integer); + clone.setLog_id(SnowflakeId.generateId()); + dosEventLogs.add(clone); + } + } + return dosEventLogs; } private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) { @@ -148,7 +171,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); - dosEventLog.setCommon_vsys_id(1); + dosEventLog.setVsys_id(value.getVsys_id()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); dosEventLog.setAttack_type(value.getAttack_type()); diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index bed7fdb..7857b6f 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -3,7 +3,7 @@ package com.zdjizhi.etl; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; import org.apache.commons.lang.StringUtils; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -18,14 +18,14 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag; /** * @author 94976 */ -public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> { +public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String,String,Integer>, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; private static final String EMPTY_SOURCE_IP_IPV6 = "::"; @Override - public void process(Tuple2<String, String> keys, + public void process(Tuple3<String,String,Integer> keys, Context context, Iterable<DosSketchLog> elements, Collector<DosSketchLog> out) { DosSketchLog middleResult = getMiddleResult(keys, elements); @@ -40,7 +40,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS } } - private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){ + private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){ DosSketchLog midResuleLog = new DosSketchLog(); Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements); @@ -48,6 +48,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS if (values != null){ midResuleLog.setAttack_type(keys.f0); midResuleLog.setDestination_ip(keys.f1); + midResuleLog.setVsys_id(keys.f2); midResuleLog.setSketch_start_time(values.f4); midResuleLog.setSketch_duration(values.f5); midResuleLog.setSource_ip(values.f3); diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index d30d1f0..d97ff43 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -52,12 +52,14 @@ public class ParseSketchLog { long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); + int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); for (HashMap<String, Object> obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); + dosSketchLog.setVsys_id(vsysId); String sourceIp = obj.get("source_ip").toString(); String destinationIp = obj.get("destination_ip").toString(); long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 72b0647..f9c1a39 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -113,6 +113,7 @@ public class ParseStaticThreshold { HashMap<String, Object> parms = new HashMap<>(); parms.put("pageSize", -1); parms.put("orderBy", "vsysId desc"); + parms.put("type", 1); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); String token = NacosUtils.getStringProperty("bifang.server.token"); if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { @@ -128,7 +129,7 @@ public class ParseStaticThreshold { Object list = data.get("list"); if (list != null) { vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType); - logger.info("获取到vsysId{}条", vsysIdList.size()); + logger.info("获取到vsysId {}条", vsysIdList.size()); } else { logger.warn("vsysIdList为空"); } @@ -149,49 +150,54 @@ public class ParseStaticThreshold { * @return thresholds */ private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() { - ArrayList<DosDetectionThreshold> thresholds = null; -// ArrayList<DosVsysId> vsysId = getVsysId(); + ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>(); + ArrayList<DosVsysId> vsysIds = getVsysId(); try { -// if (vsysId != null){ -// for (DosVsysId dosVsysId : vsysId) { - URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HashMap<String, Object> parms = new HashMap<>(); - parms.put("pageSize", -1); - parms.put("orderBy", "profileId asc"); - parms.put("isValid", 1); -// parms.put("vsysId", dosVsysId.getVsysId()); - parms.put("vsysId", 1); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); - String token = NacosUtils.getStringProperty("bifang.server.token"); - if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { - BasicHeader authorization = new BasicHeader("Authorization", token); - BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { - HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); - boolean success = (boolean) resposeMap.get("success"); - String msg = resposeMap.get("msg").toString(); - if (success) { - HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); - Object list = data.get("list"); - if (list != null) { - thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); - logger.info("获取到静态阈值配置{}条", thresholds.size()); - } else { - logger.warn("静态阈值配置为空"); + if (vsysIds != null) { + for (DosVsysId dosVsysId : vsysIds) { + Integer vsysId = dosVsysId.getId(); + Integer[] superiorIds = dosVsysId.getSuperiorIds(); + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HashMap<String, Object> parms = new HashMap<>(); + parms.put("pageSize", -1); + parms.put("orderBy", "profileId asc"); + parms.put("isValid", 1); + parms.put("vsysId", vsysId); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + String token = NacosUtils.getStringProperty("bifang.server.token"); + if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + BasicHeader authorization = new BasicHeader("Authorization", token); + BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + Object list = data.get("list"); + if (list != null) { + ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); + for (DosDetectionThreshold dosDetectionThreshold:thresholds){ + dosDetectionThreshold.setSuperiorIds(superiorIds); + vsysThresholds.add(dosDetectionThreshold); + } + logger.info("获取到vsys id是{}静态阈值配置{}条",vsysId, thresholds.size()); + } else { + logger.warn("静态阈值配置为空"); + } + } else { + logger.error(msg); + } } - } else { - logger.error(msg); } } } -// } -// } } catch (Exception e) { logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } - return thresholds; + return vsysThresholds; } /** @@ -199,14 +205,17 @@ public class ParseStaticThreshold { * * @return threshold RangeMap */ - static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() { - HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4); + static HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() { + HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4); try { ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold(); if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { for (DosDetectionThreshold threshold : dosDetectionThreshold) { String attackType = threshold.getAttackType(); - TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()); + int vsysId = threshold.getVsysId(); + HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>()); + + TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create()); ArrayList<String> serverIpList = threshold.getServerIpList(); for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); @@ -239,7 +248,8 @@ public class ParseStaticThreshold { } } } - thresholdRangeMap.put(attackType, treeRangeMap); + rangeMap.put(attackType, treeRangeMap); + thresholdRangeMap.put(vsysId,rangeMap); } } } catch (Exception e) { @@ -249,22 +259,27 @@ public class ParseStaticThreshold { } public static void main(String[] args) { + /* ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold(); - dosDetectionThreshold.forEach(System.out::println); - - +// dosDetectionThreshold.forEach(System.out::println); + getVsysId().forEach(System.out::println); System.out.println("------------------------"); - HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold(); + */ + HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> staticThreshold = createStaticThreshold(); System.out.println("------------------------"); - for (String type : staticThreshold.keySet()) { - Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges(); - for (Range<IPAddress> range : asMapOfRanges.keySet()) { - DosDetectionThreshold threshold = asMapOfRanges.get(range); - System.out.println(type + "---" + range + "---" + threshold); + for (Integer integer : staticThreshold.keySet()){ + HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> stringTreeRangeMapHashMap = staticThreshold.get(integer); + for (String type : stringTreeRangeMapHashMap.keySet()) { + Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = stringTreeRangeMapHashMap.get(type).asMapOfRanges(); + for (Range<IPAddress> range : asMapOfRanges.keySet()) { + DosDetectionThreshold threshold = asMapOfRanges.get(range); + System.out.println(integer+"---"+type + "---" + range + "---" + threshold); + } + System.out.println("------------------------"); } - System.out.println("------------------------"); + } // String s = loginBifangServer(); // System.out.println(s); diff --git a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java index 2fe8524..d3e8280 100644 --- a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java +++ b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java @@ -19,6 +19,7 @@ class TrafficServerIpMetrics { dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets()); dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes()); + dosMetricsLog.setVsys_id(midResuleLog.getVsys_id()); dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip())); logger.debug("metric 结果已加载:{}",dosMetricsLog.toString()); return dosMetricsLog; diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index 6bac8c8..8b3adc9 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -9,7 +9,7 @@ import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.ParseSketchLog; import com.zdjizhi.utils.FlinkEnvironmentUtils; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -37,7 +37,7 @@ public class OutputStreamSink { } private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){ - return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); + return middleStream.process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); } private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){ @@ -48,12 +48,13 @@ public class OutputStreamSink { .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); } - private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{ + private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{ @Override - public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){ - return Tuple2.of( + public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){ + return Tuple3.of( dosSketchLog.getAttack_type(), - dosSketchLog.getDestination_ip()); + dosSketchLog.getDestination_ip(), + dosSketchLog.getVsys_id()); } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 7defe8a..212b90c 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -51,7 +51,7 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines hbase.baseline.total.num=1000000 #baseline ttl,单位:天 -hbase.baseline.ttl=30 +hbase.baseline.ttl=1 #设置聚合并行度,2个key flink.first.agg.parallelism=1 @@ -79,11 +79,8 @@ ip.mmdb.path=D:\\data\\dat\\ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #bifang服务访问地址 -bifang.server.uri=http://192.168.44.72:80 -#bifang.server.uri=http://192.168.44.3:80 - -#访问bifang只读权限token,bifang内置,无需修改 -bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867 +#bifang.server.uri=http://192.168.44.72:80 +bifang.server.uri=http://192.168.44.3:80 #加密密码路径信息 bifang.server.encryptpwd.path=/v1/user/encryptpwd @@ -122,7 +119,6 @@ baseline.threshold.schedule.days=1 #kafka用户认证配置参数 sasl.jaas.config.user=admin #sasl.jaas.config.password=galaxy2019 -#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ) sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ #是否开启kafka用户认证配置,1:是;0:否 |
