diff options
| author | yinjiangyi <[email protected]> | 2021-08-03 16:56:37 +0800 |
|---|---|---|
| committer | yinjiangyi <[email protected]> | 2021-08-03 16:56:37 +0800 |
| commit | 03849d5f3f45e04b0060a83df01826a00c929d3b (patch) | |
| tree | 4b825dc642cb6eb9a060e54bf8d69288fbee4904 /src/main/java | |
| parent | a39609557c4e0e11a145634f7a6ac345b4145684 (diff) | |
update .gitignore
Diffstat (limited to 'src/main/java')
| -rw-r--r-- | src/main/java/cn/mesalab/config/ApplicationConfig.java | 53 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/dao/DruidData.java | 187 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java | 44 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/dao/ResultSetToListService.java | 24 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/main/BaselineApplication.java | 15 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineGeneration.java | 206 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java | 90 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/ConfigUtils.java | 45 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/DruidUtils.java | 55 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/HbaseUtils.java | 1 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/HttpClientUtils.java | 6 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/SeriesUtils.java | 212 |
12 files changed, 0 insertions, 938 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java deleted file mode 100644 index 9e0e447..0000000 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -package cn.mesalab.config; - -import cn.mesalab.utils.ConfigUtils; - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/24 10:23 上午 - */ -public class ApplicationConfig { - - public static final String DRUID_URL= ConfigUtils.getStringProperty("druid.url"); - public static final String DRUID_DRIVER = ConfigUtils.getStringProperty("druid.driver"); - public static final String DRUID_TABLE = ConfigUtils.getStringProperty("druid.table"); - - - public static final Integer DRUID_TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("read.druid.time.limit.type"); - public static final Long READ_DRUID_MAX_TIME = ConfigUtils.getLongProperty("read.druid.max.time"); - public static final Long READ_DRUID_MIN_TIME = ConfigUtils.getLongProperty("read.druid.min.time"); - - public static final Integer READ_HISTORICAL_DAYS = ConfigUtils.getIntProperty("read.historical.days"); - public static final Integer HISTORICAL_GRAD = ConfigUtils.getIntProperty("historical.grad"); - public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format"); - public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); - - public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); - public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); - public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); - public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); - public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname"); - public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname"); - public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname"); - - public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); - public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold"); - public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile"); - public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function"); - public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days"); - public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile"); - - - public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table"); - public static final String HBASE_ZOOKEEPER_QUORUM= ConfigUtils.getStringProperty("hbase.zookeeper.quorum"); - public static final String HBASE_ZOOKEEPER_CLIENT_PORT= ConfigUtils.getStringProperty("hbase.zookeeper.client.port"); - - - public static final Double BASELINE_KALMAN_Q = ConfigUtils.getDoubleProperty("baseline.kalman.q"); - public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r"); - - public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count"); - public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size"); -} - diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java deleted file mode 100644 index 2de41a7..0000000 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ /dev/null @@ -1,187 +0,0 @@ -package cn.mesalab.dao; - -import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.Impl.ResultSetToListServiceImp; -import cn.mesalab.utils.DruidUtils; -import io.vavr.Tuple; -import io.vavr.Tuple2; -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaStatement; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - - -/** - * @author yjy - * @version 1.0 - * Druid 数据库操作 - * @date 2021/7/23 4:56 下午 - */ -public class DruidData { - - private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); - private static DruidData druidData; - private AvaticaConnection connection; - private AvaticaStatement statement; - private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 - + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; - - - { - connectionInit(); - } - - /** - * 连接初始化 - */ - private void connectionInit(){ - try { - connection = DruidUtils.getConn(); - statement = connection.createStatement(); - statement.setQueryTimeout(0); - - } catch (SQLException exception) { - exception.printStackTrace(); - } - } - - /** - * 获取实例 - * @return DruidData实例 - */ - public static DruidData getInstance() { - druidData = new DruidData(); - return druidData; - } - - /** - * 获取distinct server ip - * @return ArrayList<String> ip列表 - */ - public ArrayList<String> getServerIpList() { - Long startQueryIpLIstTime = System.currentTimeMillis(); - ArrayList<String> serverIps = new ArrayList<String>(); - String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + timeFilter - + " LIMIT 10";// FOR TEST - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement,sql); - while(resultSet.next()){ - String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - serverIps.add(ip); - } - } catch (Exception e){ - e.printStackTrace(); - } - Long endQueryIpListTime = System.currentTimeMillis(); - LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime)); - - return serverIps; - } - - /** - * 从Druid读取目标IP相关数据 - * @param ipList ip列表 - * @return 数据库读取结果 - */ - public List<Map<String, Object>> readFromDruid(List<String> ipList){ - List<Map<String, Object>> rsList = null; - ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList()); - String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")"; - String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE - + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + " IN " + ipString - + " AND " + timeFilter; - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement, sql); - ResultSetToListService service = new ResultSetToListServiceImp(); - rsList = service.selectAll(resultSet); - } catch (Exception e){ - e.printStackTrace(); - } - return rsList; - } - - /** - * 从数据库读取结果中筛选指定ip的指定攻击类型的数据 - * @param allData 数据库读取结果 - * @param ip 指定ip - * @param attackType 指定攻击类型 - * @return 筛选结果 - */ - public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){ - List<Map<String, Object>> rsList = new ArrayList<>(); - try{ - rsList = allData.stream(). - filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip)) - )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType))) - .collect(Collectors.toList()); - } catch (NullPointerException e){ - } - return rsList; - } - - /** - * 计算查询时间范围,可指定时间范围(测试)或使用默认配置 - * @return 时间范围起始点和终止点 - */ - public Tuple2<Long, Long> getTimeLimit(){ - long maxTime = 0L; - long minTime = 0L; - switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){ - case 0: - maxTime = getCurrentDay(); - minTime = getCurrentDay(-ApplicationConfig.READ_HISTORICAL_DAYS); - break; - case 1: - maxTime = ApplicationConfig.READ_DRUID_MAX_TIME; - minTime = ApplicationConfig.READ_DRUID_MIN_TIME; - break; - default: - LOG.warn("没有设置Druid数据读取方式"); - } - return Tuple.of(maxTime, minTime); - } - - private long getCurrentDay(int bias) { - Calendar calendar = Calendar.getInstance(); - calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias); - calendar.set(Calendar.HOUR_OF_DAY, 0); - calendar.set(Calendar.HOUR_OF_DAY, 0); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - calendar.set(Calendar.MILLISECOND, 0); - return calendar.getTimeInMillis(); - } - - private long getCurrentDay(){ - return getCurrentDay(0); - } - - /** - * 关闭当前DruidData - */ - public void closeConn(){ - try { - DruidUtils.closeConnection(); - } catch (SQLException exception) { - exception.printStackTrace(); - } - } -} diff --git a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java deleted file mode 100644 index 7867353..0000000 --- a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java +++ /dev/null @@ -1,44 +0,0 @@ -package cn.mesalab.dao.Impl; - -import cn.mesalab.dao.ResultSetToListService; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/24 4:29 下午 - */ -public class ResultSetToListServiceImp implements ResultSetToListService { - - /** - * SELECT 查询记录以List结构返回,每一个元素是一条记录 - * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值 - * - * @param rs - * @return List<Map<String, Object>> - */ - @Override - public List<Map<String, Object>> selectAll(ResultSet rs) { - List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); - try { - ResultSetMetaData rmd = rs.getMetaData(); - int columnCount = rmd.getColumnCount(); - while (rs.next()) { - Map<String, Object> rowData = new HashMap<String, Object>(); - for (int i = 1; i <= columnCount; ++i) { - rowData.put(rmd.getColumnName(i), rs.getObject(i)); - } - list.add(rowData); - } - } catch (Exception ex) { - ex.printStackTrace(); - } - return list; - } -}
\ No newline at end of file diff --git a/src/main/java/cn/mesalab/dao/ResultSetToListService.java b/src/main/java/cn/mesalab/dao/ResultSetToListService.java deleted file mode 100644 index 103e330..0000000 --- a/src/main/java/cn/mesalab/dao/ResultSetToListService.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.mesalab.dao; - -import java.sql.ResultSet; -import java.util.List; -import java.util.Map; - - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/24 4:27 下午 - */ -public interface ResultSetToListService { - /** - * SELECT * FROM websites - * 查询所有记录,以List返回 - * list对象的每一个元素都是一条记录 - * 每条记录保存在Map<String, Object>里面,String类型指字段名字,Object对应字段值 - * - * @param rs - * @return List<Map < String, Object>> - */ - public List<Map<String, Object>> selectAll(ResultSet rs); -}
\ No newline at end of file diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java deleted file mode 100644 index 8bd6f13..0000000 --- a/src/main/java/cn/mesalab/main/BaselineApplication.java +++ /dev/null @@ -1,15 +0,0 @@ -package cn.mesalab.main; - -import cn.mesalab.service.BaselineGeneration; -import sun.rmi.runtime.Log; - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/23 5:34 下午 - */ -public class BaselineApplication { - public static void main(String[] args) { - BaselineGeneration.perform(); - } -} diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java deleted file mode 100644 index e72a0e6..0000000 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ /dev/null @@ -1,206 +0,0 @@ -package cn.mesalab.service; - -import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.DruidData; -import cn.mesalab.service.algorithm.KalmanFilter; -import cn.mesalab.utils.HbaseUtils; -import cn.mesalab.utils.SeriesUtils; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.math3.stat.StatUtils; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -/** - * @author yjy - * @version 1.0 - * baseline生成及写入 - * @date 2021/7/23 5:38 下午 - */ -public class BaselineGeneration { - private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); - - private static DruidData druidData; - private static HbaseUtils hbaseUtils; - private static Table hbaseTable; - private static List<Map<String, Object>> batchDruidData = new ArrayList<>(); - - private static List<String> attackTypeList = Arrays.asList( - ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL - ); - private static final Integer BASELINE_POINT_NUM = - ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); - - /** - * 程序执行 - */ - public static void perform() { - long start = System.currentTimeMillis(); - - druidData = DruidData.getInstance(); - hbaseUtils = HbaseUtils.getInstance(); - hbaseTable = hbaseUtils.getHbaseTable(); - LOG.info("Druid 成功建立连接"); - - try{ - // baseline生成并写入 - generateBaselinesThread(); - - long last = System.currentTimeMillis(); - LOG.warn("运行时间:" + (last - start)); - - druidData.closeConn(); - hbaseTable.close(); - LOG.info("Druid 关闭连接"); - - } catch (Exception e){ - e.printStackTrace(); - } - System.exit(0); - } - - /** - * 多线程baseline生成入口 - * @throws InterruptedException - */ - private static void generateBaselinesThread() throws InterruptedException { - int threadNum = Runtime.getRuntime().availableProcessors(); - - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("baseline-demo-%d").build(); - - // 创建线程池 - ThreadPoolExecutor executor = new ThreadPoolExecutor( - threadNum, - threadNum, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(1024), - namedThreadFactory, - new ThreadPoolExecutor.AbortPolicy()); - - // IP列表获取 - ArrayList<String> destinationIps = druidData.getServerIpList(); - - LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); - LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); - - // 分批进行IP baseline生成和处理 - List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); - for (List<String> batchIps: batchIpLists){ - if(batchIps.size()>0){ - executor.execute(() -> generateBaselines(batchIps)); - } - } - - executor.shutdown(); - executor.awaitTermination(10L, TimeUnit.HOURS); - } - - /** - * 批量生成IP baseline - * @param ipList ip列表 - */ - public static void generateBaselines(List<String> ipList){ - druidData = DruidData.getInstance(); - batchDruidData = druidData.readFromDruid(ipList); - - List<Put> putList = new ArrayList<>(); - for(String attackType: attackTypeList){ - for(String ip: ipList){ - int[] ipBaseline = generateSingleIpBaseline(ip, attackType); - if (ipBaseline!= null){ - putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); - } - } - } - - try { - hbaseTable.put(putList); - LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); - } catch (IOException e) { - e.printStackTrace(); - } - - druidData.closeConn(); - } - - /** - * 单ip baseline生成逻辑 - * @param ip ip - * @param attackType 攻击类型 - * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 - */ - private static int[] generateSingleIpBaseline(String ip, String attackType){ - // 查询 - List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType); - - if (originSeries.size()==0){ - return null; - } - - // 时间序列缺失值补0 - List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries); - - int[] baselineArr = new int[BASELINE_POINT_NUM]; - List<Integer>series = completSeries.stream().map( - i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); - - // 判断ip出现频率 - if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ - // 高频率 - double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), - ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); - Arrays.fill(baselineArr, (int)percentile); - baselineArr = baselineFunction(series); - - } else { - // 判断周期性 - if (SeriesUtils.isPeriod(series)){ - baselineArr = baselineFunction(series); - } else { - int ipPercentile = SeriesUtils.percentile( - originSeries.stream().map(i -> - Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), - ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); - Arrays.fill(baselineArr, ipPercentile); - } - } - - return baselineArr; - } - - /** - * baseline 生成算法 - * @param timeSeries 输入序列 - * @return 输出序列 - */ - private static int[] baselineFunction(List<Integer> timeSeries){ - int[] result; - switch (ApplicationConfig.BASELINE_FUNCTION){ - case "KalmanFilter": - KalmanFilter kalmanFilter = new KalmanFilter(); - kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); - result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); - break; - default: - result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); - } - return result; - } - - public static void main(String[] args) { - perform(); - } - -} diff --git a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java deleted file mode 100644 index 11a40c3..0000000 --- a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java +++ /dev/null @@ -1,90 +0,0 @@ -package cn.mesalab.service.algorithm; - -import cn.mesalab.config.ApplicationConfig; - -import java.util.ArrayList; -import java.util.List; - -/** - * @author yjy - * @version 1.0 - * kalman滤波器 - * @date 2021/7/25 1:42 下午 - */ - -public class KalmanFilter { - private Integer predict; - private Integer current; - private Integer estimate; - private double pdelt; - private double mdelt; - private double Gauss; - private double kalmanGain; - private final static double Q = ApplicationConfig.BASELINE_KALMAN_Q; - private final static double R = ApplicationConfig.BASELINE_KALMAN_R; - - public KalmanFilter() { - initial(); - } - - public void initial(){ - // TODO 调整 - pdelt = 1; - mdelt = 1; - } - - private ArrayList<Integer> smoothSeries; - private ArrayList<Integer> forecastSeries; - - public Integer calSingleKalPoint(Integer oldValue, Integer value){ - //第一个估计值 - predict = oldValue; - current = value; - //高斯噪声方差 - Gauss = Math.sqrt(pdelt * pdelt + mdelt * mdelt) + Q; - //估计方差 - kalmanGain = Math.sqrt((Gauss * Gauss)/(Gauss * Gauss + pdelt * pdelt)) + R; - //估计值 - estimate = (int) (kalmanGain * (current - predict) + predict); - //新的估计方差 - mdelt = Math.sqrt((1-kalmanGain) * Gauss * Gauss); - - return estimate; - } - - - public void forcast(List<Integer> historicalSeries, Integer length){ - int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size(); - smoothSeries = new ArrayList<Integer>(); - for(int i = 0; i < historicalSeries.size(); i++){ - int value = historicalSeries.get(i); - oldvalue = calSingleKalPoint(oldvalue,value); - smoothSeries.add(oldvalue); - } - - forecastSeries = new ArrayList<>(); - Integer partitonNum = historicalSeries.size()/length; - for(int i = 0; i<length; i++){ - long sum = 0; - for (int period=0; period<partitonNum; period++){ - sum += smoothSeries.get(length*period+i); - } - forecastSeries.add((int)sum/partitonNum); - } - } - - public ArrayList<Integer> getSmoothSeries() { - return smoothSeries; - } - - public ArrayList<Integer> getAllRangeSeries() { - ArrayList<Integer> results = new ArrayList<>(); - results.addAll(smoothSeries); - results.addAll(forecastSeries); - return results; - } - - public ArrayList<Integer> getForecastSeries() { - return forecastSeries; - } -} diff --git a/src/main/java/cn/mesalab/utils/ConfigUtils.java b/src/main/java/cn/mesalab/utils/ConfigUtils.java deleted file mode 100644 index 718648b..0000000 --- a/src/main/java/cn/mesalab/utils/ConfigUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -package cn.mesalab.utils; - - -import org.apache.log4j.Logger; - -import java.util.Properties; - -public class ConfigUtils { - private static final Logger LOG = Logger.getLogger(ConfigUtils.class); - private static Properties propCommon = new Properties(); - - public static String getStringProperty(String key) { - return propCommon.getProperty(key); - } - public static Float getFloatProperty(String key) { - return Float.parseFloat(propCommon.getProperty(key)); - } - - - public static Integer getIntProperty(String key) { - return Integer.parseInt(propCommon.getProperty(key)); - } - - public static Long getLongProperty(String key) { - return Long.parseLong(propCommon.getProperty(key)); - } - - public static Double getDoubleProperty(String key) { - return Double.parseDouble(propCommon.getProperty(key)); - } - - public static Boolean getBooleanProperty(String key) { - return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); - } - - static { - try { - propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties")); - - } catch (Exception e) { - propCommon = null; - LOG.error("配置加载失败"); - } - } -} diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java deleted file mode 100644 index 8224d37..0000000 --- a/src/main/java/cn/mesalab/utils/DruidUtils.java +++ /dev/null @@ -1,55 +0,0 @@ -package cn.mesalab.utils; - -import cn.mesalab.config.ApplicationConfig; -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaStatement; -import org.apache.hadoop.hbase.client.Table; - -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Properties; - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/23 4:50 下午 - */ -public class DruidUtils { - private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>(); - - private static final String DRUID_URL = ApplicationConfig.DRUID_URL; - private static AvaticaStatement statement = null; - - /** - * 打开连接 - * @throws SQLException - */ - public static AvaticaConnection getConn() throws SQLException { - Properties properties = new Properties(); - properties.setProperty("connectTimeout", String.valueOf(10*60*60)); - AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); - threadLocal.set(connection); - return connection; - } - - /** - * 关闭连接 - */ - public static void closeConnection() throws SQLException{ - AvaticaConnection conn = threadLocal.get(); - if(conn != null){ - conn.close(); - threadLocal.remove(); - } - } - - /** - * 根据sql查询结果 - */ - public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{ - ResultSet resultSet = statement.executeQuery(sql); - return resultSet; - } - -} diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java deleted file mode 100644 index 86123bb..0000000 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ /dev/null @@ -1 +0,0 @@ -package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
Table hbaseTable = null;
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
// FOR TEST
// start
if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){
attackType = "TCP SYN Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){
attackType = "UDP Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){
attackType = "ICMP Flood";
} else {
attackType = "DNS Amplification";
}
// end
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
private static Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
}
\ No newline at end of file diff --git a/src/main/java/cn/mesalab/utils/HttpClientUtils.java b/src/main/java/cn/mesalab/utils/HttpClientUtils.java deleted file mode 100644 index 2f7f35a..0000000 --- a/src/main/java/cn/mesalab/utils/HttpClientUtils.java +++ /dev/null @@ -1,6 +0,0 @@ -package cn.mesalab.utils;/** - * @author yjy - * @date 2021/8/3 3:57 下午 - * @version 1.0 - */public class HttpClientService { -} diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java deleted file mode 100644 index 17f84b3..0000000 --- a/src/main/java/cn/mesalab/utils/SeriesUtils.java +++ /dev/null @@ -1,212 +0,0 @@ -package cn.mesalab.utils; - -import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.DruidData; -import cn.mesalab.service.BaselineGeneration; -import com.google.common.collect.Lists; -import org.jfree.util.Log; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.lang.reflect.Array; -import java.time.Duration; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.stream.Stream; - - -/** - * @author joy - */ -public class SeriesUtils { - private static final Logger LOG = LoggerFactory.getLogger(SeriesUtils.class); - - private static DruidData druidData = new DruidData(); - - public static List<Map<String, Object>> readCsvToList(String filePath) { - List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); - - String line; - try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { - br.readLine(); - while ((line = br.readLine()) != null) { - List<String> column = Arrays.asList(line.split(",")); - // 保存记录中的每个<字段名-字段值> - Map<String, Object> rowData = new HashMap<String, Object>(); - rowData.put("__time", column.get(0)); - rowData.put(ApplicationConfig.BASELINE_METRIC_TYPE, Integer.valueOf(column.get(1))); - - list.add(rowData); - } - } catch (Exception e) { - e.printStackTrace(); - } - return list; - } - - - /** - * 时序数据补齐 - */ - public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){ - LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone - .getDefault().toZoneId()); - LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone - .getDefault().toZoneId()); - List<String> dateList = completionDate(startTime, endTime); - - // 补全后的结果 - List<Map<String, Object>> result = new ArrayList<>(); - boolean dbDateExist = false; - for (String date : dateList) { - //table为数据库查询出来的对象列表,结构为List<Map<String, Object>> - for (Map<String, Object> row : originSeries) { - if (row.get(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME).toString().substring(0,19).equals(date)) { - //集合已包含该日期 - dbDateExist = true; - result.add(row); - break; - } - } - //添加补全的数据到最后结果列表 - if (!dbDateExist) { - Map<String, Object> temp = new HashMap<>(2); - temp.put(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME, date); - temp.put(ApplicationConfig.BASELINE_METRIC_TYPE, 0); - result.add(temp); - } - dbDateExist = false; - } - - return result; - } - - private static List<String> completionDate(LocalDateTime startTime, LocalDateTime endTime) { - //日期格式化 - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ApplicationConfig.TIME_FORMAT); - List<String> timeList = new ArrayList<>(); - //遍历给定的日期期间的每一天 - for (int i = 0; !Duration.between(startTime.plusMinutes(i+1), endTime).isNegative(); i+= ApplicationConfig.HISTORICAL_GRAD) { - //添加日期 - timeList.add(startTime.plusMinutes(i).format(formatter)); - } - return timeList; - } - - /** - * 判断是否存在以天为单位的周期特征 - * @param historicalSeries - * @return - */ - public static Boolean isPeriod(List<Integer> historicalSeries){ - Boolean result = true; - List<List<Integer>> partitions = Lists.partition(historicalSeries, 24*60/ApplicationConfig.HISTORICAL_GRAD); - List<Integer> aggregatedPart = Arrays.asList(); - try{ - aggregatedPart = columnAverage(partitions.subList(0, ApplicationConfig.READ_HISTORICAL_DAYS-1)); - } catch (IndexOutOfBoundsException e){ - Log.error("历史"); - } - - // Pearson corrcoef - double pearsonCorrelationScore = getPearsonCorrelationScore(aggregatedPart.stream().mapToInt(Integer::valueOf).toArray(), - partitions.get(partitions.size() - 1).stream().mapToInt(Integer::valueOf).toArray()); - - if (pearsonCorrelationScore < ApplicationConfig.BASELINE_PERIOD_CORR_THRE){ - result=false; - } - return result; - } - - public static double getPearsonCorrelationScore(int[] xData, int[] yData) { - if (xData.length != yData.length) { - Log.error("Pearson CorrelationScore 数组长度不相等!"); - } - int xMeans; - int yMeans; - double numerator = 0; - double denominator = 0; - - double result = 0; - // 拿到两个数据的平均值 - xMeans = (int) getMeans(xData); - yMeans = (int) getMeans(yData); - // 计算皮尔逊系数的分子 - numerator = generateNumerator(xData, xMeans, yData, yMeans); - // 计算皮尔逊系数的分母 - denominator = generateDenomiator(xData, xMeans, yData, yMeans); - // 计算皮尔逊系数 - if(denominator>0) { - result = numerator / denominator; - } - return result; - } - - private static int generateNumerator(int[] xData, int xMeans, int[] yData, int yMeans) { - int numerator = 0; - for (int i = 0; i < xData.length; i++) { - numerator += (xData[i] - xMeans) * (yData[i] - yMeans); - } - return numerator; - } - - private static double generateDenomiator(int[] xData, int xMeans, int[] yData, int yMeans) { - double xSum = 0.0; - for (int i = 0; i < xData.length; i++) { - xSum += (xData[i] - xMeans) * (xData[i] - xMeans); - } - double ySum = 0.0; - for (int i = 0; i < yData.length; i++) { - ySum += (yData[i] - yMeans) * (yData[i] - yMeans); - } - return Math.sqrt(xSum) * Math.sqrt(ySum); - } - - private static double getMeans(int[] datas) { - double sum = 0.0; - for (int i = 0; i < datas.length; i++) { - sum += datas[i]; - } - return sum / datas.length; - } - - public static List<Integer> columnAverage(List<List<Integer>> list){ - ArrayList<Integer> averages = new ArrayList<>(); - for(int i=0; i<list.get(0).size(); i++){ - int columnSum = 0; - for(int j = 0; j< list.size(); j++){ - columnSum += list.get(j).get(i); - } - averages.add(columnSum / list.size()); - } - return averages; - } - - public static int percentile(List<Integer> latencies, double percentile) { - Collections.sort(latencies); - int index = (int) Math.ceil(percentile * latencies.size()); - return latencies.get(index-1); - } - - public static void main(String[] args) { - List<Integer> test = Arrays.asList( - 1,2,3,4,5, - 1,2,3,4,5, - 1,2,3,4,5, - 1,2,3,4,5, - 1,2,3,4,5, - 1,2,3,4,5, - 1,2,3,4,5); - System.out.println(columnAverage(Lists.partition(test, 5))); - - - - } - - -} - |
