diff options
| author | 尹姜谊 <[email protected]> | 2022-09-21 09:48:50 +0800 |
|---|---|---|
| committer | 尹姜谊 <[email protected]> | 2022-09-21 09:48:50 +0800 |
| commit | df43bb9e54b7d86f74b93f6c9cf55afeb1dfd84c (patch) | |
| tree | 933e6fcff9b0cc20603017be77da780f6eff5fa7 | |
| parent | ddf4fa8d6eab5005b31b6df15af09546ebe8f6d2 (diff) | |
修改rowkey,增加vsysid信息24.02
| -rw-r--r-- | src/main/java/cn/mesalab/config/ApplicationConfig.java | 1 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/dao/DruidData.java | 28 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineSingleThread.java | 58 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/HbaseUtils.java | 2 | ||||
| -rw-r--r-- | src/main/resources/application.properties | 9 |
5 files changed, 54 insertions, 44 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 3a78215..f492223 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -32,6 +32,7 @@ public class ApplicationConfig { public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip"); + public static final String DRUID_VSYSID_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.vsysid"); public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype"); public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime"); public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num"); diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index d0f486f..dc82c57 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -48,21 +48,23 @@ public class DruidData { } public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) { - Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>(); - ArrayList<String> ipList = new ArrayList<>(); + Map<String, List<Map<String, Object>>> allKeyDataList = new HashMap<>(); + ArrayList<String> keyList = new ArrayList<>(); for (Map<String, Object> rowData : result) { String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - if (!ipList.contains(ip)) { - ipList.add(ip); - List<Map<String, Object>> ipData = new ArrayList<>(); - allIpDataList.put(ip, ipData); + String vsysId = Long.toString((Long) rowData.get(ApplicationConfig.DRUID_VSYSID_COLUMN_NAME)); + String key = ip + "-" + vsysId; + if (!keyList.contains(key)) { + keyList.add(key); + List<Map<String, Object>> keyData = new ArrayList<>(); + allKeyDataList.put(key, keyData); } rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - allIpDataList.get(ip).add(rowData); + allKeyDataList.get(key).add(rowData); } - return allIpDataList; + return allKeyDataList; } @@ -158,9 +160,10 @@ public class DruidData { + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; - return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + String sql = "SELECT " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", AVG(" + ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList @@ -168,8 +171,11 @@ public class DruidData { + " AND " + timeFilter + " AND " + partitionFilter + " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")"; + System.out.println(sql); + return sql; } /** diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index b3fa542..aecf947 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.SeriesUtils; -import com.google.common.collect.Lists; import io.vavr.Tuple3; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; @@ -83,35 +82,38 @@ public class BaselineSingleThread extends Thread { batchDruidData = new HashMap<>(); } - LOG.info("完成数据处理:获取Server IP:" + batchDruidData.size() + + LOG.info("完成数据处理:获取Server IP + vsys_id:" + batchDruidData.size() + " 运行时间:" + (System.currentTimeMillis() - start)); // 基线生成 List<Put> putList = new ArrayList<>(); for(String attackType: attackTypeList){ - for(String ip: batchDruidData.keySet()){ - // 筛选指定ip指定攻击类型的数据 - List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream() + for(String key: batchDruidData.keySet()){ + // 筛选指定key(ip+vsys_id)指定攻击类型的数据 + List<Map<String, Object>> keyDruidData = batchDruidData.get(key).stream() .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); // baseline生成 - Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData); + Tuple3<int[], Integer, Integer> tuple = generateSingleBaseline(key, keyDruidData); if(tuple!=null){ - int[] ipBaseline = tuple._1; + int[] baseline = tuple._1; int generateType = tuple._2; int zeroReplaceValue = tuple._3; - if ((BASELINE_SAVE_LEVEL >= generateType) && (ipBaseline!= null ) && (ip.length()>0)){ - hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); - hbaseUtils.cachedInPut(putList, ip, generateType, attackType, + + List<String> keys = Arrays.asList(key.split("-")); + keys.remove(""); + if ((BASELINE_SAVE_LEVEL >= generateType) && (baseline!= null ) && (keys.size() == 2)){ + hbaseUtils.cachedInPut(putList, key, baseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + hbaseUtils.cachedInPut(putList, key, generateType, attackType, ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); - hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType, + hbaseUtils.cachedInPut(putList, key, zeroReplaceValue, attackType, ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX); } } } } try { - LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter); + LOG.info("MONITOR-IP-vsysID频率分段统计:" + frequencyBinCounter); LOG.info("MONITOR-生成类别统计:" + generateTypeCounter); LOG.info("MONITOR-无baseline生成的个数:" + discardBaselineCounter + " 其中包括IP共:" + discardIpList.size()); hbaseTable.put(putList); @@ -151,36 +153,36 @@ public class BaselineSingleThread extends Thread { /** * 单ip baseline生成逻辑 - * @param ip - * @param ipDruidData + * @param key + * @param keyDruidData * @return baseline序列,长度为 60/HISTORICAL_GRAD*24; * baselineGenerationType: - * 1: 高频IP - * 2: 低频有周期IP - * 3:其他类型IP, 采用百分位阈值基线 + * 1: 高频key + * 2: 低频有周期key + * 3:其他类型key, 采用百分位阈值基线 */ - private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ - // 无数据(ip-攻击类型)不计算 - if (ipDruidData.size()==0){ - updateDiscardCounter(ip); + private Tuple3<int[], Integer, Integer> generateSingleBaseline(String key, List<Map<String, Object>> keyDruidData){ + // 无数据不计算 + if (keyDruidData.size()==0){ + updateDiscardCounter(key); return null; } - List<Integer> originSeries = ipDruidData.stream().map(i -> + List<Integer> originSeries = keyDruidData.stream().map(i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); List<Integer> originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList()); // 全零(ip-攻击类型)不计算 if(originNonZeroSeries.size()==0){ - updateDiscardCounter(ip); + updateDiscardCounter(key); return null; } - int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); + int percentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); int baselineGenerationType; int[] baselineArr = new int[baselinePointNum]; // 时间序列缺失值补0 - List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData); + List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(keyDruidData); List<Integer>series = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); @@ -190,7 +192,7 @@ public class BaselineSingleThread extends Thread { // 异常值剔除 double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE); - LOG.debug(ip + ": series-" + series); + LOG.debug(key + ": series-" + series); for(int i=0; i<series.size(); i++){ if(series.get(i) > exceptionPercentile){ series.set(i, (int) exceptionFillPercentile); @@ -203,7 +205,7 @@ public class BaselineSingleThread extends Thread { double p50 = SeriesUtils.percentile(series, 0.50); // 无周期性 - float ipFrequency = ipDruidData.size() / (float) completSeries.size(); + float ipFrequency = keyDruidData.size() / (float) completSeries.size(); updateLogFrequencyCounter(ipFrequency); // 频率判断 if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){ @@ -220,7 +222,7 @@ public class BaselineSingleThread extends Thread { // 计算默认值-非零数据的百分位数 int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); if(defaultValue == 0){ - LOG.error(ip + "-" + "baseline default value is 0 !"); + LOG.error(key + "-" + "baseline default value is 0 !"); } return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue); diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index 03bfd5e..6d21c1d 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
private Table hbaseTable;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
if(hbaseTable == null){
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int value, String columnFamily, String columnName){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(columnName),
Bytes.toBytes(value));
putList.add(rowPut);
return putList;
}
private Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
public void close(){
try {
hbaseTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file +package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
private Table hbaseTable;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
if(hbaseTable == null){
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String key, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(key));
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
public List<Put> cachedInPut(List<Put> putList, String key, int value, String columnFamily, String columnName){
Put rowPut = new Put(Bytes.toBytes(key));
rowPut.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(columnName),
Bytes.toBytes(value));
putList.add(rowPut);
return putList;
}
private Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
public void close(){
try {
hbaseTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2104cfa..e730145 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -19,9 +19,9 @@ hbase.zookeeper.client.port=2181 #读取druid时间范围方式, # 0:读取默认范围天数read.historical.days; # 1:指定时间范围 -read.druid.time.limit.type=0 -read.druid.min.time=1627747200000 -read.druid.max.time=1630425600000 +read.druid.time.limit.type=1 +read.druid.min.time=1663430400000 +read.druid.max.time=1663603200000 #Druid字段映射 druid.attacktype.tcpsynflood=TCP SYN Flood @@ -29,6 +29,7 @@ druid.attacktype.udpflood=UDP Flood druid.attacktype.icmpflood=ICMP Flood druid.attacktype.dnsamplification=DNS Flood druid.columnname.serverip=destination_ip +druid.columnname.vsysid=vsys_id druid.columnname.attacktype=attack_type druid.columnname.recvtime=__time druid.columnname.partition.num=partition_num @@ -76,7 +77,7 @@ monitor.frequency.bin.num=100 ########################################## ################ 并发参数 ################# ########################################## -all.partition.num=100 +all.partition.num=10 core.pool.size=10 max.pool.size=10 #druid分区字段partition_num的最大值为9999 |
