summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoryinjiangyi <[email protected]>2021-08-04 18:51:42 +0800
committeryinjiangyi <[email protected]>2021-08-04 18:51:42 +0800
commit1fa03fdb8f0bf5cca2932ffc5e7e71b3d69eea4c (patch)
tree2b509d3fec6c8e24c1ee986153d17a92bce82eb1 /src
parentd60d5f5e43fda0cb0c0352f155f882b113ab6b85 (diff)
修复并行数据读取不生效
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/mesalab/config/ApplicationConfig.java14
-rw-r--r--src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java10
-rw-r--r--src/main/java/cn/mesalab/service/BaselineGeneration.java44
-rw-r--r--src/main/java/cn/mesalab/service/BaselineSingleThread.java2
-rw-r--r--src/main/resources/application.properties12
5 files changed, 43 insertions, 39 deletions
diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java
index 97b8322..dab7c8b 100644
--- a/src/main/java/cn/mesalab/config/ApplicationConfig.java
+++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java
@@ -31,12 +31,12 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
- public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
- public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
- public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
+ public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
+ public static final Float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
+ public static final Float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
- public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
+ public static final Float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table");
@@ -47,15 +47,13 @@ public class ApplicationConfig {
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r");
- public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
- public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
-
+ public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size");
+ public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num");
// http config
-
public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");
public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout");
diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java
index 5b15ece..e483953 100644
--- a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java
+++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java
@@ -1,5 +1,7 @@
package cn.mesalab.dao;
+import cn.mesalab.utils.DruidUtils;
+import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -7,14 +9,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
/**
* @author yjy
* @version 1.0
* @date 2021/8/3 8:10 下午
*/
-public class ReadHistoricalDruidData implements Callable {
+public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, Object>>> {
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
private String sql;
@@ -33,8 +34,9 @@ public class ReadHistoricalDruidData implements Callable {
ArrayList<Map<String, Object>> resultData = new ArrayList<>();
try {
long start = System.currentTimeMillis();
-
- resultData.addAll(DruidData.readFromDruid(sql, statement));
+ AvaticaConnection connection = DruidUtils.getConn();
+ AvaticaStatement stat = connection.createStatement();
+ resultData.addAll(DruidData.readFromDruid(sql, stat));
long end = System.currentTimeMillis();
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));
} catch (Exception e) {
diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java
index 5a700e8..dc5c3bc 100644
--- a/src/main/java/cn/mesalab/service/BaselineGeneration.java
+++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java
@@ -27,7 +27,7 @@ import java.util.concurrent.*;
public class BaselineGeneration {
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
- private static AvaticaConnection druidConn = DruidUtils.getConn();
+ private static final AvaticaConnection druidConn = DruidUtils.getConn();
private static AvaticaStatement druidStatement;
static {
@@ -38,9 +38,9 @@ public class BaselineGeneration {
}
}
- private static Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
+ private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
- private static List<String> attackTypeList = Arrays.asList(
+ private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
@@ -49,13 +49,13 @@ public class BaselineGeneration {
private static final Integer BASELINE_POINT_NUM =
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
- private static Tuple2<Long, Long> startEndTimes = DruidData.getTimeLimit();
- private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " >= MILLIS_TO_TIMESTAMP(" + startEndTimes._2
+ private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
+ private static final String TIME_FILTER = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ + " >= MILLIS_TO_TIMESTAMP(" + START_END_TIMES._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
- + " < MILLIS_TO_TIMESTAMP(" + startEndTimes._1 + ")";
+ + " < MILLIS_TO_TIMESTAMP(" + START_END_TIMES._1 + ")";
- private static ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
+ private static final ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
/**
* 程序执行
@@ -90,27 +90,35 @@ public class BaselineGeneration {
// 数据读取
LOG.info("Druid 开始读取数据");
long start = System.currentTimeMillis();
- // allFromDruid = DruidData.readAllFromDruid(druidConn, druidStatement, timeFilter);
ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
- long timeGrad = (startEndTimes._1 - startEndTimes._2)/threadNum;
- for (int i = 0; i < threadNum; i++) {
- String sql = DruidData.getDruidQuerySql(startEndTimes._1, i, timeGrad);
+ long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
+ ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>();
+ for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) {
+ String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql,
druidStatement
);
- Future<List<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
+ Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
+ resultList.add(future);
+ }
+ for(Future<ArrayList<Map<String, Object>>> future: resultList){
try {
- allFromDruid.addAll(future.get());
+ if(future.get()!=null){
+ allFromDruid.addAll(future.get());
+ }else{
+ LOG.error("future.get()未获取到结果");
+ }
} catch (ExecutionException e) {
e.printStackTrace();
}
}
+
long last = System.currentTimeMillis();
LOG.info("Druid 加载数据共耗时:"+(last-start));
loadDataExecutor.shutdown();
@@ -128,10 +136,10 @@ public class BaselineGeneration {
new ThreadPoolExecutor.AbortPolicy());
LOG.info("共查询到服务端ip " +destinationIps.size() + " 个");
- LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
+ LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
// 分批进行IP baseline生成和处理
- List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
+ List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){
@@ -140,9 +148,9 @@ public class BaselineGeneration {
druidConn,
druidStatement,
hbaseTable,
- attackTypeList,
+ ATTACK_TYPE_LIST,
BASELINE_POINT_NUM,
- timeFilter,
+ TIME_FILTER,
allFromDruid
);
generationExecutor.execute(baselineSingleThread);
diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
index 19bdcd5..559eee1 100644
--- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java
+++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java
@@ -74,7 +74,7 @@ public class BaselineSingleThread extends Thread {
}
try {
hbaseTable.put(putList);
- LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
+ LOG.info(" 成功写入Baseline条数共计 " + putList.size());
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index b1494c5..8bf913c 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -22,15 +22,12 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围
read.druid.time.limit.type=1
-#07-01
+#07-05
read.druid.min.time=1625414400000
#06-01
#read.druid.min.time=1622476800000
read.druid.max.time=1625673600000
-thread.max.num=5
-
-
#读取过去N天数据,最小值为3天(需要判断周期性)
read.historical.days=3
#历史数据汇聚粒度为10分钟
@@ -52,11 +49,10 @@ baseline.rational.percentile=0.95
baseline.kalman.p=0.000001
baseline.kalman.r=4
-
-# 每更新1000个记录打印log
-log.write.count=10000
# FOR TEST
-generate.batch.size=100
+baseline.generate.batch.size=1000
+druid.read.batch.time.grad.hour=4
+thread.max.num=20
# http client配置