summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java200
-rw-r--r--src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java2
-rw-r--r--src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java61
-rw-r--r--src/main/java/com/zdjizhi/common/DosBaselineThreshold.java43
-rw-r--r--src/main/java/com/zdjizhi/utils/HbaseUtils.java30
-rw-r--r--src/main/resources/common.properties7
7 files changed, 207 insertions, 138 deletions
diff --git a/pom.xml b/pom.xml
index 1439385..abf4677 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
- <scope>provided</scope>
+ <!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
index 3e9bb39..c1d1d4c 100644
--- a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java
@@ -3,15 +3,12 @@ package com.zdjizhi.bolt;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.collect.TreeRangeMap;
-import com.zdjizhi.common.CommonConfig;
-import com.zdjizhi.common.DosDetectionThreshold;
-import com.zdjizhi.common.DosEventLog;
-import com.zdjizhi.common.DosSketchLog;
+import com.zdjizhi.common.*;
import com.zdjizhi.utils.*;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
-import io.vavr.Tuple2;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
@@ -33,13 +30,18 @@ import java.util.concurrent.TimeUnit;
public class DosDetectionBolt extends BaseBasicBolt {
private static final Log logger = LogFactory.get();
- private final static int BASELINE_SIZE = 144;
+
private KafkaUtils kafkaLogSend;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
- private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
- private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
+ private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
+ private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
private List<String> list = new LinkedList<>();
+ private final static int BASELINE_SIZE = 144;
+ private final static int STATIC_CONDITION_TYPE = 1;
+ private final static int BASELINE_CONDITION_TYPE = 2;
+ private final static int SENSITIVITY_CONDITION_TYPE = 3;
+
@Override
public void prepare(Map stormConf, TopologyContext context) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
@@ -79,7 +81,7 @@ public class DosDetectionBolt extends BaseBasicBolt {
String message = JsonMapper.toJsonString(dosEventLog);
if (dosEventLog != null && StringUtil.isNotBlank(message)) {
list.add(message);
- logger.info("dos detection log:{} \n{}",message,dosEventLog);
+ logger.info("dos detection log:{} \n{}", message, dosEventLog);
}
if (list.size() >= CommonConfig.KAFKA_OUTPUT_EVENT_BATCH_NUM) {
kafkaLogSend.sendMessage(list, CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME);
@@ -96,15 +98,15 @@ public class DosDetectionBolt extends BaseBasicBolt {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
- Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
+ DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
- if (thresholdMap == null && baselineMap.containsKey(destinationIp)) {
+ if (threshold == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value, destinationIp, attackType);
- }else if (thresholdMap == null && !baselineMap.containsKey(destinationIp)){
+ } else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
- } else if (thresholdMap != null){
- finalResult = getDosEventLogByStaticThreshold(value, thresholdMap);
- }else {
+ } else if (threshold != null) {
+ finalResult = getDosEventLogByStaticThreshold(value, threshold);
+ } else {
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
} catch (Exception e) {
@@ -113,11 +115,12 @@ public class DosDetectionBolt extends BaseBasicBolt {
return finalResult;
}
- private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value){
+ private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
- if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){
- result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, "sensitivity");
+ if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
+ result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD,
+ sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, 3, "sessions");
result.setSeverity(Severity.MAJOR.severity);
}
return result;
@@ -126,55 +129,60 @@ public class DosDetectionBolt extends BaseBasicBolt {
private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
- if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){
- Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
- Integer base = getBaseValue(floodTypeTup, value);
- result = getDosEventLog(value, base, sketchSessions - base, "baseline");
+ if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
+ DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
+ Integer base = getBaseValue(dosBaselineThreshold, value);
+ result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions");
}
return result;
}
- private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
- DosEventLog result = null;
- String attackType = value.getAttack_type();
- if (thresholdMap.containsKey(attackType)) {
- DosDetectionThreshold threshold = thresholdMap.get(attackType);
- long base = threshold.getSessionsPerSec();
- long diff = value.getSketch_sessions() - base;
- result = getDosEventLog(value, base, diff, "static");
+ private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
+ long base = threshold.getSessionsPerSec();
+ long diff = value.getSketch_sessions() - base;
+ DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions");
+ if (result == null) {
+ base = threshold.getPacketsPerSec();
+ diff = value.getSketch_packets() - base;
+ result = getDosEventLog(value, base, diff, 1, "packets");
+ if (result == null) {
+ base = threshold.getBitsPerSec();
+ diff = value.getSketch_bytes() - base;
+ result = getDosEventLog(value, base, diff, 1, "bits");
+ }
}
return result;
}
- private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, String tag) {
+ private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
- if (severity != Severity.NORMAL && value.getSource_ip() != null) {
- if ("baseline".equals(tag) && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD){
- logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}",destinationIp,attackType,base,percent,value);
- }else {
- result = getResult(value,base, severity, percent, tag);
- logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result);
+ if (severity != Severity.NORMAL) {
+ if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) {
+ logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
+ } else {
+ result = getResult(value, base, severity, percent, type, tag);
+ logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, tag, result);
}
} else {
- logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
+ logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
}
}
return result;
}
- private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, String tag) {
+ private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
- dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.STORM_WINDOW_MAX_TIME);
+ dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
- dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag));
+ dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -186,12 +194,12 @@ public class DosDetectionBolt extends BaseBasicBolt {
return dosEventLog;
}
- private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) {
+ private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) {
Integer base = 0;
try {
- if (floodTypeTup != null) {
- ArrayList<Integer> baselines = floodTypeTup._1;
- Integer defaultVaule = floodTypeTup._2;
+ if (dosBaselineThreshold != null) {
+ ArrayList<Integer> baselines = dosBaselineThreshold.getSession_rate();
+ Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value();
if (baselines != null && baselines.size() == BASELINE_SIZE) {
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
base = baselines.get(timeIndex);
@@ -207,36 +215,64 @@ public class DosDetectionBolt extends BaseBasicBolt {
return base;
}
- private String getConditions(String percent, long base, long sessions, String tag) {
- switch (tag) {
- case "baseline":
- return "sessions > " + percent + " of baseline";
- case "static":
- return "sessions > " + base + " sessions/s";
- case "sensitivity":
- return sessions+" sessions/s Unusually high Sessions";
+ private String getConditions(String percent, long base, long sessions, int type, String tag) {
+ switch (type) {
+ case STATIC_CONDITION_TYPE:
+ return new StrBuilder()
+ .append(tag).append(" > ")
+ .append(base).append(" ")
+ .append(tag).append("/s")
+ .toString();
+ case BASELINE_CONDITION_TYPE:
+ return new StrBuilder()
+ .append(tag).append(" > ")
+ .append(percent).append(" of baseline")
+ .toString();
+ case SENSITIVITY_CONDITION_TYPE:
+ return new StrBuilder()
+ .append(sessions).append(" ")
+ .append(tag).append("/s Unusually high ")
+ .append(StringUtils.capitalize(tag))
+ .toString();
default:
- return null;
+ throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
}
private String getSourceCountryList(String sourceIpList) {
- String[] ipArr = sourceIpList.split(",");
- HashSet<String> countrySet = new HashSet<>();
- for (String ip : ipArr) {
- countrySet.add(IpUtils.ipLookup.countryLookup(ip));
+ if (StringUtil.isNotBlank(sourceIpList)) {
+ String countryList;
+ try {
+ String[] ipArr = sourceIpList.split(",");
+ HashSet<String> countrySet = new HashSet<>();
+ for (String ip : ipArr) {
+ countrySet.add(IpUtils.ipLookup.countryLookup(ip));
+ }
+ countryList = StringUtils.join(countrySet, ",");
+ return countryList;
+ } catch (Exception e) {
+ logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
+ return StringUtil.EMPTY;
+ }
+ } else {
+ throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
}
- return StringUtils.join(countrySet, ",");
}
private int getCurrentTimeIndex(long sketchStartTime) {
- long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime()/1000;
- long indexLong = (sketchStartTime - currentDayTime) / 600;
- return Integer.parseInt(Long.toString(indexLong));
+ int index = 0;
+ try {
+ long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000;
+ long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE);
+ index = Integer.parseInt(Long.toString(indexLong));
+ } catch (Exception e) {
+ logger.error("获取time index失败", e);
+ }
+ return index;
}
private Double getDiffPercent(long diff, long base) {
- return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
+ return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
private Severity judgeSeverity(double diffPercent) {
@@ -259,49 +295,25 @@ public class DosDetectionBolt extends BaseBasicBolt {
/**
* 判断严重程度枚举类型
*/
- CRITICAL("Critical", 5),
- SEVERE("Severe", 4),
- MAJOR("Major", 3),
- WARNING("Warning", 2),
- MINOR("Minor", 1),
- NORMAL("Normal", 0);
+ CRITICAL("Critical"),
+ SEVERE("Severe"),
+ MAJOR("Major"),
+ WARNING("Warning"),
+ MINOR("Minor"),
+ NORMAL("Normal");
private final String severity;
- private final int score;
@Override
public String toString() {
return this.severity;
}
- Severity(String severity, int score) {
+ Severity(String severity) {
this.severity = severity;
- this.score = score;
}
- }
-
- @Deprecated
- private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
- if (eventLogByBaseline._1.score > eventLogByStaticThreshold._1.score) {
- logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
- return mergeCondition(eventLogByBaseline._2, eventLogByStaticThreshold._2);
- } else {
- logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
- return mergeCondition(eventLogByStaticThreshold._2, eventLogByBaseline._2);
- }
- }
- @Deprecated
- private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) {
- if (log1 != null && log2 != null) {
- String conditions1 = log1.getConditions();
- String conditions2 = log2.getConditions();
- log1.setConditions(conditions1 + " and " + conditions2);
- }else if (log1 == null && log2 != null){
- log1 = log2;
- }
- return log1;
}
@Override
diff --git a/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java b/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java
index 64b30df..7a71b20 100644
--- a/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java
@@ -52,7 +52,6 @@ public class ParseSketchLogBolt extends BaseBasicBolt {
try {
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(message, hashmapJsonType);
String commonSledIp = sketchSource.get("common_sled_ip").toString();
- String commonDataCenter = sketchSource.get("common_data_center").toString();
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
@@ -60,7 +59,6 @@ public class ParseSketchLogBolt extends BaseBasicBolt {
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setCommon_sled_ip(commonSledIp);
- dosSketchLog.setCommon_data_center(commonDataCenter);
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
diff --git a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java
index 5257834..fd9ffb4 100644
--- a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java
+++ b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java
@@ -136,34 +136,47 @@ public class ParseStaticThreshold {
* 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息
* @return threshold RangeMap
*/
- static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() {
- TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create();
+ static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
+ HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
+ String attackType = threshold.getAttackType();
+ TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServerIpList();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
if (ipAddressString.isIPAddress()) {
IPAddress address = ipAddressString.getAddress();
- Map<String, DosDetectionThreshold> floodTypeThresholdMap = thresholdRangeMap.get(address);
- if (floodTypeThresholdMap == null) {
- floodTypeThresholdMap = new HashMap<>();
- }
- floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
- if (address.isPrefixed()){
- if (address.isMultiple()){
- thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap);
- }else {
- thresholdRangeMap.put(Range.closed(address.adjustPrefixLength(address.getBitCount()),
- address.toMaxHost().withoutPrefixLength()), floodTypeThresholdMap);
+ if (address.isPrefixed()) {
+ IPAddress lower = address.getLower();
+ IPAddress upper = address.getUpper();
+ if (!address.isMultiple()) {
+ lower = address.adjustPrefixLength(address.getBitCount());
+ upper = address.toMaxHost().withoutPrefixLength();
+ }
+ Map.Entry<Range<IPAddress>, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower);
+ Map.Entry<Range<IPAddress>, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper);
+ if (lowerEntry != null && upperEntry == null) {
+ Range<IPAddress> lowerEntryKey = lowerEntry.getKey();
+ DosDetectionThreshold lowerEntryValue = lowerEntry.getValue();
+ treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
+ } else if (lowerEntry == null && upperEntry != null) {
+ Range<IPAddress> upperEntryKey = upperEntry.getKey();
+ DosDetectionThreshold upperEntryValue = upperEntry.getValue();
+ treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
+ } else {
+ treeRangeMap.put(Range.closed(lower, upper), threshold);
}
- }else {
- thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
+ } else {
+ treeRangeMap.put(Range.closed(address, address), threshold);
}
}
}
+ thresholdRangeMap.put(attackType,treeRangeMap);
}
}
} catch (Exception e) {
@@ -179,18 +192,16 @@ public class ParseStaticThreshold {
System.out.println("------------------------");
- TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
-
- /*
- Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
- for (Range<IPAddress> range : rangeMapMap.keySet()) {
- Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
- for (String type : thresholdMap.keySet()) {
- DosDetectionThreshold threshold = thresholdMap.get(type);
- System.out.println(range + "---" + type + "---" + threshold);
+ HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
+
+ for (String type : staticThreshold.keySet()) {
+ Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
+ for (Range<IPAddress> range : asMapOfRanges.keySet()) {
+ DosDetectionThreshold threshold = asMapOfRanges.get(range);
+ System.out.println(type + "---" + range + "---" + threshold);
}
+ System.out.println("------------------------");
}
- */
}
diff --git a/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java
new file mode 100644
index 0000000..e8bc228
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.common;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+public class DosBaselineThreshold implements Serializable {
+ private ArrayList<Integer> session_rate;
+ private Integer session_rate_baseline_type;
+ private Integer session_rate_default_value;
+
+ public ArrayList<Integer> getSession_rate() {
+ return session_rate;
+ }
+
+ public void setSession_rate(ArrayList<Integer> session_rate) {
+ this.session_rate = session_rate;
+ }
+
+ public Integer getSession_rate_baseline_type() {
+ return session_rate_baseline_type;
+ }
+
+ public void setSession_rate_baseline_type(Integer session_rate_baseline_type) {
+ this.session_rate_baseline_type = session_rate_baseline_type;
+ }
+
+ public Integer getSession_rate_default_value() {
+ return session_rate_default_value;
+ }
+
+ public void setSession_rate_default_value(Integer session_rate_default_value) {
+ this.session_rate_default_value = session_rate_default_value;
+ }
+
+ @Override
+ public String toString() {
+ return "DosBaselineThreshold{" +
+ "session_rate=" + session_rate +
+ ", session_rate_baseline_type=" + session_rate_baseline_type +
+ ", session_rate_default_value=" + session_rate_default_value +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java
index cfe91a7..387c90c 100644
--- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java
@@ -1,8 +1,7 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
-import io.vavr.Tuple;
-import io.vavr.Tuple2;
+import com.zdjizhi.common.DosBaselineThreshold;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -56,33 +55,38 @@ public class HbaseUtils {
}
public static void main(String[] args) {
- Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase();
+ Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {
- Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key);
+ Map<String, DosBaselineThreshold> stringTuple2Map = baselineMap.get(key);
Set<String> strings = stringTuple2Map.keySet();
for (String s:strings){
- Tuple2<ArrayList<Integer>, Integer> arrayListIntegerTuple2 = stringTuple2Map.get(s);
- System.out.println(key+"---"+s+"---"+arrayListIntegerTuple2._1+"---"+arrayListIntegerTuple2._2);
+ DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s);
+ System.out.println(key+"---"+s+"---"+dosBaselineThreshold);
}
}
System.out.println(baselineMap.size());
}
- public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() {
- Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
+ public static Map<String, Map<String, DosBaselineThreshold>> readFromHbase() {
+ Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
try {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
- Map<String, Tuple2<ArrayList<Integer>, Integer>> floodTypeMap = new HashMap<>();
+ Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
for (String type:floodTypeList){
- ArrayList<Integer> sessionRate = getArraylist(result, type, "session_rate");
+ DosBaselineThreshold baselineThreshold = new DosBaselineThreshold();
+ ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate");
if (sessionRate != null && !sessionRate.isEmpty()){
- Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value");
- floodTypeMap.put(type, Tuple.of(sessionRate, defaultValue));
+ Integer defaultValue = getIntegerValue(result, type, "session_rate_default_value");
+ Integer rateBaselineType = getIntegerValue(result, type, "session_rate_baseline_type");
+ baselineThreshold.setSession_rate(sessionRate);
+ baselineThreshold.setSession_rate_default_value(defaultValue);
+ baselineThreshold.setSession_rate_baseline_type(rateBaselineType);
+ floodTypeMap.put(type,baselineThreshold);
}
}
baselineMap.put(rowkey, floodTypeMap);
@@ -94,7 +98,7 @@ public class HbaseUtils {
return baselineMap;
}
- private static Integer getDefaultValue(Result result, String family, String qualifier) {
+ private static Integer getIntegerValue(Result result, String family, String qualifier) {
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
if (value != null){
return Bytes.toInt(value);
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index ead59c5..b3f603f 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -13,7 +13,7 @@ kafka.input.bootstrap.servers=192.168.44.11:9094
kafka.input.scan.startup.mode=latest
#读取kafka group id
-kafka.input.group.id=2108301748
+kafka.input.group.id=2110221053
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
@@ -30,7 +30,7 @@ kafka.output.event.parallelism=1
kafka.output.event.topic.name=storm-dos-test
#kafka输出地址
-kafka.output.bootstrap.servers=192.168.44.12:9092
+kafka.output.bootstrap.servers=192.168.44.12:9094
#发送kafka批大小
kafka.output.metric.batch.num=1000
@@ -129,7 +129,8 @@ topology.num.acks=0
producer.ack=1
#bifang服务访问地址
-bifang.server.uri=http://192.168.44.3:80
+#bifang.server.uri=http://192.168.44.3:80
+bifang.server.uri=http://192.168.44.72:80
#访问bifang只读权限token,bifang内置,无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867