diff options
| author | yinjiangyi <[email protected]> | 2021-08-17 14:44:20 +0800 |
|---|---|---|
| committer | yinjiangyi <[email protected]> | 2021-08-17 14:44:20 +0800 |
| commit | b5645b72edb81f572331361f17b5ae85c63c4352 (patch) | |
| tree | 5cad3340fc8033c621c4c4365e58683944bc258b /src/main/java | |
| parent | e9d045b04e122310b77df8cd2522bca3147249d7 (diff) | |
增加字段*_zero_replace_value
Diffstat (limited to 'src/main/java')
| -rw-r--r-- | src/main/java/cn/mesalab/config/ApplicationConfig.java | 3 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/service/BaselineSingleThread.java | 36 | ||||
| -rw-r--r-- | src/main/java/cn/mesalab/utils/HbaseUtils.java | 2 |
3 files changed, 24 insertions, 17 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 2423656..d540700 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -24,6 +24,7 @@ public class ApplicationConfig { public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix"); + public static final String HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.zero.replace.value.suffix"); public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); @@ -40,7 +41,7 @@ public class ApplicationConfig { public static final Float BASELINE_HISTORICAL_FREQUENCY_THREAD = ConfigUtils.getFloatProperty("baseline.historical.frequency.thread"); // 异常值判断分位数 public static final Float BASELINE_EXECEPTION_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.percentile"); - // 异常值替换百分位 + // 异常值百分位数 public static final Float BASELINE_EXCECPTION_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.fill.percentile"); public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function"); public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days"); diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index af4e2ec..26a0983 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -7,12 +7,10 @@ import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.SeriesUtils; -import io.vavr.Tuple; import io.vavr.Tuple2; -import org.apache.calcite.avatica.AvaticaClientRuntimeException; +import io.vavr.Tuple3; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; -import org.apache.commons.math3.stat.StatUtils; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; @@ -88,14 +86,19 @@ public class BaselineSingleThread extends Thread { List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream() .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); // baseline生成 - Tuple2<int[], Integer> tuple = generateSingleIpBaseline(ip, ipDruidData); + Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData); if(tuple!=null){ int[] ipBaseline = tuple._1; int generateType = tuple._2; + int zeroReplaceValue = tuple._3; if ((ipBaseline!= null ) && (ip.length()>0)){ hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); hbaseUtils.cachedInPut(putList, ip, generateType, attackType, ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); + hbaseUtils.cachedInPut(putList, ip, generateType, attackType, + ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); + hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType, + ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX); } } } @@ -136,11 +139,17 @@ public class BaselineSingleThread extends Thread { * 2: 低频有周期IP * 3:其他类型IP, 采用百分位阈值基线 */ - private Tuple2<int[], Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ + private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ if (ipDruidData.size()==0){ return null; } + List<Integer> originSeries = ipDruidData.stream().map(i -> + Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); + if(Collections.max(originSeries)==0){ + return null; + } + int baselineGenerationType = 0; int[] baselineArr = new int[baselinePointNum]; @@ -148,9 +157,11 @@ public class BaselineSingleThread extends Thread { List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData); List<Integer>series = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); + int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); // 判断ip出现频率 - if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ + float ipFrequency = ipDruidData.size() / (float) completSeries.size(); + if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ // 异常值剔除 baselineGenerationType = 1; double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); @@ -162,27 +173,22 @@ public class BaselineSingleThread extends Thread { } // KF baselineArr = baselineFunction(series); - System.out.println("type-01:" + ip + " " + Arrays.toString(baselineArr)); + // System.out.println("type-01:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr)); } else { // 判断周期性 if (SeriesUtils.isPeriod(series)){ baselineGenerationType = 2; // KF baselineArr = baselineFunction(series); - System.out.println("type-02:" + ip + " " + Arrays.toString(baselineArr)); + // System.out.println("type-02:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr)); } else { baselineGenerationType = 3; - // 百分位数 - int ipPercentile = SeriesUtils.percentile( - ipDruidData.stream().map(i -> - Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), - ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); Arrays.fill(baselineArr, ipPercentile); -// System.out.println("type-03:" + ip + " " + Arrays.toString(baselineArr)); + // System.out.println("type-03:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr)); } } - return new Tuple2<>(baselineArr, baselineGenerationType); + return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile); } /** diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index 28dfcf3..03bfd5e 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
private Table hbaseTable;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
if(hbaseTable == null){
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int generateType, String attackType, String columnName){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(columnName),
Bytes.toBytes(generateType));
putList.add(rowPut);
return putList;
}
private Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
public void close(){
try {
hbaseTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file +package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:56 下午
*/
public class HbaseUtils {
private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class);
private static HbaseUtils hbaseUtils;
private Table hbaseTable;
static {
hbaseUtils = HbaseUtils.getInstance();
}
public static HbaseUtils getInstance(){
if (hbaseUtils == null) {
hbaseUtils = new HbaseUtils();
}
return hbaseUtils;
}
public Table getHbaseTable(){
if(hbaseTable == null){
try{
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
hbaseTable = conn.getTable(tableName);
} catch (IOException e){
LOG.error("HBase 创建HBase table失败!");
e.printStackTrace();
}
}
return hbaseTable;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(attackType),
Bytes.toBytes(metricType),
WritableUtils.toByteArray(toWritable(baseline)));
putList.add(rowPut);
return putList;
}
public List<Put> cachedInPut(List<Put> putList, String ip, int value, String columnFamily, String columnName){
Put rowPut = new Put(Bytes.toBytes(ip));
rowPut.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(columnName),
Bytes.toBytes(value));
putList.add(rowPut);
return putList;
}
private Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
public void close(){
try {
hbaseTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file |
