From 56d9743db4eae3d9b8e834bcd7e449dd39694496 Mon Sep 17 00:00:00 2001 From: zhanghongqing Date: Thu, 18 Jul 2024 16:26:23 +0800 Subject: [新增][日志删除] 优化可一次删除多天 TSG-21472 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/service/StorageQuotaService.java | 92 ++++++++++++++++++---- .../mesalab/executor/test/StorageQuotaTest.java | 27 +++---- 2 files changed, 90 insertions(+), 29 deletions(-) diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java index 0af3238..24cbded 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java @@ -33,10 +33,7 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.sql.Connection; import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; @@ -58,11 +55,14 @@ public class StorageQuotaService { private final String clickhouseMaxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks()); private final String clickhouseUsageSql = StrUtil.format("SELECT 1-(sum(unreserved_space)/sum(total_space)) AS usage FROM {};", storgeConfig.getSystemDisks()); private final String clickhouseUsedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' and active = 1;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource()); + private final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from {} WHERE database = '{}' {} and date <= formatDateTime(now(),'%Y%m%d') and active = 1 group by date FORMAT JSON;", + storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion())); private final String druidMaxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; private final String druidUsedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; private final String mysqlDailyUsedSizeSql = "SELECT UNIX_TIMESTAMP(FROM_UNIXTIME(generated_time,'%Y%m%d')) AS date, sum(bytes) AS bytes FROM {}.{} WHERE generated_time <= CURRENT_TIMESTAMP AND generated_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 YEAR)) AND log_type='{}' AND data_center='{}' GROUP BY date, log_type ORDER BY date DESC"; + private final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; @Value("${zookeeper.server}") @@ -95,6 +95,7 @@ public class StorageQuotaService { for (String logType : logTypes) { JobResult jobResult = new JobResult<>(); try { + JobUtil.infoLog("--------------Get {} storage info.-----------", logType); switch (logType) { case Constant.TRAFFIC_LOGS: jobResult = getStorageQuota(jdbcParam, containBytes, storgeConfig.getTrafficDataCenter(), logType, Constant.ZK_TRAFFIC_LOGS); @@ -125,6 +126,7 @@ public class StorageQuotaService { int failCount = 0; try { + JobUtil.infoLog("--------------Delete {} old log.--------------------", logType); switch (logType) { case Constant.TRAFFIC_LOGS: failCount += deleteOldLogs(storgeConfig.getTrafficDataCenter(), maxUsage, minIntervalMinutes, jdbcParam, logType, @@ -178,11 +180,12 @@ public class StorageQuotaService { StorageSizeInfo storageSizeInfo = getStorageInfo.apply(dataCenterHost); if (maxUsage <= storageSizeInfo.getUsage()) { - List historyStorageDaily = getHistoryStorageDaily(jdbcParam, logType, dataCenterName); + List historyStorageDaily = getHistoryStorageDaily(jdbcParam, logType, dataCenterName, dataCenterHost); + Long deleteDate = getDeleteDate(historyStorageDaily, storageSizeInfo.getTotalSize(), maxUsage, zkPath, dataCenterName); long maxDays = DateUtil.betweenDay(DateUtil.date(deleteDate * 1000), DateUtil.date(), true); - JobUtil.infoLog("delete old log date : {}, maxDays: {}", deleteDate, maxDays); + JobUtil.infoLog("delete old log date : {}, maxDays: {}", DateUtil.date(deleteDate * 1000), maxDays); ReturnT deleteResult = deleteFunction.apply(JSONUtil.toJsonStr(Maps.of(Constant.MAX_DAYS, maxDays))); if (deleteResult.getCode() != 200) { @@ -208,7 +211,7 @@ public class StorageQuotaService { long now = DateUtil.currentSeconds(); Long lastStorage = getLastStorage(zkClearTimePath); if (lastStorage != null && now - lastStorage < minIntervalMinutes * 60) { - JobUtil.infoLog("The deletion task is being executed"); + JobUtil.infoLog("The deletion task is running"); return false; } return true; @@ -246,7 +249,7 @@ public class StorageQuotaService { } } jobResult.setData(newResultList); - JobUtil.infoLog("set storage data ... "); + JobUtil.infoLog("Set storage data ... "); } private JobResult getStorageQuota(JDBCParam jdbcParam, boolean containIncrease, Map dataCenterMap, String logType, String zkNode) { @@ -303,6 +306,24 @@ public class StorageQuotaService { return storageSizeInfo; } + private List getHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName, String dataCenterHost) throws SQLException { + List dailyStorageList = new ArrayList<>(); + switch (logType) { + case Constant.TRAFFIC_LOGS: + dailyStorageList = getClickhouseHistoryDaily(dataCenterHost); + break; + case Constant.METRICS: + dailyStorageList = getDruidHistoryDaily(dataCenterHost); + break; + case Constant.FILES: + dailyStorageList = getMysqlHistoryStorageDaily(jdbcParam, logType, dataCenterName); + break; + default: + break; + } + return dailyStorageList; + } + private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception { final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter + "' and generated_time >= UNIX_TIMESTAMP() - " + MAX_QUERY_INTERVAL + " ORDER BY generated_time DESC LIMIT 1;"; @@ -313,6 +334,7 @@ public class StorageQuotaService { if (lastUsedSize == null || lastUsedSize.longValue() < 0 || usedSize - lastUsedSize.longValue() <= 0) { return 0L; } + JobUtil.infoLog("Get {} log last used size is {} bytes {}", logType, lastUsedSize, usedSize - lastUsedSize.longValue()); return usedSize - lastUsedSize.longValue(); } catch (Exception e) { throw e; @@ -419,6 +441,42 @@ public class StorageQuotaService { return storageSizeInfo; } + private List getClickhouseHistoryDaily(String dataCenterHost) { + Map ckParamMap = storgeConfig.getCkSource(); + ckParamMap.put("query", clickhouseDailyUsedSizeSql); + String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(dataCenterHost), ckParamMap)); + if ("-1".equals(result)) { + throw new BusinessException("Get clickhouse http fail -1"); + } + Map resultMap = JSONUtil.toBean(result, Map.class); + List dataList = (List) resultMap.get("data"); + List dailyStorageList = Lists.newArrayList(); + for (Map map : dataList) { + DailyStorage dailyStorage = new DailyStorage(); + dailyStorage.setDate(DateUtils.convertStringToTimestamp(String.valueOf(map.get("date")), "yyyyMMdd")); + dailyStorage.setBytes(TypeUtils.castToLong(map.get("bytes"))); + dailyStorageList.add(dailyStorage); + } + + return dailyStorageList; + } + + private List getDruidHistoryDaily(String dataCenterHost) { + String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), druidDailyUsedSizeSql); + if ("-1".equals(result)) { + throw new BusinessException("Get druid http fail -1"); + } + List dataList = JSONUtil.toList(result, Map.class); + List dailyStorageList = Lists.newArrayList(); + for (Map map : dataList) { + DailyStorage dailyStorage = new DailyStorage(); + dailyStorage.setDate(DateUtils.convertStringToTimestamp(String.valueOf(map.get("date")), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + dailyStorage.setBytes(TypeUtils.castToLong(map.get("bytes"))); + dailyStorageList.add(dailyStorage); + } + return dailyStorageList; + } + /** * @param node * @return 自定义的标准时间 last_storage, @@ -453,23 +511,29 @@ public class StorageQuotaService { for (DailyStorage dailyStorage : dailyStorageList) { sumBytes += dailyStorage.getBytes(); - if ((sumBytes / (double) totalSize) >= maxUsage) { + double currUsage = getUsage(totalSize, sumBytes); + JobUtil.infoLog("The date {} sum(bytes)/totalSize is {}", dailyStorage.getDate(), currUsage); + if (currUsage >= maxUsage) { JobUtil.infoLog("The last date to reach or exceed is {}", dailyStorage.getDate()); + //记录到达maxUsage的日期 + deleteDate = dailyStorage.getDate() + Constant.ONE_DAY; break; } - //记录到达maxUsage的日期 - deleteDate = dailyStorage.getDate(); + } if (deleteDate == null) { //获取不到存储配额信息时,使用默认值 Long dcLastStorageTime = getLastStorage(StrUtil.join("/", zkPath, dataCenterName)); if (dcLastStorageTime != null) { - return dcLastStorageTime + Constant.ONE_DAY; + deleteDate = dcLastStorageTime + Constant.ONE_DAY; } else { Long lastStorage = getLastStorage(zkPath); - return lastStorage == null ? Constant.ONE_MONTH : lastStorage + Constant.ONE_DAY; + deleteDate = lastStorage == null ? Constant.ONE_MONTH : lastStorage + Constant.ONE_DAY; } + } + long currDate = DateUtil.parseDate(DateUtil.today()).getTime() / 1000; + deleteDate = deleteDate > currDate ? currDate : deleteDate; JobUtil.infoLog("The delete date is {}", deleteDate); return deleteDate; } @@ -482,7 +546,7 @@ public class StorageQuotaService { * @throws SQLException * @Description 查询数据,获取历史每日存储配额,区分分数据中心日志类型 */ - private List getHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName) throws SQLException { + private List getMysqlHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName) throws SQLException { try (Connection conn = DBUtils.getDBConn(jdbcParam)) { List dailyStorageList = SqlExecutor.query(conn, StrUtil.format(mysqlDailyUsedSizeSql, jdbcParam.getDatabase(), jdbcParam.getTable(), logType, dataCenterName), new BeanListHandler<>(DailyStorage.class)); diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java index c109a04..465e883 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java @@ -1,9 +1,6 @@ package com.mesalab.executor.test; import cn.hutool.core.date.DateUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.db.handler.BeanListHandler; -import cn.hutool.db.sql.SqlExecutor; import cn.hutool.json.JSONUtil; import com.geedgenetworks.utils.DateUtils; import com.geedgenetworks.utils.JsonMapper; @@ -27,7 +24,6 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; -import java.sql.Connection; import java.util.*; import java.util.stream.Collectors; @@ -158,21 +154,22 @@ public class StorageQuotaTest { public void testQueryMysql() { String mysqlSql = "SELECT UNIX_TIMESTAMP(FROM_UNIXTIME(generated_time,'%Y%m%d')) AS date, sum(bytes) AS used_size FROM sys_storage_event WHERE log_type='Files' AND data_center='xxg' GROUP BY date, log_type ORDER BY date ASC"; - try (Connection conn = DBUtils.getDBConn(jdbcParam)) { - List dailyStorageList = SqlExecutor.query(conn, mysqlSql, new BeanListHandler<>(DailyStorage.class)); - log.info(dailyStorageList.toString()); - } catch (Exception e) { - log.error(e.getMessage()); - } +// try (Connection conn = DBUtils.getDBConn(jdbcParam)) { +// List dailyStorageList = SqlExecutor.query(conn, mysqlSql, new BeanListHandler<>(DailyStorage.class)); +// log.info(dailyStorageList.toString()); +// } catch (Exception e) { +// log.error(e.getMessage()); +// } + System.err.println(DateUtil.parseDate(DateUtil.today()).getTime()); } - private final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from system.parts_cluster WHERE database = '{}' {} and active = 1 group by date;", - storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion())); - private final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; +// public final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from {} WHERE database = '{}' {} and date <= formatDateTime(now(),'%Y%m%d') and active = 1 group by date;", +// storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion())); +// public final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; private List getClickhouseDaily(String dataCenterHost) { Map ckParamMap = storgeConfig.getCkSource(); - ckParamMap.put("query", clickhouseDailyUsedSizeSql); + ckParamMap.put("query", "clickhouseDailyUsedSizeSql"); String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(dataCenterHost), ckParamMap)); if ("-1".equals(result)) { throw new BusinessException("Get clickhouse http fail -1"); @@ -191,7 +188,7 @@ public class StorageQuotaTest { } private List getDruidDaily(String dataCenterHost) { - String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), druidDailyUsedSizeSql); + String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), "druidDailyUsedSizeSql"); if ("-1".equals(result)) { throw new BusinessException("Get druid http fail -1"); } -- cgit v1.2.3