diff options
| author | zhanghongqing <[email protected]> | 2024-07-19 15:29:05 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2024-07-19 15:29:05 +0800 |
| commit | 8a1a5cfd9ffc6d649aa2b8992092f7b2a9009b4d (patch) | |
| tree | b88223da66a998f2cef89b5cbd277fe100c481fc | |
| parent | b7ca92005ea384349b1d01751f6eb49b527fe344 (diff) | |
[修改][日志删除] 增加磁盘使用超出限额的持续时长参数 TSG-21778
4 files changed, 88 insertions, 36 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java index 66a898c..072ea8e 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java @@ -17,6 +17,9 @@ public class Constant { public static final String ZK_REPORT_AND_METRICS = "Report-and-Metrics"; public static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/"; public static final String ZOOKEEPER_STORAGE_CLEAR_TIME = "clearTime"; + // 超限持续时间 + public static final String ZOOKEEPER_STORAGE_DURATION_TIME = "durationTime"; + public static final String ZOOKEEPER_STORAGE_OVER_MAX_USAGE = "overMaxUsage"; public static final String TOKEN = "Token"; public static final String TEXT_XML = "text/xml"; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index d354370..e9d1e4c 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -85,6 +85,7 @@ public class LogStorageQuotaJob { for (String table : tablesForCKs) { try { + JobUtil.infoLog("-----------------reset storage clickhouse endpoint = {}, table = {} ------------------", deletionConfig.getTrafficServer(), table); List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemPartsCluster() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); logger.info("reset storage days table {}, drop partition size {} ", table, partitionList.size()); XxlJobLogger.log("reset storage days table {}, drop partition size {} ", table, partitionList.size()); @@ -184,49 +185,55 @@ public class LogStorageQuotaJob { **/ private void deleteDruidTable(String queryTablesSql, Integer maxdays, boolean hotTask) { List<String> analyticTables = getAnalyticTables(UrlUtil.getUrl(deletionConfig.getAnalyticServer(), analyticUrl), queryTablesSql); - //修改规则 + //1.修改规则 String changeRulesUrl = UrlUtil.getUrl(deletionConfig.getAnalyticServer(), "/druid/coordinator/v1/rules/"); String changeRulesParam = Joiner.on("").join("[{\"type\":\"loadByPeriod\",\"period\":\"P", maxdays, "D\",\"tieredReplicants\":{\"_default_tier\":1}},{\"type\":\"dropForever\"}]"); + // 2. unuse + // curl -X 'POST' -H 'Content-Type:application/json' -d '{ "interval" : "2022-01-01/3000-01-03" }' /druid/coordinator/v1/datasources/$table_name/markUnused - //1.物理删除除 + //3 .物理删除 String deleteUrl = UrlUtil.getUrl(deletionConfig.getAnalyticServer(), "/druid/indexer/v1/task"); String prefixDeleteParam = "{\"type\":\"kill\",\"dataSource\":\""; //起始时间/当前时间-maxDay String druidDataStartTime = getDruidDataStartTime(); String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYY_MM_DD, -maxdays); String suffixDeleteParam = Joiner.on("").join("\",\"interval\":\"", druidDataStartTime, "/", deleteMaxDate, "\"}"); + String unUseParam = Joiner.on("").join("{\"interval\":\"", druidDataStartTime, "/", deleteMaxDate, "\"}"); + JobUtil.infoLog("reset storage days druid unused command = {}", unUseParam); int failCount = 0; for (String table : analyticTables) { try { + JobUtil.infoLog("-----------------reset storage druid endpoint = {}, table = {} ------------------", deletionConfig.getAnalyticServer(), table); // hot表不用执行修改规则 if (!hotTask) { - XxlJobLogger.log("reset storage days first command table :{}, sql : {}", table, changeRulesParam); + JobUtil.infoLog("reset storage days first rules command = {}", table, changeRulesParam); HttpClientUtils.httpPost(changeRulesUrl + table, changeRulesParam); } if (druidDataStartTime.compareTo(deleteMaxDate) < 0) { - XxlJobLogger.log("reset storage days druid command endpoint = {}, table = {}, sql = {}", deletionConfig.getAnalyticServer(), table, prefixDeleteParam + table + suffixDeleteParam); - logger.info("reset storage days druid second command endpoint = {}, table = {}, sql = {}", deletionConfig.getAnalyticServer(), table, prefixDeleteParam + table + suffixDeleteParam); + + String unUseMessage = HttpClientUtils.httpPost(UrlUtil.getUrl(deletionConfig.getAnalyticServer(), "/druid/coordinator/v1/datasources/", table,"/markUnused") , unUseParam); + JobUtil.infoLog("reset storage days druid unused result message :{}", unUseMessage); + + JobUtil.infoLog("reset storage days druid second command = {}", prefixDeleteParam + table + suffixDeleteParam); String deleteMessage = HttpClientUtils.httpPost(deleteUrl, prefixDeleteParam + table + suffixDeleteParam); - XxlJobLogger.log("reset storage days druid result endpoint = {}, table = {}, message :{}", deletionConfig.getAnalyticServer(), table, deleteMessage); + JobUtil.infoLog("reset storage days druid result message :{}", deleteMessage); } } catch (BusinessException e) { failCount++; - logger.error("reset storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("reset storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); + JobUtil.errorLog("reset storage druid ttl table = {}, fail message = {}", table, JobUtil.getErrorMsg(e)); } } if (ObjectUtils.isNotEmpty(deletionConfig.getAnalyticExclusion())) { List<String> tableExc = Splitter.on(",").trimResults().splitToList(deletionConfig.getAnalyticExclusion()); for (String table : tableExc) { try { - logger.info("load forever druid table {}", table); - HttpClientUtils.httpPost(changeRulesUrl + table, "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"_default_tier\":1}}]"); - XxlJobLogger.log("load forever druid table {}", table); + JobUtil.infoLog("========================= load forever druid table {} =========================", table); + String loadMessage = HttpClientUtils.httpPost(changeRulesUrl + table, "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"_default_tier\":1}}]"); + JobUtil.infoLog("load forever druid result message {}", loadMessage); } catch (BusinessException e) { failCount++; - logger.error("load forever storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); - XxlJobLogger.log("load forever storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); + JobUtil.errorLog("load forever storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); } } } @@ -253,6 +260,7 @@ public class LogStorageQuotaJob { List<String> hosHostList = parseList(deletionConfig.getFilesHosServer()); int failCount = 0; for (String hosHost : hosHostList) { + JobUtil.infoLog("-----------------reset storage hos endpoint = {} ------------------", hosHost); failCount += deleteHosData(hosHost, maxDays); } @@ -404,11 +412,14 @@ public class LogStorageQuotaJob { String changeRulesUrl = Joiner.on("").join(deleteendpoint, "/druid/coordinator/v1/rules/"); String changeRulesParam = "[{ \"type\":\"dropByInterval\", \"interval\":\"" + deleteStartTime + "/" + deleteEndDate + "\" }]"; - //2物理删除所有数据 + //2标记unused + String unUseParam = Joiner.on("").join("{\"interval\":\"", deleteStartTime, "/", deleteEndDate, "\"}"); + JobUtil.infoLog("reset storage days druid unused command = {}", unUseParam); + //3物理删除所有数据 String deleteUrl = Joiner.on("").join(deleteendpoint, "/druid/indexer/v1/task"); String prefixDeleteParam = "{\"type\":\"kill\",\"dataSource\":\""; String suffixDeleteParam = Joiner.on("").join("\",\"interval\":\"", deleteStartTime + "/", deleteEndDate, "\"}"); - //3恢复加载规则 + //4恢复加载规则 String resetRulesParam = Joiner.on("").join("[{\"type\":\"loadByPeriod\",\"period\":\"P", paramsMap.get(max_days), "D\",\"tieredReplicants\":{\"_default_tier\":1}},{\"type\":\"dropForever\"}]"); //清库时注意不操作hot结尾的表 String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%' " + deletionConfig.getNotInSql("datasource", deletionConfig.getAnalyticExclusion()) + " \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; @@ -440,6 +451,10 @@ public class LogStorageQuotaJob { break; } } + + String unUseMessage = HttpClientUtils.httpPost(UrlUtil.getUrl(deletionConfig.getAnalyticServer(), "/druid/coordinator/v1/datasources/", table,"/markUnused") , unUseParam); + JobUtil.infoLog("reset storage days druid unused result message :{}", unUseMessage); + XxlJobLogger.log("delete druid table second command table = {}, sql = {}", table, prefixDeleteParam + table + suffixDeleteParam); String deleteMessage = HttpClientUtils.httpPost(deleteUrl, prefixDeleteParam + table + suffixDeleteParam); XxlJobLogger.log("delete druid table third command table = {}, sql = {}", table, resetRulesParam); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java index cce407b..075d00b 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java @@ -66,9 +66,10 @@ public class StorageQuotaJob { } String logType = TypeUtils.castToString(paramsMap.get("logType")); double maxUsage = TypeUtils.castToInt(paramsMap.get("maxUsage")) / 100D; + int durationMinutes = TypeUtils.castToInt(paramsMap.get("durationMinutes")); int minIntervalMinutes = TypeUtils.castToInt(paramsMap.get("minIntervalMinutes")); JDBCParam jdbcParam = JSONUtil.toBean((JSONObject) paramsMap.get("jdbcParam"), JDBCParam.class); - failCount += storageQuotaService.deleteOldLog(logType, maxUsage, minIntervalMinutes, jdbcParam); + failCount += storageQuotaService.deleteOldLog(logType, maxUsage,durationMinutes, minIntervalMinutes, jdbcParam); } catch (Exception e) { failCount++; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java index 24cbded..0b99528 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java @@ -61,6 +61,7 @@ public class StorageQuotaService { private final String druidMaxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; private final String druidUsedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; + private final String druidUsedActiveSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_active=1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; private final String mysqlDailyUsedSizeSql = "SELECT UNIX_TIMESTAMP(FROM_UNIXTIME(generated_time,'%Y%m%d')) AS date, sum(bytes) AS bytes FROM {}.{} WHERE generated_time <= CURRENT_TIMESTAMP AND generated_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 YEAR)) AND log_type='{}' AND data_center='{}' GROUP BY date, log_type ORDER BY date DESC"; private final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; @@ -95,7 +96,7 @@ public class StorageQuotaService { for (String logType : logTypes) { JobResult<SysStorageEvent> jobResult = new JobResult<>(); try { - JobUtil.infoLog("--------------Get {} storage info.-----------", logType); + JobUtil.infoLog("--------------Get {} storage info task.-----------", logType); switch (logType) { case Constant.TRAFFIC_LOGS: jobResult = getStorageQuota(jdbcParam, containBytes, storgeConfig.getTrafficDataCenter(), logType, Constant.ZK_TRAFFIC_LOGS); @@ -122,22 +123,22 @@ public class StorageQuotaService { return failCount; } - public int deleteOldLog(String logType, Double maxUsage, int minIntervalMinutes, JDBCParam jdbcParam) { + public int deleteOldLog(String logType, Double maxUsage, int durationMinutes, int minIntervalMinutes, JDBCParam jdbcParam) { int failCount = 0; try { - JobUtil.infoLog("--------------Delete {} old log.--------------------", logType); + JobUtil.infoLog("--------------Delete {} old log task.--------------------", logType); switch (logType) { case Constant.TRAFFIC_LOGS: - failCount += deleteOldLogs(storgeConfig.getTrafficDataCenter(), maxUsage, minIntervalMinutes, jdbcParam, logType, + failCount += deleteOldLogs(storgeConfig.getTrafficDataCenter(), maxUsage, durationMinutes, minIntervalMinutes, jdbcParam, logType, this::getClickhouseStorageInfoByUnreserved, logStorageQuotaJob::deleteTrafficDataByCluster, counterTrafficLogsDelete::increment, Constant.ZK_TRAFFIC_LOGS); break; case Constant.METRICS: - failCount += deleteOldLogs(storgeConfig.getAnalyticDataCenter(), maxUsage, minIntervalMinutes, jdbcParam, logType, + failCount += deleteOldLogs(storgeConfig.getAnalyticDataCenter(), maxUsage, durationMinutes, minIntervalMinutes, jdbcParam, logType, this::getDruidStorageInfo, logStorageQuotaJob::deleteReportAndMetricsData, counterMetricsDelete::increment, Constant.ZK_REPORT_AND_METRICS); break; case Constant.FILES: - failCount += deleteOldLogs(storgeConfig.getFilesDataCenter(), maxUsage, minIntervalMinutes, jdbcParam, logType, + failCount += deleteOldLogs(storgeConfig.getFilesDataCenter(), maxUsage, durationMinutes, minIntervalMinutes, jdbcParam, logType, this::getHosStorageInfo, logStorageQuotaJob::deleteFiles, counterFilesDelete::increment, Constant.FILES); break; default: @@ -158,7 +159,7 @@ public class StorageQuotaService { * @param deleteFunction * @param zkPath */ - private int deleteOldLogs(Map<String, String> dataCenterMap, Double maxUsage, int minIntervalMinutes, JDBCParam jdbcParam, String logType, + private int deleteOldLogs(Map<String, String> dataCenterMap, Double maxUsage, int durationMinutes, int minIntervalMinutes, JDBCParam jdbcParam, String logType, Function<String, StorageSizeInfo> getStorageInfo, Function<String, ReturnT<String>> deleteFunction, Consumer<Double> setMonitor, @@ -174,16 +175,24 @@ public class StorageQuotaService { String dataCenterName = entry.getKey(); String zkClearTimePath = StrUtil.join("/", zkPath, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME); + // 距离上次删除最小时间 if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) { continue; } + StorageSizeInfo storageSizeInfo = getStorageInfo.apply(dataCenterHost); - if (maxUsage <= storageSizeInfo.getUsage()) { + boolean overMaxUsage = maxUsage <= storageSizeInfo.getUsage(); + // 距离上次达到时间 + if (!checkOverMaxUsageDuration(overMaxUsage, zkPath, dataCenterName, durationMinutes)) { + continue; + } + if (overMaxUsage) { List<DailyStorage> historyStorageDaily = getHistoryStorageDaily(jdbcParam, logType, dataCenterName, dataCenterHost); Long deleteDate = getDeleteDate(historyStorageDaily, storageSizeInfo.getTotalSize(), maxUsage, zkPath, dataCenterName); long maxDays = DateUtil.betweenDay(DateUtil.date(deleteDate * 1000), DateUtil.date(), true); + maxDays = maxDays < 1 ? 1 : maxDays; JobUtil.infoLog("delete old log date : {}, maxDays: {}", DateUtil.date(deleteDate * 1000), maxDays); ReturnT<String> deleteResult = deleteFunction.apply(JSONUtil.toJsonStr(Maps.of(Constant.MAX_DAYS, maxDays))); @@ -193,8 +202,8 @@ public class StorageQuotaService { JobUtil.errorLog("{} log delete fail {}", dataCenterName, deleteResult.getMsg()); continue; } - setLastStorage(StrUtil.join("/", zkPath, dataCenterName), deleteDate); - setLastStorage(zkClearTimePath, DateUtil.currentSeconds()); + setZkStorageNode(StrUtil.join("/", zkPath, dataCenterName), deleteDate); + setZkStorageNode(zkClearTimePath, DateUtil.currentSeconds()); setMonitor.accept(1D); JobUtil.warnLog("{} log delete success", dataCenterName); @@ -207,9 +216,34 @@ public class StorageQuotaService { return failCount; } + private boolean checkOverMaxUsageDuration(boolean overMaxUsage, String zkPath, String dataCenterName, int durationMinutes) { + String zkDurationTimePath = StrUtil.join("/", zkPath, dataCenterName, Constant.ZOOKEEPER_STORAGE_DURATION_TIME); + String zkOverMaxUsagePath = StrUtil.join("/", zkPath, dataCenterName, Constant.ZOOKEEPER_STORAGE_OVER_MAX_USAGE); + + // 1. 超限:获取记录,如为0,则改为1,获取持续时间,是否超时:超时: true,未超时: false + // 2. 未超限:false + if (overMaxUsage) { + Long hasOverMaxUsage = getZkStorageNode(zkOverMaxUsagePath); + Long lastOverTime = getZkStorageNode(zkDurationTimePath); + if (hasOverMaxUsage == null || lastOverTime == null || hasOverMaxUsage == 0) { + setZkStorageNode(zkOverMaxUsagePath, 1L); + setZkStorageNode(zkDurationTimePath, DateUtil.currentSeconds()); + JobUtil.warnLog("The log storage usage has first over the max usage limit."); + return false; + } + if (hasOverMaxUsage == 1 && DateUtil.currentSeconds() - lastOverTime >= durationMinutes * 60) { + JobUtil.infoLog("The deletion task has been out of limit for {} minutes", durationMinutes); + setZkStorageNode(zkOverMaxUsagePath, 0L); + setZkStorageNode(zkDurationTimePath, DateUtil.currentSeconds()); + return true; + } + } + return false; + } + private boolean checkStorageInterval(String zkClearTimePath, int minIntervalMinutes) { long now = DateUtil.currentSeconds(); - Long lastStorage = getLastStorage(zkClearTimePath); + Long lastStorage = getZkStorageNode(zkClearTimePath); if (lastStorage != null && now - lastStorage < minIntervalMinutes * 60) { JobUtil.infoLog("The deletion task is running"); return false; @@ -257,7 +291,7 @@ public class StorageQuotaService { JobResult<SysStorageEvent> jobResult = new JobResult<>(); int failCount = 0; long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(zkNode); + Long sinceTime = getZkStorageNode(zkNode); for (Map.Entry<String, String> entry : dataCenterMap.entrySet()) { try { String dataCenterHost = entry.getValue(); @@ -479,10 +513,9 @@ public class StorageQuotaService { /** * @param node - * @return 自定义的标准时间 last_storage, - * 存在zookeeper 节点/storage/worker/+node + * @return 存在zookeeper 节点/storage/worker/+node */ - private Long getLastStorage(String node) { + private Long getZkStorageNode(String node) { try { ZookeeperUtils zk = new ZookeeperUtils(); String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); @@ -490,16 +523,16 @@ public class StorageQuotaService { return null; } Long lastStorage = Long.valueOf(nodeData); - logger.info("query standard time last_storage success,{}", lastStorage); + logger.info("query storage success,{}", lastStorage); return Long.valueOf(nodeData); } catch (Exception e) { throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); } } - private void setLastStorage(String path, Long lastStorage) { + private void setZkStorageNode(String path, Long data) { ZookeeperUtils zk = new ZookeeperUtils(); - zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + path, String.valueOf(lastStorage), zookeeperServer); + zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + path, String.valueOf(data), zookeeperServer); } private Long getDeleteDate(List<DailyStorage> dailyStorageList, Long totalSize, Double maxUsage, String zkPath, String dataCenterName) { @@ -523,11 +556,11 @@ public class StorageQuotaService { } if (deleteDate == null) { //获取不到存储配额信息时,使用默认值 - Long dcLastStorageTime = getLastStorage(StrUtil.join("/", zkPath, dataCenterName)); + Long dcLastStorageTime = getZkStorageNode(StrUtil.join("/", zkPath, dataCenterName)); if (dcLastStorageTime != null) { deleteDate = dcLastStorageTime + Constant.ONE_DAY; } else { - Long lastStorage = getLastStorage(zkPath); + Long lastStorage = getZkStorageNode(zkPath); deleteDate = lastStorage == null ? Constant.ONE_MONTH : lastStorage + Constant.ONE_DAY; } |
