summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-03 16:57:18 +0800
committeryinjiangyi <[email protected]>2021-08-03 16:57:18 +0800
commit1d90784aef75c7cdae7c2fa047c2e9a5385797c2 (patch)
tree5d086efa464f85e44b5822e1eb71340f65a0d952 /src
parent03849d5f3f45e04b0060a83df01826a00c929d3b (diff)
fix
Diffstat (limited to 'src')
-rw-r--r--src/META-INF/MANIFEST.MF3
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java64
-rw-r--r--src/main/java/cn/mesalab/dao/DruidData.java187
-rw-r--r--src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java44
-rw-r--r--src/main/java/cn/mesalab/dao/ResultSetToListService.java24
-rw-r--r--src/main/java/cn/mesalab/main/BaselineApplication.java15
-rw-r--r--src/main/java/cn/mesalab/service/BaselineGeneration.java206
-rw-r--r--src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java90
-rw-r--r--src/main/java/cn/mesalab/utils/ConfigUtils.java45
-rw-r--r--src/main/java/cn/mesalab/utils/DruidUtils.java55
-rw-r--r--src/main/java/cn/mesalab/utils/HbaseUtils.java1
-rw-r--r--src/main/java/cn/mesalab/utils/HttpClientUtils.java485
-rw-r--r--src/main/java/cn/mesalab/utils/SeriesUtils.java212
-rw-r--r--src/main/resources/application.properties68
-rw-r--r--src/main/resources/log4j.properties19
-rw-r--r--src/test/java/cn/mesalab/service/HBaseTest.java92
-rw-r--r--src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java14
17 files changed, 1624 insertions, 0 deletions
diff --git a/src/META-INF/MANIFEST.MF b/src/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..4993e7c
--- /dev/null
+++ b/src/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+Main-Class: cn.mesalab.main.BaselineApplication
+
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
new file mode 100644
index 0000000..68b8c50
--- /dev/null
+++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java
@@ -0,0 +1,64 @@
+package cn.mesalab.config;
+
+import cn.mesalab.utils.ConfigUtils;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/7/24 10:23 上午
+ */
+public class ApplicationConfig {
+
+ public static final String DRUID_URL= ConfigUtils.getStringProperty("druid.url");
+ public static final String DRUID_DRIVER = ConfigUtils.getStringProperty("druid.driver");
+ public static final String DRUID_TABLE = ConfigUtils.getStringProperty("druid.table");
+
+
+ public static final Integer DRUID_TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("read.druid.time.limit.type");
+ public static final Long READ_DRUID_MAX_TIME = ConfigUtils.getLongProperty("read.druid.max.time");
+ public static final Long READ_DRUID_MIN_TIME = ConfigUtils.getLongProperty("read.druid.min.time");
+
+ public static final Integer READ_HISTORICAL_DAYS = ConfigUtils.getIntProperty("read.historical.days");
+ public static final Integer HISTORICAL_GRAD = ConfigUtils.getIntProperty("historical.grad");
+ public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format");
+ public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
+
+ public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
+ public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
+ public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
+ public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
+ public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname");
+ public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
+ public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
+
+ public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
+ public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
+ public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
+ public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
+ public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
+ public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
+
+
+ public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table");
+ public static final String HBASE_ZOOKEEPER_QUORUM= ConfigUtils.getStringProperty("hbase.zookeeper.quorum");
+ public static final String HBASE_ZOOKEEPER_CLIENT_PORT= ConfigUtils.getStringProperty("hbase.zookeeper.client.port");
+
+
+ public static final Double BASELINE_KALMAN_Q = ConfigUtils.getDoubleProperty("baseline.kalman.q");
+ public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r");
+
+ public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
+ public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
+
+
+
+ // http config
+
+ public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
+ public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");
+ public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout");
+ public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num");
+ public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route");
+
+}
+
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java
new file mode 100644
index 0000000..ec28278
--- /dev/null
+++ b/src/main/java/cn/mesalab/dao/DruidData.java
@@ -0,0 +1,187 @@
+package cn.mesalab.dao;
+
+import cn.mesalab.config.ApplicationConfig;
+import cn.mesalab.dao.Impl.ResultSetToListServiceImp;
+import cn.mesalab.utils.DruidUtils;
+import io.vavr.Tuple;
+import io.vavr.Tuple2;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+
+/**
+ * @author yjy
+ * @version 1.0
+ * Druid 数据库操作
+ * @date 2021/7/23 4:56 下午
+ */
+public class DruidData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
+ private static DruidData druidData;
+ private AvaticaConnection connection;
+ private AvaticaStatement statement;
+ private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
+ + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
+
+
+ {
+ connectionInit();
+ }
+
+ /**
+ * 连接初始化
+ */
+ private void connectionInit(){
+ try {
+ connection = DruidUtils.getConn();
+ statement = connection.createStatement();
+ statement.setQueryTimeout(0);
+
+ } catch (SQLException exception) {
+ exception.printStackTrace();
+ }
+ }
+
+ /**
+ * 获取实例
+ * @return DruidData实例
+ */
+ public static DruidData getInstance() {
+ druidData = new DruidData();
+ return druidData;
+ }
+
+ /**
+ * 获取distinct server ip
+ * @return ArrayList<String> ip列表
+ */
+ public ArrayList<String> getServerIpList() {
+ Long startQueryIpLIstTime = System.currentTimeMillis();
+ ArrayList<String> serverIps = new ArrayList<String>();
+ String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + " FROM " + ApplicationConfig.DRUID_TABLE
+ + " WHERE " + timeFilter
+ + " LIMIT 1000";// FOR TEST
+ try{
+ ResultSet resultSet = DruidUtils.executeQuery(statement,sql);
+ while(resultSet.next()){
+ String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
+ serverIps.add(ip);
+ }
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ Long endQueryIpListTime = System.currentTimeMillis();
+ LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime));
+
+ return serverIps;
+ }
+
+ /**
+ * 从Druid读取目标IP相关数据
+ * @param ipList ip列表
+ * @return 数据库读取结果
+ */
+ public List<Map<String, Object>> readFromDruid(List<String> ipList){
+ List<Map<String, Object>> rsList = null;
+ ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList());
+ String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")";
+ String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " FROM " + ApplicationConfig.DRUID_TABLE
+ + " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + " IN " + ipString
+ + " AND " + timeFilter;
+ try{
+ ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
+ ResultSetToListService service = new ResultSetToListServiceImp();
+ rsList = service.selectAll(resultSet);
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ return rsList;
+ }
+
+ /**
+ * 从数据库读取结果中筛选指定ip的指定攻击类型的数据
+ * @param allData 数据库读取结果
+ * @param ip 指定ip
+ * @param attackType 指定攻击类型
+ * @return 筛选结果
+ */
+ public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
+ List<Map<String, Object>> rsList = new ArrayList<>();
+ try{
+ rsList = allData.stream().
+ filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
+ )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
+ .collect(Collectors.toList());
+ } catch (NullPointerException e){
+ }
+ return rsList;
+ }
+
+ /**
+ * 计算查询时间范围,可指定时间范围(测试)或使用默认配置
+ * @return 时间范围起始点和终止点
+ */
+ public Tuple2<Long, Long> getTimeLimit(){
+ long maxTime = 0L;
+ long minTime = 0L;
+ switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){
+ case 0:
+ maxTime = getCurrentDay();
+ minTime = getCurrentDay(-ApplicationConfig.READ_HISTORICAL_DAYS);
+ break;
+ case 1:
+ maxTime = ApplicationConfig.READ_DRUID_MAX_TIME;
+ minTime = ApplicationConfig.READ_DRUID_MIN_TIME;
+ break;
+ default:
+ LOG.warn("没有设置Druid数据读取方式");
+ }
+ return Tuple.of(maxTime, minTime);
+ }
+
+ private long getCurrentDay(int bias) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ return calendar.getTimeInMillis();
+ }
+
+ private long getCurrentDay(){
+ return getCurrentDay(0);
+ }
+
+ /**
+ * 关闭当前DruidData
+ */
+ public void closeConn(){
+ try {
+ DruidUtils.closeConnection();
+ } catch (SQLException exception) {
+ exception.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java
new file mode 100644
index 0000000..7867353
--- /dev/null
+++ b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java
@@ -0,0 +1,44 @@
+package cn.mesalab.dao.Impl;
+
+import cn.mesalab.dao.ResultSetToListService;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/7/24 4:29 下午
+ */
+public class ResultSetToListServiceImp implements ResultSetToListService {
+
+ /**
+ * SELECT 查询记录以List结构返回,每一个元素是一条记录
+ * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值
+ *
+ * @param rs
+ * @return List<Map<String, Object>>
+ */
+ @Override
+ public List<Map<String, Object>> selectAll(ResultSet rs) {
+ List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+ try {
+ ResultSetMetaData rmd = rs.getMetaData();
+ int columnCount = rmd.getColumnCount();
+ while (rs.next()) {
+ Map<String, Object> rowData = new HashMap<String, Object>();
+ for (int i = 1; i <= columnCount; ++i) {
+ rowData.put(rmd.getColumnName(i), rs.getObject(i));
+ }
+ list.add(rowData);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ return list;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/dao/ResultSetToListService.java b/src/main/java/cn/mesalab/dao/ResultSetToListService.java
new file mode 100644
index 0000000..103e330
--- /dev/null
+++ b/src/main/java/cn/mesalab/dao/ResultSetToListService.java
@@ -0,0 +1,24 @@
+package cn.mesalab.dao;
+
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/7/24 4:27 下午
+ */
+public interface ResultSetToListService {
+ /**
+ * SELECT * FROM websites
+ * 查询所有记录,以List返回
+ * list对象的每一个元素都是一条记录
+ * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值
+ *
+ * @param rs
+ * @return List<Map < String, Object>>
+ */
+ public List<Map<String, Object>> selectAll(ResultSet rs);
+} \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java
new file mode 100644
index 0000000..8bd6f13
--- /dev/null
+++ b/src/main/java/cn/mesalab/main/BaselineApplication.java
@@ -0,0 +1,15 @@
+package cn.mesalab.main;
+
+import cn.mesalab.service.BaselineGeneration;
+import sun.rmi.runtime.Log;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/7/23 5:34 下午
+ */
+public class BaselineApplication {
+ public static void main(String[] args) {
+ BaselineGeneration.perform();
+ }
+}
diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java
new file mode 100644
index 0000000..e72a0e6
--- /dev/null
+++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java
@@ -0,0 +1,206 @@
+package cn.mesalab.service;
+
+import cn.mesalab.config.ApplicationConfig;
+import cn.mesalab.dao.DruidData;
+import cn.mesalab.service.algorithm.KalmanFilter;
+import cn.mesalab.utils.HbaseUtils;
+import cn.mesalab.utils.SeriesUtils;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.math3.stat.StatUtils;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * baseline生成及写入
+ * @date 2021/7/23 5:38 下午
+ */
+public class BaselineGeneration {
+ private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
+
+ private static DruidData druidData;
+ private static HbaseUtils hbaseUtils;
+ private static Table hbaseTable;
+ private static List<Map<String, Object>> batchDruidData = new ArrayList<>();
+
+ private static List<String> attackTypeList = Arrays.asList(
+ ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
+ ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
+ ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
+ ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
+ );
+ private static final Integer BASELINE_POINT_NUM =
+ ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
+
+ /**
+ * 程序执行
+ */
+ public static void perform() {
+ long start = System.currentTimeMillis();
+
+ druidData = DruidData.getInstance();
+ hbaseUtils = HbaseUtils.getInstance();
+ hbaseTable = hbaseUtils.getHbaseTable();
+ LOG.info("Druid 成功建立连接");
+
+ try{
+ // baseline生成并写入
+ generateBaselinesThread();
+
+ long last = System.currentTimeMillis();
+ LOG.warn("运行时间:" + (last - start));
+
+ druidData.closeConn();
+ hbaseTable.close();
+ LOG.info("Druid 关闭连接");
+
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ System.exit(0);
+ }
+
+ /**
+ * 多线程baseline生成入口
+ * @throws InterruptedException
+ */
+ private static void generateBaselinesThread() throws InterruptedException {
+ int threadNum = Runtime.getRuntime().availableProcessors();
+
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("baseline-demo-%d").build();
+
+ // 创建线程池
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ threadNum,
+ threadNum,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1024),
+ namedThreadFactory,
+ new ThreadPoolExecutor.AbortPolicy());
+
+ // IP列表获取
+ ArrayList<String> destinationIps = druidData.getServerIpList();
+
+ LOG.info("共查询到服务端ip " +destinationIps.size() + " 个");
+ LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
+
+ // 分批进行IP baseline生成和处理
+ List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
+ for (List<String> batchIps: batchIpLists){
+ if(batchIps.size()>0){
+ executor.execute(() -> generateBaselines(batchIps));
+ }
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10L, TimeUnit.HOURS);
+ }
+
+ /**
+ * 批量生成IP baseline
+ * @param ipList ip列表
+ */
+ public static void generateBaselines(List<String> ipList){
+ druidData = DruidData.getInstance();
+ batchDruidData = druidData.readFromDruid(ipList);
+
+ List<Put> putList = new ArrayList<>();
+ for(String attackType: attackTypeList){
+ for(String ip: ipList){
+ int[] ipBaseline = generateSingleIpBaseline(ip, attackType);
+ if (ipBaseline!= null){
+ putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
+ }
+ }
+ }
+
+ try {
+ hbaseTable.put(putList);
+ LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ druidData.closeConn();
+ }
+
+ /**
+ * 单ip baseline生成逻辑
+ * @param ip ip
+ * @param attackType 攻击类型
+ * @return baseline序列,长度为 60/HISTORICAL_GRAD*24
+ */
+ private static int[] generateSingleIpBaseline(String ip, String attackType){
+ // 查询
+ List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType);
+
+ if (originSeries.size()==0){
+ return null;
+ }
+
+ // 时间序列缺失值补0
+ List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries);
+
+ int[] baselineArr = new int[BASELINE_POINT_NUM];
+ List<Integer>series = completSeries.stream().map(
+ i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
+
+ // 判断ip出现频率
+ if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){
+ // 高频率
+ double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(),
+ ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE);
+ Arrays.fill(baselineArr, (int)percentile);
+ baselineArr = baselineFunction(series);
+
+ } else {
+ // 判断周期性
+ if (SeriesUtils.isPeriod(series)){
+ baselineArr = baselineFunction(series);
+ } else {
+ int ipPercentile = SeriesUtils.percentile(
+ originSeries.stream().map(i ->
+ Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
+ ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
+ Arrays.fill(baselineArr, ipPercentile);
+ }
+ }
+
+ return baselineArr;
+ }
+
+ /**
+ * baseline 生成算法
+ * @param timeSeries 输入序列
+ * @return 输出序列
+ */
+ private static int[] baselineFunction(List<Integer> timeSeries){
+ int[] result;
+ switch (ApplicationConfig.BASELINE_FUNCTION){
+ case "KalmanFilter":
+ KalmanFilter kalmanFilter = new KalmanFilter();
+ kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM);
+ result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray();
+ break;
+ default:
+ result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray();
+ }
+ return result;
+ }
+
+ public static void main(String[] args) {
+ perform();
+ }
+
+}
diff --git a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
new file mode 100644
index 0000000..11a40c3
--- /dev/null
+++ b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
@@ -0,0 +1,90 @@
+package cn.mesalab.service.algorithm;
+
+import cn.mesalab.config.ApplicationConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * kalman滤波器
+ * @date 2021/7/25 1:42 下午
+ */
+
+public class KalmanFilter {
+ private Integer predict;
+ private Integer current;
+ private Integer estimate;
+ private double pdelt;
+ private double mdelt;
+ private double Gauss;
+ private double kalmanGain;
+ private final static double Q = ApplicationConfig.BASELINE_KALMAN_Q;
+ private final static double R = ApplicationConfig.BASELINE_KALMAN_R;
+
+ public KalmanFilter() {
+ initial();
+ }
+
+ public void initial(){
+ // TODO 调整
+ pdelt = 1;
+ mdelt = 1;
+ }
+
+ private ArrayList<Integer> smoothSeries;
+ private ArrayList<Integer> forecastSeries;
+
+ public Integer calSingleKalPoint(Integer oldValue, Integer value){
+ //第一个估计值
+ predict = oldValue;
+ current = value;
+ //高斯噪声方差
+ Gauss = Math.sqrt(pdelt * pdelt + mdelt * mdelt) + Q;
+ //估计方差
+ kalmanGain = Math.sqrt((Gauss * Gauss)/(Gauss * Gauss + pdelt * pdelt)) + R;
+ //估计值
+ estimate = (int) (kalmanGain * (current - predict) + predict);
+ //新的估计方差
+ mdelt = Math.sqrt((1-kalmanGain) * Gauss * Gauss);
+
+ return estimate;
+ }
+
+
+ public void forcast(List<Integer> historicalSeries, Integer length){
+ int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size();
+ smoothSeries = new ArrayList<Integer>();
+ for(int i = 0; i < historicalSeries.size(); i++){
+ int value = historicalSeries.get(i);
+ oldvalue = calSingleKalPoint(oldvalue,value);
+ smoothSeries.add(oldvalue);
+ }
+
+ forecastSeries = new ArrayList<>();
+ Integer partitonNum = historicalSeries.size()/length;
+ for(int i = 0; i<length; i++){
+ long sum = 0;
+ for (int period=0; period<partitonNum; period++){
+ sum += smoothSeries.get(length*period+i);
+ }
+ forecastSeries.add((int)sum/partitonNum);
+ }
+ }
+
+ public ArrayList<Integer> getSmoothSeries() {
+ return smoothSeries;
+ }
+
+ public ArrayList<Integer> getAllRangeSeries() {
+ ArrayList<Integer> results = new ArrayList<>();
+ results.addAll(smoothSeries);
+ results.addAll(forecastSeries);
+ return results;
+ }
+
+ public ArrayList<Integer> getForecastSeries() {
+ return forecastSeries;
+ }
+}
diff --git a/src/main/java/cn/mesalab/utils/ConfigUtils.java b/src/main/java/cn/mesalab/utils/ConfigUtils.java
new file mode 100644
index 0000000..718648b
--- /dev/null
+++ b/src/main/java/cn/mesalab/utils/ConfigUtils.java
@@ -0,0 +1,45 @@
+package cn.mesalab.utils;
+
+
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+
+public class ConfigUtils {
+ private static final Logger LOG = Logger.getLogger(ConfigUtils.class);
+ private static Properties propCommon = new Properties();
+
+ public static String getStringProperty(String key) {
+ return propCommon.getProperty(key);
+ }
+ public static Float getFloatProperty(String key) {
+ return Float.parseFloat(propCommon.getProperty(key));
+ }
+
+
+ public static Integer getIntProperty(String key) {
+ return Integer.parseInt(propCommon.getProperty(key));
+ }
+
+ public static Long getLongProperty(String key) {
+ return Long.parseLong(propCommon.getProperty(key));
+ }
+
+ public static Double getDoubleProperty(String key) {
+ return Double.parseDouble(propCommon.getProperty(key));
+ }
+
+ public static Boolean getBooleanProperty(String key) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+ }
+
+ static {
+ try {
+ propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties"));
+
+ } catch (Exception e) {
+ propCommon = null;
+ LOG.error("配置加载失败");
+ }
+ }
+}
diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java
new file mode 100644
index 0000000..8224d37
--- /dev/null
+++ b/src/main/java/cn/mesalab/utils/DruidUtils.java
@@ -0,0 +1,55 @@
+package cn.mesalab.utils;
+
+import cn.mesalab.config.ApplicationConfig;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.hadoop.hbase.client.Table;
+
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/7/23 4:50 下午
+ */
+public class DruidUtils {
+ private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>();
+
+ private static final String DRUID_URL = ApplicationConfig.DRUID_URL;
+ private static AvaticaStatement statement = null;
+
+ /**
+ * 打开连接
+ * @throws SQLException
+ */
+ public static AvaticaConnection getConn() throws SQLException {
+ Properties properties = new Properties();
+ properties.setProperty("connectTimeout", String.valueOf(10*60*60));
+ AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
+ threadLocal.set(connection);
+ return connection;
+ }
+
+ /**
+ * 关闭连接
+ */
+ public static void closeConnection() throws SQLException{
+ AvaticaConnection conn = threadLocal.get();
+ if(conn != null){
+ conn.close();
+ threadLocal.remove();
+ }
+ }
+
+ /**
+ * 根据sql查询结果
+ */
+ public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{
+ ResultSet resultSet = statement.executeQuery(sql);
+ return resultSet;
+ }
+
+}
diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java
new file mode 100644
index 0000000..86123bb
--- /dev/null
+++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java
@@ -0,0 +1 @@
+package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/utils/HttpClientUtils.java b/src/main/java/cn/mesalab/utils/HttpClientUtils.java
new file mode 100644
index 0000000..b5b7382
--- /dev/null
+++ b/src/main/java/cn/mesalab/utils/HttpClientUtils.java
@@ -0,0 +1,485 @@
+package cn.mesalab.utils;
+
+import cn.mesalab.config.ApplicationConfig;
+import com.google.common.collect.Maps;
+import org.apache.http.*;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicHeaderElementIterator;
+import org.apache.http.protocol.HTTP;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/8/3 3:57 下午
+ */
+
+public class HttpClientUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpClientUtils.class);
+
+ //全局连接池对象
+ private PoolingHttpClientConnectionManager connectionManager;
+
+ /**
+ * 初始化连接池信息
+ */
+ @PostConstruct
+ public void initConnectionManager() {
+ if (connectionManager == null) {
+ connectionManager = new PoolingHttpClientConnectionManager();
+ // 整个连接池最大连接数
+ connectionManager.setMaxTotal(ApplicationConfig.HTTP_MAX_CONNECTION_NUM);
+ // 每路由最大连接数,默认值是2
+ connectionManager.setDefaultMaxPerRoute(ApplicationConfig.HTTP_MAX_PER_ROUTE);
+ }
+ LOG.info("Initializing PoolingHttpClientConnectionManager Complete");
+ }
+
+ /**
+ * 获取Http客户端连接对象
+ *
+ * @param socketTimeOut 响应超时时间
+ * @return Http客户端连接对象
+ */
+ public CloseableHttpClient getHttpClient(int socketTimeOut) {
+ // 创建Http请求配置参数
+ RequestConfig requestConfig = RequestConfig.custom()
+ // 获取连接超时时间
+ .setConnectionRequestTimeout(ApplicationConfig.HTTP_CONNECTION_TIMEOUT)
+ // 请求超时时间
+ .setConnectTimeout(ApplicationConfig.HTTP_REQUEST_TIMEOUT)
+ // 响应超时时间
+ .setSocketTimeout(socketTimeOut)
+ .build();
+
+ /**
+ * 测出超时重试机制为了防止超时不生效而设置
+ * 如果直接放回false,不重试
+ * 这里会根据情况进行判断是否重试
+ */
+ HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
+ if (executionCount >= 3) {// 如果已经重试了3次,就放弃
+ return false;
+ }
+ if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
+ return true;
+ }
+ if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {// 超时
+ return true;
+ }
+ if (exception instanceof UnknownHostException) {// 目标服务器不可达
+ return false;
+ }
+ if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof SSLException) {// ssl握手异常
+ return false;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ // 如果请求是幂等的,就再次尝试
+ if (!(request instanceof HttpEntityEnclosingRequest)) {
+ return true;
+ }
+ return false;
+ };
+
+
+ ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
+ HeaderElementIterator it = new BasicHeaderElementIterator
+ (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
+ while (it.hasNext()) {
+ HeaderElement he = it.nextElement();
+ String param = he.getName();
+ String value = he.getValue();
+ if (value != null && param.equalsIgnoreCase("timeout")) {
+ return Long.parseLong(value) * 1000;
+ }
+ }
+ return 60 * 1000;//如果没有约定,则默认定义时长为60s
+ };
+
+ // 创建httpClient
+ return HttpClients.custom()
+ // 把请求相关的超时信息设置到连接客户端
+ .setDefaultRequestConfig(requestConfig)
+ // 把请求重试设置到连接客户端
+ .setRetryHandler(retry)
+ .setKeepAliveStrategy(myStrategy)
+ // 配置连接池管理对象
+ .setConnectionManager(connectionManager)
+ .build();
+ }
+
+ /**
+ * Desc: 发起http delete请求,返回status code与response body
+ * @param url
+ * @param socketTimeout
+ * @return {@link Map< String, String>}
+ * @created by wWei
+ * @date 2021/1/8 3:29 下午
+ */
+ public Map<String, String> httpDelete(String url, int socketTimeout) {
+ Map<String, String> resultMap = Maps.newHashMap();
+ // 创建GET请求对象
+ CloseableHttpResponse response = null;
+ try {
+ HttpDelete httpDelete = new HttpDelete(url);
+ // 执行请求
+ response = getHttpClient(socketTimeout).execute(httpDelete);
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode()));
+ resultMap.put("result", EntityUtils.toString(entity, "UTF-8"));
+ } catch (ClientProtocolException e) {
+ LOG.error("协议错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("解析错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IO错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY));
+ resultMap.put("message", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("其它错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ resultMap.put("message", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("释放链接错误: {}", e.getMessage());
+ }
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * 返回status code与response body
+ * @param url:请求地址
+ * @param socketTimeout: 响应超时时间
+ *
+ **/
+ public Map<String, String> httpGet(String url, int socketTimeout) {
+ Map<String, String> resultMap = Maps.newHashMap();
+ // 创建GET请求对象
+ CloseableHttpResponse response = null;
+ try {
+ HttpGet httpGet = new HttpGet(url);
+ // 执行请求
+ response = getHttpClient(socketTimeout).execute(httpGet);
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode()));
+ resultMap.put("result", EntityUtils.toString(entity, "UTF-8"));
+ } catch (ClientProtocolException e) {
+ LOG.error("ClientProtocolException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("ParseException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IOException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY));
+ resultMap.put("message", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("Exception:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ resultMap.put("message", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("CloseConnectionException:{}", e.getMessage());
+ }
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * 返回status code与response body
+ * @param url:请求地址
+ * @param headers: Headers
+ * @param socketTimeOut: 响应超时时间
+ * @return: java.util.Map<java.lang.String, java.lang.String>
+ **/
+ public Map<String, String> httpGet(String url, Map<String, String> headers, int socketTimeOut) {
+ Map<String, String> resultMap = Maps.newHashMap();
+ // 创建GET请求对象
+ CloseableHttpResponse response = null;
+ try {
+ HttpGet httpGet = new HttpGet(url);
+ for (String key : headers.keySet()) {
+ httpGet.setHeader(key, headers.get(key));
+ }
+ // 执行请求
+ response = getHttpClient(socketTimeOut).execute(httpGet);
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode()));
+ resultMap.put("result", EntityUtils.toString(entity, "UTF-8"));
+ } catch (ClientProtocolException e) {
+ LOG.error("ClientProtocolException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("ParseException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IOException:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY));
+ resultMap.put("message", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("Exception:{}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ resultMap.put("message", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("CloseConnectionException:{}", e.getMessage());
+ }
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * 返回status code与response body
+ * @param url:请求地址
+ * @param jsonString:请求参数
+ * @param socketTimeOut:响应超时时间
+ **/
+ public Map<String, String> httpPost(String url, String jsonString, int socketTimeOut) {
+ Map<String, String> resultMap = Maps.newHashMap();
+ // 创建GET请求对象
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setHeader("Content-Type", "application/json");
+ httpPost.setEntity(new ByteArrayEntity(jsonString.getBytes("utf-8")));
+ response = getHttpClient(socketTimeOut).execute(httpPost);
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode()));
+ resultMap.put("result", EntityUtils.toString(entity, "UTF-8"));
+ } catch (ClientProtocolException e) {
+ LOG.error("协议错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("解析错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE));
+ resultMap.put("message", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IO错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY));
+ resultMap.put("message", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("其它错误: {}", e.getMessage());
+ resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ resultMap.put("message", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("释放链接错误: {}", e.getMessage());
+ }
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * 返回status code与response body
+ * @param url:请求地址
+ * @param headers: Headers
+ * @param socketTimeOut: 响应超时时间
+ **/
+ public Map<String, String> getHttpPostResponseHeads(String url, Map<String, String> headers, int socketTimeOut) {
+ CloseableHttpResponse response = null;
+ HashMap<String, String> map = Maps.newHashMap();
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ for (Object k : headers.keySet()) {
+ httpPost.setHeader(k.toString(), headers.get(k).toString());
+ }
+ response = getHttpClient(socketTimeOut).execute(httpPost);
+ Header[] Headers = response.getAllHeaders();
+ for (Header h : Headers) {
+ map.put(h.getName().toUpperCase(), h.getValue());
+ }
+ } catch (ClientProtocolException e) {
+ LOG.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("释放链接错误: {}", e.getMessage());
+ }
+ }
+ }
+ return map;
+ }
+
+ /**
+ * @param url:请求地址
+ **/
+ public String httpGet(String url) {
+ String msg = "-1";
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT);
+ CloseableHttpResponse response = null;
+ try {
+ URL ul = new URL(url);
+ URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null);
+ LOG.info("http get uri {}", uri);
+ // 创建GET请求对象
+ HttpGet httpGet = new HttpGet(uri);
+ // 执行请求
+ response = httpClient.execute(httpGet);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+ if (statusCode != HttpStatus.SC_OK) {
+ LOG.error("Http get content is :" + msg);
+ System.exit(1);
+ }
+ } catch (URISyntaxException e) {
+ LOG.error("URI 转换错误: {}", e.getMessage());
+ } catch (ClientProtocolException e) {
+ LOG.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("释放链接错误: {}", e.getMessage());
+ }
+ }
+ }
+ return msg;
+ }
+
+ /**
+ * @param url: 请求地址
+ * @param requestBody: 请求参数
+ * @param headers: Header
+ **/
+ public String httpPost(String url, String requestBody, Header... headers) {
+ String msg = "-1";
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT);
+ // 创建POST请求对象
+ CloseableHttpResponse response = null;
+ try {
+
+ URL ul = new URL(url);
+ URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null);
+ LOG.debug("http post uri:{}, http post body:{}", uri, requestBody);
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setHeader("Content-Type", "application/json");
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpPost.addHeader(h);
+ }
+ }
+ if (StringUtil.isNotBlank(requestBody)) {
+ httpPost.setEntity(new ByteArrayEntity(requestBody.getBytes("utf-8")));
+ }
+ response = httpClient.execute(httpPost);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+ if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED) {
+ LOG.error(msg);
+ System.exit(1);
+ }
+ } catch (URISyntaxException e) {
+ LOG.error("URI 转换错误: {}", e.getMessage());
+ } catch (ClientProtocolException e) {
+ LOG.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ LOG.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ LOG.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ LOG.error("释放链接错误: {}", e.getMessage());
+ }
+ }
+ }
+ return msg;
+ }
+}
diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java
new file mode 100644
index 0000000..17f84b3
--- /dev/null
+++ b/src/main/java/cn/mesalab/utils/SeriesUtils.java
@@ -0,0 +1,212 @@
+package cn.mesalab.utils;
+
+import cn.mesalab.config.ApplicationConfig;
+import cn.mesalab.dao.DruidData;
+import cn.mesalab.service.BaselineGeneration;
+import com.google.common.collect.Lists;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.lang.reflect.Array;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.stream.Stream;
+
+
+/**
+ * @author joy
+ */
+public class SeriesUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SeriesUtils.class);
+
+ private static DruidData druidData = new DruidData();
+
+ public static List<Map<String, Object>> readCsvToList(String filePath) {
+ List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+
+ String line;
+ try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
+ br.readLine();
+ while ((line = br.readLine()) != null) {
+ List<String> column = Arrays.asList(line.split(","));
+ // 保存记录中的每个<字段名-字段值>
+ Map<String, Object> rowData = new HashMap<String, Object>();
+ rowData.put("__time", column.get(0));
+ rowData.put(ApplicationConfig.BASELINE_METRIC_TYPE, Integer.valueOf(column.get(1)));
+
+ list.add(rowData);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return list;
+ }
+
+
+ /**
+ * 时序数据补齐
+ */
+ public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){
+ LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone
+ .getDefault().toZoneId());
+ LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone
+ .getDefault().toZoneId());
+ List<String> dateList = completionDate(startTime, endTime);
+
+ // 补全后的结果
+ List<Map<String, Object>> result = new ArrayList<>();
+ boolean dbDateExist = false;
+ for (String date : dateList) {
+ //table为数据库查询出来的对象列表,结构为List<Map<String, Object>>
+ for (Map<String, Object> row : originSeries) {
+ if (row.get(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME).toString().substring(0,19).equals(date)) {
+ //集合已包含该日期
+ dbDateExist = true;
+ result.add(row);
+ break;
+ }
+ }
+ //添加补全的数据到最后结果列表
+ if (!dbDateExist) {
+ Map<String, Object> temp = new HashMap<>(2);
+ temp.put(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME, date);
+ temp.put(ApplicationConfig.BASELINE_METRIC_TYPE, 0);
+ result.add(temp);
+ }
+ dbDateExist = false;
+ }
+
+ return result;
+ }
+
+ private static List<String> completionDate(LocalDateTime startTime, LocalDateTime endTime) {
+ //日期格式化
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ApplicationConfig.TIME_FORMAT);
+ List<String> timeList = new ArrayList<>();
+ //遍历给定的日期期间的每一天
+ for (int i = 0; !Duration.between(startTime.plusMinutes(i+1), endTime).isNegative(); i+= ApplicationConfig.HISTORICAL_GRAD) {
+ //添加日期
+ timeList.add(startTime.plusMinutes(i).format(formatter));
+ }
+ return timeList;
+ }
+
+ /**
+ * 判断是否存在以天为单位的周期特征
+ * @param historicalSeries
+ * @return
+ */
+ public static Boolean isPeriod(List<Integer> historicalSeries){
+ Boolean result = true;
+ List<List<Integer>> partitions = Lists.partition(historicalSeries, 24*60/ApplicationConfig.HISTORICAL_GRAD);
+ List<Integer> aggregatedPart = Arrays.asList();
+ try{
+ aggregatedPart = columnAverage(partitions.subList(0, ApplicationConfig.READ_HISTORICAL_DAYS-1));
+ } catch (IndexOutOfBoundsException e){
+ Log.error("历史");
+ }
+
+ // Pearson corrcoef
+ double pearsonCorrelationScore = getPearsonCorrelationScore(aggregatedPart.stream().mapToInt(Integer::valueOf).toArray(),
+ partitions.get(partitions.size() - 1).stream().mapToInt(Integer::valueOf).toArray());
+
+ if (pearsonCorrelationScore < ApplicationConfig.BASELINE_PERIOD_CORR_THRE){
+ result=false;
+ }
+ return result;
+ }
+
+ public static double getPearsonCorrelationScore(int[] xData, int[] yData) {
+ if (xData.length != yData.length) {
+ Log.error("Pearson CorrelationScore 数组长度不相等!");
+ }
+ int xMeans;
+ int yMeans;
+ double numerator = 0;
+ double denominator = 0;
+
+ double result = 0;
+ // 拿到两个数据的平均值
+ xMeans = (int) getMeans(xData);
+ yMeans = (int) getMeans(yData);
+ // 计算皮尔逊系数的分子
+ numerator = generateNumerator(xData, xMeans, yData, yMeans);
+ // 计算皮尔逊系数的分母
+ denominator = generateDenomiator(xData, xMeans, yData, yMeans);
+ // 计算皮尔逊系数
+ if(denominator>0) {
+ result = numerator / denominator;
+ }
+ return result;
+ }
+
+ private static int generateNumerator(int[] xData, int xMeans, int[] yData, int yMeans) {
+ int numerator = 0;
+ for (int i = 0; i < xData.length; i++) {
+ numerator += (xData[i] - xMeans) * (yData[i] - yMeans);
+ }
+ return numerator;
+ }
+
+ private static double generateDenomiator(int[] xData, int xMeans, int[] yData, int yMeans) {
+ double xSum = 0.0;
+ for (int i = 0; i < xData.length; i++) {
+ xSum += (xData[i] - xMeans) * (xData[i] - xMeans);
+ }
+ double ySum = 0.0;
+ for (int i = 0; i < yData.length; i++) {
+ ySum += (yData[i] - yMeans) * (yData[i] - yMeans);
+ }
+ return Math.sqrt(xSum) * Math.sqrt(ySum);
+ }
+
+ private static double getMeans(int[] datas) {
+ double sum = 0.0;
+ for (int i = 0; i < datas.length; i++) {
+ sum += datas[i];
+ }
+ return sum / datas.length;
+ }
+
+ public static List<Integer> columnAverage(List<List<Integer>> list){
+ ArrayList<Integer> averages = new ArrayList<>();
+ for(int i=0; i<list.get(0).size(); i++){
+ int columnSum = 0;
+ for(int j = 0; j< list.size(); j++){
+ columnSum += list.get(j).get(i);
+ }
+ averages.add(columnSum / list.size());
+ }
+ return averages;
+ }
+
+ public static int percentile(List<Integer> latencies, double percentile) {
+ Collections.sort(latencies);
+ int index = (int) Math.ceil(percentile * latencies.size());
+ return latencies.get(index-1);
+ }
+
+ public static void main(String[] args) {
+ List<Integer> test = Arrays.asList(
+ 1,2,3,4,5,
+ 1,2,3,4,5,
+ 1,2,3,4,5,
+ 1,2,3,4,5,
+ 1,2,3,4,5,
+ 1,2,3,4,5,
+ 1,2,3,4,5);
+ System.out.println(columnAverage(Lists.partition(test, 5)));
+
+
+
+ }
+
+
+}
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644
index 0000000..8a705ec
--- /dev/null
+++ b/src/main/resources/application.properties
@@ -0,0 +1,68 @@
+
+#Druid配置
+druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/
+druid.driver=org.apache.calcite.avatica.remote.Driver
+druid.table=top_server_ip_test_log
+
+#字段映射
+druid.attacktype.tcpsynflood=sessions
+druid.attacktype.udpflood=bytes
+druid.attacktype.icmpflood=packets
+druid.attacktype.dnsamplification=packets
+druid.serverip.columnname=destination
+druid.attacktype.columnname=order_by
+druid.recvtime.columnname=__time
+#baseline生成metric
+baseline.metric.type=session_num
+
+#HBase配置
+hbase.table=ddos_traffic_baselines
+hbase.zookeeper.quorum=192.168.44.12
+hbase.zookeeper.client.port=2181
+
+#读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围
+read.druid.time.limit.type=1
+#07-01
+read.druid.min.time=1625068800000
+#06-01
+#read.druid.min.time=1622476800000
+read.druid.max.time=1625673600000
+
+
+#读取过去N天数据,最小值为3天(需要判断周期性)
+read.historical.days=7
+#历史数据汇聚粒度为10分钟
+historical.grad=10
+#baseline生成方法
+baseline.function=KalmanFilter
+#baseline时间1天
+baseline.range.days=1
+# 数据库Time格式
+time.format=yyyy-MM-dd HH:mm:ss
+
+
+#算法参数
+baseline.period.correlative.threshold=0.5
+baseline.historical.ratio.threshold=0.1
+baseline.historical.sparse.fill.percentile=0.95
+baseline.rational.percentile=0.95
+#Kalman Filter
+baseline.kalman.q=0.000001
+baseline.kalman.r=0.002
+
+
+# 每更新1000个记录打印log
+log.write.count=10000
+# FOR TEST
+generate.batch.size=10
+
+
+# http client配置
+http.request.timeout=10000
+http.response.timeout=60000
+http.connection.timeout=500
+http.max.connection.num=500
+http.max.per.route=200
+
+
+
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..ac2c528
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,19 @@
+######################### logger ##############################
+log4j.logger.org.apache.http=OFF
+log4j.logger.org.apache.http.wire=OFF
+
+#Log4j
+log4j.rootLogger=debug,console,file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+log4j.appender.file.file=./logs/ddos_baselines.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
diff --git a/src/test/java/cn/mesalab/service/HBaseTest.java b/src/test/java/cn/mesalab/service/HBaseTest.java
new file mode 100644
index 0000000..9598865
--- /dev/null
+++ b/src/test/java/cn/mesalab/service/HBaseTest.java
@@ -0,0 +1,92 @@
+package cn.mesalab.service;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/8/3 11:21 上午
+ */
+
+import cn.mesalab.config.ApplicationConfig;
+import cn.mesalab.dao.DruidData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HBaseTest {
+ public static void main(String[] args) throws IOException {
+ org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
+
+ config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
+ config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
+
+ TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
+ Connection conn = ConnectionFactory.createConnection(config);
+ Table table = conn.getTable(tableName);
+
+
+ DruidData druidData = DruidData.getInstance();
+ ArrayList<String> destinationIps = druidData.getServerIpList();
+
+ for (String ip : destinationIps){
+ Get abcGet = new Get(Bytes.toBytes(ip));
+ Result r = table.get(abcGet);
+ ArrayWritable w = new ArrayWritable(IntWritable.class);
+ List<String> attackTypeList = Arrays.asList(
+ "TCP SYN Flood",
+ "ICMP Flood",
+ "UDP Flood",
+ "DNS Amplification"
+ );
+ for (String attackType : attackTypeList){
+ byte[] session_nums = r.getValue(Bytes.toBytes(attackType), Bytes.toBytes("session_num"));
+ if (session_nums==null){
+ continue;
+ }
+ w.readFields(new DataInputStream(new ByteArrayInputStream(session_nums)));
+ ArrayList<Integer> arr2 = fromWritable(w);
+ System.out.println(ip + "-" + attackType + ": " + arr2.toString());
+ }
+
+ }
+
+// Get abcGet = new Get(Bytes.toBytes("1.0.0.1"));
+// Result r = table.get(abcGet);
+// ArrayWritable w = new ArrayWritable(IntWritable.class);
+// w.readFields(new DataInputStream(new ByteArrayInputStream(r.getValue(Bytes.toBytes("TCP SYN Flood"), Bytes.toBytes("session_num")))));
+// ArrayList<Integer> arr2 = fromWritable(w);
+// System.out.println(arr2.toString());
+
+
+ }
+
+ public static Writable toWritable(int[] arr) {
+ Writable[] content = new Writable[arr.length];
+ for (int i = 0; i < content.length; i++) {
+ content[i] = new IntWritable(arr[i]);
+ }
+ return new ArrayWritable(IntWritable.class, content);
+ }
+
+ public static ArrayList<Integer> fromWritable(ArrayWritable writable) {
+ Writable[] writables = ((ArrayWritable) writable).get();
+ ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
+ for (Writable wrt : writables) {
+ list.add(((IntWritable)wrt).get());
+ }
+ return list;
+ }
+
+}
diff --git a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java
new file mode 100644
index 0000000..a2d713e
--- /dev/null
+++ b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java
@@ -0,0 +1,14 @@
+package cn.mesalab.utils;
+
+import com.zdjizhi.utils.JsonMapper;
+
+/**
+ * @author yjy
+ * @version 1.0
+ * @date 2021/8/3 4:43 下午
+ */
+public class HttpClientUtilsTest {
+
+
+
+} \ No newline at end of file