summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author尹姜谊 <[email protected]>2022-09-21 09:48:50 +0800
committer尹姜谊 <[email protected]>2022-09-21 09:48:50 +0800
commitdf43bb9e54b7d86f74b93f6c9cf55afeb1dfd84c (patch)
tree933e6fcff9b0cc20603017be77da780f6eff5fa7
parentddf4fa8d6eab5005b31b6df15af09546ebe8f6d2 (diff)
修改rowkey,增加vsysid信息24.02
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java1
-rw-r--r--src/main/java/cn/mesalab/dao/DruidData.java28
-rw-r--r--src/main/java/cn/mesalab/service/BaselineSingleThread.java58
-rw-r--r--src/main/java/cn/mesalab/utils/HbaseUtils.java2
-rw-r--r--src/main/resources/application.properties9
5 files changed, 54 insertions, 44 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
index 3a78215..f492223 100644
--- a/src/main/java/cn/mesalab/config/ApplicationConfig.java
+++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java
@@ -32,6 +32,7 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip");
+ public static final String DRUID_VSYSID_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.vsysid");
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime");
public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num");
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java
index d0f486f..dc82c57 100644
--- a/src/main/java/cn/mesalab/dao/DruidData.java
+++ b/src/main/java/cn/mesalab/dao/DruidData.java
@@ -48,21 +48,23 @@ public class DruidData {
}
public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) {
- Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>();
- ArrayList<String> ipList = new ArrayList<>();
+ Map<String, List<Map<String, Object>>> allKeyDataList = new HashMap<>();
+ ArrayList<String> keyList = new ArrayList<>();
for (Map<String, Object> rowData : result) {
String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
- if (!ipList.contains(ip)) {
- ipList.add(ip);
- List<Map<String, Object>> ipData = new ArrayList<>();
- allIpDataList.put(ip, ipData);
+ String vsysId = Long.toString((Long) rowData.get(ApplicationConfig.DRUID_VSYSID_COLUMN_NAME));
+ String key = ip + "-" + vsysId;
+ if (!keyList.contains(key)) {
+ keyList.add(key);
+ List<Map<String, Object>> keyData = new ArrayList<>();
+ allKeyDataList.put(key, keyData);
}
rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
- allIpDataList.get(ip).add(rowData);
+ allKeyDataList.get(key).add(rowData);
}
- return allIpDataList;
+ return allKeyDataList;
}
@@ -158,9 +160,10 @@ public class DruidData {
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
- return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
- + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
- + ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
+ String sql = "SELECT " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
+ + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ + ", AVG(" + ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList
@@ -168,8 +171,11 @@ public class DruidData {
+ " AND " + timeFilter
+ " AND " + partitionFilter
+ " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")";
+ System.out.println(sql);
+ return sql;
}
/**
diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
index b3fa542..aecf947 100644
--- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java
+++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
@@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils;
-import com.google.common.collect.Lists;
import io.vavr.Tuple3;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
@@ -83,35 +82,38 @@ public class BaselineSingleThread extends Thread {
batchDruidData = new HashMap<>();
}
- LOG.info("完成数据处理:获取Server IP:" + batchDruidData.size() +
+ LOG.info("完成数据处理:获取Server IP + vsys_id:" + batchDruidData.size() +
" 运行时间:" + (System.currentTimeMillis() - start));
// 基线生成
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){
- for(String ip: batchDruidData.keySet()){
- // 筛选指定ip指定攻击类型的数据
- List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
+ for(String key: batchDruidData.keySet()){
+ // 筛选指定key(ip+vsys_id)指定攻击类型的数据
+ List<Map<String, Object>> keyDruidData = batchDruidData.get(key).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成
- Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
+ Tuple3<int[], Integer, Integer> tuple = generateSingleBaseline(key, keyDruidData);
if(tuple!=null){
- int[] ipBaseline = tuple._1;
+ int[] baseline = tuple._1;
int generateType = tuple._2;
int zeroReplaceValue = tuple._3;
- if ((BASELINE_SAVE_LEVEL >= generateType) && (ipBaseline!= null ) && (ip.length()>0)){
- hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
- hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
+
+ List<String> keys = Arrays.asList(key.split("-"));
+ keys.remove("");
+ if ((BASELINE_SAVE_LEVEL >= generateType) && (baseline!= null ) && (keys.size() == 2)){
+ hbaseUtils.cachedInPut(putList, key, baseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
+ hbaseUtils.cachedInPut(putList, key, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
- hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType,
+ hbaseUtils.cachedInPut(putList, key, zeroReplaceValue, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX);
}
}
}
}
try {
- LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter);
+ LOG.info("MONITOR-IP-vsysID频率分段统计:" + frequencyBinCounter);
LOG.info("MONITOR-生成类别统计:" + generateTypeCounter);
LOG.info("MONITOR-无baseline生成的个数:" + discardBaselineCounter + " 其中包括IP共:" + discardIpList.size());
hbaseTable.put(putList);
@@ -151,36 +153,36 @@ public class BaselineSingleThread extends Thread {
/**
* 单ip baseline生成逻辑
- * @param ip
- * @param ipDruidData
+ * @param key
+ * @param keyDruidData
* @return baseline序列,长度为 60/HISTORICAL_GRAD*24;
* baselineGenerationType:
- * 1: 高频IP
- * 2: 低频有周期IP
- * 3:其他类型IP, 采用百分位阈值基线
+ * 1: 高频key
+ * 2: 低频有周期key
+ * 3:其他类型key, 采用百分位阈值基线
*/
- private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
- // 无数据(ip-攻击类型)不计算
- if (ipDruidData.size()==0){
- updateDiscardCounter(ip);
+ private Tuple3<int[], Integer, Integer> generateSingleBaseline(String key, List<Map<String, Object>> keyDruidData){
+ // 无数据不计算
+ if (keyDruidData.size()==0){
+ updateDiscardCounter(key);
return null;
}
- List<Integer> originSeries = ipDruidData.stream().map(i ->
+ List<Integer> originSeries = keyDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
List<Integer> originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList());
// 全零(ip-攻击类型)不计算
if(originNonZeroSeries.size()==0){
- updateDiscardCounter(ip);
+ updateDiscardCounter(key);
return null;
}
- int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
+ int percentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
int baselineGenerationType;
int[] baselineArr = new int[baselinePointNum];
// 时间序列缺失值补0
- List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
+ List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(keyDruidData);
List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
@@ -190,7 +192,7 @@ public class BaselineSingleThread extends Thread {
// 异常值剔除
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE);
- LOG.debug(ip + ": series-" + series);
+ LOG.debug(key + ": series-" + series);
for(int i=0; i<series.size(); i++){
if(series.get(i) > exceptionPercentile){
series.set(i, (int) exceptionFillPercentile);
@@ -203,7 +205,7 @@ public class BaselineSingleThread extends Thread {
double p50 = SeriesUtils.percentile(series, 0.50);
// 无周期性
- float ipFrequency = ipDruidData.size() / (float) completSeries.size();
+ float ipFrequency = keyDruidData.size() / (float) completSeries.size();
updateLogFrequencyCounter(ipFrequency);
// 频率判断
if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){
@@ -220,7 +222,7 @@ public class BaselineSingleThread extends Thread {
// 计算默认值-非零数据的百分位数
int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
if(defaultValue == 0){
- LOG.error(ip + "-" + "baseline default value is 0 !");
+ LOG.error(key + "-" + "baseline default value is 0 !");
}
return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue);
diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java
index 03bfd5e..6d21c1d 100644
--- a/src/main/java/cn/mesalab/utils/HbaseUtils.java
+++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java
@@ -1 +1 @@
-package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List<Put> cachedInPut(List<Put> putList, String ip, int value, String columnFamily, String columnName){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file
+package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String key, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(key)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List<Put> cachedInPut(List<Put> putList, String key, int value, String columnFamily, String columnName){ Put rowPut = new Put(Bytes.toBytes(key)); rowPut.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 2104cfa..e730145 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -19,9 +19,9 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式,
# 0:读取默认范围天数read.historical.days;
# 1:指定时间范围
-read.druid.time.limit.type=0
-read.druid.min.time=1627747200000
-read.druid.max.time=1630425600000
+read.druid.time.limit.type=1
+read.druid.min.time=1663430400000
+read.druid.max.time=1663603200000
#Druid字段映射
druid.attacktype.tcpsynflood=TCP SYN Flood
@@ -29,6 +29,7 @@ druid.attacktype.udpflood=UDP Flood
druid.attacktype.icmpflood=ICMP Flood
druid.attacktype.dnsamplification=DNS Flood
druid.columnname.serverip=destination_ip
+druid.columnname.vsysid=vsys_id
druid.columnname.attacktype=attack_type
druid.columnname.recvtime=__time
druid.columnname.partition.num=partition_num
@@ -76,7 +77,7 @@ monitor.frequency.bin.num=100
##########################################
################ 并发参数 #################
##########################################
-all.partition.num=100
+all.partition.num=10
core.pool.size=10
max.pool.size=10
#druid分区字段partition_num的最大值为9999