diff options
| author | zhanghongqing <[email protected]> | 2024-07-01 18:11:41 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2024-07-01 18:11:41 +0800 |
| commit | 43fe8d22b16737fbceab6875ec3d30210e4908c7 (patch) | |
| tree | 43b53f72d17d0d19a4312242dddbb7960468b447 | |
| parent | dee95eb5ed7cf292bd334c662971863afe07ebe1 (diff) | |
[新增][日志删除] 增加删除日志任务delete_old_log TSG-21553
8 files changed, 574 insertions, 857 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java index 30c59e5..b866c2a 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java @@ -7,11 +7,13 @@ public class Constant { public static final String REPORT_AND_METRICS = "Report and Metrics"; public static final String TRAFFIC_LOGS = "Traffic Logs"; public static final String METRICS = "Metrics"; + public static final String MAX_DAYS = "maxDays"; // zookeeper /path+node public static final String ZK_TRAFFIC_LOGS = "Traffic-Logs"; public static final String ZK_REPORT_AND_METRICS = "Report-and-Metrics"; public static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/"; + public static final String ZOOKEEPER_STORAGE_CLEAR_TIME = "clearTime"; public static final String TOKEN = "Token"; public static final String TEXT_XML = "text/xml"; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index 7aa5670..e7f85a6 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -2,6 +2,7 @@ package com.mesalab.executor.jobhandler; import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import com.geedgenetworks.utils.DateUtils; import com.geedgenetworks.utils.JsonMapper; @@ -15,20 +16,19 @@ import com.mesalab.executor.core.utils.*; import com.mesalab.executor.exception.BusinessException; import com.mesalab.executor.pojo.LifecycleConfiguration; import com.mesalab.executor.pojo.ListAllMyBucketsResult; -import com.mesalab.executor.service.StorageQuotaService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.message.BasicHeader; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -39,90 +39,31 @@ public class LogStorageQuotaJob { private Log logger = Log.get(); private static final String analyticUrl = "/druid/v2/sql"; - private static final String trafficPort = "8123"; private static final String ck_cluster = "ck_cluster"; private static final String max_days = "maxDays"; - @Autowired - StorageQuotaService storageQuotaService; + @Value("${zookeeper.server}") + private String zookeeperServer; - private StorgeConfig deletionConfig = (StorgeConfig)SpringContextUtil.getBean("storgeConfig"); + private StorgeConfig deletionConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig"); //*********************************************设置存储策略********************************************************** - + private final Header[] hosHeaders = {new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken()), + new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; /** * 设置流量日志存储策略储 * clickhouse 数据库 * - * @param params {"maxdays":30} + * @param params {"maxDays":30} */ - @XxlJob("deleteTrafficDataJobHandler2") - public ReturnT<String> deleteTrafficData(String params) { + @XxlJob("deleteTrafficDataJobHandler") + public ReturnT<String> deleteTrafficDataByCluster(String params) { try { Map<String, Object> paramsMap = validParams(params); if (ObjectUtils.isEmpty(paramsMap)) { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - Map<String, Object> deleteParamMap = deletionConfig.getCkSource(); - - //根据服务器地址,表,分区去删除数据 - String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), "."); - String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -Integer.valueOf(String.valueOf(paramsMap.get(max_days)))); - UrlBuilder urlBuilder = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8); - List<String> addressForCKs = getAddressForCK(urlBuilder.toString()); - int failCount = 0; - // 指定服务器 - for (String endpoint : addressForCKs) { - try { - // 指定表 - String url = UrlUtil.getUrl(endpoint); - List<String> tablesForCKs = getTablesForCK(url,deletionConfig.getNotInSql("name",deletionConfig.getTafficExclusion())); - for (String table : tablesForCKs) { - try { - //指定分区 - List<String> partitions = getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); - for (String partition : partitions) { - try { - deleteParamMap.put("query", Joiner.on("").join(prefixDeleteSql, table, " DROP PARTITION ", partition)); - HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), ""); - } catch (BusinessException e) { - failCount++; - logger.error("clickhouse ttl error endpoint = {}, table = {}, partition = {}, fail message = {}", endpoint, table, partition, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("clickhouse ttl error partitionFailCount={}, endpoint = {}, table = {}, partition = {}, fail message = {}", failCount, endpoint, table, partition, JobUtil.getErrorMsg(e)); - } - } - logger.info("clickhouse ttl clickhouse endpoint = {}, table = {}, partition < {}, size = {}, fail count = {}", endpoint, table, deleteMaxDate, partitions.size(), failCount); - } catch (BusinessException e) { - failCount++; - logger.error("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e)); - } - } - XxlJobLogger.log("clickhouse ttl endpoint = {}, failCount = {}, tables = {}", endpoint, failCount, tablesForCKs); - } catch (BusinessException e) { - failCount++; - logger.error("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); - } - } - modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, Integer.valueOf(String.valueOf(paramsMap.get(max_days)))); - if (failCount > 0) { - throw new BusinessException("reset storage clickhouse ttl error ,failCount " + failCount); - } - - } catch (BusinessException be) { - logger.error(be.getMessage()); - XxlJobLogger.log(be.getMessage()); - return ReturnT.FAIL; - } - return ReturnT.SUCCESS; - } - - @XxlJob("deleteTrafficDataJobHandler") - public ReturnT<String> deleteTrafficDataByCluster(String params) { - try { - Map<String, Object> paramsMap = validParams(params); if (ObjectUtils.isEmpty(paramsMap)) { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; @@ -149,7 +90,7 @@ public class LogStorageQuotaJob { XxlJobLogger.log("reset storage days table {}, drop partition size {} ", table, partitionList.size()); partitionList.forEach(partition -> { - String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'"); + String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'", partition, "'"); deleteParamMap.put("query", deleteSql); logger.info("reset storage days clickhouse sql:{}", deleteSql); @@ -167,8 +108,8 @@ public class LogStorageQuotaJob { if (failCount > 0) { throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount); } - logger.info("reset clickhouse {} days success",maxdays); - XxlJobLogger.log("reset clickhouse {} days success",maxdays); + logger.info("reset clickhouse {} days success", maxdays); + XxlJobLogger.log("reset clickhouse {} days success", maxdays); } catch (BusinessException be) { logger.error(be.getMessage()); XxlJobLogger.log(be.getMessage()); @@ -308,122 +249,75 @@ public class LogStorageQuotaJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - List<String> endpointList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesHosServer()); - List<String> whiteList = StringUtil.isEmpty(deletionConfig.getFilesExclusion()) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion()); - Header header1 = new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken()); - Header header2 = new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML); - Header[] headers = {header1, header2}; + String maxDays = String.valueOf(paramsMap.get(max_days)); + List<String> hosHostList = parseList(deletionConfig.getFilesHosServer()); int failCount = 0; - for (String endpoint : endpointList) { - try { - String filesServer = UrlUtil.getUrl(endpoint); - String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header1); - if (httpGetRes != null && !"-1".equals(httpGetRes)) { - ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes); - logger.info("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, list); - if (ObjectUtils.allNotNull(list, list.getBuckets(), list.getBuckets().getBucket())) { - ListAllMyBucketsResult.Buckets buckets = list.getBuckets(); - List<ListAllMyBucketsResult.Buckets.Bucket> bucketList = buckets.getBucket(); - - LifecycleConfiguration lc = new LifecycleConfiguration(); - lc.getRule().setExpiration(new LifecycleConfiguration.Expiration(String.valueOf(paramsMap.get(max_days)))); - - for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) { - try { - if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) { - LifecycleConfiguration whiteLc = new LifecycleConfiguration(); - whiteLc.getRule().setExpiration(new LifecycleConfiguration.Expiration("24855"));//hos保存最大年限 - HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(whiteLc), headers); - continue; - } - HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(lc), headers); - } catch (BusinessException e) { - failCount++; - logger.error("reset storage files ttl failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucket, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucket, JobUtil.getErrorMsg(e)); - } - } - } - } else { - failCount++; - logger.error("reset storage files ttl failCount = {}, endpoint = {} , fail message = I/O exception", failCount, endpoint); - XxlJobLogger.log("endpoint = {} , fail message = I/O exception", endpoint); - } - } catch (BusinessException e) { - logger.error("reset storage files ttl endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); - failCount++; - } + for (String hosHost : hosHostList) { + failCount += deleteHosData(hosHost, maxDays); } - modifyLastStorage(Constant.FILES, Integer.valueOf(String.valueOf(paramsMap.get(max_days)))); + if (failCount > 0) { - throw new BusinessException(" number of total endpoints " + endpointList.size() + ", number of failed count " + failCount); + JobUtil.errorLog(" number of total endpoints " + hosHostList.size() + ", number of failed count " + failCount); + return ReturnT.FAIL; } - } catch (BusinessException be) { - logger.error(be.getMessage()); - XxlJobLogger.log(be.getMessage()); + JobUtil.errorLog(be.getMessage()); return ReturnT.FAIL; } return ReturnT.SUCCESS; } - //*********************************************清库********************************************************** - - /** - * 清除所有流量数据,click hosue库 - * - * @param params{"maxdays":365} - */ - @XxlJob("deleteAllTrafficDataJobHandler2") - public ReturnT<String> deleteAllTrafficData2(String params) { - + public int deleteHosData(String endpoint, String maxDays) { + int failCount = 0; try { - Map<String, Object> paramsMap = validParams(params); - if (ObjectUtils.isEmpty(paramsMap)) { - logger.error("params parser error , params is {}", params); - return IJobHandler.FAIL; - } - List<String> addressForCKs = getAddressForCK(UrlUtil.getUrl(deletionConfig.getTrafficServer())); - Map<String, Object> deleteParamMap = deletionConfig.getCkSource(); - //清库命令参数 - String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), "."); - int failCount = 0; - for (String endpoint : addressForCKs) { + String hosServer = UrlUtil.getUrl(endpoint); + // 获取桶列表 + List<String> bucketList = getBucketList(endpoint, hosServer); + LifecycleConfiguration lifecycleConfig = new LifecycleConfiguration(); + + lifecycleConfig.getRule().setExpiration(new LifecycleConfiguration.Expiration(maxDays)); + JobUtil.infoLog("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, bucketList); + for (String bucketName : bucketList) { try { - String url = UrlUtil.getUrl(endpoint); - List<String> tablesForCKs = getTablesForCK(url, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion())); - for (String table : tablesForCKs) { - try { - deleteParamMap.put("query", deleteSql.concat(table)); - logger.info("delete clickhouse endpoint = {}, sql:{}", endpoint, deleteSql.concat(table)); - HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), ""); - } catch (BusinessException e) { - logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e)); - failCount++; - } - } - XxlJobLogger.log("delete storage clickhouse error endpoint = {}, fail count = {}, tables = {}", endpoint, failCount, tablesForCKs); + HttpClientUtils.httpPut(StringUtils.join(hosServer, "/hos/", bucketName, "?lifecycle"), XmlUtil.converToXml(lifecycleConfig), hosHeaders); } catch (BusinessException e) { - logger.error("delete storage clickhouse error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("delete storage clickhouse error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); failCount++; + JobUtil.errorLog("reset storage files ttl failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucketName, JobUtil.getErrorMsg(e)); } } - modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, 0); - if (failCount > 0) { - throw new BusinessException("delete storage clickhouse error ,failCount " + failCount); - } + modifyLastStorage(Constant.FILES, Integer.valueOf(maxDays)); + } catch (BusinessException e) { + JobUtil.errorLog("reset storage files ttl endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); + failCount++; + } + return failCount; + } - } catch (BusinessException be) { - logger.error(be.getMessage()); - XxlJobLogger.log(be.getMessage()); - return ReturnT.FAIL; + private List<String> getBucketList(String endpoint, String hosServer) { + List<String> whiteList = parseList(deletionConfig.getFilesExclusion()); + String httpGetRes = HttpClientUtils.httpGet(hosServer + "/hos/", hosHeaders); + if (httpGetRes == null || "-1".equals(httpGetRes)) { + throw new BusinessException(StrUtil.format("reset storage files ttl endpoint = {} , fail message = I/O exception", endpoint)); } - return ReturnT.SUCCESS; + List<String> bucketList = Lists.newArrayList(); + ListAllMyBucketsResult bucketsResult = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes); + + if (ObjectUtils.allNotNull(bucketsResult, bucketsResult.getBuckets(), bucketsResult.getBuckets().getBucket())) { + for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketsResult.getBuckets().getBucket()) { + if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) { + continue; + } + bucketList.add(bucket.getName()); + } + } + return bucketList; + } + + private List<String> parseList(String csv) { + return StringUtil.isEmpty(csv) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(csv); } + //*********************************************清库********************************************************** /** * 清除所有流量数据,click hosue库 @@ -451,12 +345,12 @@ public class LogStorageQuotaJob { for (String table : tablesForCKs) { try { - String deleteSqlCluster = String.join(" ",deleteSql, table, " ON CLUSTER ", ck_cluster); + String deleteSqlCluster = String.join(" ", deleteSql, table, " ON CLUSTER ", ck_cluster); deleteParamMap.put("query", deleteSqlCluster); logger.info("delete clickhouse table = {}, sql:{}", table, deleteSqlCluster); HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), ""); - XxlJobLogger.log("delete clickhouse table = {}" ,table); + XxlJobLogger.log("delete clickhouse table = {}", table); } catch (BusinessException e) { logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e)); @@ -468,7 +362,7 @@ public class LogStorageQuotaJob { XxlJobLogger.log("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs); } catch (BusinessException e) { - logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e)); + logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e)); XxlJobLogger.log("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e)); failCount++; } @@ -643,41 +537,6 @@ public class LogStorageQuotaJob { /** - * 直接查询数据库获取参数 - * - * @param params {"topic":"SYS-STORAGE-LOG","ckDayGrowth":"false"} - * ckDayGrowth:表示是否为一天的执行一次的时间粒度 - */ - @XxlJob("getStorageQuotaJobHandler") - public ReturnT<String> getStorageQuota(String params) { - - try { - Map<String, Object> paramsMap = validParams(params); - if (ObjectUtils.isEmpty(paramsMap)) { - logger.error("params parser error , params is {}", params); - return IJobHandler.FAIL; - } - Map resultMap = storageQuotaService.getDiskJson("true".equals(paramsMap.get("ckDayGrowth"))); - - //发送到kafka - new KafkaUtils().sendMessage(String.valueOf(paramsMap.get("topic")), (List<Object>) resultMap.get("data")); - XxlJobLogger.log("kafka topic is {}", paramsMap.get("topic")); - //查询存在问题会返回status>0 - int status = ((Map<String, Integer>) resultMap.get("status")).values().stream().mapToInt(Integer::intValue).sum(); - if (status > 0) { - return new ReturnT<String>(IJobHandler.FAIL.getCode(), " query error " + resultMap.get("status")); - } - - } catch (BusinessException be) { - logger.error(be.getMessage()); - XxlJobLogger.log(be.getMessage()); - return ReturnT.FAIL; - } - - return ReturnT.SUCCESS; - } - - /** * 数据库连接参数 * * @return map @@ -720,17 +579,6 @@ public class LogStorageQuotaJob { return tableList; } - /*** - * --查所有的IP地址 - * @Date 2020\9\18 0018 - * @return java.util.List<java.lang.String> - **/ - private List<String> getAddressForCK(String url) { - List<String> endpointList = new ArrayList<>(); - endpointList.addAll(getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT concat(host_address,':','" + trafficPort + "') as endpoint FROM ", deletionConfig.getSystemClusters(), " FORMAT JSON;"))); - return endpointList; - } - /** * 查询clickhouse 需要删除的表 * @@ -772,15 +620,16 @@ public class LogStorageQuotaJob { * 更新标准时间 * * @param logType - * @param maxdays + * @param maxDays */ - private void modifyLastStorage(String logType, Integer maxdays) { - // 设置标准时间,如果now - 标准时间 > maxdays,则更新 标准时间 = 当前时间-maxday - Long lastStorage = storageQuotaService.getLastStorage(logType); + private void modifyLastStorage(String logType, Integer maxDays) { + // 设置标准时间,如果now - 标准时间 > maxDays,则更新 标准时间 = 当前时间-maxday + Long lastStorage = getLastStorage(logType); Long now = System.currentTimeMillis() / 1000; - Long max = maxdays * 24 * 60 * 60L;// 单位秒 + Long max = maxDays * 24 * 60 * 60L;// 单位秒 if (now - lastStorage > max) { - storageQuotaService.modifyLastStorage(logType, String.valueOf(now - max)); + ZookeeperUtils zk = new ZookeeperUtils(); + zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + logType, String.valueOf(now - max), zookeeperServer); } } @@ -793,4 +642,33 @@ public class LogStorageQuotaJob { return ObjectUtils.isEmpty(startTimeList) ? DateUtils.getCurrentDate() : String.valueOf(startTimeList.get(0).get("version")); } + /** + * @param node + * @return 自定义的标准时间 last_storage, + * 存在zookeeper 节点/storage/worker/+node + */ + public Long getLastStorage(String node) { + try { + ZookeeperUtils zk = new ZookeeperUtils(); + //query + String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); + //不存在创建一个 + if (org.springframework.util.ObjectUtils.isEmpty(nodeData)) { + Long lastTime = System.currentTimeMillis() / 1000; + zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer); + return lastTime; + } + Long lastStorage = Long.valueOf(nodeData); + logger.info("query standard time last_storage success,{}", lastStorage); + + return Long.valueOf(nodeData); + } catch (Exception e) { + logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e)); + throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); + } + + } + class DeleteParams { + + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java index 0abc2b9..b46c9d7 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java @@ -17,7 +17,6 @@ import com.mesalab.executor.core.utils.HttpClientUtils; import com.mesalab.executor.core.utils.SpringContextUtil; import com.mesalab.executor.exception.BusinessException; import com.mesalab.executor.pojo.ColumnParam; -import com.mesalab.executor.service.StorageQuotaService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; @@ -37,8 +36,6 @@ public class LogStorageTtlJob { private static final int materializeTtlAfterModify = 0; @Autowired - StorageQuotaService storageQuotaService; - @Autowired ConfigService configService; private static StorgeConfig deletionConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig"); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java index 3d16deb..5e6d44f 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java @@ -5,9 +5,11 @@ import cn.hutool.core.util.StrUtil; import com.mesalab.executor.core.utils.JobUtil; import com.mesalab.executor.core.utils.TypeUtils; import com.mesalab.executor.pojo.JDBCParam; -import com.mesalab.executor.service.StorageQuotaInfoService; +import com.mesalab.executor.service.StorageQuotaService; import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; +import org.apache.commons.lang3.ObjectUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -25,7 +27,7 @@ import java.util.Map; public class StorageQuotaJob { @Resource - private StorageQuotaInfoService storageQuotaInfoService; + private StorageQuotaService storageQuotaService; @XxlJob("getDatabaseStorageQuotaJobHandler") @@ -40,7 +42,7 @@ public class StorageQuotaJob { Boolean containBytes = TypeUtils.castToBoolean(source.get("containBytes")); JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class); - failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes); + failCount += storageQuotaService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes); } } catch (Exception e) { @@ -49,4 +51,28 @@ public class StorageQuotaJob { } return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS; } + + @XxlJob("deleteOldLogJobHandler") + public ReturnT<String> deleteOldLogJobHandler(String params) { + + int failCount = 0; + try { + Map<String, Object> paramsMap = ValidParamUtil.parseParams(params); + if (ObjectUtils.isEmpty(paramsMap)) { + JobUtil.errorLog("params parse error , params is {}", params); + return IJobHandler.FAIL; + } + String logType = TypeUtils.castToString(paramsMap.get("logType")); + int maxUsage = TypeUtils.castToInt(paramsMap.get("maxUsage")); + int minIntervalMinutes = TypeUtils.castToInt(paramsMap.get("minIntervalMinutes")); + + failCount += storageQuotaService.deleteOldLog(logType, maxUsage, minIntervalMinutes); + + } catch (Exception e) { + failCount++; + JobUtil.errorLog("getDatabaseStorageQuota job error:", e.getMessage()); + } + return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS; + } + } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java index ce41541..611b23e 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java @@ -3,6 +3,7 @@ package com.mesalab.executor.jobhandler; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; +import com.geedgenetworks.utils.StringUtil; import com.xxl.job.core.log.XxlJobLogger; import java.util.List; @@ -54,4 +55,18 @@ public class ValidParamUtil { } return paramList; } + + public static Map<String, Object> parseParams(String params) { + + if (StringUtil.isBlank(params)) { + XxlJobLogger.log("params is Empty !"); + return null; + } + Map<String, Object> paramMap = JSONUtil.toBean(params, Map.class); + if (paramMap == null) { + XxlJobLogger.log("params error !"); + return null; + } + return paramMap; + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java deleted file mode 100644 index cf1b0d4..0000000 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java +++ /dev/null @@ -1,314 +0,0 @@ -package com.mesalab.executor.service; - -import cn.hutool.core.date.DateUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.db.handler.NumberHandler; -import cn.hutool.db.sql.SqlExecutor; -import cn.hutool.log.Log; -import com.geedgenetworks.utils.DateUtils; -import com.google.common.collect.Lists; -import com.mesalab.executor.core.config.StorgeConfig; -import com.mesalab.executor.core.utils.*; -import com.mesalab.executor.exception.BusinessException; -import com.mesalab.executor.pojo.HosSpace; -import com.mesalab.executor.pojo.JDBCParam; -import com.mesalab.executor.pojo.JobResult; -import com.mesalab.executor.pojo.SysStorageEvent; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpHeaders; -import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; - -import java.sql.Connection; -import java.util.List; -import java.util.Map; - -/** - * @Description: - * @Author: zhq - * @CreateDate: 2024/6/13 - * @Version: 1.0 - */ - -@Service -public class StorageQuotaInfoService { - - private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig"); - private Log logger = Log.get(); - - //查询历史数据时允许的最大间隔60 MINUTE - private static final int STORAGE_LIMITATION = 120 * 60; - - @Value("${zookeeper.server}") - private String zookeeperServer; - - - public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) { - - int failCount = 0; - for (String logType : logTypes) { - JobResult<SysStorageEvent> jobResult = new JobResult<>(); - try { - switch (logType) { - case Constant.TRAFFIC_LOGS: - jobResult = getClickhouseStorageInfo(jdbcParam, containBytes); - break; - case Constant.METRICS: - jobResult = getDruidStorageInfo(jdbcParam, containBytes); - break; - case Constant.FILES: - jobResult = getHosStorageInfo(jdbcParam, containBytes); - break; - default: - break; - } - failCount += jobResult.getFailCount(); - if (containBytes) { - setStorageAvg(jobResult); - } - DBUtils.save(jobResult.getData(), jdbcParam); - } catch (Exception e) { - failCount++; - JobUtil.errorLog(e.getMessage()); - } - } - return failCount; - } - - /** - * 将一天的数据平均分为每5分钟的数据 - * - * @param jobResult - */ - private void setStorageAvg(JobResult<SysStorageEvent> jobResult) { - - long interval = 60; - long num = 24 * 60 / interval; - - List<SysStorageEvent> newResultList = Lists.newArrayList(); - long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd"); - for (SysStorageEvent sysStorageEvent : jobResult.getData()) { - Long dayBytes = sysStorageEvent.getBytes(); - long minBytes = dayBytes / num; - for (long i = 0; i < num; i++) { - SysStorageEvent newStorageEvent = sysStorageEvent; - newStorageEvent.setBytes(minBytes); - - timestamp = timestamp + interval * 60; - newStorageEvent.setGeneratedTime(timestamp); - newResultList.add(newStorageEvent); - } - } - jobResult.setData(newResultList); - } - - private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { - - final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks()); - final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource()); - - List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); - JobResult<SysStorageEvent> jobResult = new JobResult(); - int failCount = 0; - long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); - for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) { - try { - String datacenterHost = datacenterMap.getValue(); - String datacenterName = datacenterMap.getKey(); - Map<String, Object> ckParamMap = storgeConfig.getCkSource(); - // 1. 总计 - ckParamMap.put("query", maxSizeSql); - Long totalSize = queryClickhouse(datacenterHost, ckParamMap); - //2. 已使用 - ckParamMap.put("query", usedSizeSql); - Long usedSize = queryClickhouse(datacenterHost, ckParamMap); - - //3. 增量 - Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS); - SysStorageEvent storageEvent = SysStorageEvent.builder() - .logType(Constant.TRAFFIC_LOGS) - .dataCenter(datacenterName) - .generatedTime(generatedTime) - .totalAllocatedSize(totalSize) - .usedSize(usedSize) - .bytes(bytes) - .sinceTime(sinceTime) - .build(); - - sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString()); - } catch (Exception e) { - failCount++; - JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); - } - } - jobResult.setFailCount(failCount); - jobResult.setData(sysStorageEvents); - return jobResult; - } - - private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { - - final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; - final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; - - List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); - JobResult<SysStorageEvent> jobResult = new JobResult(); - int failCount = 0; - long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS); - for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) { - try { - String datacenterHost = datacenterMap.getValue(); - String datacenterName = datacenterMap.getKey(); - // 1. 总计 - Long totalSize = queryDruid(datacenterHost, maxSizeSql); - //2. 已使用 - Long usedSize = queryDruid(datacenterHost, usedSizeSql); - - //3. 增量 - Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS); - SysStorageEvent storageEvent = SysStorageEvent.builder() - .logType(Constant.METRICS) - .dataCenter(datacenterName) - .generatedTime(generatedTime) - .totalAllocatedSize(totalSize) - .usedSize(usedSize) - .bytes(bytes) - .sinceTime(sinceTime) - .build(); - - sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString()); - } catch (Exception e) { - failCount++; - JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); - } - } - jobResult.setFailCount(failCount); - jobResult.setData(sysStorageEvents); - return jobResult; - } - - private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { - - List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); - JobResult<SysStorageEvent> jobResult = new JobResult(); - int failCount = 0; - Long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.FILES); - for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) { - try { - String datacenterHost = datacenterMap.getValue(); - String datacenterName = datacenterMap.getKey(); - - HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName); - Long totalSize = hosSpace.getHosCapacity(); - Long usedSize = hosSpace.getHosUsed(); - if (totalSize == -1 || usedSize == -1) { - throw new BusinessException("hos server error : " + datacenterName); - } - - //3. 增量 - Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES); - SysStorageEvent storageEvent = SysStorageEvent.builder() - .logType(Constant.FILES) - .dataCenter(datacenterName) - .generatedTime(generatedTime) - .totalAllocatedSize(totalSize) - .usedSize(usedSize) - .bytes(bytes) - .sinceTime(sinceTime) - .build(); - - sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString()); - } catch (Exception e) { - failCount++; - JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); - } - } - jobResult.setFailCount(failCount); - jobResult.setData(sysStorageEvents); - return jobResult; - } - - - private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception { - final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter - + "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;"; - JobUtil.infoLog(lastUsedSizeSql); - - try (Connection conn = DBUtils.getDBConn(jdbcParam)) { - Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler()); - if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) { - return 0L; - } - - return usedSize - lastUsedSize.longValue(); - } catch (Exception e) { - throw e; - } - } - - private Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) { - String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); - if ("-1".equals(result)) { - throw new BusinessException("Get clickhouse http fail -1"); - } - return Long.valueOf(result.trim()); - } - - private Long queryDruid(String datacenterHost, String usedSizeSql) { - final String druidPath = "/druid/v2/sql"; - String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql); - - if ("-1".equals(result)) { - throw new BusinessException("Get druid http fail -1"); - } - return Long.valueOf(result.trim()); - } - - private HosSpace getHosSpace(String datacenterHost, String datacenterName) { - final String fileStoragePath = "/admin/diskspace"; - Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; - String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers); - - if ("-1".equals(result) || "-1".equals(result)) { - throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName); - } - HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); - return hosSpace; - } - - /** - * @param node - * @return 自定义的标准时间 last_storage, - * 存在zookeeper 节点/storage/worker/+node - */ - private Long getLastStorage(String node) { - try { - ZookeeperUtils zk = new ZookeeperUtils(); - String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); - //不存在创建一个 - if (ObjectUtils.isEmpty(nodeData)) { - Long lastTime = System.currentTimeMillis() / 1000; - zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer); - return lastTime; - } - Long lastStorage = Long.valueOf(nodeData); - logger.info("query standard time last_storage success,{}", lastStorage); - - return Long.valueOf(nodeData); - } catch (Exception e) { - logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e)); - throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); - } - - } - - -} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java index 26b9ab3..d8bbf03 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java @@ -1,13 +1,22 @@ package com.mesalab.executor.service; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.handler.NumberHandler; +import cn.hutool.db.sql.SqlExecutor; import cn.hutool.log.Log; import com.geedgenetworks.utils.DateUtils; -import com.geedgenetworks.utils.JsonMapper; +import com.google.common.collect.Lists; import com.mesalab.executor.core.config.StorgeConfig; import com.mesalab.executor.core.utils.*; import com.mesalab.executor.exception.BusinessException; +import com.mesalab.executor.jobhandler.LogStorageQuotaJob; import com.mesalab.executor.pojo.HosSpace; -import com.xxl.job.core.log.XxlJobLogger; +import com.mesalab.executor.pojo.JDBCParam; +import com.mesalab.executor.pojo.JobResult; +import com.mesalab.executor.pojo.SysStorageEvent; +import com.xxl.job.core.biz.model.ReturnT; +import org.apache.groovy.util.Maps; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.springframework.beans.factory.annotation.Value; @@ -15,352 +24,482 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import java.util.ArrayList; -import java.util.HashMap; +import javax.annotation.Resource; +import javax.management.timer.Timer; +import java.sql.Connection; import java.util.List; import java.util.Map; /** - * 存储配额获取指标类 组合 原始日志结果对象,统计日志结果对象,文件系统结果对象 - * - * @author qidaijie + * @Description: + * @Author: zhq + * @CreateDate: 2024/6/13 + * @Version: 1.0 */ + @Service public class StorageQuotaService { + private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig"); private Log logger = Log.get(); - private static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/"; - private static final String FILE_STORAGE_PATH = "/admin/diskspace"; - private static final String DRUID_PATH = "/druid/v2/sql"; - private static final int STORAGE_LIMITATION = 60; //查询历史数据时允许的最大间隔60 MINUTE + //查询历史数据时允许的最大间隔second + private static final int MAX_QUERY_INTERVAL = 2 * 60 * 60; + final String clickhouseMaxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks()); + final String clickhouseUsedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource()); @Value("${zookeeper.server}") private String zookeeperServer; - private static StorgeConfig storgeConfig = (StorgeConfig)SpringContextUtil.getBean("storgeConfig"); - - /** - * 用于获取 ClickHouse 当前存储大小 若获取 failquerydruid内最新的值替补当前值 - */ - private Long getClickHouseCurr(String point) { - Map<String, Object> deleteParamMap = storgeConfig.getCkSource(); - String currSql = "SELECT SUM(`bytes_on_disk`) FROM " + storgeConfig.getSystemPartsCluster() + " WHERE database = '" + storgeConfig.getTrafficDatasource() + "';"; - deleteParamMap.put("query", currSql); - String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap)); - Long result = Long.valueOf(currResult.trim()); - logger.info("query clickhouse success, used_size={}", result); - return result; + @Resource + private LogStorageQuotaJob logStorageQuotaJob; + + public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) { + + int failCount = 0; + for (String logType : logTypes) { + JobResult<SysStorageEvent> jobResult = new JobResult<>(); + try { + switch (logType) { + case Constant.TRAFFIC_LOGS: + jobResult = getClickhouseStorageInfo(jdbcParam, containBytes); + break; + case Constant.METRICS: + jobResult = getDruidStorageInfo(jdbcParam, containBytes); + break; + case Constant.FILES: + jobResult = getHosStorageInfo(jdbcParam, containBytes); + break; + default: + break; + } + failCount += jobResult.getFailCount(); + if (containBytes) { + setStorageAvg(jobResult); + } + DBUtils.save(jobResult.getData(), jdbcParam); + } catch (Exception e) { + failCount++; + JobUtil.errorLog(e.getMessage()); + } + } + return failCount; } - /** - * 用于获取ClickHouse最大存储大小 若获取 failquerydruid内最新的值替补当前值 - */ - private Long getClickHouseMax(String point) { - Map<String, Object> deleteParamMap = storgeConfig.getCkSource(); - String maxSql = "SELECT SUM(`total_space`) FROM ".concat(storgeConfig.getSystemDisks()).concat(";"); - deleteParamMap.put("query", maxSql); - String maxResult = HttpClientUtils - .httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap)); - long result = Long.parseLong(maxResult.trim()); - logger.info("query clickhouse max_size success ,{}", result); - return result; - } + public int deleteOldLog(String logType, int maxUsage, int minIntervalMinutes) { - /** - * 用于获取ClickHouse 差值 若获取 fail直接写入0 - */ - private Long getClickHouseDiff(String point) { - String date = DateUtils.getDateOfYesterday("yyyyMMdd"); - Map<String, Object> deleteParamMap = storgeConfig.getCkSource(); - String diffSql = "SELECT SUM(bytes_on_disk) FROM ".concat(storgeConfig.getSystemPartsCluster()).concat(" WHERE database = '" ).concat(storgeConfig.getTrafficDatasource()) - .concat("' AND partition = '").concat(date).concat("';"); - deleteParamMap.put("query", diffSql); - String diffResult = HttpClientUtils - .httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap)); - long result = Long.parseLong(diffResult.trim()); - logger.info("query clickhouse success aggregate_size={}", result); - return result; + int failCount = 0; + try { + switch (logType) { + case Constant.TRAFFIC_LOGS: + failCount+= deleteClickhouseOldLog(maxUsage, minIntervalMinutes); + break; + case Constant.METRICS: + failCount+= deleteDruidOldLog(maxUsage, minIntervalMinutes); + break; + case Constant.FILES: + failCount+= deleteHosOldLog(maxUsage, minIntervalMinutes); + break; + default: + break; + } + } catch (Exception e) { + failCount++; + JobUtil.errorLog(e.getMessage()); + } + return failCount; } -//========================Druid============================== + public int deleteClickhouseOldLog(int maxUsage, int minIntervalMinutes) { + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) { + try { + String dataCenterHost = datacenterMap.getValue(); + String dataCenterName = datacenterMap.getKey(); - /** - * 获取Druid当前存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值 - */ - private Long getDruidUsed(String point) { - String currSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - String currResult = HttpClientUtils.httpPost(UrlUtil.getUrl(point).concat(DRUID_PATH), currSql); - List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(currResult, List.class); - if(list!=null&&list.size()==0){ - return 0L; - } - Long currSize = 0L; - if(list!=null&&list.get(0)!=null){ - Map<String, Object> map = list.get(0); - if(map!=null&&map.get("used_size")!=null){ - currSize = Long.valueOf(String.valueOf(map.get("used_size"))); + String zkClearTimePath = StrUtil.join("/", Constant.ZK_TRAFFIC_LOGS, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME); + if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) { + continue; + } + double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName); + if (maxUsage <= clickhouseUsage * 100d) { + // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1 + // 2.空则使用 maxDays = now - last_storage - 1 + // 3.调用删除接口 参数maxDays, 记录日志 + // 4.修改 datacenter时间 + String zkDcLastStoragePath = StrUtil.join("/", Constant.ZK_TRAFFIC_LOGS, dataCenterName); + Long lastStorageTime = getLastStorage(zkDcLastStoragePath); + if (lastStorageTime == null) { + lastStorageTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); + } + // 删除1天 + long storageTime = lastStorageTime - Timer.ONE_DAY; + Long maxDays = DateUtil.currentSeconds() - storageTime; + + ReturnT<String> deleteResult = logStorageQuotaJob.deleteTrafficDataByCluster(Maps.of(Constant.MAX_DAYS, maxDays).toString()); + if (deleteResult.getCode() != 200) { + failCount++; + JobUtil.errorLog("{} clickhouse log delete fail {}", dataCenterName, deleteResult.getMsg()); + continue; + } + setLastStorage(zkDcLastStoragePath, storageTime); + JobUtil.infoLog("{} clickhouse log delete success {}"); + } + } catch (Exception e) { + failCount++; + JobUtil.errorLog("clickhouse delete log error {}", e.getMessage()); } } - logger.info("query druid used_size success,{}", currSize); - return currSize; + + return failCount; } - /** - * 获取Druid最大存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值 - */ - private Long getDruidMax(String point) { - String maxSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - String maxResult = HttpClientUtils.httpPost(UrlUtil.getUrl(point).concat(DRUID_PATH), maxSql); - List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(maxResult, List.class); - Long maxSize = 0L; - if(list!=null&&list.get(0)!=null){ - Map<String, Object> map = list.get(0); - if(map!=null&&map.get("max_size")!=null){ - maxSize = Long.valueOf(String.valueOf(map.get("max_size"))); + public int deleteDruidOldLog(int maxUsage, int minIntervalMinutes) { + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) { + try { + String dataCenterHost = datacenterMap.getValue(); + String dataCenterName = datacenterMap.getKey(); + + String zkClearTimePath = StrUtil.join("/", Constant.ZK_REPORT_AND_METRICS, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME); + if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) { + continue; + } + double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName); + if (maxUsage <= clickhouseUsage * 100d) { + // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1 + // 2.空则使用 maxDays = now - last_storage - 1 + // 3.调用删除接口 参数maxDays, 记录日志 + // 4.修改 datacenter时间 + String zkDcLastStoragePath = StrUtil.join("/", Constant.ZK_REPORT_AND_METRICS, dataCenterName); + Long lastStorageTime = getLastStorage(zkDcLastStoragePath); + if (lastStorageTime == null) { + lastStorageTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS); + } + // 删除1天 + long storageTime = lastStorageTime - Timer.ONE_DAY; + Long maxDays = DateUtil.currentSeconds() - storageTime; + + ReturnT<String> deleteResult = logStorageQuotaJob.deleteReportAndMetricsData(Maps.of(Constant.MAX_DAYS, maxDays).toString()); + if (deleteResult.getCode() != 200) { + failCount++; + JobUtil.errorLog("{} druid log delete fail {}", dataCenterName, deleteResult.getMsg()); + continue; + } + setLastStorage(zkDcLastStoragePath, storageTime); + JobUtil.infoLog("{} druid log delete success {}"); + } + } catch (Exception e) { + failCount++; + JobUtil.errorLog("druid delete log error {}", e.getMessage()); } } - logger.info("query druid max_size success,{}", maxSize); - return maxSize; + + return failCount; } - /** - * 获取hos前存储大小 若获取 failquerydruid内最新的值替补当前值 - */ - private Map getHBaseStorage(String key,String point) { - Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; - String result = HttpClientUtils.httpGet(UrlUtil.getUrl(point).concat(FILE_STORAGE_PATH), headers); - if("-1".equals(result)){ - throw new BusinessException("hos server error"); - } - HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); - Long hosCapacity = hosSpace.getHosCapacity(); - Long hosUsed = hosSpace.getHosUsed(); - if (hosCapacity==-1||hosUsed==-1){ - throw new BusinessException("hos server error"); + public int deleteHosOldLog(int maxUsage, int minIntervalMinutes) { + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) { + try { + String dataCenterHost = datacenterMap.getValue(); + String dataCenterName = datacenterMap.getKey(); + + String zkClearTimePath = StrUtil.join("/", Constant.FILES, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME); + + if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) { + continue; + } + double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName); + if (maxUsage <= clickhouseUsage * 100d) { + // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1 + // 2.空则使用 maxDays = now - last_storage - 1 + // 3.调用删除接口 参数maxDays, 记录日志 + // 4.修改 datacenter时间 + String zkDcLastStoragePath = StrUtil.join("/", Constant.FILES, dataCenterName); + Long lastStorageTime = getLastStorage(zkDcLastStoragePath); + if (lastStorageTime == null) { + lastStorageTime = getLastStorage(Constant.FILES); + } + // 删除1天 + long storageTime = lastStorageTime - Timer.ONE_DAY; + Long maxDays = DateUtil.currentSeconds() - storageTime; + + ReturnT<String> deleteResult = logStorageQuotaJob.deleteFiles(Maps.of(Constant.MAX_DAYS, maxDays).toString()); + if (deleteResult.getCode() != 200) { + failCount++; + JobUtil.errorLog("{} hos log delete fail {}", dataCenterName, deleteResult.getMsg()); + continue; + } + setLastStorage(zkDcLastStoragePath, storageTime); + JobUtil.infoLog("{} hos log delete success {}"); + } + } catch (Exception e) { + failCount++; + JobUtil.errorLog("hos delete log error {}", e.getMessage()); + } } - Map<String, Object> data = new HashMap<>(); - data.put("max_size", hosCapacity); - data.put("used_size", hosUsed); - data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES,key))); + return failCount; + } - logger.info("query file storage success,{}", data); - return data; + private boolean checkStorageInterval(String zkClearTimePath, int minIntervalMinutes) { + long now = DateUtil.currentSeconds(); + Long lastStorage = getLastStorage(zkClearTimePath); + if (lastStorage != null && now - lastStorage <= minIntervalMinutes * 60) { + JobUtil.infoLog("The deletion task is being executed"); + return false; + } + return true; + } + + private double getClickhouseUsage(String dataCenterHost, String dataCenterName) { + Map<String, Object> ckParamMap = storgeConfig.getCkSource(); + // 1. 总计 + ckParamMap.put("query", clickhouseMaxSizeSql); + Long totalSize = queryClickhouse(dataCenterHost, ckParamMap); + //2. 已使用 + ckParamMap.put("query", clickhouseUsedSizeSql); + Long usedSize = queryClickhouse(dataCenterHost, ckParamMap); + double usage = usedSize / (double) totalSize; + usage = Double.valueOf(String.format("%.4f", usage)); + JobUtil.infoLog("clickhouse total size {}, used size {}, usage {}",totalSize, usedSize, usage); + return usage; } -//=======================工具类方法=============================== /** - * 用于通过druid获取上次对应类型的Curr值 + * 将一天的数据平均分为每5分钟的数据 * - * @param logType 统计类型 - * @return 上次的值,如果不是当天,也不是在00:00:00则返回0 - * //如果历史记录时间比现在少于1个小时,写0,如果最近一小时为0则查今天的 + * @param jobResult */ - private Long getCurrBefore(String logType, String key) { - String currSql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType - + "' and data_center = '"+ key +"' and __time >= CURRENT_TIMESTAMP - INTERVAL '"+ STORAGE_LIMITATION +"' MINUTE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; - String currResult = HttpClientUtils.httpPost(UrlUtil.getUrl(storgeConfig.getAnalyticServer()).concat(DRUID_PATH), currSql); - List<Map> list = (List) JsonMapper.fromJsonString(currResult, List.class); - if(list!=null&&list.size()==0){ - return 0L; + private void setStorageAvg(JobResult<SysStorageEvent> jobResult) { + + long interval = 60; + long num = 24 * 60 / interval; + + List<SysStorageEvent> newResultList = Lists.newArrayList(); + long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd"); + for (SysStorageEvent sysStorageEvent : jobResult.getData()) { + Long dayBytes = sysStorageEvent.getBytes(); + long minBytes = dayBytes / num; + for (long i = 0; i < num; i++) { + SysStorageEvent newStorageEvent = sysStorageEvent; + newStorageEvent.setBytes(minBytes); + + timestamp = timestamp + interval * 60; + newStorageEvent.setGeneratedTime(timestamp); + newResultList.add(newStorageEvent); + } } - Long historyUsed = 0L; - if(list!=null&&list.get(0)!=null){ - Map<String, Object> map = list.get(0); - if(map!=null&&map.get("used_size")!=null){ - historyUsed = Long.valueOf(String.valueOf(map.get("used_size"))); + jobResult.setData(newResultList); + JobUtil.infoLog("set storage data ... "); + } + private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { + + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); + int failCount = 0; + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); + for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + Map<String, Object> ckParamMap = storgeConfig.getCkSource(); + // 1. 总计 + ckParamMap.put("query", clickhouseMaxSizeSql); + Long totalSize = queryClickhouse(datacenterHost, ckParamMap); + //2. 已使用 + ckParamMap.put("query", clickhouseUsedSizeSql); + Long usedSize = queryClickhouse(datacenterHost, ckParamMap); + + //3. 增量 + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS); + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.TRAFFIC_LOGS) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(bytes) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString()); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); } } - logger.info("query {} history used_size success,{}",logType, historyUsed); - return historyUsed; + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } - /** - * @param node - * @return 自定义的标准时间 last_storage, - * 存在zookeeper 节点/storage/worker/+node - */ - public Long getLastStorage(String node) { - try { - ZookeeperUtils zk = new ZookeeperUtils(); - //query - String nodeData = zk.getNodeData(ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); - //不存在创建一个 - if (ObjectUtils.isEmpty(nodeData)) { - Long lastTime = System.currentTimeMillis() / 1000; - zk.modifyNode(ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer); - return lastTime; + private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { + + final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; + final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; + + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); + int failCount = 0; + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS); + for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + // 1. 总计 + Long totalSize = queryDruid(datacenterHost, maxSizeSql); + //2. 已使用 + Long usedSize = queryDruid(datacenterHost, usedSizeSql); + + //3. 增量 + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS); + + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.METRICS) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(bytes) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString()); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); } - Long lastStorage = Long.valueOf(nodeData); - logger.info("query standard time last_storage success,{}", lastStorage); - - return Long.valueOf(nodeData); - } catch (Exception e) { - logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e)); - throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); } - + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } - /** - * 修改zookeeper节点信息 - * <p> - * path 节点路径/storage/worker/ +node - */ - public void modifyLastStorage(String node, String data) { - ZookeeperUtils zk = new ZookeeperUtils(); - //是否存在 不存在创建 - zk.modifyNode(ZOOKEEPER_STORAGE_PATH + node, data, zookeeperServer); + private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { + + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); + int failCount = 0; + Long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.FILES); + for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + + HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName); + Long totalSize = hosSpace.getHosCapacity(); + Long usedSize = hosSpace.getHosUsed(); + if (totalSize == -1 || usedSize == -1) { + throw new BusinessException("hos server error : " + datacenterName); + } + + //3. 增量 + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES); + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.FILES) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(bytes) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString()); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); + } + } + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } - /** - * 用于组合正常维度的JSON串 - * - * @return 结果json - */ - public Map<String, Map> getDiskJson(boolean ckDay) { - - Map all = new HashMap<>(); - List<Map> data = new ArrayList<>(); - Long now = System.currentTimeMillis() / 1000; - Map<String, Integer> status = new HashMap<>(); - if(ckDay){ - //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据 - long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd"); - status.put("trafficFailCount", getCKStorageJson(data, timestamp,true)); - } else{ - status.put("trafficFailCount", getCKStorageJson(data, now,false)); - status.put("reportFailCount", getDruidStorageJson(data, now)); - status.put("fileFailCount", getFileStorageJson(data, now)); + private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception { + final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter + + "' and generated_time >= UNIX_TIMESTAMP() - " + MAX_QUERY_INTERVAL + " ORDER BY generated_time DESC LIMIT 1;"; + JobUtil.infoLog(lastUsedSizeSql); + + try (Connection conn = DBUtils.getDBConn(jdbcParam)) { + Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler()); + if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) { + return 0L; + } + + return usedSize - lastUsedSize.longValue(); + } catch (Exception e) { + throw e; } - all.put("data", data); - all.put("status", status); - return all; } - private int getCKStorageJson(List<Map> data, Long time, boolean day) { - int errorCount = 0; - try { - for (Map.Entry<String, String> dc : storgeConfig.getTrafficDataCenter().entrySet()) { - try { - Map<String,Object> traffic = new HashMap<>(); - traffic.put("log_type", Constant.TRAFFIC_LOGS); - traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS)); - traffic.put("data_center",dc.getKey()); - traffic.put("time", time); - traffic.put("used_size",getClickHouseCurr(dc.getValue())); - traffic.put("max_size",getClickHouseMax(dc.getValue())); - if(day){ - //昨日平均一天的5分钟粒度平均值 - long interval = 5; - long num = 24*60/interval; - Long ckDiffAvg = getClickHouseDiff(dc.getValue())/num; - for (int i = 0; i < num; i++) { - Map<String,Object> trafficAvg = new HashMap<>(); - trafficAvg.putAll(traffic); - trafficAvg.put("time", time); - trafficAvg.put("aggregate_size",ckDiffAvg); - data.add(trafficAvg); - time = time + interval*60;//转换为秒 - } - return errorCount; - } - data.add(traffic); - } catch (BusinessException e) { - logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e)); - XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e)); - errorCount++; - } - } - } catch (BusinessException e) { - logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e)); - XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e)); - errorCount++; + + public Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) { + String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); + if ("-1".equals(result)) { + throw new BusinessException("Get clickhouse http fail -1"); } - return errorCount; + return Long.valueOf(result.trim()); } - private int getDruidStorageJson(List<Map> data, Long time) { - int errorCount = 0; - try { - for (Map.Entry<String, String> dc : storgeConfig.getAnalyticDataCenter().entrySet()) { - try { - Map metrics = new HashMap<>(); - metrics.put("log_type", Constant.REPORT_AND_METRICS); - metrics.put("time", time); - metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS)); - metrics.put("data_center", dc.getKey()); - - metrics.put("max_size",getDruidMax(dc.getValue())); - Long druidUsed = getDruidUsed(dc.getValue()); - metrics.put("used_size",druidUsed); - metrics.put("aggregate_size", getDiffNum(druidUsed, getCurrBefore(Constant.REPORT_AND_METRICS,dc.getKey()))); - data.add(metrics); - } catch (BusinessException e) { - logger.error("druid storage endpoint={}, error {}",dc.getValue(), JobUtil.getErrorMsg(e)); - XxlJobLogger.log("druid storage endpoint={},error {}",dc.getValue(), JobUtil.getErrorMsg(e)); - errorCount++; - } - } - } catch (BusinessException e) { - logger.error("druid storage error {}", JobUtil.getErrorMsg(e)); - XxlJobLogger.log("druid storage error {}", JobUtil.getErrorMsg(e)); - errorCount++; + public Long queryDruid(String datacenterHost, String usedSizeSql) { + final String druidPath = "/druid/v2/sql"; + String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql); + + if ("-1".equals(result)) { + throw new BusinessException("Get druid http fail -1"); } + return Long.valueOf(result.trim()); + } + + public HosSpace getHosSpace(String datacenterHost, String datacenterName) { + final String fileStoragePath = "/admin/diskspace"; + Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), + new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; + String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers); - return errorCount; + if ("-1".equals(result)) { + throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName); + } + HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); + return hosSpace; } - /* - * 集群hbase存储配额 - **/ - private int getFileStorageJson(List<Map> data, Long time) { - int errorCount = 0; + + /** + * @param node + * @return 自定义的标准时间 last_storage, + * 存在zookeeper 节点/storage/worker/+node + */ + private Long getLastStorage(String node) { try { - for (Map.Entry<String, String> entry : storgeConfig.getFilesDataCenter().entrySet()) { - try { - Map files = new HashMap<>(); - files.put("log_type", Constant.FILES); - files.put("time", time); - files.put("last_storage", getLastStorage(Constant.FILES)); - files.put("data_center",entry.getKey()); - files.putAll(getHBaseStorage(entry.getKey(),entry.getValue())); - data.add(files); - } catch (BusinessException e) { - logger.error("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e)); - XxlJobLogger.log("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e)); - errorCount++; - } + ZookeeperUtils zk = new ZookeeperUtils(); + String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); + if (ObjectUtils.isEmpty(nodeData)) { + return null; } - } catch (BusinessException e) { - logger.error("file storage error {}", JobUtil.getErrorMsg(e)); - XxlJobLogger.log("file storage error {}", JobUtil.getErrorMsg(e)); - errorCount++; + Long lastStorage = Long.valueOf(nodeData); + logger.info("query standard time last_storage success,{}", lastStorage); + return Long.valueOf(nodeData); + } catch (Exception e) { + throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); } - return errorCount; } - /** - * 获取差值计算,若为负数则填写0 - * - * @param now 这一次的值 - * @param before 上一次的值 - * @return 差值 - */ - private Long getDiffNum(Long now, Long before) { - if(before<=0){ - return 0L; - } - long diff = now - before; - if (diff >= 0) { - return diff; - } else { - return 0L; - } + private void setLastStorage(String path, Long lastStorage) { + ZookeeperUtils zk = new ZookeeperUtils(); + zk.modifyNode(path, String.valueOf(lastStorage), zookeeperServer); } } diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java index 6f1321b..bcecbbb 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java @@ -10,7 +10,6 @@ import com.mesalab.executor.core.utils.Constant; import com.mesalab.executor.core.utils.KafkaUtils; import com.mesalab.executor.jobhandler.LogStorageQuotaJob; import com.mesalab.executor.pojo.JDBCParam; -import com.mesalab.executor.service.StorageQuotaInfoService; import com.mesalab.executor.service.StorageQuotaService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; @@ -18,7 +17,6 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.util.Assert; import javax.annotation.Resource; import java.util.HashMap; @@ -33,17 +31,6 @@ import java.util.stream.Collectors; public class StorageQuotaTest { @Autowired - StorageQuotaService storageQuotaService; - - @Test - public void storageQuotaServiceTest() { - - Map diskJson = storageQuotaService.getDiskJson(false); - Assert.notEmpty(diskJson, "存储配额获取失败"); - - } - - @Autowired LogStorageQuotaJob logStorageQuotaJob; /** @@ -56,19 +43,6 @@ public class StorageQuotaTest { // Assert.isTrue(200==(deleteAllFiles.getCode()),"success"); } - /** - * 设置文件存储策略 - */ -// @Test -// public void deleteFilesTest() { -// ReturnT<String> deleteFiles = logStorageQuotaJob.deleteFiles("{\"maxdays\":365}"); -// Assert.isTrue(200 == (deleteFiles.getCode()), "success"); -// } - @Test - public void zookeeperTest() { - Long lastStorage = storageQuotaService.getLastStorage("Files"); - Assert.notNull(lastStorage, "获取标准时间失败"); - } @Test public void testS() { @@ -102,7 +76,7 @@ public class StorageQuotaTest { @Resource - StorageQuotaInfoService storageQuotaInfoService; + StorageQuotaService storageQuotaService; @Test public void testStorage2() { @@ -114,9 +88,9 @@ public class StorageQuotaTest { sourceParams.put("username", "root"); sourceParams.put("pin", "galaxy2019"); JDBCParam jdbcParam = new JDBCParam(sourceParams); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true); + storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true); + storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true); + storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true); } } |
