From 357fc2eabcecb2015c0ccb2bdf8f6a1caa0db95a Mon Sep 17 00:00:00 2001 From: yinjiangyi Date: Mon, 9 Aug 2021 15:47:42 +0800 Subject: 根据partition_num进行多线程分区 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/mesalab/config/ApplicationConfig.java | 15 +-- src/main/java/cn/mesalab/dao/DruidData.java | 22 ++++- .../java/cn/mesalab/main/BaselineApplication.java | 2 +- .../cn/mesalab/service/BaselineGeneration.java | 103 +++++---------------- .../cn/mesalab/service/BaselineSingleThread.java | 53 ++++++++--- src/main/java/cn/mesalab/utils/HbaseUtils.java | 2 +- 6 files changed, 91 insertions(+), 106 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 890053b..223fc86 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -27,9 +27,10 @@ public class ApplicationConfig { public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); - public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname"); - public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname"); - public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname"); + public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip"); + public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype"); + public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime"); + public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num"); // 周期性相关系数阈值 public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); @@ -54,16 +55,10 @@ public class ApplicationConfig { public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p"); public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m"); - public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size"); public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num"); + public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max"); - // http config - public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); - public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); - public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout"); - public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num"); - public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route"); } diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index c2f1cba..6e0d396 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -109,7 +109,7 @@ public class DruidData { } - public static String getDruidQuerySql(List attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ + public static String getBatchDruidQuerySql(List attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ long startTime = originBeginTime + currentPart * timeGrad; long endTime = originBeginTime + (currentPart+1) * timeGrad; attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); @@ -129,6 +129,26 @@ public class DruidData { + " AND " + timeFilter; } + public static String getBatchDruidQuerySql(List attackTypeList, int currentPart, int partitionNumGrad){ + int startPartitionNum = currentPart * partitionNumGrad; + int endPartitionNum = (currentPart + 1) * partitionNumGrad; + attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); + String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")"; + String partitionFilter = ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + + " >= " + startPartitionNum + + " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + + " < " + endPartitionNum; + + return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " FROM " + ApplicationConfig.DRUID_TABLE + + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + " IN " + attackList + + " AND " + partitionFilter; + } + /** * 描述:分割Map * @param map 原始数据 diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java index d9c1694..3f443b7 100644 --- a/src/main/java/cn/mesalab/main/BaselineApplication.java +++ b/src/main/java/cn/mesalab/main/BaselineApplication.java @@ -9,6 +9,6 @@ import cn.mesalab.service.BaselineGeneration; */ public class BaselineApplication { public static void main(String[] args) { - new BaselineGeneration().perform(); + BaselineGeneration.perform(); } } diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index 0ba8d55..74e4ca9 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -23,7 +23,6 @@ import java.util.concurrent.*; public class BaselineGeneration { private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); - private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); private static final List ATTACK_TYPE_LIST = Arrays.asList( ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD // ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, @@ -34,18 +33,18 @@ public class BaselineGeneration { ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); - private static final Map>> allFromDruid = new HashMap<>(); + private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM; + // 每个线程读取数据所覆盖的partition_num个数 + private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum); /** * 程序执行 */ - public void perform() { + public static void perform() { long start = System.currentTimeMillis(); try{ - loadFromDruid(); - baselineGenration(); - hbaseTable.close(); + baselineGeneration(); } catch (Exception e){ e.printStackTrace(); } finally { @@ -55,92 +54,32 @@ public class BaselineGeneration { System.exit(0); } - /** - * Druid数据读取 - * @throws InterruptedException - */ - private void loadFromDruid() throws InterruptedException { - LOG.info("开始读取数据"); - long start = System.currentTimeMillis(); - - long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; - int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); - ArrayList>>>> resultList = new ArrayList<>(); - CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum); - - ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("baseline-load-data-%d").build(); - ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( - loadDataThreadPoolNum, loadDataThreadPoolNum, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, - new ThreadPoolExecutor.AbortPolicy()); - - // 按ip数分区 - for (int i = 0; i < loadDataThreadPoolNum; i++) { - String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad); - ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( - sql, - loadDataCountDownLatch - ); - Future>>> future = loadDataExecutor.submit(readHistoricalDruidData); - resultList.add(future); - } - loadDataExecutor.shutdown(); - loadDataCountDownLatch.await(); - - // 返回结果合并 - for(Future>>> future: resultList){ - try { - Map>> queryBatchIpData = future.get(); - if(queryBatchIpData !=null){ - queryBatchIpData.forEach((ip, data)-> - allFromDruid.merge(ip, data, ListUtils::union)); - }else{ - LOG.error("future.get()未获取到结果"); - } - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - LOG.info("本次共查询到服务端ip个数:" +allFromDruid.size()); - LOG.info("查询范围: " + START_END_TIMES._1 + " - " + START_END_TIMES._2); - - long last = System.currentTimeMillis(); - LOG.info("Druid 加载数据共耗时:"+(last-start)); - } - - /** - * Baseline生成及写入 - * @throws InterruptedException - */ - private static void baselineGenration() throws InterruptedException { - List>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - int generationThreadPoolNum = batchDruidDataLists.size(); - CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum); + private static void baselineGeneration() throws InterruptedException { + CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( - generationThreadPoolNum, generationThreadPoolNum, 0L, + threadPoolNum, threadPoolNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - - for (Map>>batchDruidData: batchDruidDataLists){ - if(batchDruidData.size()>0){ - BaselineSingleThread baselineSingleThread = new BaselineSingleThread( - hbaseTable, - ATTACK_TYPE_LIST, - BASELINE_POINT_NUM, - batchDruidData, - generateCountDownLatch - ); - generationExecutor.execute(baselineSingleThread); - } + for(int threadCount = 0; threadCount attackTypeList; private final Integer baselinePointNum; - private final Map>> batchDruidData; + private final int batchPartitionRange; + private final int currentBatch; private final CountDownLatch countDownLatch; public BaselineSingleThread( - Table hbaseTable, List attackTypeList, - Integer baselinePointNum, - Map>> batchDruidData, + int baselinePointNum, + int batchPartitionRange, + int currentBatch, CountDownLatch countDownLatch ){ - this.hbaseTable = hbaseTable; + hbaseUtils = HbaseUtils.getInstance(); + + this.hbaseTable = hbaseUtils.getHbaseTable(); this.attackTypeList = attackTypeList; this.baselinePointNum = baselinePointNum; - this.batchDruidData = batchDruidData; + this.batchPartitionRange = batchPartitionRange; + this.currentBatch = currentBatch; this.countDownLatch = countDownLatch; } @Override public void run(){ + long start = System.currentTimeMillis(); + // 数据读取 + LOG.info("开始数据读取"); + Map>> batchDruidData = getBatchDruidData(); + LOG.info("完成数据读取:获取Server IP:" + batchDruidData.size() + + " 运行时间:" + (System.currentTimeMillis()- start)); + + // 基线生成 List putList = new ArrayList<>(); for(String attackType: attackTypeList){ for(String ip: batchDruidData.keySet()){ @@ -57,7 +72,7 @@ public class BaselineSingleThread extends Thread { // baseline生成 int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData); if (ipBaseline!= null){ - HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); } } } @@ -66,11 +81,27 @@ public class BaselineSingleThread extends Thread { } catch (IOException e) { e.printStackTrace(); } finally { + hbaseUtils.close(); countDownLatch.countDown(); LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); } } + private Map>> getBatchDruidData() { + Map>> readFromDruid = new HashMap<>(); + try { + AvaticaConnection connection = DruidUtils.getConn(); + AvaticaStatement stat = connection.createStatement(); + String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange); + readFromDruid = DruidData.readFromDruid(sql, stat); + connection.close(); + stat.close(); + } catch (Exception e){ + e.printStackTrace(); + } + return readFromDruid; + } + /** * 单ip baseline生成逻辑 * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index a79bfda..16aab37 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public static List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file +package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file -- cgit v1.2.3