diff options
| author | yinjiangyi <[email protected]> | 2021-08-09 15:47:42 +0800 |
|---|---|---|
| committer | yinjiangyi <[email protected]> | 2021-08-09 15:47:42 +0800 |
| commit | 357fc2eabcecb2015c0ccb2bdf8f6a1caa0db95a (patch) | |
| tree | 747e2178a63360c53a8f89c920779280c35e5fec | |
| parent | ef1f0fd8264be80b181da04b625c73725ea038f2 (diff) | |
根据partition_num进行多线程分区
| -rw-r--r-- | src/main/java/cn/mesalab/config/ApplicationConfig.java | 15 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/dao/DruidData.java | 22 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/main/BaselineApplication.java | 2 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineGeneration.java | 103 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineSingleThread.java | 53 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/HbaseUtils.java | 2 | ||||
| -rw-r--r-- | src/main/resources/application.properties | 66 |
7 files changed, 125 insertions, 138 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 890053b..223fc86 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -27,9 +27,10 @@ public class ApplicationConfig { public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); - public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname"); - public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname"); - public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname"); + public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip"); + public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype"); + public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime"); + public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num"); // 周期性相关系数阈值 public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); @@ -54,16 +55,10 @@ public class ApplicationConfig { public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p"); public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m"); - public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size"); public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num"); + public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max"); - // http config - public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); - public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); - public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout"); - public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num"); - public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route"); } diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index c2f1cba..6e0d396 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -109,7 +109,7 @@ public class DruidData { } - public static String getDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ + public static String getBatchDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ long startTime = originBeginTime + currentPart * timeGrad; long endTime = originBeginTime + (currentPart+1) * timeGrad; attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); @@ -129,6 +129,26 @@ public class DruidData { + " AND " + timeFilter; } + public static String getBatchDruidQuerySql(List<String> attackTypeList, int currentPart, int partitionNumGrad){ + int startPartitionNum = currentPart * partitionNumGrad; + int endPartitionNum = (currentPart + 1) * partitionNumGrad; + attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); + String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")"; + String partitionFilter = ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + + " >= " + startPartitionNum + + " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + + " < " + endPartitionNum; + + return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " FROM " + ApplicationConfig.DRUID_TABLE + + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + " IN " + attackList + + " AND " + partitionFilter; + } + /** * 描述:分割Map * @param map 原始数据 diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java index d9c1694..3f443b7 100644 --- a/src/main/java/cn/mesalab/main/BaselineApplication.java +++ b/src/main/java/cn/mesalab/main/BaselineApplication.java @@ -9,6 +9,6 @@ import cn.mesalab.service.BaselineGeneration; */ public class BaselineApplication { public static void main(String[] args) { - new BaselineGeneration().perform(); + BaselineGeneration.perform(); } } diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index 0ba8d55..74e4ca9 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -23,7 +23,6 @@ import java.util.concurrent.*; public class BaselineGeneration { private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); - private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); private static final List<String> ATTACK_TYPE_LIST = Arrays.asList( ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD // ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, @@ -34,18 +33,18 @@ public class BaselineGeneration { ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit(); - private static final Map<String, List<Map<String, Object>>> allFromDruid = new HashMap<>(); + private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM; + // 每个线程读取数据所覆盖的partition_num个数 + private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum); /** * 程序执行 */ - public void perform() { + public static void perform() { long start = System.currentTimeMillis(); try{ - loadFromDruid(); - baselineGenration(); - hbaseTable.close(); + baselineGeneration(); } catch (Exception e){ e.printStackTrace(); } finally { @@ -55,92 +54,32 @@ public class BaselineGeneration { System.exit(0); } - /** - * Druid数据读取 - * @throws InterruptedException - */ - private void loadFromDruid() throws InterruptedException { - LOG.info("开始读取数据"); - long start = System.currentTimeMillis(); - - long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; - int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); - ArrayList<Future<Map<String, List<Map<String, Object>>>>> resultList = new ArrayList<>(); - CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum); - - ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("baseline-load-data-%d").build(); - ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( - loadDataThreadPoolNum, loadDataThreadPoolNum, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, - new ThreadPoolExecutor.AbortPolicy()); - - // 按ip数分区 - for (int i = 0; i < loadDataThreadPoolNum; i++) { - String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad); - ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( - sql, - loadDataCountDownLatch - ); - Future<Map<String, List<Map<String, Object>>>> future = loadDataExecutor.submit(readHistoricalDruidData); - resultList.add(future); - } - loadDataExecutor.shutdown(); - loadDataCountDownLatch.await(); - - // 返回结果合并 - for(Future<Map<String, List<Map<String, Object>>>> future: resultList){ - try { - Map<String, List<Map<String, Object>>> queryBatchIpData = future.get(); - if(queryBatchIpData !=null){ - queryBatchIpData.forEach((ip, data)-> - allFromDruid.merge(ip, data, ListUtils::union)); - }else{ - LOG.error("future.get()未获取到结果"); - } - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - LOG.info("本次共查询到服务端ip个数:" +allFromDruid.size()); - LOG.info("查询范围: " + START_END_TIMES._1 + " - " + START_END_TIMES._2); - - long last = System.currentTimeMillis(); - LOG.info("Druid 加载数据共耗时:"+(last-start)); - } - - /** - * Baseline生成及写入 - * @throws InterruptedException - */ - private static void baselineGenration() throws InterruptedException { - List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - int generationThreadPoolNum = batchDruidDataLists.size(); - CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum); + private static void baselineGeneration() throws InterruptedException { + CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( - generationThreadPoolNum, generationThreadPoolNum, 0L, + threadPoolNum, threadPoolNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - - for (Map<String, List<Map<String, Object>>>batchDruidData: batchDruidDataLists){ - if(batchDruidData.size()>0){ - BaselineSingleThread baselineSingleThread = new BaselineSingleThread( - hbaseTable, - ATTACK_TYPE_LIST, - BASELINE_POINT_NUM, - batchDruidData, - generateCountDownLatch - ); - generationExecutor.execute(baselineSingleThread); - } + for(int threadCount = 0; threadCount<threadPoolNum; threadCount++){ + BaselineSingleThread baselineSingleThread = new BaselineSingleThread( + ATTACK_TYPE_LIST, + BASELINE_POINT_NUM, + batchPartitionRange, + threadCount, + generateCountDownLatch + ); + generationExecutor.execute(baselineSingleThread); } generationExecutor.shutdown(); generateCountDownLatch.await(); } + public static void main(String[] args) { + perform(); + } + } diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index eae0c2f..6cdda3b 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -1,9 +1,14 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; import cn.mesalab.service.algorithm.KalmanFilter; +import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.SeriesUtils; +import org.apache.calcite.avatica.AvaticaClientRuntimeException; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.commons.math3.stat.StatUtils; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -11,10 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; @@ -26,28 +28,41 @@ import java.util.stream.Collectors; public class BaselineSingleThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class); + private final HbaseUtils hbaseUtils; private final Table hbaseTable; private final List<String> attackTypeList; private final Integer baselinePointNum; - private final Map<String,List<Map<String, Object>>> batchDruidData; + private final int batchPartitionRange; + private final int currentBatch; private final CountDownLatch countDownLatch; public BaselineSingleThread( - Table hbaseTable, List<String> attackTypeList, - Integer baselinePointNum, - Map<String,List<Map<String, Object>>> batchDruidData, + int baselinePointNum, + int batchPartitionRange, + int currentBatch, CountDownLatch countDownLatch ){ - this.hbaseTable = hbaseTable; + hbaseUtils = HbaseUtils.getInstance(); + + this.hbaseTable = hbaseUtils.getHbaseTable(); this.attackTypeList = attackTypeList; this.baselinePointNum = baselinePointNum; - this.batchDruidData = batchDruidData; + this.batchPartitionRange = batchPartitionRange; + this.currentBatch = currentBatch; this.countDownLatch = countDownLatch; } @Override public void run(){ + long start = System.currentTimeMillis(); + // 数据读取 + LOG.info("开始数据读取"); + Map<String, List<Map<String, Object>>> batchDruidData = getBatchDruidData(); + LOG.info("完成数据读取:获取Server IP:" + batchDruidData.size() + + " 运行时间:" + (System.currentTimeMillis()- start)); + + // 基线生成 List<Put> putList = new ArrayList<>(); for(String attackType: attackTypeList){ for(String ip: batchDruidData.keySet()){ @@ -57,7 +72,7 @@ public class BaselineSingleThread extends Thread { // baseline生成 int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData); if (ipBaseline!= null){ - HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); } } } @@ -66,11 +81,27 @@ public class BaselineSingleThread extends Thread { } catch (IOException e) { e.printStackTrace(); } finally { + hbaseUtils.close(); countDownLatch.countDown(); LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); } } + private Map<String, List<Map<String, Object>>> getBatchDruidData() { + Map<String, List<Map<String, Object>>> readFromDruid = new HashMap<>(); + try { + AvaticaConnection connection = DruidUtils.getConn(); + AvaticaStatement stat = connection.createStatement(); + String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange); + readFromDruid = DruidData.readFromDruid(sql, stat); + connection.close(); + stat.close(); + } catch (Exception e){ + e.printStackTrace(); + } + return readFromDruid; + } + /** * 单ip baseline生成逻辑 * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index a79bfda..16aab37 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
Table hbaseTable = null;
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
return hbaseTable;
}
public static List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
// FOR TEST
// start
if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){
attackType = "TCP SYN Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){
attackType = "UDP Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){
attackType = "ICMP Flood";
} else {
attackType = "DNS Amplification";
}
// end
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
private static Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
}
\ No newline at end of file +package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
private Table hbaseTable;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
if(hbaseTable == null){
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
// FOR TEST
// start
if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){
attackType = "TCP SYN Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){
attackType = "UDP Flood";
} else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){
attackType = "ICMP Flood";
} else {
attackType = "DNS Amplification";
}
// end
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
private Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
public void close(){
try {
hbaseTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8d937d6..889d5e6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,70 +1,72 @@ - +########################################## +############## 数据库配置 ############### +########################################## #Druid配置 druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/ druid.driver=org.apache.calcite.avatica.remote.Driver druid.table=top_server_ip_test_log -#字段映射 -druid.attacktype.tcpsynflood=sessions -druid.attacktype.udpflood=bytes -druid.attacktype.icmpflood=packets -druid.attacktype.dnsamplification=packets -druid.serverip.columnname=destination -druid.attacktype.columnname=order_by -druid.recvtime.columnname=__time -#baseline生成metric -baseline.metric.type=session_num - #HBase配置 hbase.table=ddos_traffic_baselines hbase.zookeeper.quorum=192.168.44.12 hbase.zookeeper.client.port=2181 -#读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围 +########################################## +############## Druid数据读取 ############### +########################################## +#读取druid时间范围方式, +# 0:读取默认范围天数read.historical.days; +# 1:指定时间范围 read.druid.time.limit.type=1 #07-05 read.druid.min.time=1625414400000 -#06-01 -#read.druid.min.time=1622476800000 #07-08 read.druid.max.time=1625673600000 -#读取过去N天数据,最小值为3天(需要判断周期性) +#字段映射 +druid.attacktype.tcpsynflood=sessions +druid.attacktype.udpflood=bytes +druid.attacktype.icmpflood=packets +druid.attacktype.dnsamplification=packets +druid.columnname.serverip=destination +druid.columnname.attacktype=order_by +druid.columnname.recvtime=__time +#FOR TEST +druid.columnname.partition.num=session_num +baseline.metric.type=session_num + +#数据情况 +#读取历史N天数据,最小值为3天(需要判断周期性) read.historical.days=3 #历史数据汇聚粒度为10分钟 historical.grad=10 -#baseline生成方法 -baseline.function=KalmanFilter -#baseline时间1天 -baseline.range.days=1 # 数据库Time格式 time.format=yyyy-MM-dd HH:mm:ss - -#算法参数 +########################################## +############ Baseline生成参数 ############# +########################################## +baseline.range.days=1 +baseline.function=KalmanFilter baseline.period.correlative.threshold=0.5 baseline.historical.frequency.thread=0.1 baseline.exception.percentile=0.99 baseline.exception.fill.percentile=0.99 baseline.rational.percentile=0.95 + #Kalman Filter baseline.kalman.q=0.1 baseline.kalman.r=0.1 baseline.kalman.p=8 baseline.kalman.m=2 -# FOR TEST -baseline.generate.batch.size=100 +########################################## +################ 并发参数 ################# +########################################## druid.read.batch.time.grad.hour=4 thread.pool.num=10 - - -# http client配置 -http.request.timeout=10000 -http.response.timeout=60000 -http.connection.timeout=500 -http.max.connection.num=500 -http.max.per.route=200 +#druid分区字段partition_num的最大值 +druid.partition.num.max=10 |
