diff options
| author | zhanghongqing <[email protected]> | 2020-12-11 10:07:39 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2020-12-11 10:07:39 +0800 |
| commit | d7054c2d8cfce5a1803bf5d696d80ac8df48d180 (patch) | |
| tree | 3b43c28a3ecab48ba530730a17398d96bc00bd7b | |
| parent | f1a873beb01d89619f0e9b0d13174e0a2d01db86 (diff) | |
修改文件存储配额查询任务 v11-rc3
5 files changed, 225 insertions, 217 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java index cb92b92..5e05774 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/StorgeConfig.java @@ -1,9 +1,15 @@ package com.mesalab.executor.core.config; +import com.google.common.base.Splitter; +import com.zdjizhi.utils.StringUtil; +import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; -import lombok.Data; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Data @Configuration @@ -45,5 +51,43 @@ public class StorgeConfig { @Value("${storge.files.token}") private String filesToken; //登录验证 + public String getFilesHosServer() { + return getServer(); + } + + private String getServer() { + return getServer(filesHosServer); + } + + public String getTrafficServer() { + return getServer(trafficServer); + } + public String getAnalyticServer() { + return getServer(analyticServer); + } + + public Map<String,String> getFilesDataCenter() { + return getDCMap(filesHosServer); + } + + public Map<String,String> getTrafficDataCenter() { + return getDCMap(trafficServer); + } + public Map<String,String> getAnalyticDataCenter() { + return getDCMap(analyticServer); + } + private String getServer(String dcStr) { + return Splitter.on(",").trimResults().splitToList(dcStr).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x -> StringUtil.isNotEmpty(x)).collect(Collectors.joining(",")); + } + private Map<String, String> getDCMap(String dcStr) { + Map<String, String> map = new HashMap(); + Splitter.on(",").trimResults().splitToList(dcStr).stream().forEach(x -> { + List<String> dc = Splitter.on("|").trimResults().splitToList(x); + if (dc.size() == 2 && StringUtil.isNotBlank(dc.get(1))) { + map.put(dc.get(0), dc.get(1)); + } + }); + return map; + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index 9c7e9e8..6390597 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -569,14 +569,7 @@ public class LogStorageQuotaJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - //调用查询方法单机,集群 - Map resultMap = null; - - if ("false".equals(paramsMap.get("ckDayGrowth"))) { - resultMap = storageQuotaService.getDiskJson(); - } else { - resultMap = storageQuotaService.getDayJson(); - } + Map resultMap = storageQuotaService.getDiskJson("true".equals(paramsMap.get("ckDayGrowth"))); //发送到kafka new KafkaUtils().sendMessage(String.valueOf(paramsMap.get("topic")), (List<Object>) resultMap.get("data")); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java index 2733867..806b9c1 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java @@ -1,8 +1,5 @@ package com.mesalab.executor.service; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.mesalab.executor.core.config.StorgeConfig; import com.mesalab.executor.core.utils.*; @@ -11,7 +8,6 @@ import com.mesalab.executor.pojo.HosSpace; import com.xxl.job.core.log.XxlJobLogger; import com.zdjizhi.utils.DateUtils; import com.zdjizhi.utils.JsonMapper; -import lombok.Data; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.slf4j.Logger; @@ -22,7 +18,10 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * 存储配额获取指标类 组合 原始日志结果对象,统计日志结果对象,文件系统结果对象 @@ -40,17 +39,18 @@ public class StorageQuotaService { /** * Druid rest地址 */ - private static String queryDruidUrl; + private static String druidServer; + private static Map<String,String> druidDataCenter; /** * ClickHouse rest地址 */ - private static String queryClickHouseUrl; + private static Map<String,String> clickHouseServer; private static String database; /** * hos地址 */ - private static String filesHosServer; + private static Map<String,String> filesHosServer; private static String filesToken; private static String trafficUserName; @@ -60,6 +60,8 @@ public class StorageQuotaService { private static String systemDisks; private static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/"; private static final String FILE_STORAGE_PATH = "/admin/diskspace"; + private static final String DRUID_PATH = "/druid/v2/sql"; + private static final int STORAGE_LIMITATION = 60; //查询历史数据时允许的最大间隔60 MINUTE @Value("${zookeeper.server}") private String zookeeperServer; @@ -69,64 +71,59 @@ public class StorageQuotaService { database = config.getTrafficDatasource(); trafficUserName = config.getTrafficUsername(); trafficPassword = config.getTrafficPassword(); - queryClickHouseUrl = Joiner.on("").join(Constant.HTTP, config.getTrafficServer()); + clickHouseServer = config.getTrafficDataCenter(); systemPartsCluster = config.getSystemPartsCluster(); systemDisks = config.getSystemDisks(); - queryDruidUrl = Joiner.on("").join(Constant.HTTP, config.getAnalyticServer(), "/druid/v2/sql"); - filesHosServer = config.getFilesHosServer(); + druidServer = config.getAnalyticServer(); + druidDataCenter = config.getAnalyticDataCenter(); + + filesHosServer = config.getFilesDataCenter(); filesToken = config.getFilesToken(); } /** * 用于获取 ClickHouse 当前存储大小 若获取 failquerydruid内最新的值替补当前值 */ - private StorageQuota getClickHouseCurr() { + private Long getClickHouseCurr(String point) { Map<String, Object> deleteParamMap = getDeleteSource(); - StorageQuota storageQuota = new StorageQuota(); String currSql = "SELECT SUM(`bytes_on_disk`) FROM " + systemPartsCluster + " WHERE database = '" + database + "';"; deleteParamMap.put("query", currSql); - String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap)); + String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap)); Long result = Long.valueOf(currResult.trim()); - - storageQuota.setData(ImmutableMap.of("used_size", result)); - logger.info("query clickhouse used_size success, {}", storageQuota); - return storageQuota; + logger.info("query clickhouse success, used_size={}", result); + return result; } /** * 用于获取ClickHouse最大存储大小 若获取 failquerydruid内最新的值替补当前值 */ - private StorageQuota getClickHouseMax() { + private Long getClickHouseMax(String point) { Map<String, Object> deleteParamMap = getDeleteSource(); - StorageQuota storageQuota = new StorageQuota(); - String maxSql = "SELECT SUM(`total_space`) FROM " + systemDisks + ";"; + String maxSql = "SELECT SUM(`total_space`) FROM ".concat(systemDisks).concat(";"); deleteParamMap.put("query", maxSql); String maxResult = HttpClientUtils - .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap)); + .httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap)); long result = Long.parseLong(maxResult.trim()); - storageQuota.setData(ImmutableMap.of("max_size", result)); - logger.info("query clickhouse max_size success ,{}", storageQuota); - return storageQuota; + logger.info("query clickhouse max_size success ,{}", result); + return result; } /** * 用于获取ClickHouse 差值 若获取 fail直接写入0 */ - private StorageQuota getClickHouseDiff() { + private Long getClickHouseDiff(String point) { String date = DateUtils.getDateOfYesterday("yyyyMMdd"); Map<String, Object> deleteParamMap = getDeleteSource(); - StorageQuota storageQuota = new StorageQuota(); - String diffSql = "SELECT SUM(bytes_on_disk) FROM " + systemPartsCluster + " WHERE database = '" + database - + "' AND partition = '" + date + "';"; + String diffSql = "SELECT SUM(bytes_on_disk) FROM ".concat(systemPartsCluster).concat(" WHERE database = '" ).concat(database) + .concat("' AND partition = '").concat(date).concat("';"); deleteParamMap.put("query", diffSql); String diffResult = HttpClientUtils - .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap)); + .httpGet(HttpClientUtils.getUrlWithParams(Constant.HTTP.concat(point), deleteParamMap)); long result = Long.parseLong(diffResult.trim()); - storageQuota.setData(ImmutableMap.of("aggregate_size", result)); - logger.info("query clickhouse traffic_aggregate success,{}", storageQuota); - return storageQuota; + logger.info("query clickhouse success aggregate_size={}", result); + return result; } //========================Druid============================== @@ -134,50 +131,37 @@ public class StorageQuotaService { /** * 获取Druid当前存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值 */ - private StorageQuota getDruidCurr() { - String currSql = "{\"query\":\"SELECT SUM(curr_size) AS curr_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - StorageQuota storageQuota = new StorageQuota(); - String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql); + private Long getDruidUsed(String point) { + String currSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_available=1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; + String currResult = HttpClientUtils.httpPost(Constant.HTTP.concat(point).concat(DRUID_PATH), currSql); List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(currResult, List.class); - Long currSize = Long.valueOf(String.valueOf(list.get(0).get("curr_size"))); - - storageQuota.setData(ImmutableMap.of("used_size", currSize)); - logger.info("query druid used_size success,{}", storageQuota); - return storageQuota; - } + if(list!=null&&list.size()==0){ + return 0L; + } + Long currSize = Long.valueOf(String.valueOf(list.get(0).get("used_size"))); - /** - * 获取Druid过去30秒增量 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值 - */ - private StorageQuota getDruidDiff(Long currSize) { - StorageQuota storageQuota = new StorageQuota(); - Long before = getCurrBefore(Constant.REPORT_AND_METRICS); - long diff = getDiffNum(currSize, before); - storageQuota.setData(ImmutableMap.of("aggregate_size", diff)); - logger.info("query druid aggregate_size success,{}", storageQuota); - return storageQuota; + logger.info("query druid used_size success,{}", currSize); + return currSize; } /** * 获取Druid最大存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值 */ - private StorageQuota getDruidMax() { + private Long getDruidMax(String point) { String maxSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - StorageQuota storageQuota = new StorageQuota(); - String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql); + String maxResult = HttpClientUtils.httpPost(Constant.HTTP.concat(point).concat(DRUID_PATH), maxSql); List<Map<String, Long>> list = (List) JsonMapper.fromJsonString(maxResult, List.class); Long maxSize = list.get(0).get("max_size"); - storageQuota.setData(ImmutableMap.of("max_size", maxSize)); - logger.info("query druid max_size success,{}", storageQuota); - return storageQuota; + + logger.info("query druid max_size success,{}", maxSize); + return maxSize; } /** * 获取hos前存储大小 若获取 failquerydruid内最新的值替补当前值 */ - private StorageQuota getHBaseStorage(String queryHosUrl) { - StorageQuota storageQuota = new StorageQuota(); + private Map getHBaseStorage(String key,String point) { Header[] headers = {new BasicHeader(Constant.TOKEN, filesToken), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; - String result = HttpClientUtils.httpGet(queryHosUrl.concat(FILE_STORAGE_PATH), headers); + String result = HttpClientUtils.httpGet(Constant.HTTP.concat(point).concat(FILE_STORAGE_PATH), headers); if("-1".equals(result)){ throw new RuntimeException("hos server error"); } @@ -192,11 +176,10 @@ public class StorageQuotaService { Map<String, Object> data = new HashMap<>(); data.put("max_size", hosCapacity); data.put("used_size", hosUsed); - data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES))); + data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES,key))); - storageQuota.setData(data); - logger.info("query file storage success,{}", storageQuota); - return storageQuota; + logger.info("query file storage success,{}", data); + return data; } //=======================工具类方法=============================== @@ -204,35 +187,33 @@ public class StorageQuotaService { * 用于通过druid获取上次对应类型的Curr值 * * @param logType 统计类型 - * @return 上次的值 + * @return 上次的值,如果不是当天,也不是在00:00:00则返回0 + * //如果历史记录时间比现在少于1个小时,写0,如果最近一小时为0则查今天的 */ - private Long getCurrBefore(String logType) { + private Long getCurrBefore(String logType, String key) { String currSql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType - + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql); + + "' and data_center = '"+ key +"' and __time >= CURRENT_TIMESTAMP - INTERVAL '"+ STORAGE_LIMITATION +"' MINUTE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; + String currResult = HttpClientUtils.httpPost(Constant.HTTP.concat(druidServer).concat(DRUID_PATH), currSql); List<Map> list = (List) JsonMapper.fromJsonString(currResult, List.class); - Long currSize = Long.valueOf(String.valueOf(list.get(0).get("used_size"))); - logger.info("query {} history used_size success,{}",logType, currSize); - return currSize; + if(list!=null&&list.size()==0){ + String currDaySql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType + + "' and data_center = '"+ key +"' and __time >= CURRENT_DATE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; + String currDayResult = HttpClientUtils.httpPost(Constant.HTTP.concat(druidServer).concat(DRUID_PATH), currDaySql); + list = (List) JsonMapper.fromJsonString(currDayResult, List.class); + } + if(list!=null&&list.size()==0){ + return 0L; + } + Object used = list.get(0).get("used_size"); + if(ObjectUtils.isEmpty(used)){ + return 0L; + } + Long historyUsed = Long.valueOf(String.valueOf(used)); + logger.info("query {} history used_size success,{}",logType, historyUsed); + return historyUsed; } /** - * 用于通过druid获取上次对应类型的Max值 - * - * @param logType 统计类型 - * @return 上次的值 - */ -/* private Long getMaxBefore(String logType) { - String maxSql = "{\"query\":\"SELECT max_size FROM sys_storage_log WHERE log_type = '" + logType - + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql); - List<Map> list = (List<Map>) JsonMapper.fromJsonString(maxResult, List.class); - Long currSize = Long.valueOf(String.valueOf(list.get(0).get("max_size"))); - logger.info("query {} max_size history success,{}", logType,currSize); - return currSize; - }*/ - - /** * @param node * @return 自定义的标准时间 last_storage, * 存在zookeeper 节点/storage/worker/+node @@ -287,74 +268,85 @@ public class StorageQuotaService { * * @return 结果json */ - public Map<String, Map> getDiskJson() { + public Map<String, Map> getDiskJson(boolean ckDay) { Map all = new HashMap<>(); List<Map> data = new ArrayList<>(); Long now = System.currentTimeMillis() / 1000; Map<String, Integer> status = new HashMap<>(); - status.put("trafficFailCount", getCKStorageJson(data, now)); - status.put("reportFailCount", getDruidStorageJson(data, now)); - status.put("fileFailCount", getFileStorageJson(data, now)); - + if(ckDay){ + //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据 + long time = 12*3600L; + long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd")+time; + status.put("trafficFailCount", getCKStorageJson(data, timestamp,true)); + } else{ + status.put("trafficFailCount", getCKStorageJson(data, now,false)); + status.put("reportFailCount", getDruidStorageJson(data, now)); + status.put("fileFailCount", getFileStorageJson(data, now)); + } all.put("data", data); all.put("status", status); - return all; } - - private int getCKStorageJson(List<Map> data, Long time) { - int ckErrorCount = 0; + private int getCKStorageJson(List<Map> data, Long time, boolean day) { + int errorCount = 0; try { - StorageQuota clickHouseCurr = getClickHouseCurr(); - StorageQuota clickHouseMax = getClickHouseMax(); - - if (clickHouseCurr.getStatus() != null || clickHouseMax.getStatus() != null) { - XxlJobLogger.log("clickhouse storage error clickHouseCurr ={} , clickHouseMax={}", clickHouseCurr.getStatus(), clickHouseMax.getStatus()); - ckErrorCount++; + for (Map.Entry<String, String> dc : clickHouseServer.entrySet()) { + try { + Map<String,Object> traffic = new HashMap<>(); + traffic.put("log_type", Constant.TRAFFIC_LOGS); + traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS)); + traffic.put("data_center",dc.getKey()); + traffic.put("time", time); + traffic.put("used_size",getClickHouseCurr(dc.getValue())); + traffic.put("max_size",getClickHouseMax(dc.getValue())); + if(day){ + traffic.put("aggregate_size",getClickHouseDiff(dc.getValue())); + } + data.add(traffic); + } catch (Exception e) { + logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e)); + XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e)); + errorCount++; + } } - - Map<String,Object> traffic = new HashMap<>(); - traffic.put("log_type", Constant.TRAFFIC_LOGS); - traffic.put("time", time); - traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS)); - traffic.putAll(clickHouseCurr.getData()); - traffic.putAll(clickHouseMax.getData()); - data.add(traffic); } catch (Exception e) { logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e)); XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e)); - ckErrorCount++; + errorCount++; } - return ckErrorCount; + return errorCount; } private int getDruidStorageJson(List<Map> data, Long time) { - int druidErrorCount = 0; + int errorCount = 0; try { - StorageQuota druidCurr = getDruidCurr(); - StorageQuota druidDiff = getDruidDiff(Long.valueOf(String.valueOf(druidCurr.getData().get("used_size")))); - StorageQuota druidMax = getDruidMax(); - druidErrorCount = 0; - if (druidCurr.getStatus() != null || druidDiff.getStatus() != null || druidMax.getStatus() != null) { - XxlJobLogger.log("druid storage error druidCurr ={} , druidDiff={}, druidMax={}", druidCurr.getStatus(), druidDiff.getStatus(), druidMax.getStatus()); - druidErrorCount++; + for (Map.Entry<String, String> dc : druidDataCenter.entrySet()) { + try { + Map metrics = new HashMap<>(); + metrics.put("log_type", Constant.REPORT_AND_METRICS); + metrics.put("time", time); + metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS)); + metrics.put("data_center", dc.getKey()); + + metrics.put("max_size",getDruidMax(dc.getValue())); + Long druidUsed = getDruidUsed(dc.getValue()); + metrics.put("used_size",druidUsed); + metrics.put("aggregate_size", getDiffNum(druidUsed, getCurrBefore(Constant.REPORT_AND_METRICS,dc.getKey()))); + data.add(metrics); + } catch (Exception e) { + logger.error("druid storage endpoint={}, error {}",dc.getValue(), JobUtil.getErrorMsg(e)); + XxlJobLogger.log("druid storage endpoint={},error {}",dc.getValue(), JobUtil.getErrorMsg(e)); + errorCount++; + } } - - Map metrics = new HashMap<>(); - metrics.put("log_type", Constant.REPORT_AND_METRICS); - metrics.put("time", time); - metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS)); - metrics.putAll(druidCurr.getData()); - metrics.putAll(druidDiff.getData()); - metrics.putAll(druidMax.getData()); - data.add(metrics); } catch (Exception e) { logger.error("druid storage error {}", JobUtil.getErrorMsg(e)); XxlJobLogger.log("druid storage error {}", JobUtil.getErrorMsg(e)); - druidErrorCount++; + errorCount++; } - return druidErrorCount; + + return errorCount; } /* * 集群hbase存储配额 @@ -362,36 +354,21 @@ public class StorageQuotaService { private int getFileStorageJson(List<Map> data, Long time) { int errorCount = 0; try { - List<String> queryHdfsUrlList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(filesHosServer); - long max = 0; - long used = 0; - long aggregate = 0; - for (String queryHdfsUrl : queryHdfsUrlList) { + for (Map.Entry<String, String> entry : filesHosServer.entrySet()) { try { - queryHdfsUrl = Constant.HTTP.concat(queryHdfsUrl); - StorageQuota hosStorage = getHBaseStorage(queryHdfsUrl); - long maxSize = Long.parseLong(String.valueOf(hosStorage.getData().get("max_size"))); - long usedSize = Long.parseLong(String.valueOf(hosStorage.getData().get("used_size"))); - long aggregateSize = Long.parseLong(String.valueOf(hosStorage.getData().get("aggregate_size"))); - max += maxSize; - used += usedSize; - aggregate += aggregateSize; + Map files = new HashMap<>(); + files.put("log_type", Constant.FILES); + files.put("time", time); + files.put("last_storage", getLastStorage(Constant.FILES)); + files.put("data_center",entry.getKey()); + files.putAll(getHBaseStorage(entry.getKey(),entry.getValue())); + data.add(files); } catch (Exception e) { - logger.error("file storage endpoint={},error={}", queryHdfsUrl, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("file storage endpoint={},error={}", queryHdfsUrl, JobUtil.getErrorMsg(e)); + logger.error("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e)); + XxlJobLogger.log("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e)); errorCount++; } } - if(errorCount!=queryHdfsUrlList.size()){ - Map files = new HashMap<>(); - files.put("log_type", Constant.FILES); - files.put("time", time); - files.put("last_storage", getLastStorage(Constant.FILES)); - files.put("max_size", max); - files.put("used_size", used); - files.put("aggregate_size", aggregate); - data.add(files); - } } catch (Exception e) { logger.error("file storage error {}", JobUtil.getErrorMsg(e)); XxlJobLogger.log("file storage error {}", JobUtil.getErrorMsg(e)); @@ -400,31 +377,6 @@ public class StorageQuotaService { return errorCount; } - /** - * 用于组合ClickHouse一天执行一次的JSON串 特殊处理 - * - * @return 结果json - */ - public Map getDayJson() { - Map all = new HashMap<>(); - List<Map> data = new ArrayList<>(); - //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据 - Long date = System.currentTimeMillis()/1000 - 3600*12; - int ckFailCount = getCKStorageJson(data, date); - - StorageQuota clickHouseDiff = getClickHouseDiff(); - if (clickHouseDiff.getStatus() != null) { - XxlJobLogger.log("query clickhouse storage error, clickHouseDiff ={} ", clickHouseDiff.getStatus()); - ckFailCount++; - } - data.get(0).putAll(clickHouseDiff.getData()); - - Map<String, Integer> status = new HashMap<>(); - status.put("trafficFailCount", ckFailCount); - all.put("status", status); - all.put("data", data); - return all; - } /** * 获取差值计算,若为负数则填写0 @@ -442,12 +394,4 @@ public class StorageQuotaService { } } - @Data - class StorageQuota { - - private Map<String, Object> data; - - private Map<String, String> status; - } - } diff --git a/galaxy-job-executor/src/main/resources/application-executor.yml b/galaxy-job-executor/src/main/resources/application-executor.yml index 8db4bd9..a486c9a 100644 --- a/galaxy-job-executor/src/main/resources/application-executor.yml +++ b/galaxy-job-executor/src/main/resources/application-executor.yml @@ -8,14 +8,14 @@ spring: storge: ## 存储配额文件服务器 files: - hos-server: 192.168.40.223:9098,192.168.1.2:9098,192.168.44.12:9098 #hos对象存储,删除清库任务 + hos-server: Nur-sultan|192.168.40.223:9098,Aktau|,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan| token: c21f969b5f03d33d43e04f8f136e7682 ## 存储配额查询druid analytic: - server: 192.168.40.203:8082 + server: Nur-sultan|192.168.40.203:8082 ## 存储配额查询 clickhouse traffic: - server: 192.168.40.203:8123 + server: Nur-sultan|192.168.40.203:8123 datasource: tsg_galaxy_v3 username: default password: ceiec2019 @@ -29,4 +29,4 @@ storge: tables: system.tables clusters: system.clusters zookeeper: - server: 192.168.40.203:2181 + server: 192.168.40.203:2181
\ No newline at end of file diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java index 5b12263..6a04d2d 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java @@ -1,8 +1,12 @@ package com.mesalab.executor.test; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; import com.mesalab.executor.GalaxyExecutorApplication; import com.mesalab.executor.jobhandler.LogStorageQuotaJob; import com.mesalab.executor.service.StorageQuotaService; +import com.zdjizhi.utils.DateUtils; +import com.zdjizhi.utils.StringUtil; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -10,25 +14,26 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.Assert; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + @RunWith(SpringRunner.class) @SpringBootTest(classes = { GalaxyExecutorApplication.class }) public class StorageQuotaTest { -// @Autowired StorageQuotaService storageQuotaService; -// -// @Test public void storageQuotaServiceTest() { -// -// Map dayJson = storageQuotaService.getDayJson(); Assert.notEmpty(dayJson, -// "clickhouse 一天的为空"); Map diskJson = storageQuotaService.getDiskJson(); -// Assert.notEmpty(diskJson, "存储配额获取失败"); -// -// } + @Autowired StorageQuotaService storageQuotaService; + + @Test public void storageQuotaServiceTest() { + + Map diskJson = storageQuotaService.getDiskJson(false); + Assert.notEmpty(diskJson, "存储配额获取失败"); + + } @Autowired LogStorageQuotaJob logStorageQuotaJob; - - @Autowired - StorageQuotaService storageQuotaService; /** * 文件清库 慎 @@ -54,5 +59,27 @@ public class StorageQuotaTest { Long lastStorage = storageQuotaService.getLastStorage("Files"); Assert.notNull(lastStorage, "获取标准时间失败"); } - + @Test + public void testS() { + String filesHosServer ="Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|"; + +// Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|"))); + String b ="Nur-sultan|192.168.40.223:9098"; +// String collect = Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x-> StringUtil.isNotEmpty(x)).collect(Collectors.joining(",")); + List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x->{ + List<String> dc = Splitter.on("|").splitToList(x); + if (dc.size()==2&&StringUtil.isNotBlank(dc.get(1))){ + return ImmutableMap.of(dc.get(0),dc.get(1)); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + System.err.println(collect); + } + + @Test + public void test(){ + String date = DateUtils.getDateOfYesterday("yyyyMMdd"); + long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd"); + System.out.println(yyyyMMdd); + } } |
