summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-03 14:34:18 +0800
committeryinjiangyi <[email protected]>2021-08-03 14:34:18 +0800
commit4bcda7bb29098738b1637fba151dd771d0dfff7f (patch)
tree078f82981b99091d300c98febce7abea6d3904ff /src/main/java
parent2c041bee58686db69bec5ca4331ebecd360f79bd (diff)
to get help
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/cn/mesalab/dao/DruidData.java97
-rw-r--r--src/main/java/cn/mesalab/main/BaselineApplication.java1
-rw-r--r--src/main/java/cn/mesalab/service/BaselineGeneration.java62
-rw-r--r--src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java (renamed from src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java)8
-rw-r--r--src/main/java/cn/mesalab/utils/DruidUtils.java6
5 files changed, 107 insertions, 67 deletions
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java
index 9b391f7..2de41a7 100644
--- a/src/main/java/cn/mesalab/dao/DruidData.java
+++ b/src/main/java/cn/mesalab/dao/DruidData.java
@@ -6,6 +6,7 @@ import cn.mesalab.utils.DruidUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,69 +23,79 @@ import java.util.stream.Collectors;
/**
* @author yjy
* @version 1.0
+ * Druid 数据库操作
* @date 2021/7/23 4:56 下午
*/
public class DruidData {
private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
private static DruidData druidData;
-
private AvaticaConnection connection;
+ private AvaticaStatement statement;
+ private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
+ + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
+
{
+ connectionInit();
+ }
+
+ /**
+ * 连接初始化
+ */
+ private void connectionInit(){
try {
connection = DruidUtils.getConn();
+ statement = connection.createStatement();
+ statement.setQueryTimeout(0);
+
} catch (SQLException exception) {
exception.printStackTrace();
}
}
-
- private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
- + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
-
-
-
+ /**
+ * 获取实例
+ * @return DruidData实例
+ */
public static DruidData getInstance() {
druidData = new DruidData();
return druidData;
}
+ /**
+ * 获取distinct server ip
+ * @return ArrayList<String> ip列表
+ */
public ArrayList<String> getServerIpList() {
- Long startQueryIPLIstTime = System.currentTimeMillis();
- ArrayList<String> serverIPs = new ArrayList<String>();
+ Long startQueryIpLIstTime = System.currentTimeMillis();
+ ArrayList<String> serverIps = new ArrayList<String>();
String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
- + " WHERE " + timeFilter;// FOR TEST
+ + " WHERE " + timeFilter
+ + " LIMIT 10";// FOR TEST
try{
- ResultSet resultSet = DruidUtils.executeQuery(connection,sql);
+ ResultSet resultSet = DruidUtils.executeQuery(statement,sql);
while(resultSet.next()){
String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
- serverIPs.add(ip);
+ serverIps.add(ip);
}
} catch (Exception e){
e.printStackTrace();
}
- Long endQueryIPListTime = System.currentTimeMillis();
- LOG.info("性能测试:ip list查询耗时——"+(endQueryIPListTime-startQueryIPLIstTime));
+ Long endQueryIpListTime = System.currentTimeMillis();
+ LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime));
- return serverIPs;
- }
-
- public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
- List<Map<String, Object>> rsList = new ArrayList<>();
- try{
- rsList = allData.stream().
- filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
- )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
- .collect(Collectors.toList());
- } catch (NullPointerException e){
- }
- return rsList;
+ return serverIps;
}
+ /**
+ * 从Druid读取目标IP相关数据
+ * @param ipList ip列表
+ * @return 数据库读取结果
+ */
public List<Map<String, Object>> readFromDruid(List<String> ipList){
List<Map<String, Object>> rsList = null;
ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList());
@@ -98,7 +109,7 @@ public class DruidData {
+ " IN " + ipString
+ " AND " + timeFilter;
try{
- ResultSet resultSet = DruidUtils.executeQuery(connection,sql);
+ ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet);
} catch (Exception e){
@@ -107,6 +118,29 @@ public class DruidData {
return rsList;
}
+ /**
+ * 从数据库读取结果中筛选指定ip的指定攻击类型的数据
+ * @param allData 数据库读取结果
+ * @param ip 指定ip
+ * @param attackType 指定攻击类型
+ * @return 筛选结果
+ */
+ public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
+ List<Map<String, Object>> rsList = new ArrayList<>();
+ try{
+ rsList = allData.stream().
+ filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
+ )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
+ .collect(Collectors.toList());
+ } catch (NullPointerException e){
+ }
+ return rsList;
+ }
+
+ /**
+ * 计算查询时间范围,可指定时间范围(测试)或使用默认配置
+ * @return 时间范围起始点和终止点
+ */
public Tuple2<Long, Long> getTimeLimit(){
long maxTime = 0L;
long minTime = 0L;
@@ -140,6 +174,9 @@ public class DruidData {
return getCurrentDay(0);
}
+ /**
+ * 关闭当前DruidData
+ */
public void closeConn(){
try {
DruidUtils.closeConnection();
diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java
index 3f443b7..8bd6f13 100644
--- a/src/main/java/cn/mesalab/main/BaselineApplication.java
+++ b/src/main/java/cn/mesalab/main/BaselineApplication.java
@@ -1,6 +1,7 @@
package cn.mesalab.main;
import cn.mesalab.service.BaselineGeneration;
+import sun.rmi.runtime.Log;
/**
* @author yjy
diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java
index f588164..bd232ce 100644
--- a/src/main/java/cn/mesalab/service/BaselineGeneration.java
+++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java
@@ -2,7 +2,7 @@ package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
-import cn.mesalab.service.BaselineService.KalmanFilter;
+import cn.mesalab.service.algorithm.KalmanFilter;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.SeriesUtils;
import com.google.common.collect.Lists;
@@ -21,6 +21,7 @@ import java.util.stream.Collectors;
/**
* @author yjy
* @version 1.0
+ * baseline生成及写入
* @date 2021/7/23 5:38 下午
*/
public class BaselineGeneration {
@@ -37,18 +38,22 @@ public class BaselineGeneration {
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
);
- private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
+ private static final Integer BASELINE_POINT_NUM =
+ ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
+ /**
+ * 程序执行
+ */
public static void perform() {
long start = System.currentTimeMillis();
druidData = DruidData.getInstance();
hbaseUtils = HbaseUtils.getInstance();
hbaseTable = hbaseUtils.getHbaseTable();
-
LOG.info("Druid 成功建立连接");
try{
+ // baseline生成并写入
generateBaselinesThread();
long last = System.currentTimeMillis();
@@ -64,10 +69,12 @@ public class BaselineGeneration {
System.exit(0);
}
+ /**
+ * 多线程baseline生成入口
+ * @throws InterruptedException
+ */
private static void generateBaselinesThread() throws InterruptedException {
int threadNum = Runtime.getRuntime().availableProcessors();
-// int threadNum = 10;
-
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-demo-%d").build();
@@ -82,15 +89,13 @@ public class BaselineGeneration {
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
- // baseline 生成及写入
- // 耗时测试
- Long startQueryIPList = System.currentTimeMillis();
+ // IP列表获取
ArrayList<String> destinationIps = druidData.getServerIpList();
- Long endQueryIPList = System.currentTimeMillis();
LOG.info("共查询到服务端ip " +destinationIps.size() + " 个");
LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
+ // 分批进行IP baseline生成和处理
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){
@@ -102,27 +107,24 @@ public class BaselineGeneration {
executor.awaitTermination(10L, TimeUnit.HOURS);
}
+ /**
+ * 批量生成IP baseline
+ * @param ipList ip列表
+ */
public static void generateBaselines(List<String> ipList){
- Long startGenerationBaselines= System.currentTimeMillis();
- Long startReadDruidData = System.currentTimeMillis();
-
+ druidData = DruidData.getInstance();
batchDruidData = druidData.readFromDruid(ipList);
- Long endReadDruidData = System.currentTimeMillis();
- //LOG.info("读取Druid数据耗时:"+(endReadDruidData-startReadDruidData));
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){
for(String ip: ipList){
int[] ipBaseline = generateSingleIpBaseline(ip, attackType);
- if (!(ipBaseline ==null)){
+ if (ipBaseline!= null){
putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
}
}
}
- Long endGenerationBaselines= System.currentTimeMillis();
- //LOG.info("BaselineGeneration耗时:"+(endGenerationBaselines-endReadDruidData));
-
try {
hbaseTable.put(putList);
LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
@@ -130,25 +132,27 @@ public class BaselineGeneration {
e.printStackTrace();
}
- Long endWriteTime = System.currentTimeMillis();
- //LOG.info("BaselineWriteIn耗时:"+(endWriteTime-endGenerationBaselines));
+ druidData.closeConn();
}
+ /**
+ * 单ip baseline生成逻辑
+ * @param ip ip
+ * @param attackType 攻击类型
+ * @return baseline序列,长度为 60/HISTORICAL_GRAD*24
+ */
private static int[] generateSingleIpBaseline(String ip, String attackType){
// 查询
- Long startQuerySingleIPTime = System.currentTimeMillis();
List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType);
if (originSeries.size()==0){
return null;
}
- Long endQuerySingleIPTime = System.currentTimeMillis();
-
// 时间序列缺失值补0
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries);
- int[] baselineArr = new int[completSeries.size()];
+ int[] baselineArr = new int[BASELINE_POINT_NUM];
List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
@@ -173,14 +177,14 @@ public class BaselineGeneration {
}
}
- Long endGenerateSingleIPTime = System.currentTimeMillis();
- //LOG.info("性能测试:单个baseline生成耗时——"+(endGenerateSingleIPTime-endQuerySingleIPTime));
- //System.out.println(ip);
- //System.out.println(attackType + Arrays.toString(baselineArr));
-
return baselineArr;
}
+ /**
+ * baseline 生成算法
+ * @param timeSeries 输入序列
+ * @return 输出序列
+ */
private static int[] baselineFunction(List<Integer> timeSeries){
int[] result;
switch (ApplicationConfig.BASELINE_FUNCTION){
diff --git a/src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
index 68fc182..11a40c3 100644
--- a/src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java
+++ b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java
@@ -1,4 +1,4 @@
-package cn.mesalab.service.BaselineService;
+package cn.mesalab.service.algorithm;
import cn.mesalab.config.ApplicationConfig;
@@ -8,12 +8,11 @@ import java.util.List;
/**
* @author yjy
* @version 1.0
+ * kalman滤波器
* @date 2021/7/25 1:42 下午
*/
public class KalmanFilter {
-
- /**Kalman Filter*/
private Integer predict;
private Integer current;
private Integer estimate;
@@ -29,6 +28,7 @@ public class KalmanFilter {
}
public void initial(){
+ // TODO 调整
pdelt = 1;
mdelt = 1;
}
@@ -54,9 +54,7 @@ public class KalmanFilter {
public void forcast(List<Integer> historicalSeries, Integer length){
- // 初始值计算
int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size();
- // 滤波
smoothSeries = new ArrayList<Integer>();
for(int i = 0; i < historicalSeries.size(); i++){
int value = historicalSeries.get(i);
diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java
index 18f6393..8224d37 100644
--- a/src/main/java/cn/mesalab/utils/DruidUtils.java
+++ b/src/main/java/cn/mesalab/utils/DruidUtils.java
@@ -19,6 +19,7 @@ public class DruidUtils {
private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>();
private static final String DRUID_URL = ApplicationConfig.DRUID_URL;
+ private static AvaticaStatement statement = null;
/**
* 打开连接
@@ -46,9 +47,8 @@ public class DruidUtils {
/**
* 根据sql查询结果
*/
- public static ResultSet executeQuery (AvaticaConnection connection, String sql) throws SQLException{
- AvaticaStatement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql);
+ public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{
+ ResultSet resultSet = statement.executeQuery(sql);
return resultSet;
}