summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-03 16:56:37 +0800
committeryinjiangyi <[email protected]>2021-08-03 16:56:37 +0800
commit03849d5f3f45e04b0060a83df01826a00c929d3b (patch)
tree4b825dc642cb6eb9a060e54bf8d69288fbee4904 /src/main/java
parenta39609557c4e0e11a145634f7a6ac345b4145684 (diff)
update .gitignore
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java53
-rw-r--r--src/main/java/cn/mesalab/dao/DruidData.java187
-rw-r--r--src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java44
-rw-r--r--src/main/java/cn/mesalab/dao/ResultSetToListService.java24
-rw-r--r--src/main/java/cn/mesalab/main/BaselineApplication.java15
-rw-r--r--src/main/java/cn/mesalab/service/BaselineGeneration.java206
-rw-r--r--src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java90
-rw-r--r--src/main/java/cn/mesalab/utils/ConfigUtils.java45
-rw-r--r--src/main/java/cn/mesalab/utils/DruidUtils.java55
-rw-r--r--src/main/java/cn/mesalab/utils/HbaseUtils.java1
-rw-r--r--src/main/java/cn/mesalab/utils/HttpClientUtils.java6
-rw-r--r--src/main/java/cn/mesalab/utils/SeriesUtils.java212
12 files changed, 0 insertions, 938 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
deleted file mode 100644
index 9e0e447..0000000
--- a/src/main/java/cn/mesalab/config/ApplicationConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package cn.mesalab.config;
-
-import cn.mesalab.utils.ConfigUtils;
-
-/**
- * @author yjy
- * @version 1.0
- * @date 2021/7/24 10:23 上午
- */
-public class ApplicationConfig {
-
- public static final String DRUID_URL= ConfigUtils.getStringProperty("druid.url");
- public static final String DRUID_DRIVER = ConfigUtils.getStringProperty("druid.driver");
- public static final String DRUID_TABLE = ConfigUtils.getStringProperty("druid.table");
-
-
- public static final Integer DRUID_TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("read.druid.time.limit.type");
- public static final Long READ_DRUID_MAX_TIME = ConfigUtils.getLongProperty("read.druid.max.time");
- public static final Long READ_DRUID_MIN_TIME = ConfigUtils.getLongProperty("read.druid.min.time");
-
- public static final Integer READ_HISTORICAL_DAYS = ConfigUtils.getIntProperty("read.historical.days");
- public static final Integer HISTORICAL_GRAD = ConfigUtils.getIntProperty("historical.grad");
- public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format");
- public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
-
- public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
- public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
- public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
- public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
- public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname");
- public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
- public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
-
- public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
- public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
- public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
- public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
- public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
- public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
-
-
- public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table");
- public static final String HBASE_ZOOKEEPER_QUORUM= ConfigUtils.getStringProperty("hbase.zookeeper.quorum");
- public static final String HBASE_ZOOKEEPER_CLIENT_PORT= ConfigUtils.getStringProperty("hbase.zookeeper.client.port");
-
-
- public static final Double BASELINE_KALMAN_Q = ConfigUtils.getDoubleProperty("baseline.kalman.q");
- public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r");
-
- public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
- public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
-}
-
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java
deleted file mode 100644
index 2de41a7..0000000
--- a/src/main/java/cn/mesalab/dao/DruidData.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package cn.mesalab.dao;
-
-import cn.mesalab.config.ApplicationConfig;
-import cn.mesalab.dao.Impl.ResultSetToListServiceImp;
-import cn.mesalab.utils.DruidUtils;
-import io.vavr.Tuple;
-import io.vavr.Tuple2;
-import org.apache.calcite.avatica.AvaticaConnection;
-import org.apache.calcite.avatica.AvaticaStatement;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-
-/**
- * @author yjy
- * @version 1.0
- * Druid 数据库操作
- * @date 2021/7/23 4:56 下午
- */
-public class DruidData {
-
- private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
- private static DruidData druidData;
- private AvaticaConnection connection;
- private AvaticaStatement statement;
- private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
- + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
-
-
- {
- connectionInit();
- }
-
- /**
- * 连接初始化
- */
- private void connectionInit(){
- try {
- connection = DruidUtils.getConn();
- statement = connection.createStatement();
- statement.setQueryTimeout(0);
-
- } catch (SQLException exception) {
- exception.printStackTrace();
- }
- }
-
- /**
- * 获取实例
- * @return DruidData实例
- */
- public static DruidData getInstance() {
- druidData = new DruidData();
- return druidData;
- }
-
- /**
- * 获取distinct server ip
- * @return ArrayList<String> ip列表
- */
- public ArrayList<String> getServerIpList() {
- Long startQueryIpLIstTime = System.currentTimeMillis();
- ArrayList<String> serverIps = new ArrayList<String>();
- String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
- + " FROM " + ApplicationConfig.DRUID_TABLE
- + " WHERE " + timeFilter
- + " LIMIT 10";// FOR TEST
- try{
- ResultSet resultSet = DruidUtils.executeQuery(statement,sql);
- while(resultSet.next()){
- String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
- serverIps.add(ip);
- }
- } catch (Exception e){
- e.printStackTrace();
- }
- Long endQueryIpListTime = System.currentTimeMillis();
- LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime));
-
- return serverIps;
- }
-
- /**
- * 从Druid读取目标IP相关数据
- * @param ipList ip列表
- * @return 数据库读取结果
- */
- public List<Map<String, Object>> readFromDruid(List<String> ipList){
- List<Map<String, Object>> rsList = null;
- ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList());
- String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")";
- String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
- + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
- + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
- + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " FROM " + ApplicationConfig.DRUID_TABLE
- + " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
- + " IN " + ipString
- + " AND " + timeFilter;
- try{
- ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
- ResultSetToListService service = new ResultSetToListServiceImp();
- rsList = service.selectAll(resultSet);
- } catch (Exception e){
- e.printStackTrace();
- }
- return rsList;
- }
-
- /**
- * 从数据库读取结果中筛选指定ip的指定攻击类型的数据
- * @param allData 数据库读取结果
- * @param ip 指定ip
- * @param attackType 指定攻击类型
- * @return 筛选结果
- */
- public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
- List<Map<String, Object>> rsList = new ArrayList<>();
- try{
- rsList = allData.stream().
- filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
- )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
- .collect(Collectors.toList());
- } catch (NullPointerException e){
- }
- return rsList;
- }
-
- /**
- * 计算查询时间范围,可指定时间范围(测试)或使用默认配置
- * @return 时间范围起始点和终止点
- */
- public Tuple2<Long, Long> getTimeLimit(){
- long maxTime = 0L;
- long minTime = 0L;
- switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){
- case 0:
- maxTime = getCurrentDay();
- minTime = getCurrentDay(-ApplicationConfig.READ_HISTORICAL_DAYS);
- break;
- case 1:
- maxTime = ApplicationConfig.READ_DRUID_MAX_TIME;
- minTime = ApplicationConfig.READ_DRUID_MIN_TIME;
- break;
- default:
- LOG.warn("没有设置Druid数据读取方式");
- }
- return Tuple.of(maxTime, minTime);
- }
-
- private long getCurrentDay(int bias) {
- Calendar calendar = Calendar.getInstance();
- calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias);
- calendar.set(Calendar.HOUR_OF_DAY, 0);
- calendar.set(Calendar.HOUR_OF_DAY, 0);
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
- return calendar.getTimeInMillis();
- }
-
- private long getCurrentDay(){
- return getCurrentDay(0);
- }
-
- /**
- * 关闭当前DruidData
- */
- public void closeConn(){
- try {
- DruidUtils.closeConnection();
- } catch (SQLException exception) {
- exception.printStackTrace();
- }
- }
-}
diff --git a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java
deleted file mode 100644
index 7867353..0000000
--- a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package cn.mesalab.dao.Impl;
-
-import cn.mesalab.dao.ResultSetToListService;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author yjy
- * @version 1.0
- * @date 2021/7/24 4:29 下午
- */
-public class ResultSetToListServiceImp implements ResultSetToListService {
-
- /**
- * SELECT 查询记录以List结构返回,每一个元素是一条记录
- * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值
- *
- * @param rs
- * @return List<Map<String, Object>>
- */
- @Override
- public List<Map<String, Object>> selectAll(ResultSet rs) {
- List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
- try {
- ResultSetMetaData rmd = rs.getMetaData();
- int columnCount = rmd.getColumnCount();
- while (rs.next()) {
- Map<String, Object> rowData = new HashMap<String, Object>();
- for (int i = 1; i <= columnCount; ++i) {
- rowData.put(rmd.getColumnName(i), rs.getObject(i));
- }
- list.add(rowData);
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- return list;
- }
-} \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/dao/ResultSetToListService.java b/src/main/java/cn/mesalab/dao/ResultSetToListService.java
deleted file mode 100644
index 103e330..0000000
--- a/src/main/java/cn/mesalab/dao/ResultSetToListService.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package cn.mesalab.dao;
-
-import java.sql.ResultSet;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @author yjy
- * @version 1.0
- * @date 2021/7/24 4:27 下午
- */
-public interface ResultSetToListService {
- /**
- * SELECT * FROM websites
- * 查询所有记录,以List返回
- * list对象的每一个元素都是一条记录
- * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值
- *
- * @param rs
- * @return List<Map < String, Object>>
- */
- public List<Map<String, Object>> selectAll(ResultSet rs);
-} \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java
deleted file mode 100644
index 8bd6f13..0000000
--- a/src/main/java/cn/mesalab/main/BaselineApplication.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package cn.mesalab.main;
-
-import cn.mesalab.service.BaselineGeneration;
-import sun.rmi.runtime.Log;
-
-/**
- * @author yjy
- * @version 1.0
- * @date 2021/7/23 5:34 下午
- */
-public class BaselineApplication {
- public static void main(String[] args) {
- BaselineGeneration.perform();
- }
-}
diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java
deleted file mode 100644
index e72a0e6..0000000
--- a/src/main/java/cn/mesalab/service/BaselineGeneration.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package cn.mesalab.service;
-
-import cn.mesalab.config.ApplicationConfig;
-import cn.mesalab.dao.DruidData;
-import cn.mesalab.service.algorithm.KalmanFilter;
-import cn.mesalab.utils.HbaseUtils;
-import cn.mesalab.utils.SeriesUtils;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.math3.stat.StatUtils;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-
-/**
- * @author yjy
- * @version 1.0
- * baseline生成及写入
- * @date 2021/7/23 5:38 下午
- */
-public class BaselineGeneration {
- private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
-
- private static DruidData druidData;
- private static HbaseUtils hbaseUtils;
- private static Table hbaseTable;
- private static List<Map<String, Object>> batchDruidData = new ArrayList<>();
-
- private static List<String> attackTypeList = Arrays.asList(
- ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
- ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
- ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
- ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
- );
- private static final Integer BASELINE_POINT_NUM =
- ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
-
- /**
- * 程序执行
- */
- public static void perform() {
- long start = System.currentTimeMillis();
-
- druidData = DruidData.getInstance();
- hbaseUtils = HbaseUtils.getInstance();
- hbaseTable = hbaseUtils.getHbaseTable();
- LOG.info("Druid 成功建立连接");
-
- try{
- // baseline生成并写入
- generateBaselinesThread();
-
- long last = System.currentTimeMillis();
- LOG.warn("运行时间:" + (last - start));
-
- druidData.closeConn();
- hbaseTable.close();
- LOG.info("Druid 关闭连接");
-
- } catch (Exception e){
- e.printStackTrace();
- }
- System.exit(0);
- }
-
- /**
- * 多线程baseline生成入口
- * @throws InterruptedException
- */
- private static void generateBaselinesThread() throws InterruptedException {
- int threadNum = Runtime.getRuntime().availableProcessors();
-
- ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat("baseline-demo-%d").build();
-
- // 创建线程池
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- threadNum,
- threadNum,
- 0L,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(1024),
- namedThreadFactory,
- new ThreadPoolExecutor.AbortPolicy());
-
- // IP列表获取
- ArrayList<String> destinationIps = druidData.getServerIpList();
-
- LOG.info("共查询到服务端ip " +destinationIps.size() + " 个");
- LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
-
- // 分批进行IP baseline生成和处理
- List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
- for (List<String> batchIps: batchIpLists){
- if(batchIps.size()>0){
- executor.execute(() -> generateBaselines(batchIps));
- }
- }
-
- executor.shutdown();
- executor.awaitTermination(10L, TimeUnit.HOURS);
- }
-
- /**
- * 批量生成IP baseline
- * @param ipList ip列表
- */
- public static void generateBaselines(List<String> ipList){
- druidData = DruidData.getInstance();
- batchDruidData = druidData.readFromDruid(ipList);
-
- List<Put> putList = new ArrayList<>();
- for(String attackType: attackTypeList){
- for(String ip: ipList){
- int[] ipBaseline = generateSingleIpBaseline(ip, attackType);
- if (ipBaseline!= null){
- putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
- }
- }
- }
-
- try {
- hbaseTable.put(putList);
- LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- druidData.closeConn();
- }
-
- /**
- * 单ip baseline生成逻辑
- * @param ip ip
- * @param attackType 攻击类型
- * @return baseline序列,长度为 60/HISTORICAL_GRAD*24
- */
- private static int[] generateSingleIpBaseline(String ip, String attackType){
- // 查询
- List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType);
-
- if (originSeries.size()==0){
- return null;
- }
-
- // 时间序列缺失值补0
- List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries);
-
- int[] baselineArr = new int[BASELINE_POINT_NUM];
- List<Integer>series = completSeries.stream().map(
- i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
-
- // 判断ip出现频率
- if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){
- // 高频率
- double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(),
- ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE);
- Arrays.fill(baselineArr, (int)percentile);
- baselineArr = baselineFunction(series);
-
- } else {
- // 判断周期性
- if (SeriesUtils.isPeriod(series)){
- baselineArr = baselineFunction(series);
- } else {
- int ipPercentile = SeriesUtils.percentile(
- originSeries.stream().map(i ->
- Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
- ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
- Arrays.fill(baselineArr, ipPercentile);
- }
- }
-
- return baselineArr;
- }
-
- /**
- * baseline 生成算法
- * @param timeSeries 输入序列
- * @return 输出序列
- */
- private static int[] baselineFunction(List<Integer> timeSeries){
- int[] result;
- switch (ApplicationConfig.BASELINE_FUNCTION){
- case "KalmanFilter":
- KalmanFilter kalmanFilter = new KalmanFilter();
- kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM);
- result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray();
- break;
- default:
- result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray();
- }
- return result;
- }
-
- public static void main(String[] args) {
- perform();
- }
-
-}
diff --git a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
deleted file mode 100644
index 11a40c3..0000000
--- a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package cn.mesalab.service.algorithm;
-
-import cn.mesalab.config.ApplicationConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author yjy
- * @version 1.0
- * kalman滤波器
- * @date 2021/7/25 1:42 下午
- */
-
-public class KalmanFilter {
- private Integer predict;
- private Integer current;
- private Integer estimate;
- private double pdelt;
- private double mdelt;
- private double Gauss;
- private double kalmanGain;
- private final static double Q = ApplicationConfig.BASELINE_KALMAN_Q;
- private final static double R = ApplicationConfig.BASELINE_KALMAN_R;
-
- public KalmanFilter() {
- initial();
- }
-
- public void initial(){
- // TODO 调整
- pdelt = 1;
- mdelt = 1;
- }
-
- private ArrayList<Integer> smoothSeries;
- private ArrayList<Integer> forecastSeries;
-
- public Integer calSingleKalPoint(Integer oldValue, Integer value){
- //第一个估计值
- predict = oldValue;
- current = value;
- //高斯噪声方差
- Gauss = Math.sqrt(pdelt * pdelt + mdelt * mdelt) + Q;
- //估计方差
- kalmanGain = Math.sqrt((Gauss * Gauss)/(Gauss * Gauss + pdelt * pdelt)) + R;
- //估计值
- estimate = (int) (kalmanGain * (current - predict) + predict);
- //新的估计方差
- mdelt = Math.sqrt((1-kalmanGain) * Gauss * Gauss);
-
- return estimate;
- }
-
-
- public void forcast(List<Integer> historicalSeries, Integer length){
- int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size();
- smoothSeries = new ArrayList<Integer>();
- for(int i = 0; i < historicalSeries.size(); i++){
- int value = historicalSeries.get(i);
- oldvalue = calSingleKalPoint(oldvalue,value);
- smoothSeries.add(oldvalue);
- }
-
- forecastSeries = new ArrayList<>();
- Integer partitonNum = historicalSeries.size()/length;
- for(int i = 0; i<length; i++){
- long sum = 0;
- for (int period=0; period<partitonNum; period++){
- sum += smoothSeries.get(length*period+i);
- }
- forecastSeries.add((int)sum/partitonNum);
- }
- }
-
- public ArrayList<Integer> getSmoothSeries() {
- return smoothSeries;
- }
-
- public ArrayList<Integer> getAllRangeSeries() {
- ArrayList<Integer> results = new ArrayList<>();
- results.addAll(smoothSeries);
- results.addAll(forecastSeries);
- return results;
- }
-
- public ArrayList<Integer> getForecastSeries() {
- return forecastSeries;
- }
-}
diff --git a/src/main/java/cn/mesalab/utils/ConfigUtils.java b/src/main/java/cn/mesalab/utils/ConfigUtils.java
deleted file mode 100644
index 718648b..0000000
--- a/src/main/java/cn/mesalab/utils/ConfigUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package cn.mesalab.utils;
-
-
-import org.apache.log4j.Logger;
-
-import java.util.Properties;
-
-public class ConfigUtils {
- private static final Logger LOG = Logger.getLogger(ConfigUtils.class);
- private static Properties propCommon = new Properties();
-
- public static String getStringProperty(String key) {
- return propCommon.getProperty(key);
- }
- public static Float getFloatProperty(String key) {
- return Float.parseFloat(propCommon.getProperty(key));
- }
-
-
- public static Integer getIntProperty(String key) {
- return Integer.parseInt(propCommon.getProperty(key));
- }
-
- public static Long getLongProperty(String key) {
- return Long.parseLong(propCommon.getProperty(key));
- }
-
- public static Double getDoubleProperty(String key) {
- return Double.parseDouble(propCommon.getProperty(key));
- }
-
- public static Boolean getBooleanProperty(String key) {
- return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
- }
-
- static {
- try {
- propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties"));
-
- } catch (Exception e) {
- propCommon = null;
- LOG.error("配置加载失败");
- }
- }
-}
diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java
deleted file mode 100644
index 8224d37..0000000
--- a/src/main/java/cn/mesalab/utils/DruidUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package cn.mesalab.utils;
-
-import cn.mesalab.config.ApplicationConfig;
-import org.apache.calcite.avatica.AvaticaConnection;
-import org.apache.calcite.avatica.AvaticaStatement;
-import org.apache.hadoop.hbase.client.Table;
-
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Properties;
-
-/**
- * @author yjy
- * @version 1.0
- * @date 2021/7/23 4:50 下午
- */
-public class DruidUtils {
- private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>();
-
- private static final String DRUID_URL = ApplicationConfig.DRUID_URL;
- private static AvaticaStatement statement = null;
-
- /**
- * 打开连接
- * @throws SQLException
- */
- public static AvaticaConnection getConn() throws SQLException {
- Properties properties = new Properties();
- properties.setProperty("connectTimeout", String.valueOf(10*60*60));
- AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
- threadLocal.set(connection);
- return connection;
- }
-
- /**
- * 关闭连接
- */
- public static void closeConnection() throws SQLException{
- AvaticaConnection conn = threadLocal.get();
- if(conn != null){
- conn.close();
- threadLocal.remove();
- }
- }
-
- /**
- * 根据sql查询结果
- */
- public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{
- ResultSet resultSet = statement.executeQuery(sql);
- return resultSet;
- }
-
-}
diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java
deleted file mode 100644
index 86123bb..0000000
--- a/src/main/java/cn/mesalab/utils/HbaseUtils.java
+++ /dev/null
@@ -1 +0,0 @@
-package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file
diff --git a/src/main/java/cn/mesalab/utils/HttpClientUtils.java b/src/main/java/cn/mesalab/utils/HttpClientUtils.java
deleted file mode 100644
index 2f7f35a..0000000
--- a/src/main/java/cn/mesalab/utils/HttpClientUtils.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package cn.mesalab.utils;/**
- * @author yjy
- * @date 2021/8/3 3:57 下午
- * @version 1.0
- */public class HttpClientService {
-}
diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java
deleted file mode 100644
index 17f84b3..0000000
--- a/src/main/java/cn/mesalab/utils/SeriesUtils.java
+++ /dev/null
@@ -1,212 +0,0 @@
-package cn.mesalab.utils;
-
-import cn.mesalab.config.ApplicationConfig;
-import cn.mesalab.dao.DruidData;
-import cn.mesalab.service.BaselineGeneration;
-import com.google.common.collect.Lists;
-import org.jfree.util.Log;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.lang.reflect.Array;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
-import java.util.stream.Stream;
-
-
-/**
- * @author joy
- */
-public class SeriesUtils {
- private static final Logger LOG = LoggerFactory.getLogger(SeriesUtils.class);
-
- private static DruidData druidData = new DruidData();
-
- public static List<Map<String, Object>> readCsvToList(String filePath) {
- List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
-
- String line;
- try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
- br.readLine();
- while ((line = br.readLine()) != null) {
- List<String> column = Arrays.asList(line.split(","));
- // 保存记录中的每个<字段名-字段值>
- Map<String, Object> rowData = new HashMap<String, Object>();
- rowData.put("__time", column.get(0));
- rowData.put(ApplicationConfig.BASELINE_METRIC_TYPE, Integer.valueOf(column.get(1)));
-
- list.add(rowData);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return list;
- }
-
-
- /**
- * 时序数据补齐
- */
- public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){
- LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone
- .getDefault().toZoneId());
- LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone
- .getDefault().toZoneId());
- List<String> dateList = completionDate(startTime, endTime);
-
- // 补全后的结果
- List<Map<String, Object>> result = new ArrayList<>();
- boolean dbDateExist = false;
- for (String date : dateList) {
- //table为数据库查询出来的对象列表,结构为List<Map<String, Object>>
- for (Map<String, Object> row : originSeries) {
- if (row.get(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME).toString().substring(0,19).equals(date)) {
- //集合已包含该日期
- dbDateExist = true;
- result.add(row);
- break;
- }
- }
- //添加补全的数据到最后结果列表
- if (!dbDateExist) {
- Map<String, Object> temp = new HashMap<>(2);
- temp.put(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME, date);
- temp.put(ApplicationConfig.BASELINE_METRIC_TYPE, 0);
- result.add(temp);
- }
- dbDateExist = false;
- }
-
- return result;
- }
-
- private static List<String> completionDate(LocalDateTime startTime, LocalDateTime endTime) {
- //日期格式化
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ApplicationConfig.TIME_FORMAT);
- List<String> timeList = new ArrayList<>();
- //遍历给定的日期期间的每一天
- for (int i = 0; !Duration.between(startTime.plusMinutes(i+1), endTime).isNegative(); i+= ApplicationConfig.HISTORICAL_GRAD) {
- //添加日期
- timeList.add(startTime.plusMinutes(i).format(formatter));
- }
- return timeList;
- }
-
- /**
- * 判断是否存在以天为单位的周期特征
- * @param historicalSeries
- * @return
- */
- public static Boolean isPeriod(List<Integer> historicalSeries){
- Boolean result = true;
- List<List<Integer>> partitions = Lists.partition(historicalSeries, 24*60/ApplicationConfig.HISTORICAL_GRAD);
- List<Integer> aggregatedPart = Arrays.asList();
- try{
- aggregatedPart = columnAverage(partitions.subList(0, ApplicationConfig.READ_HISTORICAL_DAYS-1));
- } catch (IndexOutOfBoundsException e){
- Log.error("历史");
- }
-
- // Pearson corrcoef
- double pearsonCorrelationScore = getPearsonCorrelationScore(aggregatedPart.stream().mapToInt(Integer::valueOf).toArray(),
- partitions.get(partitions.size() - 1).stream().mapToInt(Integer::valueOf).toArray());
-
- if (pearsonCorrelationScore < ApplicationConfig.BASELINE_PERIOD_CORR_THRE){
- result=false;
- }
- return result;
- }
-
- public static double getPearsonCorrelationScore(int[] xData, int[] yData) {
- if (xData.length != yData.length) {
- Log.error("Pearson CorrelationScore 数组长度不相等!");
- }
- int xMeans;
- int yMeans;
- double numerator = 0;
- double denominator = 0;
-
- double result = 0;
- // 拿到两个数据的平均值
- xMeans = (int) getMeans(xData);
- yMeans = (int) getMeans(yData);
- // 计算皮尔逊系数的分子
- numerator = generateNumerator(xData, xMeans, yData, yMeans);
- // 计算皮尔逊系数的分母
- denominator = generateDenomiator(xData, xMeans, yData, yMeans);
- // 计算皮尔逊系数
- if(denominator>0) {
- result = numerator / denominator;
- }
- return result;
- }
-
- private static int generateNumerator(int[] xData, int xMeans, int[] yData, int yMeans) {
- int numerator = 0;
- for (int i = 0; i < xData.length; i++) {
- numerator += (xData[i] - xMeans) * (yData[i] - yMeans);
- }
- return numerator;
- }
-
- private static double generateDenomiator(int[] xData, int xMeans, int[] yData, int yMeans) {
- double xSum = 0.0;
- for (int i = 0; i < xData.length; i++) {
- xSum += (xData[i] - xMeans) * (xData[i] - xMeans);
- }
- double ySum = 0.0;
- for (int i = 0; i < yData.length; i++) {
- ySum += (yData[i] - yMeans) * (yData[i] - yMeans);
- }
- return Math.sqrt(xSum) * Math.sqrt(ySum);
- }
-
- private static double getMeans(int[] datas) {
- double sum = 0.0;
- for (int i = 0; i < datas.length; i++) {
- sum += datas[i];
- }
- return sum / datas.length;
- }
-
- public static List<Integer> columnAverage(List<List<Integer>> list){
- ArrayList<Integer> averages = new ArrayList<>();
- for(int i=0; i<list.get(0).size(); i++){
- int columnSum = 0;
- for(int j = 0; j< list.size(); j++){
- columnSum += list.get(j).get(i);
- }
- averages.add(columnSum / list.size());
- }
- return averages;
- }
-
- public static int percentile(List<Integer> latencies, double percentile) {
- Collections.sort(latencies);
- int index = (int) Math.ceil(percentile * latencies.size());
- return latencies.get(index-1);
- }
-
- public static void main(String[] args) {
- List<Integer> test = Arrays.asList(
- 1,2,3,4,5,
- 1,2,3,4,5,
- 1,2,3,4,5,
- 1,2,3,4,5,
- 1,2,3,4,5,
- 1,2,3,4,5,
- 1,2,3,4,5);
- System.out.println(columnAverage(Lists.partition(test, 5)));
-
-
-
- }
-
-
-}
-