summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2020-12-11 10:07:39 +0800
committerzhanghongqing <[email protected]>2020-12-11 10:07:39 +0800
commitd7054c2d8cfce5a1803bf5d696d80ac8df48d180 (patch)
tree3b43c28a3ecab48ba530730a17398d96bc00bd7b
parentf1a873beb01d89619f0e9b0d13174e0a2d01db86 (diff)
修改文件存储配额查询任务 v11-rc3
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java46
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java9
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java326
-rw-r--r--galaxy-job-executor/src/main/resources/application-executor.yml8
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java53
5 files changed, 225 insertions, 217 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java
index cb92b92..5e05774 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java
@@ -1,9 +1,15 @@
package com.mesalab.executor.core.config;
+import com.google.common.base.Splitter;
+import com.zdjizhi.utils.StringUtil;
+import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
-import lombok.Data;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
@Data
@Configuration
@@ -45,5 +51,43 @@ public class StorgeConfig {
@Value("${storge.files.token}")
private String filesToken; //登录验证
+ public String getFilesHosServer() {
+ return getServer();
+ }
+
+ private String getServer() {
+ return getServer(filesHosServer);
+ }
+
+ public String getTrafficServer() {
+ return getServer(trafficServer);
+ }
+ public String getAnalyticServer() {
+ return getServer(analyticServer);
+ }
+
+ public Map<String,String> getFilesDataCenter() {
+ return getDCMap(filesHosServer);
+ }
+
+ public Map<String,String> getTrafficDataCenter() {
+ return getDCMap(trafficServer);
+ }
+ public Map<String,String> getAnalyticDataCenter() {
+ return getDCMap(analyticServer);
+ }
+ private String getServer(String dcStr) {
+ return Splitter.on(",").trimResults().splitToList(dcStr).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x -> StringUtil.isNotEmpty(x)).collect(Collectors.joining(","));
+ }
+ private Map<String, String> getDCMap(String dcStr) {
+ Map<String, String> map = new HashMap();
+ Splitter.on(",").trimResults().splitToList(dcStr).stream().forEach(x -> {
+ List<String> dc = Splitter.on("|").trimResults().splitToList(x);
+ if (dc.size() == 2 && StringUtil.isNotBlank(dc.get(1))) {
+ map.put(dc.get(0), dc.get(1));
+ }
+ });
+ return map;
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index 9c7e9e8..6390597 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -569,14 +569,7 @@ public class LogStorageQuotaJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- //调用查询方法单机,集群
- Map resultMap = null;
-
- if ("false".equals(paramsMap.get("ckDayGrowth"))) {
- resultMap = storageQuotaService.getDiskJson();
- } else {
- resultMap = storageQuotaService.getDayJson();
- }
+ Map resultMap = storageQuotaService.getDiskJson("true".equals(paramsMap.get("ckDayGrowth")));
//发送到kafka
new KafkaUtils().sendMessage(String.valueOf(paramsMap.get("topic")), (List<Object>) resultMap.get("data"));
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
index 2733867..806b9c1 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
@@ -1,8 +1,5 @@
package com.mesalab.executor.service;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.mesalab.executor.core.config.StorgeConfig;
import com.mesalab.executor.core.utils.*;
@@ -11,7 +8,6 @@ import com.mesalab.executor.pojo.HosSpace;
import com.xxl.job.core.log.XxlJobLogger;
import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.JsonMapper;
-import lombok.Data;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
@@ -22,7 +18,10 @@ import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* 存储配额获取指标类 组合 原始日志结果对象,统计日志结果对象,文件系统结果对象
@@ -40,17 +39,18 @@ public class StorageQuotaService {
/**
* Druid rest地址
*/
- private static String queryDruidUrl;
+ private static String druidServer;
+ private static Map<String,String> druidDataCenter;
/**
* ClickHouse rest地址
*/
- private static String queryClickHouseUrl;
+ private static Map<String,String> clickHouseServer;
private static String database;
/**
* hos地址
*/
- private static String filesHosServer;
+ private static Map<String,String> filesHosServer;
private static String filesToken;
private static String trafficUserName;
@@ -60,6 +60,8 @@ public class StorageQuotaService {
private static String systemDisks;
private static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/";
private static final String FILE_STORAGE_PATH = "/admin/diskspace";
+ private static final String DRUID_PATH = "/druid/v2/sql";
+ private static final int STORAGE_LIMITATION = 60; //查询历史数据时允许的最大间隔60 MINUTE
@Value("${zookeeper.server}")
private String zookeeperServer;
@@ -69,64 +71,59 @@ public class StorageQuotaService {
database = config.getTrafficDatasource();
trafficUserName = config.getTrafficUsername();
trafficPassword = config.getTrafficPassword();
- queryClickHouseUrl = Joiner.on("").join(Constant.HTTP, config.getTrafficServer());
+ clickHouseServer = config.getTrafficDataCenter();
systemPartsCluster = config.getSystemPartsCluster();
systemDisks = config.getSystemDisks();
- queryDruidUrl = Joiner.on("").join(Constant.HTTP, config.getAnalyticServer(), "/druid/v2/sql");
- filesHosServer = config.getFilesHosServer();
+ druidServer = config.getAnalyticServer();
+ druidDataCenter = config.getAnalyticDataCenter();
+
+ filesHosServer = config.getFilesDataCenter();
filesToken = config.getFilesToken();
}
/**
* 用于获取 ClickHouse 当前存储大小 若获取 failquerydruid内最新的值替补当前值
*/
- private StorageQuota getClickHouseCurr() {
+ private Long getClickHouseCurr(String point) {
Map<String, Object> deleteParamMap = getDeleteSource();
- StorageQuota storageQuota = new StorageQuota();
String currSql = "SELECT SUM(`bytes_on_disk`) FROM " + systemPartsCluster + " WHERE database = '" + database + "';";
deleteParamMap.put("query", currSql);
- String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap));
Long result = Long.valueOf(currResult.trim());
-
- storageQuota.setData(ImmutableMap.of("used_size", result));
- logger.info("query clickhouse used_size success, {}", storageQuota);
- return storageQuota;
+ logger.info("query clickhouse success, used_size={}", result);
+ return result;
}
/**
* 用于获取ClickHouse最大存储大小 若获取 failquerydruid内最新的值替补当前值
*/
- private StorageQuota getClickHouseMax() {
+ private Long getClickHouseMax(String point) {
Map<String, Object> deleteParamMap = getDeleteSource();
- StorageQuota storageQuota = new StorageQuota();
- String maxSql = "SELECT SUM(`total_space`) FROM " + systemDisks + ";";
+ String maxSql = "SELECT SUM(`total_space`) FROM ".concat(systemDisks).concat(";");
deleteParamMap.put("query", maxSql);
String maxResult = HttpClientUtils
- .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ .httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap));
long result = Long.parseLong(maxResult.trim());
- storageQuota.setData(ImmutableMap.of("max_size", result));
- logger.info("query clickhouse max_size success ,{}", storageQuota);
- return storageQuota;
+ logger.info("query clickhouse max_size success ,{}", result);
+ return result;
}
/**
* 用于获取ClickHouse 差值 若获取 fail直接写入0
*/
- private StorageQuota getClickHouseDiff() {
+ private Long getClickHouseDiff(String point) {
String date = DateUtils.getDateOfYesterday("yyyyMMdd");
Map<String, Object> deleteParamMap = getDeleteSource();
- StorageQuota storageQuota = new StorageQuota();
- String diffSql = "SELECT SUM(bytes_on_disk) FROM " + systemPartsCluster + " WHERE database = '" + database
- + "' AND partition = '" + date + "';";
+ String diffSql = "SELECT SUM(bytes_on_disk) FROM ".concat(systemPartsCluster).concat(" WHERE database = '" ).concat(database)
+ .concat("' AND partition = '").concat(date).concat("';");
deleteParamMap.put("query", diffSql);
String diffResult = HttpClientUtils
- .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ .httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap));
long result = Long.parseLong(diffResult.trim());
- storageQuota.setData(ImmutableMap.of("aggregate_size", result));
- logger.info("query clickhouse traffic_aggregate success,{}", storageQuota);
- return storageQuota;
+ logger.info("query clickhouse success aggregate_size={}", result);
+ return result;
}
//========================Druid==============================
@@ -134,50 +131,37 @@ public class StorageQuotaService {
/**
* 获取Druid当前存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值
*/
- private StorageQuota getDruidCurr() {
- String currSql = "{\"query\":\"SELECT SUM(curr_size) AS curr_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- StorageQuota storageQuota = new StorageQuota();
- String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql);
+ private Long getDruidUsed(String point) {
+ String currSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_available=1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String currResult = HttpClientUtils.httpPost(Constant.HTTP.concat(point).concat(DRUID_PATH), currSql);
List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(currResult, List.class);
- Long currSize = Long.valueOf(String.valueOf(list.get(0).get("curr_size")));
-
- storageQuota.setData(ImmutableMap.of("used_size", currSize));
- logger.info("query druid used_size success,{}", storageQuota);
- return storageQuota;
- }
+ if(list!=null&&list.size()==0){
+ return 0L;
+ }
+ Long currSize = Long.valueOf(String.valueOf(list.get(0).get("used_size")));
- /**
- * 获取Druid过去30秒增量 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值
- */
- private StorageQuota getDruidDiff(Long currSize) {
- StorageQuota storageQuota = new StorageQuota();
- Long before = getCurrBefore(Constant.REPORT_AND_METRICS);
- long diff = getDiffNum(currSize, before);
- storageQuota.setData(ImmutableMap.of("aggregate_size", diff));
- logger.info("query druid aggregate_size success,{}", storageQuota);
- return storageQuota;
+ logger.info("query druid used_size success,{}", currSize);
+ return currSize;
}
/**
* 获取Druid最大存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值
*/
- private StorageQuota getDruidMax() {
+ private Long getDruidMax(String point) {
String maxSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- StorageQuota storageQuota = new StorageQuota();
- String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql);
+ String maxResult = HttpClientUtils.httpPost(Constant.HTTP.concat(point).concat(DRUID_PATH), maxSql);
List<Map<String, Long>> list = (List) JsonMapper.fromJsonString(maxResult, List.class);
Long maxSize = list.get(0).get("max_size");
- storageQuota.setData(ImmutableMap.of("max_size", maxSize));
- logger.info("query druid max_size success,{}", storageQuota);
- return storageQuota;
+
+ logger.info("query druid max_size success,{}", maxSize);
+ return maxSize;
}
/**
* 获取hos前存储大小 若获取 failquerydruid内最新的值替补当前值
*/
- private StorageQuota getHBaseStorage(String queryHosUrl) {
- StorageQuota storageQuota = new StorageQuota();
+ private Map getHBaseStorage(String key,String point) {
Header[] headers = {new BasicHeader(Constant.TOKEN, filesToken), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
- String result = HttpClientUtils.httpGet(queryHosUrl.concat(FILE_STORAGE_PATH), headers);
+ String result = HttpClientUtils.httpGet(Constant.HTTP.concat(point).concat(FILE_STORAGE_PATH), headers);
if("-1".equals(result)){
throw new RuntimeException("hos server error");
}
@@ -192,11 +176,10 @@ public class StorageQuotaService {
Map<String, Object> data = new HashMap<>();
data.put("max_size", hosCapacity);
data.put("used_size", hosUsed);
- data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES)));
+ data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES,key)));
- storageQuota.setData(data);
- logger.info("query file storage success,{}", storageQuota);
- return storageQuota;
+ logger.info("query file storage success,{}", data);
+ return data;
}
//=======================工具类方法===============================
@@ -204,35 +187,33 @@ public class StorageQuotaService {
* 用于通过druid获取上次对应类型的Curr值
*
* @param logType 统计类型
- * @return 上次的值
+ * @return 上次的值,如果不是当天,也不是在00:00:00则返回0
+ * //如果历史记录时间比现在少于1个小时,写0,如果最近一小时为0则查今天的
*/
- private Long getCurrBefore(String logType) {
+ private Long getCurrBefore(String logType, String key) {
String currSql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType
- + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql);
+ + "' and data_center = '"+ key +"' and __time >= CURRENT_TIMESTAMP - INTERVAL '"+ STORAGE_LIMITATION +"' MINUTE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String currResult = HttpClientUtils.httpPost(Constant.HTTP.concat(druidServer).concat(DRUID_PATH), currSql);
List<Map> list = (List) JsonMapper.fromJsonString(currResult, List.class);
- Long currSize = Long.valueOf(String.valueOf(list.get(0).get("used_size")));
- logger.info("query {} history used_size success,{}",logType, currSize);
- return currSize;
+ if(list!=null&&list.size()==0){
+ String currDaySql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType
+ + "' and data_center = '"+ key +"' and __time >= CURRENT_DATE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String currDayResult = HttpClientUtils.httpPost(Constant.HTTP.concat(druidServer).concat(DRUID_PATH), currDaySql);
+ list = (List) JsonMapper.fromJsonString(currDayResult, List.class);
+ }
+ if(list!=null&&list.size()==0){
+ return 0L;
+ }
+ Object used = list.get(0).get("used_size");
+ if(ObjectUtils.isEmpty(used)){
+ return 0L;
+ }
+ Long historyUsed = Long.valueOf(String.valueOf(used));
+ logger.info("query {} history used_size success,{}",logType, historyUsed);
+ return historyUsed;
}
/**
- * 用于通过druid获取上次对应类型的Max值
- *
- * @param logType 统计类型
- * @return 上次的值
- */
-/* private Long getMaxBefore(String logType) {
- String maxSql = "{\"query\":\"SELECT max_size FROM sys_storage_log WHERE log_type = '" + logType
- + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql);
- List<Map> list = (List<Map>) JsonMapper.fromJsonString(maxResult, List.class);
- Long currSize = Long.valueOf(String.valueOf(list.get(0).get("max_size")));
- logger.info("query {} max_size history success,{}", logType,currSize);
- return currSize;
- }*/
-
- /**
* @param node
* @return 自定义的标准时间 last_storage,
* 存在zookeeper 节点/storage/worker/+node
@@ -287,74 +268,85 @@ public class StorageQuotaService {
*
* @return 结果json
*/
- public Map<String, Map> getDiskJson() {
+ public Map<String, Map> getDiskJson(boolean ckDay) {
Map all = new HashMap<>();
List<Map> data = new ArrayList<>();
Long now = System.currentTimeMillis() / 1000;
Map<String, Integer> status = new HashMap<>();
- status.put("trafficFailCount", getCKStorageJson(data, now));
- status.put("reportFailCount", getDruidStorageJson(data, now));
- status.put("fileFailCount", getFileStorageJson(data, now));
-
+ if(ckDay){
+ //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据
+ long time = 12*3600L;
+ long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd")+time;
+ status.put("trafficFailCount", getCKStorageJson(data, timestamp,true));
+ } else{
+ status.put("trafficFailCount", getCKStorageJson(data, now,false));
+ status.put("reportFailCount", getDruidStorageJson(data, now));
+ status.put("fileFailCount", getFileStorageJson(data, now));
+ }
all.put("data", data);
all.put("status", status);
-
return all;
}
-
- private int getCKStorageJson(List<Map> data, Long time) {
- int ckErrorCount = 0;
+ private int getCKStorageJson(List<Map> data, Long time, boolean day) {
+ int errorCount = 0;
try {
- StorageQuota clickHouseCurr = getClickHouseCurr();
- StorageQuota clickHouseMax = getClickHouseMax();
-
- if (clickHouseCurr.getStatus() != null || clickHouseMax.getStatus() != null) {
- XxlJobLogger.log("clickhouse storage error clickHouseCurr ={} , clickHouseMax={}", clickHouseCurr.getStatus(), clickHouseMax.getStatus());
- ckErrorCount++;
+ for (Map.Entry<String, String> dc : clickHouseServer.entrySet()) {
+ try {
+ Map<String,Object> traffic = new HashMap<>();
+ traffic.put("log_type", Constant.TRAFFIC_LOGS);
+ traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS));
+ traffic.put("data_center",dc.getKey());
+ traffic.put("time", time);
+ traffic.put("used_size",getClickHouseCurr(dc.getValue()));
+ traffic.put("max_size",getClickHouseMax(dc.getValue()));
+ if(day){
+ traffic.put("aggregate_size",getClickHouseDiff(dc.getValue()));
+ }
+ data.add(traffic);
+ } catch (Exception e) {
+ logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e));
+ errorCount++;
+ }
}
-
- Map<String,Object> traffic = new HashMap<>();
- traffic.put("log_type", Constant.TRAFFIC_LOGS);
- traffic.put("time", time);
- traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS));
- traffic.putAll(clickHouseCurr.getData());
- traffic.putAll(clickHouseMax.getData());
- data.add(traffic);
} catch (Exception e) {
logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e));
XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e));
- ckErrorCount++;
+ errorCount++;
}
- return ckErrorCount;
+ return errorCount;
}
private int getDruidStorageJson(List<Map> data, Long time) {
- int druidErrorCount = 0;
+ int errorCount = 0;
try {
- StorageQuota druidCurr = getDruidCurr();
- StorageQuota druidDiff = getDruidDiff(Long.valueOf(String.valueOf(druidCurr.getData().get("used_size"))));
- StorageQuota druidMax = getDruidMax();
- druidErrorCount = 0;
- if (druidCurr.getStatus() != null || druidDiff.getStatus() != null || druidMax.getStatus() != null) {
- XxlJobLogger.log("druid storage error druidCurr ={} , druidDiff={}, druidMax={}", druidCurr.getStatus(), druidDiff.getStatus(), druidMax.getStatus());
- druidErrorCount++;
+ for (Map.Entry<String, String> dc : druidDataCenter.entrySet()) {
+ try {
+ Map metrics = new HashMap<>();
+ metrics.put("log_type", Constant.REPORT_AND_METRICS);
+ metrics.put("time", time);
+ metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS));
+ metrics.put("data_center", dc.getKey());
+
+ metrics.put("max_size",getDruidMax(dc.getValue()));
+ Long druidUsed = getDruidUsed(dc.getValue());
+ metrics.put("used_size",druidUsed);
+ metrics.put("aggregate_size", getDiffNum(druidUsed, getCurrBefore(Constant.REPORT_AND_METRICS,dc.getKey())));
+ data.add(metrics);
+ } catch (Exception e) {
+ logger.error("druid storage endpoint={}, error {}",dc.getValue(), JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("druid storage endpoint={},error {}",dc.getValue(), JobUtil.getErrorMsg(e));
+ errorCount++;
+ }
}
-
- Map metrics = new HashMap<>();
- metrics.put("log_type", Constant.REPORT_AND_METRICS);
- metrics.put("time", time);
- metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS));
- metrics.putAll(druidCurr.getData());
- metrics.putAll(druidDiff.getData());
- metrics.putAll(druidMax.getData());
- data.add(metrics);
} catch (Exception e) {
logger.error("druid storage error {}", JobUtil.getErrorMsg(e));
XxlJobLogger.log("druid storage error {}", JobUtil.getErrorMsg(e));
- druidErrorCount++;
+ errorCount++;
}
- return druidErrorCount;
+
+ return errorCount;
}
/*
* 集群hbase存储配额
@@ -362,36 +354,21 @@ public class StorageQuotaService {
private int getFileStorageJson(List<Map> data, Long time) {
int errorCount = 0;
try {
- List<String> queryHdfsUrlList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(filesHosServer);
- long max = 0;
- long used = 0;
- long aggregate = 0;
- for (String queryHdfsUrl : queryHdfsUrlList) {
+ for (Map.Entry<String, String> entry : filesHosServer.entrySet()) {
try {
- queryHdfsUrl = Constant.HTTP.concat(queryHdfsUrl);
- StorageQuota hosStorage = getHBaseStorage(queryHdfsUrl);
- long maxSize = Long.parseLong(String.valueOf(hosStorage.getData().get("max_size")));
- long usedSize = Long.parseLong(String.valueOf(hosStorage.getData().get("used_size")));
- long aggregateSize = Long.parseLong(String.valueOf(hosStorage.getData().get("aggregate_size")));
- max += maxSize;
- used += usedSize;
- aggregate += aggregateSize;
+ Map files = new HashMap<>();
+ files.put("log_type", Constant.FILES);
+ files.put("time", time);
+ files.put("last_storage", getLastStorage(Constant.FILES));
+ files.put("data_center",entry.getKey());
+ files.putAll(getHBaseStorage(entry.getKey(),entry.getValue()));
+ data.add(files);
} catch (Exception e) {
- logger.error("file storage endpoint={},error={}", queryHdfsUrl, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("file storage endpoint={},error={}", queryHdfsUrl, JobUtil.getErrorMsg(e));
+ logger.error("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e));
errorCount++;
}
}
- if(errorCount!=queryHdfsUrlList.size()){
- Map files = new HashMap<>();
- files.put("log_type", Constant.FILES);
- files.put("time", time);
- files.put("last_storage", getLastStorage(Constant.FILES));
- files.put("max_size", max);
- files.put("used_size", used);
- files.put("aggregate_size", aggregate);
- data.add(files);
- }
} catch (Exception e) {
logger.error("file storage error {}", JobUtil.getErrorMsg(e));
XxlJobLogger.log("file storage error {}", JobUtil.getErrorMsg(e));
@@ -400,31 +377,6 @@ public class StorageQuotaService {
return errorCount;
}
- /**
- * 用于组合ClickHouse一天执行一次的JSON串 特殊处理
- *
- * @return 结果json
- */
- public Map getDayJson() {
- Map all = new HashMap<>();
- List<Map> data = new ArrayList<>();
- //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据
- Long date = System.currentTimeMillis()/1000 - 3600*12;
- int ckFailCount = getCKStorageJson(data, date);
-
- StorageQuota clickHouseDiff = getClickHouseDiff();
- if (clickHouseDiff.getStatus() != null) {
- XxlJobLogger.log("query clickhouse storage error, clickHouseDiff ={} ", clickHouseDiff.getStatus());
- ckFailCount++;
- }
- data.get(0).putAll(clickHouseDiff.getData());
-
- Map<String, Integer> status = new HashMap<>();
- status.put("trafficFailCount", ckFailCount);
- all.put("status", status);
- all.put("data", data);
- return all;
- }
/**
* 获取差值计算,若为负数则填写0
@@ -442,12 +394,4 @@ public class StorageQuotaService {
}
}
- @Data
- class StorageQuota {
-
- private Map<String, Object> data;
-
- private Map<String, String> status;
- }
-
}
diff --git a/galaxy-job-executor/src/main/resources/application-executor.yml b/galaxy-job-executor/src/main/resources/application-executor.yml
index 8db4bd9..a486c9a 100644
--- a/galaxy-job-executor/src/main/resources/application-executor.yml
+++ b/galaxy-job-executor/src/main/resources/application-executor.yml
@@ -8,14 +8,14 @@ spring:
storge:
## 存储配额文件服务器
files:
- hos-server: 192.168.40.223:9098,192.168.1.2:9098,192.168.44.12:9098 #hos对象存储,删除清库任务
+ hos-server: Nur-sultan|192.168.40.223:9098,Aktau|,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|
token: c21f969b5f03d33d43e04f8f136e7682
## 存储配额查询druid
analytic:
- server: 192.168.40.203:8082
+ server: Nur-sultan|192.168.40.203:8082
## 存储配额查询 clickhouse
traffic:
- server: 192.168.40.203:8123
+ server: Nur-sultan|192.168.40.203:8123
datasource: tsg_galaxy_v3
username: default
password: ceiec2019
@@ -29,4 +29,4 @@ storge:
tables: system.tables
clusters: system.clusters
zookeeper:
- server: 192.168.40.203:2181
+ server: 192.168.40.203:2181 \ No newline at end of file
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
index 5b12263..6a04d2d 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
@@ -1,8 +1,12 @@
package com.mesalab.executor.test;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
import com.mesalab.executor.GalaxyExecutorApplication;
import com.mesalab.executor.jobhandler.LogStorageQuotaJob;
import com.mesalab.executor.service.StorageQuotaService;
+import com.zdjizhi.utils.DateUtils;
+import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -10,25 +14,26 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { GalaxyExecutorApplication.class })
public class StorageQuotaTest {
-// @Autowired StorageQuotaService storageQuotaService;
-//
-// @Test public void storageQuotaServiceTest() {
-//
-// Map dayJson = storageQuotaService.getDayJson(); Assert.notEmpty(dayJson,
-// "clickhouse 一天的为空"); Map diskJson = storageQuotaService.getDiskJson();
-// Assert.notEmpty(diskJson, "存储配额获取失败");
-//
-// }
+ @Autowired StorageQuotaService storageQuotaService;
+
+ @Test public void storageQuotaServiceTest() {
+
+ Map diskJson = storageQuotaService.getDiskJson(false);
+ Assert.notEmpty(diskJson, "存储配额获取失败");
+
+ }
@Autowired
LogStorageQuotaJob logStorageQuotaJob;
-
- @Autowired
- StorageQuotaService storageQuotaService;
/**
* 文件清库 慎
@@ -54,5 +59,27 @@ public class StorageQuotaTest {
Long lastStorage = storageQuotaService.getLastStorage("Files");
Assert.notNull(lastStorage, "获取标准时间失败");
}
-
+ @Test
+ public void testS() {
+ String filesHosServer ="Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|";
+
+// Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|")));
+ String b ="Nur-sultan|192.168.40.223:9098";
+// String collect = Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x-> StringUtil.isNotEmpty(x)).collect(Collectors.joining(","));
+ List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x->{
+ List<String> dc = Splitter.on("|").splitToList(x);
+ if (dc.size()==2&&StringUtil.isNotBlank(dc.get(1))){
+ return ImmutableMap.of(dc.get(0),dc.get(1));
+ }
+ return null;
+ }).filter(Objects::nonNull).collect(Collectors.toList());
+ System.err.println(collect);
+ }
+
+ @Test
+ public void test(){
+ String date = DateUtils.getDateOfYesterday("yyyyMMdd");
+ long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd");
+ System.out.println(yyyyMMdd);
+ }
}