summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2024-07-18 16:26:23 +0800
committerzhanghongqing <[email protected]>2024-07-18 16:26:23 +0800
commit56d9743db4eae3d9b8e834bcd7e449dd39694496 (patch)
tree5d43e2710ef83bff2c7016bb27aab561e1e4a9d2
parent7d365802bed2929195955ca64a24c776583bd257 (diff)
[新增][日志删除] 优化可一次删除多天 TSG-21472
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java92
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java27
2 files changed, 90 insertions, 29 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
index 0af3238..24cbded 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
@@ -33,10 +33,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -58,11 +55,14 @@ public class StorageQuotaService {
private final String clickhouseMaxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks());
private final String clickhouseUsageSql = StrUtil.format("SELECT 1-(sum(unreserved_space)/sum(total_space)) AS usage FROM {};", storgeConfig.getSystemDisks());
private final String clickhouseUsedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' and active = 1;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource());
+ private final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from {} WHERE database = '{}' {} and date <= formatDateTime(now(),'%Y%m%d') and active = 1 group by date FORMAT JSON;",
+ storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion()));
private final String druidMaxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
private final String druidUsedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
private final String mysqlDailyUsedSizeSql = "SELECT UNIX_TIMESTAMP(FROM_UNIXTIME(generated_time,'%Y%m%d')) AS date, sum(bytes) AS bytes FROM {}.{} WHERE generated_time <= CURRENT_TIMESTAMP AND generated_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 YEAR)) AND log_type='{}' AND data_center='{}' GROUP BY date, log_type ORDER BY date DESC";
+ private final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
@Value("${zookeeper.server}")
@@ -95,6 +95,7 @@ public class StorageQuotaService {
for (String logType : logTypes) {
JobResult<SysStorageEvent> jobResult = new JobResult<>();
try {
+ JobUtil.infoLog("--------------Get {} storage info.-----------", logType);
switch (logType) {
case Constant.TRAFFIC_LOGS:
jobResult = getStorageQuota(jdbcParam, containBytes, storgeConfig.getTrafficDataCenter(), logType, Constant.ZK_TRAFFIC_LOGS);
@@ -125,6 +126,7 @@ public class StorageQuotaService {
int failCount = 0;
try {
+ JobUtil.infoLog("--------------Delete {} old log.--------------------", logType);
switch (logType) {
case Constant.TRAFFIC_LOGS:
failCount += deleteOldLogs(storgeConfig.getTrafficDataCenter(), maxUsage, minIntervalMinutes, jdbcParam, logType,
@@ -178,11 +180,12 @@ public class StorageQuotaService {
StorageSizeInfo storageSizeInfo = getStorageInfo.apply(dataCenterHost);
if (maxUsage <= storageSizeInfo.getUsage()) {
- List<DailyStorage> historyStorageDaily = getHistoryStorageDaily(jdbcParam, logType, dataCenterName);
+ List<DailyStorage> historyStorageDaily = getHistoryStorageDaily(jdbcParam, logType, dataCenterName, dataCenterHost);
+
Long deleteDate = getDeleteDate(historyStorageDaily, storageSizeInfo.getTotalSize(), maxUsage, zkPath, dataCenterName);
long maxDays = DateUtil.betweenDay(DateUtil.date(deleteDate * 1000), DateUtil.date(), true);
- JobUtil.infoLog("delete old log date : {}, maxDays: {}", deleteDate, maxDays);
+ JobUtil.infoLog("delete old log date : {}, maxDays: {}", DateUtil.date(deleteDate * 1000), maxDays);
ReturnT<String> deleteResult = deleteFunction.apply(JSONUtil.toJsonStr(Maps.of(Constant.MAX_DAYS, maxDays)));
if (deleteResult.getCode() != 200) {
@@ -208,7 +211,7 @@ public class StorageQuotaService {
long now = DateUtil.currentSeconds();
Long lastStorage = getLastStorage(zkClearTimePath);
if (lastStorage != null && now - lastStorage < minIntervalMinutes * 60) {
- JobUtil.infoLog("The deletion task is being executed");
+ JobUtil.infoLog("The deletion task is running");
return false;
}
return true;
@@ -246,7 +249,7 @@ public class StorageQuotaService {
}
}
jobResult.setData(newResultList);
- JobUtil.infoLog("set storage data ... ");
+ JobUtil.infoLog("Set storage data ... ");
}
private JobResult<SysStorageEvent> getStorageQuota(JDBCParam jdbcParam, boolean containIncrease, Map<String, String> dataCenterMap, String logType, String zkNode) {
@@ -303,6 +306,24 @@ public class StorageQuotaService {
return storageSizeInfo;
}
+ private List<DailyStorage> getHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName, String dataCenterHost) throws SQLException {
+ List<DailyStorage> dailyStorageList = new ArrayList<>();
+ switch (logType) {
+ case Constant.TRAFFIC_LOGS:
+ dailyStorageList = getClickhouseHistoryDaily(dataCenterHost);
+ break;
+ case Constant.METRICS:
+ dailyStorageList = getDruidHistoryDaily(dataCenterHost);
+ break;
+ case Constant.FILES:
+ dailyStorageList = getMysqlHistoryStorageDaily(jdbcParam, logType, dataCenterName);
+ break;
+ default:
+ break;
+ }
+ return dailyStorageList;
+ }
+
private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception {
final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter
+ "' and generated_time >= UNIX_TIMESTAMP() - " + MAX_QUERY_INTERVAL + " ORDER BY generated_time DESC LIMIT 1;";
@@ -313,6 +334,7 @@ public class StorageQuotaService {
if (lastUsedSize == null || lastUsedSize.longValue() < 0 || usedSize - lastUsedSize.longValue() <= 0) {
return 0L;
}
+ JobUtil.infoLog("Get {} log last used size is {} bytes {}", logType, lastUsedSize, usedSize - lastUsedSize.longValue());
return usedSize - lastUsedSize.longValue();
} catch (Exception e) {
throw e;
@@ -419,6 +441,42 @@ public class StorageQuotaService {
return storageSizeInfo;
}
+ private List<DailyStorage> getClickhouseHistoryDaily(String dataCenterHost) {
+ Map<String, Object> ckParamMap = storgeConfig.getCkSource();
+ ckParamMap.put("query", clickhouseDailyUsedSizeSql);
+ String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(dataCenterHost), ckParamMap));
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get clickhouse http fail -1");
+ }
+ Map resultMap = JSONUtil.toBean(result, Map.class);
+ List<Map> dataList = (List) resultMap.get("data");
+ List<DailyStorage> dailyStorageList = Lists.newArrayList();
+ for (Map map : dataList) {
+ DailyStorage dailyStorage = new DailyStorage();
+ dailyStorage.setDate(DateUtils.convertStringToTimestamp(String.valueOf(map.get("date")), "yyyyMMdd"));
+ dailyStorage.setBytes(TypeUtils.castToLong(map.get("bytes")));
+ dailyStorageList.add(dailyStorage);
+ }
+
+ return dailyStorageList;
+ }
+
+ private List<DailyStorage> getDruidHistoryDaily(String dataCenterHost) {
+ String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), druidDailyUsedSizeSql);
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get druid http fail -1");
+ }
+ List<Map> dataList = JSONUtil.toList(result, Map.class);
+ List<DailyStorage> dailyStorageList = Lists.newArrayList();
+ for (Map map : dataList) {
+ DailyStorage dailyStorage = new DailyStorage();
+ dailyStorage.setDate(DateUtils.convertStringToTimestamp(String.valueOf(map.get("date")), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+ dailyStorage.setBytes(TypeUtils.castToLong(map.get("bytes")));
+ dailyStorageList.add(dailyStorage);
+ }
+ return dailyStorageList;
+ }
+
/**
* @param node
* @return 自定义的标准时间 last_storage,
@@ -453,23 +511,29 @@ public class StorageQuotaService {
for (DailyStorage dailyStorage : dailyStorageList) {
sumBytes += dailyStorage.getBytes();
- if ((sumBytes / (double) totalSize) >= maxUsage) {
+ double currUsage = getUsage(totalSize, sumBytes);
+ JobUtil.infoLog("The date {} sum(bytes)/totalSize is {}", dailyStorage.getDate(), currUsage);
+ if (currUsage >= maxUsage) {
JobUtil.infoLog("The last date to reach or exceed is {}", dailyStorage.getDate());
+ //记录到达maxUsage的日期
+ deleteDate = dailyStorage.getDate() + Constant.ONE_DAY;
break;
}
- //记录到达maxUsage的日期
- deleteDate = dailyStorage.getDate();
+
}
if (deleteDate == null) {
//获取不到存储配额信息时,使用默认值
Long dcLastStorageTime = getLastStorage(StrUtil.join("/", zkPath, dataCenterName));
if (dcLastStorageTime != null) {
- return dcLastStorageTime + Constant.ONE_DAY;
+ deleteDate = dcLastStorageTime + Constant.ONE_DAY;
} else {
Long lastStorage = getLastStorage(zkPath);
- return lastStorage == null ? Constant.ONE_MONTH : lastStorage + Constant.ONE_DAY;
+ deleteDate = lastStorage == null ? Constant.ONE_MONTH : lastStorage + Constant.ONE_DAY;
}
+
}
+ long currDate = DateUtil.parseDate(DateUtil.today()).getTime() / 1000;
+ deleteDate = deleteDate > currDate ? currDate : deleteDate;
JobUtil.infoLog("The delete date is {}", deleteDate);
return deleteDate;
}
@@ -482,7 +546,7 @@ public class StorageQuotaService {
* @throws SQLException
* @Description 查询数据,获取历史每日存储配额,区分分数据中心日志类型
*/
- private List<DailyStorage> getHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName) throws SQLException {
+ private List<DailyStorage> getMysqlHistoryStorageDaily(JDBCParam jdbcParam, String logType, String dataCenterName) throws SQLException {
try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
List<DailyStorage> dailyStorageList = SqlExecutor.query(conn, StrUtil.format(mysqlDailyUsedSizeSql, jdbcParam.getDatabase(),
jdbcParam.getTable(), logType, dataCenterName), new BeanListHandler<>(DailyStorage.class));
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
index c109a04..465e883 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
@@ -1,9 +1,6 @@
package com.mesalab.executor.test;
import cn.hutool.core.date.DateUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.db.handler.BeanListHandler;
-import cn.hutool.db.sql.SqlExecutor;
import cn.hutool.json.JSONUtil;
import com.geedgenetworks.utils.DateUtils;
import com.geedgenetworks.utils.JsonMapper;
@@ -27,7 +24,6 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
-import java.sql.Connection;
import java.util.*;
import java.util.stream.Collectors;
@@ -158,21 +154,22 @@ public class StorageQuotaTest {
public void testQueryMysql() {
String mysqlSql = "SELECT UNIX_TIMESTAMP(FROM_UNIXTIME(generated_time,'%Y%m%d')) AS date, sum(bytes) AS used_size FROM sys_storage_event WHERE log_type='Files' AND data_center='xxg' GROUP BY date, log_type ORDER BY date ASC";
- try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
- List<DailyStorage> dailyStorageList = SqlExecutor.query(conn, mysqlSql, new BeanListHandler<>(DailyStorage.class));
- log.info(dailyStorageList.toString());
- } catch (Exception e) {
- log.error(e.getMessage());
- }
+// try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
+// List<DailyStorage> dailyStorageList = SqlExecutor.query(conn, mysqlSql, new BeanListHandler<>(DailyStorage.class));
+// log.info(dailyStorageList.toString());
+// } catch (Exception e) {
+// log.error(e.getMessage());
+// }
+ System.err.println(DateUtil.parseDate(DateUtil.today()).getTime());
}
- private final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from system.parts_cluster WHERE database = '{}' {} and active = 1 group by date;",
- storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion()));
- private final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+// public final String clickhouseDailyUsedSizeSql = StrUtil.format("SELECT DISTINCT(`partition`) as date, SUM(`bytes_on_disk`) as bytes from {} WHERE database = '{}' {} and date <= formatDateTime(now(),'%Y%m%d') and active = 1 group by date;",
+// storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource(), storgeConfig.getNotInSql("table", storgeConfig.getTafficExclusion()));
+// public final String druidDailyUsedSizeSql = "{\"query\":\"SELECT \\\"start\\\" as \\\"date\\\", sum(\\\"size\\\") as bytes from sys.segments where is_published = 1 and \\\"start\\\" < '3000' " + storgeConfig.getNotInSql("datasource", storgeConfig.getAnalyticExclusion()) + " group by \\\"start\\\" ORDER BY \\\"start\\\" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
private List<DailyStorage> getClickhouseDaily(String dataCenterHost) {
Map<String, Object> ckParamMap = storgeConfig.getCkSource();
- ckParamMap.put("query", clickhouseDailyUsedSizeSql);
+ ckParamMap.put("query", "clickhouseDailyUsedSizeSql");
String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(dataCenterHost), ckParamMap));
if ("-1".equals(result)) {
throw new BusinessException("Get clickhouse http fail -1");
@@ -191,7 +188,7 @@ public class StorageQuotaTest {
}
private List<DailyStorage> getDruidDaily(String dataCenterHost) {
- String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), druidDailyUsedSizeSql);
+ String result = HttpClientUtils.httpPost(UrlUtil.getUrl(dataCenterHost, Constant.DRUID_URL), "druidDailyUsedSizeSql");
if ("-1".equals(result)) {
throw new BusinessException("Get druid http fail -1");
}