summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-09-09 09:46:14 +0800
committerwanglihui <[email protected]>2021-09-09 09:46:14 +0800
commitf046d4b0196d964c7f08af1b5eea8ddf055612f1 (patch)
tree6858f1cdaeed0df590abf7978cbb5884ed3bf618
parent4b0ff85e503e34fb617d8f050079832fb5573883 (diff)
新增获取bifang配置作为判定条件
新增敏感阈值过滤报警信息
-rw-r--r--pom.xml6
-rw-r--r--src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java146
-rw-r--r--src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java176
-rw-r--r--src/main/java/com/zdjizhi/common/CommonConfig.java3
-rw-r--r--src/main/java/com/zdjizhi/common/DosDetectionThreshold.java93
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtils.java271
-rw-r--r--src/main/resources/common.properties6
8 files changed, 663 insertions, 40 deletions
diff --git a/pom.xml b/pom.xml
index eaea5f2..abf4677 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,6 +250,12 @@
<version>0.10.3</version>
</dependency>
+ <dependency>
+ <groupId>com.github.seancfoley</groupId>
+ <artifactId>ipaddress</artifactId>
+ <version>5.3.3</version>
+ </dependency>
+
</dependencies>
diff --git a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
index 58ef066..273b561 100644
--- a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
@@ -2,10 +2,14 @@ package com.zdjizhi.bolt;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.google.common.collect.TreeRangeMap;
import com.zdjizhi.common.CommonConfig;
+import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.*;
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
import io.vavr.Tuple2;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -33,6 +37,7 @@ public class DosDetectionBolt extends BaseBasicBolt {
private KafkaUtils kafkaLogSend;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
+ private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
private List<String> list = new LinkedList<>();
@Override
@@ -40,10 +45,11 @@ public class DosDetectionBolt extends BaseBasicBolt {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
- executorService.scheduleAtFixedRate(() -> {
- //do something
- baselineMap = HbaseUtils.readFromHbase();
- }, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
+ executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(),
+ 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
+
+ executorService.scheduleAtFixedRate(() -> baselineMap = HbaseUtils.readFromHbase(),
+ 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("定时器任务执行失败", e);
}
@@ -84,43 +90,99 @@ public class DosDetectionBolt extends BaseBasicBolt {
}
- private DosEventLog dosDetection(DosSketchLog sketchLog) {
+ private DosEventLog dosDetection(DosSketchLog value) {
+ DosEventLog finalResult = null;
try {
- String destinationIp = sketchLog.getDestination_ip();
- String attackType = sketchLog.getAttack_type();
+ String destinationIp = value.getDestination_ip();
+ String attackType = value.getAttack_type();
+ long sketchSessions = value.getSketch_sessions();
+ IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
+ Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
- if (baselineMap.containsKey(destinationIp)) {
- Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
- Integer base = getBaseValue(floodTypeTup, sketchLog);
- long diff = sketchLog.getSketch_sessions() - base;
- if (diff > 0 && base != 0) {
- double percent = getDiffPercent(diff, base);
- Severity severity = judgeSeverity(percent);
- if (severity != Severity.NORMAL) {
- DosEventLog result = getResult(sketchLog, severity, percent);
- logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp, attackType, base, percent, result);
- return result;
- } else {
- logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, sketchLog);
- }
+ if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){
+ if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) {
+ finalResult = getDosEventLogByBaseline(value, destinationIp, attackType)._2;
+ } else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) {
+ finalResult = getDosEventLogByStaticThreshold(value, thresholdMap)._2;
+ } else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) {
+ Tuple2<Severity, DosEventLog> eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType);
+ Tuple2<Severity, DosEventLog> eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap);
+ finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold);
+ } else {
+ logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
- } else {
- logger.debug("未获取到当前server IP:{} 类型 {} baseline数据", destinationIp, attackType);
}
} catch (Exception e) {
- logger.error("判定失败\n {} \n{}", sketchLog, e);
+ logger.error("判定失败\n {} \n{}", value, e);
+ }
+ return finalResult;
+ }
+
+ private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
+ if (eventLogByBaseline._1.score > eventLogByStaticThreshold._1.score) {
+ mergeCondition(eventLogByBaseline._2, eventLogByStaticThreshold._2);
+ logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
+ return eventLogByBaseline._2;
+ } else {
+ mergeCondition(eventLogByStaticThreshold._2, eventLogByBaseline._2);
+ logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
+ return eventLogByStaticThreshold._2;
+ }
+ }
+
+ private void mergeCondition(DosEventLog log1, DosEventLog log2) {
+ if (log1 != null && log2 != null) {
+ String conditions1 = log1.getConditions();
+ String conditions2 = log2.getConditions();
+ log1.setConditions(conditions1 + " and " + conditions2);
+ }
+ }
+
+ private Tuple2<Severity, DosEventLog> getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) {
+ Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
+ Integer base = getBaseValue(floodTypeTup, value);
+ long diff = value.getSketch_sessions() - base;
+ return getDosEventLog(value, base, diff, "baseline");
+ }
+
+ private Tuple2<Severity, DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
+ Tuple2<Severity, DosEventLog> result = io.vavr.Tuple.of(Severity.NORMAL, null);
+ String attackType = value.getAttack_type();
+ if (thresholdMap.containsKey(attackType)) {
+ DosDetectionThreshold threshold = thresholdMap.get(attackType);
+ long base = threshold.getSessionsPerSec();
+ long diff = value.getSketch_sessions() - base;
+ result = getDosEventLog(value, base, diff, "static");
}
- return null;
+ return result;
}
- private DosEventLog getResult(DosSketchLog value, Severity severity, double percent) {
+ private Tuple2<Severity, DosEventLog> getDosEventLog(DosSketchLog value, long base, long diff, String tag) {
+ DosEventLog result = null;
+ String destinationIp = value.getDestination_ip();
+ String attackType = value.getAttack_type();
+ Severity severity = Severity.NORMAL;
+ if (diff > 0 && base != 0) {
+ double percent = getDiffPercent(diff, base);
+ severity = judgeSeverity(percent);
+ if (severity != Severity.NORMAL) {
+ result = getResult(value, severity, percent, tag);
+ logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result);
+ } else {
+ logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
+ }
+ }
+ return io.vavr.Tuple.of(severity, result);
+ }
+
+ private DosEventLog getResult(DosSketchLog value, Severity severity, double percent, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.STORM_WINDOW_MAX_TIME);
dosEventLog.setAttack_type(value.getAttack_type());
- dosEventLog.setSeverity(severity.toString());
- dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent)));
+ dosEventLog.setSeverity(severity.severity);
+ dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), value.getSketch_sessions(), tag));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -153,8 +215,15 @@ public class DosDetectionBolt extends BaseBasicBolt {
return base;
}
- private String getConditions(String percent) {
- return "sessions > " + percent + " of baseline";
+ private String getConditions(String percent, long sessions, String tag) {
+ switch (tag) {
+ case "baseline":
+ return "sessions > " + percent + " of baseline";
+ case "static":
+ return "sessions > " + sessions + " sessions/s";
+ default:
+ return null;
+ }
}
private String getSourceCountryList(String sourceIpList) {
@@ -196,27 +265,28 @@ public class DosDetectionBolt extends BaseBasicBolt {
/**
* 判断严重程度枚举类型
*/
- CRITICAL("Critical"),
- SEVERE("Severe"),
- MAJOR("Major"),
- WARNING("Warning"),
- MINOR("Minor"),
- NORMAL("Normal");
+ CRITICAL("Critical", 5),
+ SEVERE("Severe", 4),
+ MAJOR("Major", 3),
+ WARNING("Warning", 2),
+ MINOR("Minor", 1),
+ NORMAL("Normal", 0);
private final String severity;
+ private final int score;
@Override
public String toString() {
return this.severity;
}
- Severity(String severity) {
+ Severity(String severity, int score) {
this.severity = severity;
+ this.score = score;
}
}
-
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
diff --git a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java
new file mode 100644
index 0000000..afdb66f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java
@@ -0,0 +1,176 @@
+package com.zdjizhi.bolt;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeMap;
+import com.zdjizhi.common.CommonConfig;
+import com.zdjizhi.common.DosDetectionThreshold;
+import com.zdjizhi.utils.HttpClientUtils;
+import com.zdjizhi.utils.JsonMapper;
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.message.BasicHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author wlh
+ */
+public class ParseStaticThreshold {
+ private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
+ private static String encryptpwd;
+
+ private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
+ private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
+ private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
+
+ static {
+ //加载加密登录密码
+ encryptpwd = getEncryptpwd();
+ }
+
+ /**
+ * 获取加密密码
+ */
+ private static String getEncryptpwd() {
+ String psw = HttpClientUtils.ERROR_MESSAGE;
+ try {
+ URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
+ HashMap<String, String> parms = new HashMap<>();
+ parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
+ String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
+ HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
+ HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
+ psw = data.get("encryptpwd").toString();
+ } else {
+ logger.error(msg);
+ }
+ }
+ } catch (URISyntaxException e) {
+ logger.error("构造URI异常", e);
+ } catch (Exception e) {
+ logger.error("获取encryptpwd失败", e);
+ }
+ return psw;
+ }
+
+ /**
+ * 登录bifang服务,获取token
+ * @return token
+ */
+ private static String loginBifangServer() {
+ String token = HttpClientUtils.ERROR_MESSAGE;
+ try {
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) {
+ URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
+ HashMap<String, String> parms = new HashMap<>();
+ parms.put("username", CommonConfig.BIFANG_SERVER_USER);
+ parms.put("password", encryptpwd);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
+ String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
+ HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
+ HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
+ token = data.get("token").toString();
+ } else {
+ logger.error(msg);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("登录失败,未获取到token ", e);
+ }
+ return token;
+ }
+
+ /**
+ * 获取静态阈值配置列表
+ * @return thresholds
+ */
+ private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
+ ArrayList<DosDetectionThreshold> thresholds = null;
+ try {
+ URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null);
+ String token = loginBifangServer();
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
+ BasicHeader authorization = new BasicHeader("Authorization", token);
+ String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization);
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
+ HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
+ HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
+ thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(data.get("list")), thresholdType);
+ logger.info("获取到静态阈值配置{}条", thresholds.size());
+ } else {
+ logger.error(msg);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
+ }
+ return thresholds;
+ }
+
+ /**
+ * 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息
+ * @return threshold RangeMap
+ */
+ static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() {
+ TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create();
+ try {
+ ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
+ if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
+ for (DosDetectionThreshold threshold : dosDetectionThreshold) {
+ ArrayList<String> serverIpList = threshold.getServerIpList();
+ for (String sip : serverIpList) {
+ IPAddressString ipAddressString = new IPAddressString(sip);
+ if (ipAddressString.isIPAddress()) {
+ IPAddress address = ipAddressString.getAddress();
+ Map<String, DosDetectionThreshold> floodTypeThresholdMap = thresholdRangeMap.get(address);
+ if (floodTypeThresholdMap == null) {
+ floodTypeThresholdMap = new HashMap<>();
+ }
+ floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
+ thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("构建threshold RangeMap失败", e);
+ }
+ return thresholdRangeMap;
+ }
+
+ public static void main(String[] args) {
+ TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
+ Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
+ for (Range<IPAddress> range : rangeMapMap.keySet()) {
+ Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
+ for (String type : thresholdMap.keySet()) {
+ DosDetectionThreshold threshold = thresholdMap.get(type);
+ System.out.println(range + "---" + type + "---" + threshold);
+ }
+ }
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index e126224..28e7d09 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -35,6 +35,8 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
+ public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold");
+
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
@@ -42,6 +44,7 @@ public class CommonConfig {
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
+ public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num");
diff --git a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java
new file mode 100644
index 0000000..c67d3a4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java
@@ -0,0 +1,93 @@
+package com.zdjizhi.common;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+public class DosDetectionThreshold implements Serializable {
+ private String profileId;
+ private String attackType;
+ private ArrayList<String> serverIpList;
+ private String serverIpAddr;
+ private long packetsPerSec;
+ private long bitsPerSec;
+ private long sessionsPerSec;
+ private int isValid;
+
+ @Override
+ public String toString() {
+ return "DosDetectionThreshold{" +
+ "profileId='" + profileId + '\'' +
+ ", attackType='" + attackType + '\'' +
+ ", serverIpList=" + serverIpList +
+ ", serverIpAddr='" + serverIpAddr + '\'' +
+ ", packetsPerSec=" + packetsPerSec +
+ ", bitsPerSec=" + bitsPerSec +
+ ", sessionsPerSec=" + sessionsPerSec +
+ ", isValid=" + isValid +
+ '}';
+ }
+
+ public String getProfileId() {
+ return profileId;
+ }
+
+ public void setProfileId(String profileId) {
+ this.profileId = profileId;
+ }
+
+ public String getAttackType() {
+ return attackType;
+ }
+
+ public void setAttackType(String attackType) {
+ this.attackType = attackType;
+ }
+
+ public ArrayList<String> getServerIpList() {
+ return serverIpList;
+ }
+
+ public void setServerIpList(ArrayList<String> serverIpList) {
+ this.serverIpList = serverIpList;
+ }
+
+ public String getServerIpAddr() {
+ return serverIpAddr;
+ }
+
+ public void setServerIpAddr(String serverIpAddr) {
+ this.serverIpAddr = serverIpAddr;
+ }
+
+ public long getPacketsPerSec() {
+ return packetsPerSec;
+ }
+
+ public void setPacketsPerSec(long packetsPerSec) {
+ this.packetsPerSec = packetsPerSec;
+ }
+
+ public long getBitsPerSec() {
+ return bitsPerSec;
+ }
+
+ public void setBitsPerSec(long bitsPerSec) {
+ this.bitsPerSec = bitsPerSec;
+ }
+
+ public long getSessionsPerSec() {
+ return sessionsPerSec;
+ }
+
+ public void setSessionsPerSec(long sessionsPerSec) {
+ this.sessionsPerSec = sessionsPerSec;
+ }
+
+ public int getIsValid() {
+ return isValid;
+ }
+
+ public void setIsValid(int isValid) {
+ this.isValid = isValid;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index d9059d6..4880e7d 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -43,7 +43,6 @@ public class LogFlowWriteTopology {
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
-// topologyConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
}
@@ -52,7 +51,6 @@ public class LogFlowWriteTopology {
topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, CommonConfig.TRANSFER_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, CommonConfig.EXECUTOR_RECEIVE_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, CommonConfig.EXECUTOR_SEND_BUFFER_SIZE);
-// topologyConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java
new file mode 100644
index 0000000..9f65018
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java
@@ -0,0 +1,271 @@
+package com.zdjizhi.utils;
+
+import com.zdjizhi.common.CommonConfig;
+import org.apache.http.*;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicHeaderElementIterator;
+import org.apache.http.protocol.HTTP;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * http client工具类
+ * @author wlh
+ */
+public class HttpClientUtils {
+ /** 全局连接池对象 */
+ private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
+
+ private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
+ public static final String ERROR_MESSAGE = "-1";
+
+ /*
+ * 静态代码块配置连接池信息
+ */
+ static {
+
+ // 设置最大连接数
+ CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
+ // 设置每个连接的路由数
+ CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
+
+ }
+
+ /**
+ * 获取Http客户端连接对象
+ * @return Http客户端连接对象
+ */
+ private static CloseableHttpClient getHttpClient() {
+ // 创建Http请求配置参数
+ RequestConfig requestConfig = RequestConfig.custom()
+ // 获取连接超时时间
+ .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
+ // 请求超时时间
+ .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
+ // 响应超时时间
+ .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
+ .build();
+
+ /*
+ * 测出超时重试机制为了防止超时不生效而设置
+ * 如果直接放回false,不重试
+ * 这里会根据情况进行判断是否重试
+ */
+ HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
+ if (executionCount >= 3) {// 如果已经重试了3次,就放弃
+ return false;
+ }
+ if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
+ return true;
+ }
+ if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
+ return false;
+ }
+ if (exception instanceof UnknownHostException) {// 目标服务器不可达
+ return false;
+ }
+ if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof HttpHostConnectException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof SSLException) {// ssl握手异常
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {// 超时
+ return true;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ // 如果请求是幂等的,就再次尝试
+ return !(request instanceof HttpEntityEnclosingRequest);
+ };
+
+
+ ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
+ HeaderElementIterator it = new BasicHeaderElementIterator
+ (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
+ while (it.hasNext()) {
+ HeaderElement he = it.nextElement();
+ String param = he.getName();
+ String value = he.getValue();
+ if (value != null && "timeout".equalsIgnoreCase(param)) {
+ return Long.parseLong(value) * 1000;
+ }
+ }
+ return 60 * 1000;//如果没有约定,则默认定义时长为60s
+ };
+
+ // 创建httpClient
+ return HttpClients.custom()
+ // 把请求相关的超时信息设置到连接客户端
+ .setDefaultRequestConfig(requestConfig)
+ // 把请求重试设置到连接客户端
+ .setRetryHandler(retry)
+ .setKeepAliveStrategy(myStrategy)
+ // 配置连接池管理对象
+ .setConnectionManager(CONN_MANAGER)
+ .build();
+ }
+
+
+ /**
+ * GET请求
+ *
+ * @param uri 请求地
+ * @return message
+ */
+ public static String httpGet(URI uri, Header... headers) {
+ String msg = ERROR_MESSAGE;
+
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient();
+ CloseableHttpResponse response = null;
+
+ try {
+ logger.info("http get uri {}",uri);
+ // 创建GET请求对象
+ HttpGet httpGet = new HttpGet(uri);
+
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpGet.addHeader(h);
+ logger.info("request header : {}",h);
+ }
+ }
+ // 执行请求
+ response = httpClient.execute(httpGet);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+
+ if (statusCode != HttpStatus.SC_OK) {
+ logger.error("Http get content is :{}" , msg);
+ }
+
+ } catch (ClientProtocolException e) {
+ logger.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ logger.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ logger.error("IO错误: {}",e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ logger.error("释放链接错误: {}", e.getMessage());
+
+ }
+ }
+ }
+
+ return msg;
+ }
+ /**
+ * POST 请求
+ * @param uri uri参数
+ * @param requestBody 请求体
+ * @return post请求返回结果
+ */
+ public static String httpPost(URI uri, String requestBody, Header... headers) {
+ String msg = ERROR_MESSAGE;
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient();
+
+ // 创建POST请求对象
+ CloseableHttpResponse response = null;
+ try {
+
+ logger.info("http post uri:{}, http post body:{}", uri, requestBody);
+
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpPost.addHeader(h);
+ logger.info("request header : {}",h);
+ }
+ }
+
+ if(StringUtil.isNotBlank(requestBody)) {
+ byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
+ httpPost.setEntity(new ByteArrayEntity(bytes));
+ }
+
+ response = httpClient.execute(httpPost);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+
+ if (statusCode != HttpStatus.SC_OK) {
+ logger.error("Http post content is :{}" , msg);
+ }
+ } catch (ClientProtocolException e) {
+ logger.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ logger.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ logger.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ logger.error("释放链接错误: {}", e.getMessage());
+
+ }
+ }
+ }
+ return msg;
+ }
+
+ /**
+ * 拼装url
+ * url ,参数map
+ */
+ public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, String> params) {
+ try {
+ uriBuilder.setPath(path);
+ if (params != null && !params.isEmpty()){
+ for (Map.Entry<String, String> kv : params.entrySet()) {
+ uriBuilder.setParameter(kv.getKey(),kv.getValue());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
+ }
+ }
+
+}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index e84df34..a16eb62 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -59,6 +59,9 @@ data.center.id.num=15
ip.mmdb.path=D:\\data\\dat\\
#ip.mmdb.path=/home/wlh/dos-detection/dat/
+#敏感阈值,速率小于此值不报警
+sensitivity.threshold=100
+
#基于baseline判定dos攻击的上下限
baseline.sessions.minor.threshold=0.1
baseline.sessions.warning.threshold=0.5
@@ -69,6 +72,9 @@ baseline.sessions.critical.threshold=8
#获取baseline周期,默认7天
baseline.threshold.schedule.days=7
+#获取静态阈值周期,默认十分钟
+static.threshold.schedule.minutes=10
+
#基于目的IP的分区数,默认为10000,一般不变
destination.ip.partition.num=10000