summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-09 15:47:42 +0800
committeryinjiangyi <[email protected]>2021-08-09 15:47:42 +0800
commit357fc2eabcecb2015c0ccb2bdf8f6a1caa0db95a (patch)
tree747e2178a63360c53a8f89c920779280c35e5fec /src/main/java
parentef1f0fd8264be80b181da04b625c73725ea038f2 (diff)
根据partition_num进行多线程分区
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java15
-rw-r--r--src/main/java/cn/mesalab/dao/DruidData.java22
-rw-r--r--src/main/java/cn/mesalab/main/BaselineApplication.java2
-rw-r--r--src/main/java/cn/mesalab/service/BaselineGeneration.java103
-rw-r--r--src/main/java/cn/mesalab/service/BaselineSingleThread.java53
-rw-r--r--src/main/java/cn/mesalab/utils/HbaseUtils.java2
6 files changed, 91 insertions, 106 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
index 890053b..223fc86 100644
--- a/src/main/java/cn/mesalab/config/ApplicationConfig.java
+++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java
@@ -27,9 +27,10 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
- public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname");
- public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
- public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
+ public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip");
+ public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype");
+ public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime");
+ public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num");
// 周期性相关系数阈值
public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
@@ -54,16 +55,10 @@ public class ApplicationConfig {
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m");
- public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size");
public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num");
+ public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max");
- // http config
- public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
- public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");
- public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout");
- public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num");
- public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route");
}
diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java
index c2f1cba..6e0d396 100644
--- a/src/main/java/cn/mesalab/dao/DruidData.java
+++ b/src/main/java/cn/mesalab/dao/DruidData.java
@@ -109,7 +109,7 @@ public class DruidData {
}
- public static String getDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
+ public static String getBatchDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
long startTime = originBeginTime + currentPart * timeGrad;
long endTime = originBeginTime + (currentPart+1) * timeGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
@@ -129,6 +129,26 @@ public class DruidData {
+ " AND " + timeFilter;
}
+ public static String getBatchDruidQuerySql(List<String> attackTypeList, int currentPart, int partitionNumGrad){
+ int startPartitionNum = currentPart * partitionNumGrad;
+ int endPartitionNum = (currentPart + 1) * partitionNumGrad;
+ attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
+ String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")";
+ String partitionFilter = ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM
+ + " >= " + startPartitionNum
+ + " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM
+ + " < " + endPartitionNum;
+
+ return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " FROM " + ApplicationConfig.DRUID_TABLE
+ + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ + " IN " + attackList
+ + " AND " + partitionFilter;
+ }
+
/**
* 描述:分割Map
* @param map 原始数据
diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java
index d9c1694..3f443b7 100644
--- a/src/main/java/cn/mesalab/main/BaselineApplication.java
+++ b/src/main/java/cn/mesalab/main/BaselineApplication.java
@@ -9,6 +9,6 @@ import cn.mesalab.service.BaselineGeneration;
*/
public class BaselineApplication {
public static void main(String[] args) {
- new BaselineGeneration().perform();
+ BaselineGeneration.perform();
}
}
diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java
index 0ba8d55..74e4ca9 100644
--- a/src/main/java/cn/mesalab/service/BaselineGeneration.java
+++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java
@@ -23,7 +23,6 @@ import java.util.concurrent.*;
public class BaselineGeneration {
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
- private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD
// ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
@@ -34,18 +33,18 @@ public class BaselineGeneration {
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
- private static final Map<String, List<Map<String, Object>>> allFromDruid = new HashMap<>();
+ private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM;
+ // 每个线程读取数据所覆盖的partition_num个数
+ private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum);
/**
* 程序执行
*/
- public void perform() {
+ public static void perform() {
long start = System.currentTimeMillis();
try{
- loadFromDruid();
- baselineGenration();
- hbaseTable.close();
+ baselineGeneration();
} catch (Exception e){
e.printStackTrace();
} finally {
@@ -55,92 +54,32 @@ public class BaselineGeneration {
System.exit(0);
}
- /**
- * Druid数据读取
- * @throws InterruptedException
- */
- private void loadFromDruid() throws InterruptedException {
- LOG.info("开始读取数据");
- long start = System.currentTimeMillis();
-
- long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
- int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad);
- ArrayList<Future<Map<String, List<Map<String, Object>>>>> resultList = new ArrayList<>();
- CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum);
-
- ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat("baseline-load-data-%d").build();
- ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
- loadDataThreadPoolNum, loadDataThreadPoolNum, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
- new ThreadPoolExecutor.AbortPolicy());
-
- // 按ip数分区
- for (int i = 0; i < loadDataThreadPoolNum; i++) {
- String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad);
- ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
- sql,
- loadDataCountDownLatch
- );
- Future<Map<String, List<Map<String, Object>>>> future = loadDataExecutor.submit(readHistoricalDruidData);
- resultList.add(future);
- }
- loadDataExecutor.shutdown();
- loadDataCountDownLatch.await();
-
- // 返回结果合并
- for(Future<Map<String, List<Map<String, Object>>>> future: resultList){
- try {
- Map<String, List<Map<String, Object>>> queryBatchIpData = future.get();
- if(queryBatchIpData !=null){
- queryBatchIpData.forEach((ip, data)->
- allFromDruid.merge(ip, data, ListUtils::union));
- }else{
- LOG.error("future.get()未获取到结果");
- }
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- LOG.info("本次共查询到服务端ip个数:" +allFromDruid.size());
- LOG.info("查询范围: " + START_END_TIMES._1 + " - " + START_END_TIMES._2);
-
- long last = System.currentTimeMillis();
- LOG.info("Druid 加载数据共耗时:"+(last-start));
- }
-
- /**
- * Baseline生成及写入
- * @throws InterruptedException
- */
- private static void baselineGenration() throws InterruptedException {
- List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
- int generationThreadPoolNum = batchDruidDataLists.size();
- CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum);
+ private static void baselineGeneration() throws InterruptedException {
+ CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum);
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-generate-%d").build();
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
- generationThreadPoolNum, generationThreadPoolNum, 0L,
+ threadPoolNum, threadPoolNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
- LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
-
- for (Map<String, List<Map<String, Object>>>batchDruidData: batchDruidDataLists){
- if(batchDruidData.size()>0){
- BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
- hbaseTable,
- ATTACK_TYPE_LIST,
- BASELINE_POINT_NUM,
- batchDruidData,
- generateCountDownLatch
- );
- generationExecutor.execute(baselineSingleThread);
- }
+ for(int threadCount = 0; threadCount<threadPoolNum; threadCount++){
+ BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
+ ATTACK_TYPE_LIST,
+ BASELINE_POINT_NUM,
+ batchPartitionRange,
+ threadCount,
+ generateCountDownLatch
+ );
+ generationExecutor.execute(baselineSingleThread);
}
generationExecutor.shutdown();
generateCountDownLatch.await();
}
+ public static void main(String[] args) {
+ perform();
+ }
+
}
diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
index eae0c2f..6cdda3b 100644
--- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java
+++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
@@ -1,9 +1,14 @@
package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig;
+import cn.mesalab.dao.DruidData;
import cn.mesalab.service.algorithm.KalmanFilter;
+import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.SeriesUtils;
+import org.apache.calcite.avatica.AvaticaClientRuntimeException;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
@@ -11,10 +16,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
@@ -26,28 +28,41 @@ import java.util.stream.Collectors;
public class BaselineSingleThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class);
+ private final HbaseUtils hbaseUtils;
private final Table hbaseTable;
private final List<String> attackTypeList;
private final Integer baselinePointNum;
- private final Map<String,List<Map<String, Object>>> batchDruidData;
+ private final int batchPartitionRange;
+ private final int currentBatch;
private final CountDownLatch countDownLatch;
public BaselineSingleThread(
- Table hbaseTable,
List<String> attackTypeList,
- Integer baselinePointNum,
- Map<String,List<Map<String, Object>>> batchDruidData,
+ int baselinePointNum,
+ int batchPartitionRange,
+ int currentBatch,
CountDownLatch countDownLatch
){
- this.hbaseTable = hbaseTable;
+ hbaseUtils = HbaseUtils.getInstance();
+
+ this.hbaseTable = hbaseUtils.getHbaseTable();
this.attackTypeList = attackTypeList;
this.baselinePointNum = baselinePointNum;
- this.batchDruidData = batchDruidData;
+ this.batchPartitionRange = batchPartitionRange;
+ this.currentBatch = currentBatch;
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
+ long start = System.currentTimeMillis();
+ // 数据读取
+ LOG.info("开始数据读取");
+ Map<String, List<Map<String, Object>>> batchDruidData = getBatchDruidData();
+ LOG.info("完成数据读取:获取Server IP:" + batchDruidData.size() +
+ " 运行时间:" + (System.currentTimeMillis()- start));
+
+ // 基线生成
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){
for(String ip: batchDruidData.keySet()){
@@ -57,7 +72,7 @@ public class BaselineSingleThread extends Thread {
// baseline生成
int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData);
if (ipBaseline!= null){
- HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
+ hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
}
}
}
@@ -66,11 +81,27 @@ public class BaselineSingleThread extends Thread {
} catch (IOException e) {
e.printStackTrace();
} finally {
+ hbaseUtils.close();
countDownLatch.countDown();
LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount());
}
}
+ private Map<String, List<Map<String, Object>>> getBatchDruidData() {
+ Map<String, List<Map<String, Object>>> readFromDruid = new HashMap<>();
+ try {
+ AvaticaConnection connection = DruidUtils.getConn();
+ AvaticaStatement stat = connection.createStatement();
+ String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange);
+ readFromDruid = DruidData.readFromDruid(sql, stat);
+ connection.close();
+ stat.close();
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ return readFromDruid;
+ }
+
/**
* 单ip baseline生成逻辑
* @return baseline序列,长度为 60/HISTORICAL_GRAD*24
diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java
index a79bfda..16aab37 100644
--- a/src/main/java/cn/mesalab/utils/HbaseUtils.java
+++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java
@@ -1 +1 @@
-package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public static List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file
+package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List<Put> cachedInPut(List<Put> putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList<Integer> fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList<Integer> list = new ArrayList<Integer>(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file