diff options
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java | 200 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java | 61 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosBaselineThreshold.java | 43 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HbaseUtils.java | 30 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 7 |
7 files changed, 207 insertions, 138 deletions
@@ -103,7 +103,7 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> - <scope>provided</scope> + <!--<scope>provided</scope>--> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java index 3e9bb39..c1d1d4c 100644 --- a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java +++ b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java @@ -3,15 +3,12 @@ package com.zdjizhi.bolt; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.google.common.collect.TreeRangeMap; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.DosDetectionThreshold; -import com.zdjizhi.common.DosEventLog; -import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.common.*; import com.zdjizhi.utils.*; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; -import io.vavr.Tuple2; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.storm.Config; import org.apache.storm.task.TopologyContext; @@ -33,13 +30,18 @@ import java.util.concurrent.TimeUnit; public class DosDetectionBolt extends BaseBasicBolt { private static final Log logger = LogFactory.get(); - private final static int BASELINE_SIZE = 144; + private KafkaUtils kafkaLogSend; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); - private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>(); - private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap; + private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>(); + private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap; private List<String> list = new LinkedList<>(); + private final static int BASELINE_SIZE = 144; + private final static int STATIC_CONDITION_TYPE = 1; + private final static int BASELINE_CONDITION_TYPE = 2; + private final static int SENSITIVITY_CONDITION_TYPE = 3; + @Override public void prepare(Map stormConf, TopologyContext context) { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, @@ -79,7 +81,7 @@ public class DosDetectionBolt extends BaseBasicBolt { String message = JsonMapper.toJsonString(dosEventLog); if (dosEventLog != null && StringUtil.isNotBlank(message)) { list.add(message); - logger.info("dos detection log:{} \n{}",message,dosEventLog); + logger.info("dos detection log:{} \n{}", message, dosEventLog); } if (list.size() >= CommonConfig.KAFKA_OUTPUT_EVENT_BATCH_NUM) { kafkaLogSend.sendMessage(list, CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME); @@ -96,15 +98,15 @@ public class DosDetectionBolt extends BaseBasicBolt { String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); - Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress); + DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if (thresholdMap == null && baselineMap.containsKey(destinationIp)) { + if (threshold == null && baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogByBaseline(value, destinationIp, attackType); - }else if (thresholdMap == null && !baselineMap.containsKey(destinationIp)){ + } else if (threshold == null && !baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogBySensitivityThreshold(value); - } else if (thresholdMap != null){ - finalResult = getDosEventLogByStaticThreshold(value, thresholdMap); - }else { + } else if (threshold != null) { + finalResult = getDosEventLogByStaticThreshold(value, threshold); + } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } } catch (Exception e) { @@ -113,11 +115,12 @@ public class DosDetectionBolt extends BaseBasicBolt { return finalResult; } - private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value){ + private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { DosEventLog result = null; long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){ - result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, "sensitivity"); + if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) { + result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, + sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, 3, "sessions"); result.setSeverity(Severity.MAJOR.severity); } return result; @@ -126,55 +129,60 @@ public class DosDetectionBolt extends BaseBasicBolt { private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) { DosEventLog result = null; long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){ - Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); - Integer base = getBaseValue(floodTypeTup, value); - result = getDosEventLog(value, base, sketchSessions - base, "baseline"); + if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) { + DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType); + Integer base = getBaseValue(dosBaselineThreshold, value); + result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions"); } return result; } - private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) { - DosEventLog result = null; - String attackType = value.getAttack_type(); - if (thresholdMap.containsKey(attackType)) { - DosDetectionThreshold threshold = thresholdMap.get(attackType); - long base = threshold.getSessionsPerSec(); - long diff = value.getSketch_sessions() - base; - result = getDosEventLog(value, base, diff, "static"); + private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) { + long base = threshold.getSessionsPerSec(); + long diff = value.getSketch_sessions() - base; + DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions"); + if (result == null) { + base = threshold.getPacketsPerSec(); + diff = value.getSketch_packets() - base; + result = getDosEventLog(value, base, diff, 1, "packets"); + if (result == null) { + base = threshold.getBitsPerSec(); + diff = value.getSketch_bytes() - base; + result = getDosEventLog(value, base, diff, 1, "bits"); + } } return result; } - private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, String tag) { + private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) { DosEventLog result = null; String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); - if (severity != Severity.NORMAL && value.getSource_ip() != null) { - if ("baseline".equals(tag) && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD){ - logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}",destinationIp,attackType,base,percent,value); - }else { - result = getResult(value,base, severity, percent, tag); - logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result); + if (severity != Severity.NORMAL) { + if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) { + logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); + } else { + result = getResult(value, base, severity, percent, type, tag); + logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, tag, result); } } else { - logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString()); + logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); } } return result; } - private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, String tag) { + private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setStart_time(value.getSketch_start_time()); - dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.STORM_WINDOW_MAX_TIME); + dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); dosEventLog.setAttack_type(value.getAttack_type()); dosEventLog.setSeverity(severity.severity); - dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag)); + dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); @@ -186,12 +194,12 @@ public class DosDetectionBolt extends BaseBasicBolt { return dosEventLog; } - private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) { + private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) { Integer base = 0; try { - if (floodTypeTup != null) { - ArrayList<Integer> baselines = floodTypeTup._1; - Integer defaultVaule = floodTypeTup._2; + if (dosBaselineThreshold != null) { + ArrayList<Integer> baselines = dosBaselineThreshold.getSession_rate(); + Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value(); if (baselines != null && baselines.size() == BASELINE_SIZE) { int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); base = baselines.get(timeIndex); @@ -207,36 +215,64 @@ public class DosDetectionBolt extends BaseBasicBolt { return base; } - private String getConditions(String percent, long base, long sessions, String tag) { - switch (tag) { - case "baseline": - return "sessions > " + percent + " of baseline"; - case "static": - return "sessions > " + base + " sessions/s"; - case "sensitivity": - return sessions+" sessions/s Unusually high Sessions"; + private String getConditions(String percent, long base, long sessions, int type, String tag) { + switch (type) { + case STATIC_CONDITION_TYPE: + return new StrBuilder() + .append(tag).append(" > ") + .append(base).append(" ") + .append(tag).append("/s") + .toString(); + case BASELINE_CONDITION_TYPE: + return new StrBuilder() + .append(tag).append(" > ") + .append(percent).append(" of baseline") + .toString(); + case SENSITIVITY_CONDITION_TYPE: + return new StrBuilder() + .append(sessions).append(" ") + .append(tag).append("/s Unusually high ") + .append(StringUtils.capitalize(tag)) + .toString(); default: - return null; + throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]"); } } private String getSourceCountryList(String sourceIpList) { - String[] ipArr = sourceIpList.split(","); - HashSet<String> countrySet = new HashSet<>(); - for (String ip : ipArr) { - countrySet.add(IpUtils.ipLookup.countryLookup(ip)); + if (StringUtil.isNotBlank(sourceIpList)) { + String countryList; + try { + String[] ipArr = sourceIpList.split(","); + HashSet<String> countrySet = new HashSet<>(); + for (String ip : ipArr) { + countrySet.add(IpUtils.ipLookup.countryLookup(ip)); + } + countryList = StringUtils.join(countrySet, ","); + return countryList; + } catch (Exception e) { + logger.error("{} source IP lists 获取国家失败", sourceIpList, e); + return StringUtil.EMPTY; + } + } else { + throw new IllegalArgumentException("Illegal Argument sourceIpList = null"); } - return StringUtils.join(countrySet, ","); } private int getCurrentTimeIndex(long sketchStartTime) { - long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime()/1000; - long indexLong = (sketchStartTime - currentDayTime) / 600; - return Integer.parseInt(Long.toString(indexLong)); + int index = 0; + try { + long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000; + long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE); + index = Integer.parseInt(Long.toString(indexLong)); + } catch (Exception e) { + logger.error("获取time index失败", e); + } + return index; } private Double getDiffPercent(long diff, long base) { - return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); + return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); } private Severity judgeSeverity(double diffPercent) { @@ -259,49 +295,25 @@ public class DosDetectionBolt extends BaseBasicBolt { /** * 判断严重程度枚举类型 */ - CRITICAL("Critical", 5), - SEVERE("Severe", 4), - MAJOR("Major", 3), - WARNING("Warning", 2), - MINOR("Minor", 1), - NORMAL("Normal", 0); + CRITICAL("Critical"), + SEVERE("Severe"), + MAJOR("Major"), + WARNING("Warning"), + MINOR("Minor"), + NORMAL("Normal"); private final String severity; - private final int score; @Override public String toString() { return this.severity; } - Severity(String severity, int score) { + Severity(String severity) { this.severity = severity; - this.score = score; } - } - - @Deprecated - private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) { - if (eventLogByBaseline._1.score > eventLogByStaticThreshold._1.score) { - logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); - return mergeCondition(eventLogByBaseline._2, eventLogByStaticThreshold._2); - } else { - logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); - return mergeCondition(eventLogByStaticThreshold._2, eventLogByBaseline._2); - } - } - @Deprecated - private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) { - if (log1 != null && log2 != null) { - String conditions1 = log1.getConditions(); - String conditions2 = log2.getConditions(); - log1.setConditions(conditions1 + " and " + conditions2); - }else if (log1 == null && log2 != null){ - log1 = log2; - } - return log1; } @Override diff --git a/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java b/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java index 64b30df..7a71b20 100644 --- a/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java +++ b/src/main/java/com/zdjizhi/bolt/ParseSketchLogBolt.java @@ -52,7 +52,6 @@ public class ParseSketchLogBolt extends BaseBasicBolt { try { HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(message, hashmapJsonType); String commonSledIp = sketchSource.get("common_sled_ip").toString(); - String commonDataCenter = sketchSource.get("common_data_center").toString(); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); @@ -60,7 +59,6 @@ public class ParseSketchLogBolt extends BaseBasicBolt { for (HashMap<String, Object> obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setCommon_sled_ip(commonSledIp); - dosSketchLog.setCommon_data_center(commonDataCenter); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); diff --git a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java index 5257834..fd9ffb4 100644 --- a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java @@ -136,34 +136,47 @@ public class ParseStaticThreshold { * 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息 * @return threshold RangeMap */ - static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() { - TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create(); + static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() { + HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4); try { ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold(); if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { for (DosDetectionThreshold threshold : dosDetectionThreshold) { + String attackType = threshold.getAttackType(); + TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()); ArrayList<String> serverIpList = threshold.getServerIpList(); for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); if (ipAddressString.isIPAddress()) { IPAddress address = ipAddressString.getAddress(); - Map<String, DosDetectionThreshold> floodTypeThresholdMap = thresholdRangeMap.get(address); - if (floodTypeThresholdMap == null) { - floodTypeThresholdMap = new HashMap<>(); - } - floodTypeThresholdMap.put(threshold.getAttackType(), threshold); - if (address.isPrefixed()){ - if (address.isMultiple()){ - thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); - }else { - thresholdRangeMap.put(Range.closed(address.adjustPrefixLength(address.getBitCount()), - address.toMaxHost().withoutPrefixLength()), floodTypeThresholdMap); + if (address.isPrefixed()) { + IPAddress lower = address.getLower(); + IPAddress upper = address.getUpper(); + if (!address.isMultiple()) { + lower = address.adjustPrefixLength(address.getBitCount()); + upper = address.toMaxHost().withoutPrefixLength(); + } + Map.Entry<Range<IPAddress>, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower); + Map.Entry<Range<IPAddress>, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper); + if (lowerEntry != null && upperEntry == null) { + Range<IPAddress> lowerEntryKey = lowerEntry.getKey(); + DosDetectionThreshold lowerEntryValue = lowerEntry.getValue(); + treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue); + treeRangeMap.put(Range.closed(lower, upper), threshold); + } else if (lowerEntry == null && upperEntry != null) { + Range<IPAddress> upperEntryKey = upperEntry.getKey(); + DosDetectionThreshold upperEntryValue = upperEntry.getValue(); + treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue); + treeRangeMap.put(Range.closed(lower, upper), threshold); + } else { + treeRangeMap.put(Range.closed(lower, upper), threshold); } - }else { - thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); + } else { + treeRangeMap.put(Range.closed(address, address), threshold); } } } + thresholdRangeMap.put(attackType,treeRangeMap); } } } catch (Exception e) { @@ -179,18 +192,16 @@ public class ParseStaticThreshold { System.out.println("------------------------"); - TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold(); - - /* - Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges(); - for (Range<IPAddress> range : rangeMapMap.keySet()) { - Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range); - for (String type : thresholdMap.keySet()) { - DosDetectionThreshold threshold = thresholdMap.get(type); - System.out.println(range + "---" + type + "---" + threshold); + HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold(); + + for (String type : staticThreshold.keySet()) { + Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges(); + for (Range<IPAddress> range : asMapOfRanges.keySet()) { + DosDetectionThreshold threshold = asMapOfRanges.get(range); + System.out.println(type + "---" + range + "---" + threshold); } + System.out.println("------------------------"); } - */ } diff --git a/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java new file mode 100644 index 0000000..e8bc228 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java @@ -0,0 +1,43 @@ +package com.zdjizhi.common; + +import java.io.Serializable; +import java.util.ArrayList; + +public class DosBaselineThreshold implements Serializable { + private ArrayList<Integer> session_rate; + private Integer session_rate_baseline_type; + private Integer session_rate_default_value; + + public ArrayList<Integer> getSession_rate() { + return session_rate; + } + + public void setSession_rate(ArrayList<Integer> session_rate) { + this.session_rate = session_rate; + } + + public Integer getSession_rate_baseline_type() { + return session_rate_baseline_type; + } + + public void setSession_rate_baseline_type(Integer session_rate_baseline_type) { + this.session_rate_baseline_type = session_rate_baseline_type; + } + + public Integer getSession_rate_default_value() { + return session_rate_default_value; + } + + public void setSession_rate_default_value(Integer session_rate_default_value) { + this.session_rate_default_value = session_rate_default_value; + } + + @Override + public String toString() { + return "DosBaselineThreshold{" + + "session_rate=" + session_rate + + ", session_rate_baseline_type=" + session_rate_baseline_type + + ", session_rate_default_value=" + session_rate_default_value + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index cfe91a7..387c90c 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -1,8 +1,7 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; -import io.vavr.Tuple; -import io.vavr.Tuple2; +import com.zdjizhi.common.DosBaselineThreshold; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -56,33 +55,38 @@ public class HbaseUtils { } public static void main(String[] args) { - Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase(); + Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase(); Set<String> keySet = baselineMap.keySet(); for (String key : keySet) { - Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key); + Map<String, DosBaselineThreshold> stringTuple2Map = baselineMap.get(key); Set<String> strings = stringTuple2Map.keySet(); for (String s:strings){ - Tuple2<ArrayList<Integer>, Integer> arrayListIntegerTuple2 = stringTuple2Map.get(s); - System.out.println(key+"---"+s+"---"+arrayListIntegerTuple2._1+"---"+arrayListIntegerTuple2._2); + DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s); + System.out.println(key+"---"+s+"---"+dosBaselineThreshold); } } System.out.println(baselineMap.size()); } - public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() { - Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>(); + public static Map<String, Map<String, DosBaselineThreshold>> readFromHbase() { + Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>(); try { prepareHbaseEnv(); logger.info("开始读取baseline数据"); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { - Map<String, Tuple2<ArrayList<Integer>, Integer>> floodTypeMap = new HashMap<>(); + Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>(); String rowkey = Bytes.toString(result.getRow()); for (String type:floodTypeList){ - ArrayList<Integer> sessionRate = getArraylist(result, type, "session_rate"); + DosBaselineThreshold baselineThreshold = new DosBaselineThreshold(); + ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate"); if (sessionRate != null && !sessionRate.isEmpty()){ - Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value"); - floodTypeMap.put(type, Tuple.of(sessionRate, defaultValue)); + Integer defaultValue = getIntegerValue(result, type, "session_rate_default_value"); + Integer rateBaselineType = getIntegerValue(result, type, "session_rate_baseline_type"); + baselineThreshold.setSession_rate(sessionRate); + baselineThreshold.setSession_rate_default_value(defaultValue); + baselineThreshold.setSession_rate_baseline_type(rateBaselineType); + floodTypeMap.put(type,baselineThreshold); } } baselineMap.put(rowkey, floodTypeMap); @@ -94,7 +98,7 @@ public class HbaseUtils { return baselineMap; } - private static Integer getDefaultValue(Result result, String family, String qualifier) { + private static Integer getIntegerValue(Result result, String family, String qualifier) { byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); if (value != null){ return Bytes.toInt(value); diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index ead59c5..b3f603f 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -13,7 +13,7 @@ kafka.input.bootstrap.servers=192.168.44.11:9094 kafka.input.scan.startup.mode=latest #读取kafka group id -kafka.input.group.id=2108301748 +kafka.input.group.id=2110221053 #发送kafka metrics并行度大小 kafka.output.metric.parallelism=1 @@ -30,7 +30,7 @@ kafka.output.event.parallelism=1 kafka.output.event.topic.name=storm-dos-test #kafka输出地址 -kafka.output.bootstrap.servers=192.168.44.12:9092 +kafka.output.bootstrap.servers=192.168.44.12:9094 #发送kafka批大小 kafka.output.metric.batch.num=1000 @@ -129,7 +129,8 @@ topology.num.acks=0 producer.ack=1 #bifang服务访问地址 -bifang.server.uri=http://192.168.44.3:80 +#bifang.server.uri=http://192.168.44.3:80 +bifang.server.uri=http://192.168.44.72:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867 |
