summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-17 14:44:20 +0800
committeryinjiangyi <[email protected]>2021-08-17 14:44:20 +0800
commitb5645b72edb81f572331361f17b5ae85c63c4352 (patch)
tree5cad3340fc8033c621c4c4365e58683944bc258b /src/main/java
parente9d045b04e122310b77df8cd2522bca3147249d7 (diff)
增加字段*_zero_replace_value
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java3
-rw-r--r--src/main/java/cn/mesalab/service/BaselineSingleThread.java36
-rw-r--r--src/main/java/cn/mesalab/utils/HbaseUtils.java2
3 files changed, 24 insertions, 17 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
index 2423656..d540700 100644
--- a/src/main/java/cn/mesalab/config/ApplicationConfig.java
+++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java
@@ -24,6 +24,7 @@ public class ApplicationConfig {
public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix");
+ public static final String HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.zero.replace.value.suffix");
public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
@@ -40,7 +41,7 @@ public class ApplicationConfig {
public static final Float BASELINE_HISTORICAL_FREQUENCY_THREAD = ConfigUtils.getFloatProperty("baseline.historical.frequency.thread");
// 异常值判断分位数
public static final Float BASELINE_EXECEPTION_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.percentile");
- // 异常值替换百分位
+ // 异常值百分位数
public static final Float BASELINE_EXCECPTION_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.fill.percentile");
public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
index af4e2ec..26a0983 100644
--- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java
+++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
@@ -7,12 +7,10 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils;
-import io.vavr.Tuple;
import io.vavr.Tuple2;
-import org.apache.calcite.avatica.AvaticaClientRuntimeException;
+import io.vavr.Tuple3;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
-import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
@@ -88,14 +86,19 @@ public class BaselineSingleThread extends Thread {
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成
- Tuple2<int[], Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
+ Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
if(tuple!=null){
int[] ipBaseline = tuple._1;
int generateType = tuple._2;
+ int zeroReplaceValue = tuple._3;
if ((ipBaseline!= null ) && (ip.length()>0)){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
+ hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
+ ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
+ hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType,
+ ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX);
}
}
}
@@ -136,11 +139,17 @@ public class BaselineSingleThread extends Thread {
* 2: 低频有周期IP
* 3:其他类型IP, 采用百分位阈值基线
*/
- private Tuple2<int[], Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
+ private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
if (ipDruidData.size()==0){
return null;
}
+ List<Integer> originSeries = ipDruidData.stream().map(i ->
+ Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
+ if(Collections.max(originSeries)==0){
+ return null;
+ }
+
int baselineGenerationType = 0;
int[] baselineArr = new int[baselinePointNum];
@@ -148,9 +157,11 @@ public class BaselineSingleThread extends Thread {
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
+ int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
// 判断ip出现频率
- if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){
+ float ipFrequency = ipDruidData.size() / (float) completSeries.size();
+ if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){
// 异常值剔除
baselineGenerationType = 1;
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
@@ -162,27 +173,22 @@ public class BaselineSingleThread extends Thread {
}
// KF
baselineArr = baselineFunction(series);
- System.out.println("type-01:" + ip + " " + Arrays.toString(baselineArr));
+ // System.out.println("type-01:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} else {
// 判断周期性
if (SeriesUtils.isPeriod(series)){
baselineGenerationType = 2;
// KF
baselineArr = baselineFunction(series);
- System.out.println("type-02:" + ip + " " + Arrays.toString(baselineArr));
+ // System.out.println("type-02:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} else {
baselineGenerationType = 3;
- // 百分位数
- int ipPercentile = SeriesUtils.percentile(
- ipDruidData.stream().map(i ->
- Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
- ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
Arrays.fill(baselineArr, ipPercentile);
-// System.out.println("type-03:" + ip + " " + Arrays.toString(baselineArr));
+ // System.out.println("type-03:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
}
}
- return new Tuple2<>(baselineArr, baselineGenerationType);
+ return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile);
}
/**
diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java
index 28dfcf3..03bfd5e 100644
--- a/src/main/java/cn/mesalab/utils/HbaseUtils.java
+++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java
@@ -1 +1 @@
-package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List<Put> cachedInPut(List<Put> putList, String ip, int generateType, String attackType, String columnName){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(columnName), Bytes.toBytes(generateType)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file
+package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List<Put> cachedInPut(List<Put> putList, String ip, int value, String columnFamily, String columnName){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file