diff options
| author | wanglihui <[email protected]> | 2021-09-09 09:46:14 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-09-09 09:46:14 +0800 |
| commit | f046d4b0196d964c7f08af1b5eea8ddf055612f1 (patch) | |
| tree | 6858f1cdaeed0df590abf7978cbb5884ed3bf618 | |
| parent | 4b0ff85e503e34fb617d8f050079832fb5573883 (diff) | |
新增获取bifang配置作为判定条件
新增敏感阈值过滤报警信息
| -rw-r--r-- | pom.xml | 6 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java | 146 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java | 176 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CommonConfig.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DosDetectionThreshold.java | 93 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HttpClientUtils.java | 271 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 6 |
8 files changed, 663 insertions, 40 deletions
@@ -250,6 +250,12 @@ <version>0.10.3</version> </dependency> + <dependency> + <groupId>com.github.seancfoley</groupId> + <artifactId>ipaddress</artifactId> + <version>5.3.3</version> + </dependency> + </dependencies> diff --git a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java index 58ef066..273b561 100644 --- a/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java +++ b/src/main/java/com/zdjizhi/bolt/DosDetectionBolt.java @@ -2,10 +2,14 @@ package com.zdjizhi.bolt; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.google.common.collect.TreeRangeMap; import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.utils.*; +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; import io.vavr.Tuple2; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; @@ -33,6 +37,7 @@ public class DosDetectionBolt extends BaseBasicBolt { private KafkaUtils kafkaLogSend; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>(); + private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap; private List<String> list = new LinkedList<>(); @Override @@ -40,10 +45,11 @@ public class DosDetectionBolt extends BaseBasicBolt { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); try { - executorService.scheduleAtFixedRate(() -> { - //do something - baselineMap = HbaseUtils.readFromHbase(); - }, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); + executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), + 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); + + executorService.scheduleAtFixedRate(() -> baselineMap = HbaseUtils.readFromHbase(), + 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); } catch (Exception e) { logger.error("定时器任务执行失败", e); } @@ -84,43 +90,99 @@ public class DosDetectionBolt extends BaseBasicBolt { } - private DosEventLog dosDetection(DosSketchLog sketchLog) { + private DosEventLog dosDetection(DosSketchLog value) { + DosEventLog finalResult = null; try { - String destinationIp = sketchLog.getDestination_ip(); - String attackType = sketchLog.getAttack_type(); + String destinationIp = value.getDestination_ip(); + String attackType = value.getAttack_type(); + long sketchSessions = value.getSketch_sessions(); + IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); + Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if (baselineMap.containsKey(destinationIp)) { - Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); - Integer base = getBaseValue(floodTypeTup, sketchLog); - long diff = sketchLog.getSketch_sessions() - base; - if (diff > 0 && base != 0) { - double percent = getDiffPercent(diff, base); - Severity severity = judgeSeverity(percent); - if (severity != Severity.NORMAL) { - DosEventLog result = getResult(sketchLog, severity, percent); - logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp, attackType, base, percent, result); - return result; - } else { - logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, sketchLog); - } + if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){ + if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) { + finalResult = getDosEventLogByBaseline(value, destinationIp, attackType)._2; + } else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) { + finalResult = getDosEventLogByStaticThreshold(value, thresholdMap)._2; + } else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) { + Tuple2<Severity, DosEventLog> eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType); + Tuple2<Severity, DosEventLog> eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap); + finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold); + } else { + logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } - } else { - logger.debug("未获取到当前server IP:{} 类型 {} baseline数据", destinationIp, attackType); } } catch (Exception e) { - logger.error("判定失败\n {} \n{}", sketchLog, e); + logger.error("判定失败\n {} \n{}", value, e); + } + return finalResult; + } + + private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) { + if (eventLogByBaseline._1.score > eventLogByStaticThreshold._1.score) { + mergeCondition(eventLogByBaseline._2, eventLogByStaticThreshold._2); + logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); + return eventLogByBaseline._2; + } else { + mergeCondition(eventLogByStaticThreshold._2, eventLogByBaseline._2); + logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); + return eventLogByStaticThreshold._2; + } + } + + private void mergeCondition(DosEventLog log1, DosEventLog log2) { + if (log1 != null && log2 != null) { + String conditions1 = log1.getConditions(); + String conditions2 = log2.getConditions(); + log1.setConditions(conditions1 + " and " + conditions2); + } + } + + private Tuple2<Severity, DosEventLog> getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) { + Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); + Integer base = getBaseValue(floodTypeTup, value); + long diff = value.getSketch_sessions() - base; + return getDosEventLog(value, base, diff, "baseline"); + } + + private Tuple2<Severity, DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) { + Tuple2<Severity, DosEventLog> result = io.vavr.Tuple.of(Severity.NORMAL, null); + String attackType = value.getAttack_type(); + if (thresholdMap.containsKey(attackType)) { + DosDetectionThreshold threshold = thresholdMap.get(attackType); + long base = threshold.getSessionsPerSec(); + long diff = value.getSketch_sessions() - base; + result = getDosEventLog(value, base, diff, "static"); } - return null; + return result; } - private DosEventLog getResult(DosSketchLog value, Severity severity, double percent) { + private Tuple2<Severity, DosEventLog> getDosEventLog(DosSketchLog value, long base, long diff, String tag) { + DosEventLog result = null; + String destinationIp = value.getDestination_ip(); + String attackType = value.getAttack_type(); + Severity severity = Severity.NORMAL; + if (diff > 0 && base != 0) { + double percent = getDiffPercent(diff, base); + severity = judgeSeverity(percent); + if (severity != Severity.NORMAL) { + result = getResult(value, severity, percent, tag); + logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result); + } else { + logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString()); + } + } + return io.vavr.Tuple.of(severity, result); + } + + private DosEventLog getResult(DosSketchLog value, Severity severity, double percent, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.STORM_WINDOW_MAX_TIME); dosEventLog.setAttack_type(value.getAttack_type()); - dosEventLog.setSeverity(severity.toString()); - dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent))); + dosEventLog.setSeverity(severity.severity); + dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), value.getSketch_sessions(), tag)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); @@ -153,8 +215,15 @@ public class DosDetectionBolt extends BaseBasicBolt { return base; } - private String getConditions(String percent) { - return "sessions > " + percent + " of baseline"; + private String getConditions(String percent, long sessions, String tag) { + switch (tag) { + case "baseline": + return "sessions > " + percent + " of baseline"; + case "static": + return "sessions > " + sessions + " sessions/s"; + default: + return null; + } } private String getSourceCountryList(String sourceIpList) { @@ -196,27 +265,28 @@ public class DosDetectionBolt extends BaseBasicBolt { /** * 判断严重程度枚举类型 */ - CRITICAL("Critical"), - SEVERE("Severe"), - MAJOR("Major"), - WARNING("Warning"), - MINOR("Minor"), - NORMAL("Normal"); + CRITICAL("Critical", 5), + SEVERE("Severe", 4), + MAJOR("Major", 3), + WARNING("Warning", 2), + MINOR("Minor", 1), + NORMAL("Normal", 0); private final String severity; + private final int score; @Override public String toString() { return this.severity; } - Severity(String severity) { + Severity(String severity, int score) { this.severity = severity; + this.score = score; } } - @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { diff --git a/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java new file mode 100644 index 0000000..afdb66f --- /dev/null +++ b/src/main/java/com/zdjizhi/bolt/ParseStaticThreshold.java @@ -0,0 +1,176 @@ +package com.zdjizhi.bolt; + +import com.fasterxml.jackson.databind.JavaType; +import com.google.common.collect.Range; +import com.google.common.collect.TreeRangeMap; +import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.DosDetectionThreshold; +import com.zdjizhi.utils.HttpClientUtils; +import com.zdjizhi.utils.JsonMapper; +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.message.BasicHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @author wlh + */ +public class ParseStaticThreshold { + private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class); + private static String encryptpwd; + + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); + private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class); + + static { + //加载加密登录密码 + encryptpwd = getEncryptpwd(); + } + + /** + * 获取加密密码 + */ + private static String getEncryptpwd() { + String psw = HttpClientUtils.ERROR_MESSAGE; + try { + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HashMap<String, String> parms = new HashMap<>(); + parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build()); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + psw = data.get("encryptpwd").toString(); + } else { + logger.error(msg); + } + } + } catch (URISyntaxException e) { + logger.error("构造URI异常", e); + } catch (Exception e) { + logger.error("获取encryptpwd失败", e); + } + return psw; + } + + /** + * 登录bifang服务,获取token + * @return token + */ + private static String loginBifangServer() { + String token = HttpClientUtils.ERROR_MESSAGE; + try { + if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) { + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HashMap<String, String> parms = new HashMap<>(); + parms.put("username", CommonConfig.BIFANG_SERVER_USER); + parms.put("password", encryptpwd); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms); + String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + token = data.get("token").toString(); + } else { + logger.error(msg); + } + } + } + } catch (Exception e) { + logger.error("登录失败,未获取到token ", e); + } + return token; + } + + /** + * 获取静态阈值配置列表 + * @return thresholds + */ + private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() { + ArrayList<DosDetectionThreshold> thresholds = null; + try { + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null); + String token = loginBifangServer(); + if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + BasicHeader authorization = new BasicHeader("Authorization", token); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(data.get("list")), thresholdType); + logger.info("获取到静态阈值配置{}条", thresholds.size()); + } else { + logger.error(msg); + } + } + } + } catch (Exception e) { + logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); + } + return thresholds; + } + + /** + * 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息 + * @return threshold RangeMap + */ + static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() { + TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create(); + try { + ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold(); + if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { + for (DosDetectionThreshold threshold : dosDetectionThreshold) { + ArrayList<String> serverIpList = threshold.getServerIpList(); + for (String sip : serverIpList) { + IPAddressString ipAddressString = new IPAddressString(sip); + if (ipAddressString.isIPAddress()) { + IPAddress address = ipAddressString.getAddress(); + Map<String, DosDetectionThreshold> floodTypeThresholdMap = thresholdRangeMap.get(address); + if (floodTypeThresholdMap == null) { + floodTypeThresholdMap = new HashMap<>(); + } + floodTypeThresholdMap.put(threshold.getAttackType(), threshold); + thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); + } + } + } + } + } catch (Exception e) { + logger.error("构建threshold RangeMap失败", e); + } + return thresholdRangeMap; + } + + public static void main(String[] args) { + TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold(); + Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges(); + for (Range<IPAddress> range : rangeMapMap.keySet()) { + Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range); + for (String type : thresholdMap.keySet()) { + DosDetectionThreshold threshold = thresholdMap.get(type); + System.out.println(range + "---" + type + "---" + threshold); + } + } + } + + +} diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index e126224..28e7d09 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -35,6 +35,8 @@ public class CommonConfig { public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path"); + public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold"); + public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold"); public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold"); public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold"); @@ -42,6 +44,7 @@ public class CommonConfig { public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold"); public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days"); + public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes"); public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num"); diff --git a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java new file mode 100644 index 0000000..c67d3a4 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java @@ -0,0 +1,93 @@ +package com.zdjizhi.common; + +import java.io.Serializable; +import java.util.ArrayList; + +public class DosDetectionThreshold implements Serializable { + private String profileId; + private String attackType; + private ArrayList<String> serverIpList; + private String serverIpAddr; + private long packetsPerSec; + private long bitsPerSec; + private long sessionsPerSec; + private int isValid; + + @Override + public String toString() { + return "DosDetectionThreshold{" + + "profileId='" + profileId + '\'' + + ", attackType='" + attackType + '\'' + + ", serverIpList=" + serverIpList + + ", serverIpAddr='" + serverIpAddr + '\'' + + ", packetsPerSec=" + packetsPerSec + + ", bitsPerSec=" + bitsPerSec + + ", sessionsPerSec=" + sessionsPerSec + + ", isValid=" + isValid + + '}'; + } + + public String getProfileId() { + return profileId; + } + + public void setProfileId(String profileId) { + this.profileId = profileId; + } + + public String getAttackType() { + return attackType; + } + + public void setAttackType(String attackType) { + this.attackType = attackType; + } + + public ArrayList<String> getServerIpList() { + return serverIpList; + } + + public void setServerIpList(ArrayList<String> serverIpList) { + this.serverIpList = serverIpList; + } + + public String getServerIpAddr() { + return serverIpAddr; + } + + public void setServerIpAddr(String serverIpAddr) { + this.serverIpAddr = serverIpAddr; + } + + public long getPacketsPerSec() { + return packetsPerSec; + } + + public void setPacketsPerSec(long packetsPerSec) { + this.packetsPerSec = packetsPerSec; + } + + public long getBitsPerSec() { + return bitsPerSec; + } + + public void setBitsPerSec(long bitsPerSec) { + this.bitsPerSec = bitsPerSec; + } + + public long getSessionsPerSec() { + return sessionsPerSec; + } + + public void setSessionsPerSec(long sessionsPerSec) { + this.sessionsPerSec = sessionsPerSec; + } + + public int getIsValid() { + return isValid; + } + + public void setIsValid(int isValid) { + this.isValid = isValid; + } +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index d9059d6..4880e7d 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -43,7 +43,6 @@ public class LogFlowWriteTopology { private void runLocally() throws InterruptedException { topologyConfig.setMaxTaskParallelism(1); -// topologyConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3); StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); } @@ -52,7 +51,6 @@ public class LogFlowWriteTopology { topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, CommonConfig.TRANSFER_BUFFER_SIZE); topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, CommonConfig.EXECUTOR_RECEIVE_BUFFER_SIZE); topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, CommonConfig.EXECUTOR_SEND_BUFFER_SIZE); -// topologyConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3); StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); } diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java new file mode 100644 index 0000000..9f65018 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java @@ -0,0 +1,271 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.common.CommonConfig; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * http client工具类 + * @author wlh + */ +public class HttpClientUtils { + /** 全局连接池对象 */ + private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); + + private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); + public static final String ERROR_MESSAGE = "-1"; + + /* + * 静态代码块配置连接池信息 + */ + static { + + // 设置最大连接数 + CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + // 设置每个连接的路由数 + CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + + } + + /** + * 获取Http客户端连接对象 + * @return Http客户端连接对象 + */ + private static CloseableHttpClient getHttpClient() { + // 创建Http请求配置参数 + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + // 请求超时时间 + .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + // 响应超时时间 + .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .build(); + + /* + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof HttpHostConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && "timeout".equalsIgnoreCase(param)) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(CONN_MANAGER) + .build(); + } + + + /** + * GET请求 + * + * @param uri 请求地 + * @return message + */ + public static String httpGet(URI uri, Header... headers) { + String msg = ERROR_MESSAGE; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + CloseableHttpResponse response = null; + + try { + logger.info("http get uri {}",uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}",h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http get content is :{}" , msg); + } + + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}",e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + /** + * POST 请求 + * @param uri uri参数 + * @param requestBody 请求体 + * @return post请求返回结果 + */ + public static String httpPost(URI uri, String requestBody, Header... headers) { + String msg = ERROR_MESSAGE; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + logger.info("http post uri:{}, http post body:{}", uri, requestBody); + + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + logger.info("request header : {}",h); + } + } + + if(StringUtil.isNotBlank(requestBody)) { + byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); + httpPost.setEntity(new ByteArrayEntity(bytes)); + } + + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http post content is :{}" , msg); + } + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + return msg; + } + + /** + * 拼装url + * url ,参数map + */ + public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, String> params) { + try { + uriBuilder.setPath(path); + if (params != null && !params.isEmpty()){ + for (Map.Entry<String, String> kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(),kv.getValue()); + } + } + } catch (Exception e) { + logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); + } + } + +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index e84df34..a16eb62 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -59,6 +59,9 @@ data.center.id.num=15 ip.mmdb.path=D:\\data\\dat\\ #ip.mmdb.path=/home/wlh/dos-detection/dat/ +#敏感阈值,速率小于此值不报警 +sensitivity.threshold=100 + #基于baseline判定dos攻击的上下限 baseline.sessions.minor.threshold=0.1 baseline.sessions.warning.threshold=0.5 @@ -69,6 +72,9 @@ baseline.sessions.critical.threshold=8 #获取baseline周期,默认7天 baseline.threshold.schedule.days=7 +#获取静态阈值周期,默认十分钟 +static.threshold.schedule.minutes=10 + #基于目的IP的分区数,默认为10000,一般不变 destination.ip.partition.num=10000 |
