summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-08-24 16:35:31 +0800
committerwanglihui <[email protected]>2021-08-24 16:35:31 +0800
commitb4f919647a8e4c7dfbcfeb458005d8953495042c (patch)
tree16a7e8f65135e93fe37694d30bcd0cd3e922e5b7
parent55af33b508c95efa78fa720b7ba3697ca478a01b (diff)
新增根据静态阈值判定dos攻击逻辑
新增定时器,定时获取静态阈值与baseline
-rw-r--r--src/main/java/com/zdjizhi/common/CommonConfig.java3
-rw-r--r--src/main/java/com/zdjizhi/etl/DosDetection.java114
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseSketchLog.java9
-rw-r--r--src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java116
-rw-r--r--src/main/java/com/zdjizhi/utils/HbaseUtils.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtils.java1
-rw-r--r--src/main/resources/common.properties10
7 files changed, 173 insertions, 87 deletions
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index b0b1f97..2da067b 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -58,4 +58,7 @@ public class CommonConfig {
public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout");
public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout");
+ public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
+ public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
+
}
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index f572107..7175ae7 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -10,6 +10,7 @@ import com.zdjizhi.utils.SnowflakeId;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -20,6 +21,9 @@ import org.slf4j.LoggerFactory;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* @author wlh
@@ -27,15 +31,28 @@ import java.util.*;
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
- private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap;
+ private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
private final static int BASELINE_SIZE = 144;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
@Override
public void open(Configuration parameters) {
- baselineMap = HbaseUtils.baselineMap;
- thresholdRangeMap = ParseStaticThreshold.createStaticThreshold();
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
+ new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
+ try {
+ executorService.scheduleAtFixedRate(() -> {
+ //do something
+ thresholdRangeMap = ParseStaticThreshold.createStaticThreshold();
+ }, 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
+
+ executorService.scheduleAtFixedRate(() -> {
+ //do something
+ baselineMap = HbaseUtils.readFromHbase();
+ }, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
+ } catch (Exception e) {
+ logger.error("定时器任务执行失败", e);
+ }
PERCENT_INSTANCE.setMinimumFractionDigits(2);
}
@@ -48,72 +65,90 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
- if (baselineMap.containsKey(destinationIp) && thresholdMap == null) {
- finalResult = getDosEventLogByBaseline(value, destinationIp, attackType);
- } else if (!baselineMap.containsKey(destinationIp) && thresholdMap != null) {
- finalResult = getDosEventLogByStaticThreshold(value,thresholdMap);
- }else if (baselineMap.containsKey(destinationIp) && thresholdMap != null){
- DosEventLog eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType);
- DosEventLog eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap);
- finalResult = mergeFinalResult(eventLogByBaseline,eventLogByStaticThreshold);
- }else {
+ if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) {
+ finalResult = getDosEventLogByBaseline(value, destinationIp, attackType).f1;
+ } else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) {
+ finalResult = getDosEventLogByStaticThreshold(value, thresholdMap).f1;
+ } else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) {
+ Tuple2<Severity, DosEventLog> eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType);
+ Tuple2<Severity, DosEventLog> eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap);
+ finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold);
+ } else {
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
+
} catch (Exception e) {
logger.error("判定失败\n {} \n{}", value, e);
}
return finalResult;
}
- private DosEventLog mergeFinalResult(DosEventLog eventLogByBaseline,DosEventLog eventLogByStaticThreshold){
- return eventLogByStaticThreshold;
+ private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
+ if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) {
+ mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1);
+ logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
+ return eventLogByBaseline.f1;
+ } else {
+ mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1);
+ logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
+ return eventLogByStaticThreshold.f1;
+ }
+ }
+
+ private void mergeCondition(DosEventLog log1, DosEventLog log2) {
+ if (log1 != null && log2 != null) {
+ String conditions1 = log1.getConditions();
+ String conditions2 = log2.getConditions();
+ log1.setConditions(conditions1 + " and " + conditions2);
+ }
}
- private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) throws ParseException {
+ private Tuple2<Severity, DosEventLog> getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) throws ParseException {
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
long diff = value.getSketch_sessions() - base;
- return getDosEventLog(value, base, diff);
+ return getDosEventLog(value, base, diff, "baseline");
}
- private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) throws ParseException {
- DosEventLog result = null;
+ private Tuple2<Severity, DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) throws ParseException {
+ Tuple2<Severity, DosEventLog> result = Tuple2.of(Severity.NORMAL, null);
String attackType = value.getAttack_type();
if (thresholdMap.containsKey(attackType)) {
DosDetectionThreshold threshold = thresholdMap.get(attackType);
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
- result = getDosEventLog(value, base, diff);
+ result = getDosEventLog(value, base, diff, "static");
}
return result;
}
- private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff) throws ParseException {
+ private Tuple2<Severity, DosEventLog> getDosEventLog(DosSketchLog value, long base, long diff, String tag) throws ParseException {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
+ Severity severity = Severity.NORMAL;
if (diff > 0 && base != 0) {
String percent = getDiffPercent(diff, base);
double diffPercentDouble = getDiffPercentDouble(percent);
- Severity severity = judgeSeverity(diffPercentDouble);
+ severity = judgeSeverity(diffPercentDouble);
if (severity != Severity.NORMAL) {
- result = getResult(value, severity, percent);
+ result = getResult(value, severity, percent, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}", destinationIp, attackType, result.toString());
} else {
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
}
}
- return result;
+ return Tuple2.of(severity, result);
}
- private DosEventLog getResult(DosSketchLog value, Severity severity, String percent) {
+ private DosEventLog getResult(DosSketchLog value, Severity severity, String percent, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
dosEventLog.setAttack_type(value.getAttack_type());
- dosEventLog.setSeverity(severity.toString());
- dosEventLog.setConditions(getConditions(percent));
+ dosEventLog.setSeverity(severity.severity);
+ dosEventLog.setConditions(getConditions(percent, value.getSketch_sessions(), tag));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -146,8 +181,15 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return base;
}
- private String getConditions(String percent) {
- return "sessions > " + percent + " of baseline";
+ private String getConditions(String percent, long sessions, String tag) {
+ switch (tag) {
+ case "baseline":
+ return "sessions > " + percent + " of baseline";
+ case "static":
+ return "sessions > " + sessions + " sessions/s";
+ default:
+ return null;
+ }
}
private String getSourceCountryList(String sourceIpList) {
@@ -195,22 +237,24 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
/**
* 判断严重程度枚举类型
*/
- CRITICAL("Critical"),
- SEVERE("Severe"),
- MAJOR("Major"),
- WARNING("Warning"),
- MINOR("Minor"),
- NORMAL("Normal");
+ CRITICAL("Critical", 5),
+ SEVERE("Severe", 4),
+ MAJOR("Major", 3),
+ WARNING("Warning", 2),
+ MINOR("Minor", 1),
+ NORMAL("Normal", 0);
private final String severity;
+ private final int score;
@Override
public String toString() {
return this.severity;
}
- Severity(String severity) {
+ Severity(String severity, int score) {
this.severity = severity;
+ this.score = score;
}
}
diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
index 8560aa2..3059f78 100644
--- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
+++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
@@ -1,5 +1,6 @@
package com.zdjizhi.etl;
+import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.source.DosSketchSource;
@@ -23,6 +24,10 @@ import java.util.HashMap;
public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
+ private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
+ private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
+ private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
+
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
@@ -43,11 +48,11 @@ public class ParseSketchLog {
public void flatMap(String s, Collector<DosSketchLog> collector) {
try {
if (StringUtil.isNotBlank(s)){
- HashMap<String, Object> sketchSource = (HashMap<String, Object>) JsonMapper.fromJsonString(s, Object.class);
+ HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
- ArrayList<HashMap<String, Object>> reportIpList = (ArrayList<HashMap<String, Object>>) sketchSource.get("report_ip_list");
+ ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setSketch_start_time(sketchStartTime);
diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
index 1a246f7..3c2d1b4 100644
--- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
+++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java
@@ -18,7 +18,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
/**
* @author wlh
@@ -39,116 +38,143 @@ public class ParseStaticThreshold {
/**
* 获取加密密码
*/
- private static String getEncryptpwd(){
+ private static String getEncryptpwd() {
String psw = HttpClientUtils.ERROR_MESSAGE;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, String> parms = new HashMap<>();
- parms.put("password",CommonConfig.BIFANG_SERVER_PASSWORD);
- HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH,parms);
+ parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
- if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
- boolean success = (boolean)resposeMap.get("success");
- if (success){
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
psw = data.get("encryptpwd").toString();
+ } else {
+ logger.error(msg);
}
}
- }catch (URISyntaxException e){
- logger.error("构造URI异常",e);
- }catch (Exception e){
- logger.error("获取encryptpwd失败",e);
+ } catch (URISyntaxException e) {
+ logger.error("构造URI异常", e);
+ } catch (Exception e) {
+ logger.error("获取encryptpwd失败", e);
}
return psw;
}
/**
* 登录bifang服务,获取token
+ *
* @return token
*/
- private static String loginBifangServer(){
+ private static String loginBifangServer() {
String token = HttpClientUtils.ERROR_MESSAGE;
try {
- if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)){
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, String> parms = new HashMap<>();
- parms.put("username",CommonConfig.BIFANG_SERVER_USER);
- parms.put("password",encryptpwd);
- HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_LOGIN_PATH,parms);
+ parms.put("username", CommonConfig.BIFANG_SERVER_USER);
+ parms.put("password", encryptpwd);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
- if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
- boolean success = (boolean)resposeMap.get("success");
- if (success){
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
token = data.get("token").toString();
+ } else {
+ logger.error(msg);
}
}
}
- }catch (Exception e){
- logger.error("登录失败,未获取到token ",e);
+ } catch (Exception e) {
+ logger.error("登录失败,未获取到token ", e);
}
return token;
}
/**
* 获取静态阈值配置列表
+ *
* @return thresholds
*/
- private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold(){
+ private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
- HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH,null);
+ HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null);
String token = loginBifangServer();
- if (!HttpClientUtils.ERROR_MESSAGE.equals(token)){
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization);
- if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
+ if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
- boolean success = (boolean)resposeMap.get("success");
- if (success){
+ boolean success = (boolean) resposeMap.get("success");
+ String msg = resposeMap.get("msg").toString();
+ if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(data.get("list")), thresholdType);
- logger.info("获取到静态阈值配置{}条",thresholds.size());
+ logger.info("获取到静态阈值配置{}条", thresholds.size());
+ } else {
+ logger.error(msg);
}
}
}
- }catch (Exception e){
- logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ",e);
+ } catch (Exception e) {
+ logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
return thresholds;
}
/**
* 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息
+ *
* @return threshold RangeMap
*/
- public static TreeRangeMap<IPAddress, Map<String,DosDetectionThreshold>> createStaticThreshold(){
- TreeRangeMap<IPAddress, Map<String,DosDetectionThreshold>> thresholdRangeMap = null;
+ static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() {
+ TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create();
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
- if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()){
- thresholdRangeMap = TreeRangeMap.create();
- for (DosDetectionThreshold threshold:dosDetectionThreshold){
+ if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
+ for (DosDetectionThreshold threshold : dosDetectionThreshold) {
+ String attackType = threshold.getAttackType();
+ switch (attackType) {
+ case "tcp_syn_flood":
+ threshold.setAttackType("TCP SYN Flood");
+ break;
+ case "udp_flood":
+ threshold.setAttackType("UDP Flood");
+ break;
+ case "icmp_flood":
+ threshold.setAttackType("ICMP Flood");
+ break;
+ case "dns_amplification":
+ threshold.setAttackType("DNS Amplification");
+ break;
+ default:
+ }
ArrayList<String> serverIpList = threshold.getServerIpList();
- for (String sip:serverIpList){
+ for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
- if (ipAddressString.isIPAddress()){
+ if (ipAddressString.isIPAddress()) {
IPAddress address = ipAddressString.getAddress();
Map<String, DosDetectionThreshold> floodTypeThresholdMap = thresholdRangeMap.get(address);
- if (floodTypeThresholdMap == null){
+ if (floodTypeThresholdMap == null) {
floodTypeThresholdMap = new HashMap<>();
}
- floodTypeThresholdMap.put(threshold.getAttackType(),threshold);
- thresholdRangeMap.put(Range.closed(address.getLower(),address.getUpper()),floodTypeThresholdMap);
+ floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
+ thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap);
}
}
}
}
- }catch (Exception e){
- logger.error("构建threshold RangeMap失败",e);
+ } catch (Exception e) {
+ logger.error("构建threshold RangeMap失败", e);
}
return thresholdRangeMap;
}
@@ -156,11 +182,11 @@ public class ParseStaticThreshold {
public static void main(String[] args) {
TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
- for (Range<IPAddress> range:rangeMapMap.keySet()){
+ for (Range<IPAddress> range : rangeMapMap.keySet()) {
Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
- for (String type:thresholdMap.keySet()){
+ for (String type : thresholdMap.keySet()) {
DosDetectionThreshold threshold = thresholdMap.get(type);
- System.out.println(range+"---"+type+"---"+threshold);
+ System.out.println(range + "---" + type + "---" + threshold);
}
}
}
diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java
index bf9ced5..c147e61 100644
--- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java
@@ -25,7 +25,6 @@ public class HbaseUtils {
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
private static Table table = null;
private static Scan scan = null;
- public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
private static ArrayList<String> floodTypeList = new ArrayList<>();
static {
@@ -33,7 +32,6 @@ public class HbaseUtils {
floodTypeList.add("UDP Flood");
floodTypeList.add("ICMP Flood");
floodTypeList.add("DNS Amplification");
- readFromHbase();
}
private static void prepareHbaseEnv() throws IOException {
@@ -54,6 +52,7 @@ public class HbaseUtils {
}
public static void main(String[] args) {
+ Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {
Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key);
@@ -66,7 +65,8 @@ public class HbaseUtils {
System.out.println(baselineMap.size());
}
- private static void readFromHbase() {
+ public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() {
+ Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
try {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
@@ -87,6 +87,7 @@ public class HbaseUtils {
} catch (Exception e) {
logger.error("读取hbase数据失败", e);
}
+ return baselineMap;
}
private static Integer getDefaultValue(Result result, String family, String qualifier) {
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java
index d358300..6a9af77 100644
--- a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java
@@ -32,6 +32,7 @@ import java.util.Map;
/**
* http client工具类
+ * @author wlh
*/
public class HttpClientUtils {
/** 全局连接池对象 */
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index cde96d8..82cea3e 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-LOG
kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#读取kafka group id
-kafka.input.group.id=2108161121
+kafka.input.group.id=2108231709
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
@@ -112,4 +112,10 @@ http.pool.request.timeout=60000
http.pool.connect.timeout=60000
#服务端响应超时时间设置(单位:毫秒)
-http.pool.response.timeout=60000 \ No newline at end of file
+http.pool.response.timeout=60000
+
+#获取静态阈值周期,默认十分钟
+static.threshold.schedule.minutes=10
+
+#获取baseline周期,默认7天
+baseline.threshold.schedule.days=7 \ No newline at end of file