diff options
| author | yinjiangyi <[email protected]> | 2021-08-05 11:30:42 +0800 |
|---|---|---|
| committer | yinjiangyi <[email protected]> | 2021-08-05 11:30:42 +0800 |
| commit | 307f2831347f96ba90cbbefddb4fb193c6007711 (patch) | |
| tree | e154eab57a7dbe092f9bbad5f1be948e4b6ae148 /src/main/java | |
| parent | 7eeaee542d4d78c6fd08f3651b57a2d0c896577f (diff) | |
线程阻塞
Diffstat (limited to 'src/main/java')
3 files changed, 31 insertions, 12 deletions
diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java index d0af9a9..7a1a936 100644 --- a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java +++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; /** * @author yjy @@ -19,11 +20,14 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); private String sql; + private CountDownLatch countDownLatch; public ReadHistoricalDruidData( - String sql + String sql, + CountDownLatch countDownLatch ){ this.sql = sql; + this.countDownLatch = countDownLatch; } @Override @@ -34,12 +38,18 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O AvaticaConnection connection = DruidUtils.getConn(); AvaticaStatement stat = connection.createStatement(); resultData.addAll(DruidData.readFromDruid(sql, stat)); + + + long end = System.currentTimeMillis(); LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); connection.close(); stat.close(); } catch (Exception e) { e.printStackTrace(); + } finally { + countDownLatch.countDown(); + LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); } return resultData; } diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index a91fb14..a911f03 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -85,15 +85,21 @@ public class BaselineGeneration { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; + int threadPoolNum = (int) ((START_END_TIMES._1-START_END_TIMES._2)/timeGrad); ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>(); - for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) { + CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); + for (int i = 0; i < threadNum; i++) { String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad); ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( - sql + sql, + loadDataCountDownLatch ); Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData); resultList.add(future); } + loadDataExecutor.shutdown(); + loadDataCountDownLatch.await(); + for(Future<ArrayList<Map<String, Object>>> future: resultList){ try { if(future.get()!=null){ @@ -105,12 +111,8 @@ public class BaselineGeneration { e.printStackTrace(); } } - long last = System.currentTimeMillis(); LOG.info("Druid 加载数据共耗时:"+(last-start)); - loadDataExecutor.shutdown(); - loadDataExecutor.awaitTermination(10L, TimeUnit.HOURS); - // BaseLine生成 // 获取IP列表 @@ -127,7 +129,7 @@ public class BaselineGeneration { // 分批进行IP baseline生成和处理 List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - + CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); for (List<String> batchIps: batchIpLists){ if(batchIps.size()>0){ BaselineSingleThread baselineSingleThread = new BaselineSingleThread( @@ -136,14 +138,14 @@ public class BaselineGeneration { ATTACK_TYPE_LIST, BASELINE_POINT_NUM, TIME_FILTER, - allFromDruid + allFromDruid, + generateCountDownLatch ); generationExecutor.execute(baselineSingleThread); } } - generationExecutor.shutdown(); - generationExecutor.awaitTermination(10L, TimeUnit.HOURS); + generateCountDownLatch.await(); } } diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 98f16b1..e5ce54c 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; /** @@ -35,6 +36,7 @@ public class BaselineSingleThread extends Thread { private String timeFilter; private List<Map<String, Object>> batchDruidData; private List<Map<String, Object>> historicalData; + private CountDownLatch countDownLatch; public BaselineSingleThread( List<String> batchIpList, @@ -42,7 +44,8 @@ public class BaselineSingleThread extends Thread { List<String> attackTypeList, Integer BASELINE_POINT_NUM, String timeFilter, - List<Map<String, Object>> historicalData + List<Map<String, Object>> historicalData, + CountDownLatch countDownLatch ){ this.ipList = batchIpList; this.hbaseTable = hbaseTable; @@ -50,6 +53,7 @@ public class BaselineSingleThread extends Thread { this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; this.timeFilter = timeFilter; this.historicalData = historicalData; + this.countDownLatch = countDownLatch; } @Override @@ -70,6 +74,9 @@ public class BaselineSingleThread extends Thread { LOG.info(" 成功写入Baseline条数共计 " + putList.size()); } catch (IOException e) { e.printStackTrace(); + } finally { + countDownLatch.countDown(); + LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); } } |
