diff options
| author | yinjiangyi <[email protected]> | 2021-08-03 14:34:18 +0800 |
|---|---|---|
| committer | yinjiangyi <[email protected]> | 2021-08-03 14:34:18 +0800 |
| commit | 4bcda7bb29098738b1637fba151dd771d0dfff7f (patch) | |
| tree | 078f82981b99091d300c98febce7abea6d3904ff /src/main/java | |
| parent | 2c041bee58686db69bec5ca4331ebecd360f79bd (diff) | |
to get help
Diffstat (limited to 'src/main/java')
| -rw-r--r-- | src/main/java/cn/mesalab/dao/DruidData.java | 97 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/main/BaselineApplication.java | 1 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineGeneration.java | 62 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java (renamed from src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java) | 8 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/DruidUtils.java | 6 |
5 files changed, 107 insertions, 67 deletions
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index 9b391f7..2de41a7 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -6,6 +6,7 @@ import cn.mesalab.utils.DruidUtils; import io.vavr.Tuple; import io.vavr.Tuple2; import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,69 +23,79 @@ import java.util.stream.Collectors; /** * @author yjy * @version 1.0 + * Druid 数据库操作 * @date 2021/7/23 4:56 下午 */ public class DruidData { private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); private static DruidData druidData; - private AvaticaConnection connection; + private AvaticaStatement statement; + private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 + + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; + { + connectionInit(); + } + + /** + * 连接初始化 + */ + private void connectionInit(){ try { connection = DruidUtils.getConn(); + statement = connection.createStatement(); + statement.setQueryTimeout(0); + } catch (SQLException exception) { exception.printStackTrace(); } } - - private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 - + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; - - - + /** + * 获取实例 + * @return DruidData实例 + */ public static DruidData getInstance() { druidData = new DruidData(); return druidData; } + /** + * 获取distinct server ip + * @return ArrayList<String> ip列表 + */ public ArrayList<String> getServerIpList() { - Long startQueryIPLIstTime = System.currentTimeMillis(); - ArrayList<String> serverIPs = new ArrayList<String>(); + Long startQueryIpLIstTime = System.currentTimeMillis(); + ArrayList<String> serverIps = new ArrayList<String>(); String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + timeFilter;// FOR TEST + + " WHERE " + timeFilter + + " LIMIT 10";// FOR TEST try{ - ResultSet resultSet = DruidUtils.executeQuery(connection,sql); + ResultSet resultSet = DruidUtils.executeQuery(statement,sql); while(resultSet.next()){ String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - serverIPs.add(ip); + serverIps.add(ip); } } catch (Exception e){ e.printStackTrace(); } - Long endQueryIPListTime = System.currentTimeMillis(); - LOG.info("性能测试:ip list查询耗时——"+(endQueryIPListTime-startQueryIPLIstTime)); + Long endQueryIpListTime = System.currentTimeMillis(); + LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime)); - return serverIPs; - } - - public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){ - List<Map<String, Object>> rsList = new ArrayList<>(); - try{ - rsList = allData.stream(). - filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip)) - )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType))) - .collect(Collectors.toList()); - } catch (NullPointerException e){ - } - return rsList; + return serverIps; } + /** + * 从Druid读取目标IP相关数据 + * @param ipList ip列表 + * @return 数据库读取结果 + */ public List<Map<String, Object>> readFromDruid(List<String> ipList){ List<Map<String, Object>> rsList = null; ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList()); @@ -98,7 +109,7 @@ public class DruidData { + " IN " + ipString + " AND " + timeFilter; try{ - ResultSet resultSet = DruidUtils.executeQuery(connection,sql); + ResultSet resultSet = DruidUtils.executeQuery(statement, sql); ResultSetToListService service = new ResultSetToListServiceImp(); rsList = service.selectAll(resultSet); } catch (Exception e){ @@ -107,6 +118,29 @@ public class DruidData { return rsList; } + /** + * 从数据库读取结果中筛选指定ip的指定攻击类型的数据 + * @param allData 数据库读取结果 + * @param ip 指定ip + * @param attackType 指定攻击类型 + * @return 筛选结果 + */ + public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){ + List<Map<String, Object>> rsList = new ArrayList<>(); + try{ + rsList = allData.stream(). + filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip)) + )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType))) + .collect(Collectors.toList()); + } catch (NullPointerException e){ + } + return rsList; + } + + /** + * 计算查询时间范围,可指定时间范围(测试)或使用默认配置 + * @return 时间范围起始点和终止点 + */ public Tuple2<Long, Long> getTimeLimit(){ long maxTime = 0L; long minTime = 0L; @@ -140,6 +174,9 @@ public class DruidData { return getCurrentDay(0); } + /** + * 关闭当前DruidData + */ public void closeConn(){ try { DruidUtils.closeConnection(); diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java index 3f443b7..8bd6f13 100644 --- a/src/main/java/cn/mesalab/main/BaselineApplication.java +++ b/src/main/java/cn/mesalab/main/BaselineApplication.java @@ -1,6 +1,7 @@ package cn.mesalab.main; import cn.mesalab.service.BaselineGeneration; +import sun.rmi.runtime.Log; /** * @author yjy diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index f588164..bd232ce 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -2,7 +2,7 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; -import cn.mesalab.service.BaselineService.KalmanFilter; +import cn.mesalab.service.algorithm.KalmanFilter; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.SeriesUtils; import com.google.common.collect.Lists; @@ -21,6 +21,7 @@ import java.util.stream.Collectors; /** * @author yjy * @version 1.0 + * baseline生成及写入 * @date 2021/7/23 5:38 下午 */ public class BaselineGeneration { @@ -37,18 +38,22 @@ public class BaselineGeneration { ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL ); - private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); + private static final Integer BASELINE_POINT_NUM = + ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); + /** + * 程序执行 + */ public static void perform() { long start = System.currentTimeMillis(); druidData = DruidData.getInstance(); hbaseUtils = HbaseUtils.getInstance(); hbaseTable = hbaseUtils.getHbaseTable(); - LOG.info("Druid 成功建立连接"); try{ + // baseline生成并写入 generateBaselinesThread(); long last = System.currentTimeMillis(); @@ -64,10 +69,12 @@ public class BaselineGeneration { System.exit(0); } + /** + * 多线程baseline生成入口 + * @throws InterruptedException + */ private static void generateBaselinesThread() throws InterruptedException { int threadNum = Runtime.getRuntime().availableProcessors(); -// int threadNum = 10; - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-demo-%d").build(); @@ -82,15 +89,13 @@ public class BaselineGeneration { namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - // baseline 生成及写入 - // 耗时测试 - Long startQueryIPList = System.currentTimeMillis(); + // IP列表获取 ArrayList<String> destinationIps = druidData.getServerIpList(); - Long endQueryIPList = System.currentTimeMillis(); LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); + // 分批进行IP baseline生成和处理 List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); for (List<String> batchIps: batchIpLists){ if(batchIps.size()>0){ @@ -102,27 +107,24 @@ public class BaselineGeneration { executor.awaitTermination(10L, TimeUnit.HOURS); } + /** + * 批量生成IP baseline + * @param ipList ip列表 + */ public static void generateBaselines(List<String> ipList){ - Long startGenerationBaselines= System.currentTimeMillis(); - Long startReadDruidData = System.currentTimeMillis(); - + druidData = DruidData.getInstance(); batchDruidData = druidData.readFromDruid(ipList); - Long endReadDruidData = System.currentTimeMillis(); - //LOG.info("读取Druid数据耗时:"+(endReadDruidData-startReadDruidData)); List<Put> putList = new ArrayList<>(); for(String attackType: attackTypeList){ for(String ip: ipList){ int[] ipBaseline = generateSingleIpBaseline(ip, attackType); - if (!(ipBaseline ==null)){ + if (ipBaseline!= null){ putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); } } } - Long endGenerationBaselines= System.currentTimeMillis(); - //LOG.info("BaselineGeneration耗时:"+(endGenerationBaselines-endReadDruidData)); - try { hbaseTable.put(putList); LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); @@ -130,25 +132,27 @@ public class BaselineGeneration { e.printStackTrace(); } - Long endWriteTime = System.currentTimeMillis(); - //LOG.info("BaselineWriteIn耗时:"+(endWriteTime-endGenerationBaselines)); + druidData.closeConn(); } + /** + * 单ip baseline生成逻辑 + * @param ip ip + * @param attackType 攻击类型 + * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 + */ private static int[] generateSingleIpBaseline(String ip, String attackType){ // 查询 - Long startQuerySingleIPTime = System.currentTimeMillis(); List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType); if (originSeries.size()==0){ return null; } - Long endQuerySingleIPTime = System.currentTimeMillis(); - // 时间序列缺失值补0 List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries); - int[] baselineArr = new int[completSeries.size()]; + int[] baselineArr = new int[BASELINE_POINT_NUM]; List<Integer>series = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); @@ -173,14 +177,14 @@ public class BaselineGeneration { } } - Long endGenerateSingleIPTime = System.currentTimeMillis(); - //LOG.info("性能测试:单个baseline生成耗时——"+(endGenerateSingleIPTime-endQuerySingleIPTime)); - //System.out.println(ip); - //System.out.println(attackType + Arrays.toString(baselineArr)); - return baselineArr; } + /** + * baseline 生成算法 + * @param timeSeries 输入序列 + * @return 输出序列 + */ private static int[] baselineFunction(List<Integer> timeSeries){ int[] result; switch (ApplicationConfig.BASELINE_FUNCTION){ diff --git a/src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java index 68fc182..11a40c3 100644 --- a/src/main/java/cn/mesalab/service/BaselineService/KalmanFilter.java +++ b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java @@ -1,4 +1,4 @@ -package cn.mesalab.service.BaselineService; +package cn.mesalab.service.algorithm; import cn.mesalab.config.ApplicationConfig; @@ -8,12 +8,11 @@ import java.util.List; /** * @author yjy * @version 1.0 + * kalman滤波器 * @date 2021/7/25 1:42 下午 */ public class KalmanFilter { - - /**Kalman Filter*/ private Integer predict; private Integer current; private Integer estimate; @@ -29,6 +28,7 @@ public class KalmanFilter { } public void initial(){ + // TODO 调整 pdelt = 1; mdelt = 1; } @@ -54,9 +54,7 @@ public class KalmanFilter { public void forcast(List<Integer> historicalSeries, Integer length){ - // 初始值计算 int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size(); - // 滤波 smoothSeries = new ArrayList<Integer>(); for(int i = 0; i < historicalSeries.size(); i++){ int value = historicalSeries.get(i); diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java index 18f6393..8224d37 100644 --- a/src/main/java/cn/mesalab/utils/DruidUtils.java +++ b/src/main/java/cn/mesalab/utils/DruidUtils.java @@ -19,6 +19,7 @@ public class DruidUtils { private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>(); private static final String DRUID_URL = ApplicationConfig.DRUID_URL; + private static AvaticaStatement statement = null; /** * 打开连接 @@ -46,9 +47,8 @@ public class DruidUtils { /** * 根据sql查询结果 */ - public static ResultSet executeQuery (AvaticaConnection connection, String sql) throws SQLException{ - AvaticaStatement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); + public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{ + ResultSet resultSet = statement.executeQuery(sql); return resultSet; } |
