summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2024-07-01 18:11:41 +0800
committerzhanghongqing <[email protected]>2024-07-01 18:11:41 +0800
commit43fe8d22b16737fbceab6875ec3d30210e4908c7 (patch)
tree43b53f72d17d0d19a4312242dddbb7960468b447
parentdee95eb5ed7cf292bd334c662971863afe07ebe1 (diff)
[新增][日志删除] 增加删除日志任务delete_old_log TSG-21553
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java326
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java3
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java32
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java15
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java314
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java705
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java34
8 files changed, 574 insertions, 857 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
index 30c59e5..b866c2a 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
@@ -7,11 +7,13 @@ public class Constant {
public static final String REPORT_AND_METRICS = "Report and Metrics";
public static final String TRAFFIC_LOGS = "Traffic Logs";
public static final String METRICS = "Metrics";
+ public static final String MAX_DAYS = "maxDays";
// zookeeper /path+node
public static final String ZK_TRAFFIC_LOGS = "Traffic-Logs";
public static final String ZK_REPORT_AND_METRICS = "Report-and-Metrics";
public static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/";
+ public static final String ZOOKEEPER_STORAGE_CLEAR_TIME = "clearTime";
public static final String TOKEN = "Token";
public static final String TEXT_XML = "text/xml";
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index 7aa5670..e7f85a6 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -2,6 +2,7 @@ package com.mesalab.executor.jobhandler;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.thread.ThreadUtil;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import com.geedgenetworks.utils.DateUtils;
import com.geedgenetworks.utils.JsonMapper;
@@ -15,20 +16,19 @@ import com.mesalab.executor.core.utils.*;
import com.mesalab.executor.exception.BusinessException;
import com.mesalab.executor.pojo.LifecycleConfiguration;
import com.mesalab.executor.pojo.ListAllMyBucketsResult;
-import com.mesalab.executor.service.StorageQuotaService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -39,90 +39,31 @@ public class LogStorageQuotaJob {
private Log logger = Log.get();
private static final String analyticUrl = "/druid/v2/sql";
- private static final String trafficPort = "8123";
private static final String ck_cluster = "ck_cluster";
private static final String max_days = "maxDays";
- @Autowired
- StorageQuotaService storageQuotaService;
+ @Value("${zookeeper.server}")
+ private String zookeeperServer;
- private StorgeConfig deletionConfig = (StorgeConfig)SpringContextUtil.getBean("storgeConfig");
+ private StorgeConfig deletionConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig");
//*********************************************设置存储策略**********************************************************
-
+ private final Header[] hosHeaders = {new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken()),
+ new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
/**
* 设置流量日志存储策略储
* clickhouse 数据库
*
- * @param params {"maxdays":30}
+ * @param params {"maxDays":30}
*/
- @XxlJob("deleteTrafficDataJobHandler2")
- public ReturnT<String> deleteTrafficData(String params) {
+ @XxlJob("deleteTrafficDataJobHandler")
+ public ReturnT<String> deleteTrafficDataByCluster(String params) {
try {
Map<String, Object> paramsMap = validParams(params);
if (ObjectUtils.isEmpty(paramsMap)) {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- Map<String, Object> deleteParamMap = deletionConfig.getCkSource();
-
- //根据服务器地址,表,分区去删除数据
- String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), ".");
- String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -Integer.valueOf(String.valueOf(paramsMap.get(max_days))));
- UrlBuilder urlBuilder = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8);
- List<String> addressForCKs = getAddressForCK(urlBuilder.toString());
- int failCount = 0;
- // 指定服务器
- for (String endpoint : addressForCKs) {
- try {
- // 指定表
- String url = UrlUtil.getUrl(endpoint);
- List<String> tablesForCKs = getTablesForCK(url,deletionConfig.getNotInSql("name",deletionConfig.getTafficExclusion()));
- for (String table : tablesForCKs) {
- try {
- //指定分区
- List<String> partitions = getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;"));
- for (String partition : partitions) {
- try {
- deleteParamMap.put("query", Joiner.on("").join(prefixDeleteSql, table, " DROP PARTITION ", partition));
- HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), "");
- } catch (BusinessException e) {
- failCount++;
- logger.error("clickhouse ttl error endpoint = {}, table = {}, partition = {}, fail message = {}", endpoint, table, partition, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("clickhouse ttl error partitionFailCount={}, endpoint = {}, table = {}, partition = {}, fail message = {}", failCount, endpoint, table, partition, JobUtil.getErrorMsg(e));
- }
- }
- logger.info("clickhouse ttl clickhouse endpoint = {}, table = {}, partition < {}, size = {}, fail count = {}", endpoint, table, deleteMaxDate, partitions.size(), failCount);
- } catch (BusinessException e) {
- failCount++;
- logger.error("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e));
- }
- }
- XxlJobLogger.log("clickhouse ttl endpoint = {}, failCount = {}, tables = {}", endpoint, failCount, tablesForCKs);
- } catch (BusinessException e) {
- failCount++;
- logger.error("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
- }
- }
- modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, Integer.valueOf(String.valueOf(paramsMap.get(max_days))));
- if (failCount > 0) {
- throw new BusinessException("reset storage clickhouse ttl error ,failCount " + failCount);
- }
-
- } catch (BusinessException be) {
- logger.error(be.getMessage());
- XxlJobLogger.log(be.getMessage());
- return ReturnT.FAIL;
- }
- return ReturnT.SUCCESS;
- }
-
- @XxlJob("deleteTrafficDataJobHandler")
- public ReturnT<String> deleteTrafficDataByCluster(String params) {
- try {
- Map<String, Object> paramsMap = validParams(params);
if (ObjectUtils.isEmpty(paramsMap)) {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
@@ -149,7 +90,7 @@ public class LogStorageQuotaJob {
XxlJobLogger.log("reset storage days table {}, drop partition size {} ", table, partitionList.size());
partitionList.forEach(partition -> {
- String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'");
+ String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'", partition, "'");
deleteParamMap.put("query", deleteSql);
logger.info("reset storage days clickhouse sql:{}", deleteSql);
@@ -167,8 +108,8 @@ public class LogStorageQuotaJob {
if (failCount > 0) {
throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount);
}
- logger.info("reset clickhouse {} days success",maxdays);
- XxlJobLogger.log("reset clickhouse {} days success",maxdays);
+ logger.info("reset clickhouse {} days success", maxdays);
+ XxlJobLogger.log("reset clickhouse {} days success", maxdays);
} catch (BusinessException be) {
logger.error(be.getMessage());
XxlJobLogger.log(be.getMessage());
@@ -308,122 +249,75 @@ public class LogStorageQuotaJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- List<String> endpointList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesHosServer());
- List<String> whiteList = StringUtil.isEmpty(deletionConfig.getFilesExclusion()) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion());
- Header header1 = new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken());
- Header header2 = new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML);
- Header[] headers = {header1, header2};
+ String maxDays = String.valueOf(paramsMap.get(max_days));
+ List<String> hosHostList = parseList(deletionConfig.getFilesHosServer());
int failCount = 0;
- for (String endpoint : endpointList) {
- try {
- String filesServer = UrlUtil.getUrl(endpoint);
- String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header1);
- if (httpGetRes != null && !"-1".equals(httpGetRes)) {
- ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
- logger.info("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, list);
- if (ObjectUtils.allNotNull(list, list.getBuckets(), list.getBuckets().getBucket())) {
- ListAllMyBucketsResult.Buckets buckets = list.getBuckets();
- List<ListAllMyBucketsResult.Buckets.Bucket> bucketList = buckets.getBucket();
-
- LifecycleConfiguration lc = new LifecycleConfiguration();
- lc.getRule().setExpiration(new LifecycleConfiguration.Expiration(String.valueOf(paramsMap.get(max_days))));
-
- for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) {
- try {
- if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) {
- LifecycleConfiguration whiteLc = new LifecycleConfiguration();
- whiteLc.getRule().setExpiration(new LifecycleConfiguration.Expiration("24855"));//hos保存最大年限
- HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(whiteLc), headers);
- continue;
- }
- HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(lc), headers);
- } catch (BusinessException e) {
- failCount++;
- logger.error("reset storage files ttl failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucket, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucket, JobUtil.getErrorMsg(e));
- }
- }
- }
- } else {
- failCount++;
- logger.error("reset storage files ttl failCount = {}, endpoint = {} , fail message = I/O exception", failCount, endpoint);
- XxlJobLogger.log("endpoint = {} , fail message = I/O exception", endpoint);
- }
- } catch (BusinessException e) {
- logger.error("reset storage files ttl endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
- failCount++;
- }
+ for (String hosHost : hosHostList) {
+ failCount += deleteHosData(hosHost, maxDays);
}
- modifyLastStorage(Constant.FILES, Integer.valueOf(String.valueOf(paramsMap.get(max_days))));
+
if (failCount > 0) {
- throw new BusinessException(" number of total endpoints " + endpointList.size() + ", number of failed count " + failCount);
+ JobUtil.errorLog(" number of total endpoints " + hosHostList.size() + ", number of failed count " + failCount);
+ return ReturnT.FAIL;
}
-
} catch (BusinessException be) {
- logger.error(be.getMessage());
- XxlJobLogger.log(be.getMessage());
+ JobUtil.errorLog(be.getMessage());
return ReturnT.FAIL;
}
return ReturnT.SUCCESS;
}
- //*********************************************清库**********************************************************
-
- /**
- * 清除所有流量数据,click hosue库
- *
- * @param params{"maxdays":365}
- */
- @XxlJob("deleteAllTrafficDataJobHandler2")
- public ReturnT<String> deleteAllTrafficData2(String params) {
-
+ public int deleteHosData(String endpoint, String maxDays) {
+ int failCount = 0;
try {
- Map<String, Object> paramsMap = validParams(params);
- if (ObjectUtils.isEmpty(paramsMap)) {
- logger.error("params parser error , params is {}", params);
- return IJobHandler.FAIL;
- }
- List<String> addressForCKs = getAddressForCK(UrlUtil.getUrl(deletionConfig.getTrafficServer()));
- Map<String, Object> deleteParamMap = deletionConfig.getCkSource();
- //清库命令参数
- String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), ".");
- int failCount = 0;
- for (String endpoint : addressForCKs) {
+ String hosServer = UrlUtil.getUrl(endpoint);
+ // 获取桶列表
+ List<String> bucketList = getBucketList(endpoint, hosServer);
+ LifecycleConfiguration lifecycleConfig = new LifecycleConfiguration();
+
+ lifecycleConfig.getRule().setExpiration(new LifecycleConfiguration.Expiration(maxDays));
+ JobUtil.infoLog("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, bucketList);
+ for (String bucketName : bucketList) {
try {
- String url = UrlUtil.getUrl(endpoint);
- List<String> tablesForCKs = getTablesForCK(url, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion()));
- for (String table : tablesForCKs) {
- try {
- deleteParamMap.put("query", deleteSql.concat(table));
- logger.info("delete clickhouse endpoint = {}, sql:{}", endpoint, deleteSql.concat(table));
- HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), "");
- } catch (BusinessException e) {
- logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e));
- failCount++;
- }
- }
- XxlJobLogger.log("delete storage clickhouse error endpoint = {}, fail count = {}, tables = {}", endpoint, failCount, tablesForCKs);
+ HttpClientUtils.httpPut(StringUtils.join(hosServer, "/hos/", bucketName, "?lifecycle"), XmlUtil.converToXml(lifecycleConfig), hosHeaders);
} catch (BusinessException e) {
- logger.error("delete storage clickhouse error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
- XxlJobLogger.log("delete storage clickhouse error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
failCount++;
+ JobUtil.errorLog("reset storage files ttl failCount = {}, endpoint = {}, bucket = {}, fail message = {}", failCount, endpoint, bucketName, JobUtil.getErrorMsg(e));
}
}
- modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, 0);
- if (failCount > 0) {
- throw new BusinessException("delete storage clickhouse error ,failCount " + failCount);
- }
+ modifyLastStorage(Constant.FILES, Integer.valueOf(maxDays));
+ } catch (BusinessException e) {
+ JobUtil.errorLog("reset storage files ttl endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
+ failCount++;
+ }
+ return failCount;
+ }
- } catch (BusinessException be) {
- logger.error(be.getMessage());
- XxlJobLogger.log(be.getMessage());
- return ReturnT.FAIL;
+ private List<String> getBucketList(String endpoint, String hosServer) {
+ List<String> whiteList = parseList(deletionConfig.getFilesExclusion());
+ String httpGetRes = HttpClientUtils.httpGet(hosServer + "/hos/", hosHeaders);
+ if (httpGetRes == null || "-1".equals(httpGetRes)) {
+ throw new BusinessException(StrUtil.format("reset storage files ttl endpoint = {} , fail message = I/O exception", endpoint));
}
- return ReturnT.SUCCESS;
+ List<String> bucketList = Lists.newArrayList();
+ ListAllMyBucketsResult bucketsResult = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
+
+ if (ObjectUtils.allNotNull(bucketsResult, bucketsResult.getBuckets(), bucketsResult.getBuckets().getBucket())) {
+ for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketsResult.getBuckets().getBucket()) {
+ if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) {
+ continue;
+ }
+ bucketList.add(bucket.getName());
+ }
+ }
+ return bucketList;
+ }
+
+ private List<String> parseList(String csv) {
+ return StringUtil.isEmpty(csv) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(csv);
}
+ //*********************************************清库**********************************************************
/**
* 清除所有流量数据,click hosue库
@@ -451,12 +345,12 @@ public class LogStorageQuotaJob {
for (String table : tablesForCKs) {
try {
- String deleteSqlCluster = String.join(" ",deleteSql, table, " ON CLUSTER ", ck_cluster);
+ String deleteSqlCluster = String.join(" ", deleteSql, table, " ON CLUSTER ", ck_cluster);
deleteParamMap.put("query", deleteSqlCluster);
logger.info("delete clickhouse table = {}, sql:{}", table, deleteSqlCluster);
HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), "");
- XxlJobLogger.log("delete clickhouse table = {}" ,table);
+ XxlJobLogger.log("delete clickhouse table = {}", table);
} catch (BusinessException e) {
logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e));
@@ -468,7 +362,7 @@ public class LogStorageQuotaJob {
XxlJobLogger.log("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs);
} catch (BusinessException e) {
- logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e));
+ logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e));
XxlJobLogger.log("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e));
failCount++;
}
@@ -643,41 +537,6 @@ public class LogStorageQuotaJob {
/**
- * 直接查询数据库获取参数
- *
- * @param params {"topic":"SYS-STORAGE-LOG","ckDayGrowth":"false"}
- * ckDayGrowth:表示是否为一天的执行一次的时间粒度
- */
- @XxlJob("getStorageQuotaJobHandler")
- public ReturnT<String> getStorageQuota(String params) {
-
- try {
- Map<String, Object> paramsMap = validParams(params);
- if (ObjectUtils.isEmpty(paramsMap)) {
- logger.error("params parser error , params is {}", params);
- return IJobHandler.FAIL;
- }
- Map resultMap = storageQuotaService.getDiskJson("true".equals(paramsMap.get("ckDayGrowth")));
-
- //发送到kafka
- new KafkaUtils().sendMessage(String.valueOf(paramsMap.get("topic")), (List<Object>) resultMap.get("data"));
- XxlJobLogger.log("kafka topic is {}", paramsMap.get("topic"));
- //查询存在问题会返回status>0
- int status = ((Map<String, Integer>) resultMap.get("status")).values().stream().mapToInt(Integer::intValue).sum();
- if (status > 0) {
- return new ReturnT<String>(IJobHandler.FAIL.getCode(), " query error " + resultMap.get("status"));
- }
-
- } catch (BusinessException be) {
- logger.error(be.getMessage());
- XxlJobLogger.log(be.getMessage());
- return ReturnT.FAIL;
- }
-
- return ReturnT.SUCCESS;
- }
-
- /**
* 数据库连接参数
*
* @return map
@@ -720,17 +579,6 @@ public class LogStorageQuotaJob {
return tableList;
}
- /***
- * --查所有的IP地址
- * @Date 2020\9\18 0018
- * @return java.util.List<java.lang.String>
- **/
- private List<String> getAddressForCK(String url) {
- List<String> endpointList = new ArrayList<>();
- endpointList.addAll(getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT concat(host_address,':','" + trafficPort + "') as endpoint FROM ", deletionConfig.getSystemClusters(), " FORMAT JSON;")));
- return endpointList;
- }
-
/**
* 查询clickhouse 需要删除的表
*
@@ -772,15 +620,16 @@ public class LogStorageQuotaJob {
* 更新标准时间
*
* @param logType
- * @param maxdays
+ * @param maxDays
*/
- private void modifyLastStorage(String logType, Integer maxdays) {
- // 设置标准时间,如果now - 标准时间 > maxdays,则更新 标准时间 = 当前时间-maxday
- Long lastStorage = storageQuotaService.getLastStorage(logType);
+ private void modifyLastStorage(String logType, Integer maxDays) {
+ // 设置标准时间,如果now - 标准时间 > maxDays,则更新 标准时间 = 当前时间-maxday
+ Long lastStorage = getLastStorage(logType);
Long now = System.currentTimeMillis() / 1000;
- Long max = maxdays * 24 * 60 * 60L;// 单位秒
+ Long max = maxDays * 24 * 60 * 60L;// 单位秒
if (now - lastStorage > max) {
- storageQuotaService.modifyLastStorage(logType, String.valueOf(now - max));
+ ZookeeperUtils zk = new ZookeeperUtils();
+ zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + logType, String.valueOf(now - max), zookeeperServer);
}
}
@@ -793,4 +642,33 @@ public class LogStorageQuotaJob {
return ObjectUtils.isEmpty(startTimeList) ? DateUtils.getCurrentDate() : String.valueOf(startTimeList.get(0).get("version"));
}
+ /**
+ * @param node
+ * @return 自定义的标准时间 last_storage,
+ * 存在zookeeper 节点/storage/worker/+node
+ */
+ public Long getLastStorage(String node) {
+ try {
+ ZookeeperUtils zk = new ZookeeperUtils();
+ //query
+ String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer);
+ //不存在创建一个
+ if (org.springframework.util.ObjectUtils.isEmpty(nodeData)) {
+ Long lastTime = System.currentTimeMillis() / 1000;
+ zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer);
+ return lastTime;
+ }
+ Long lastStorage = Long.valueOf(nodeData);
+ logger.info("query standard time last_storage success,{}", lastStorage);
+
+ return Long.valueOf(nodeData);
+ } catch (Exception e) {
+ logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e));
+ throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e));
+ }
+
+ }
+ class DeleteParams {
+
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
index 0abc2b9..b46c9d7 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
@@ -17,7 +17,6 @@ import com.mesalab.executor.core.utils.HttpClientUtils;
import com.mesalab.executor.core.utils.SpringContextUtil;
import com.mesalab.executor.exception.BusinessException;
import com.mesalab.executor.pojo.ColumnParam;
-import com.mesalab.executor.service.StorageQuotaService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
@@ -37,8 +36,6 @@ public class LogStorageTtlJob {
private static final int materializeTtlAfterModify = 0;
@Autowired
- StorageQuotaService storageQuotaService;
- @Autowired
ConfigService configService;
private static StorgeConfig deletionConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig");
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java
index 3d16deb..5e6d44f 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java
@@ -5,9 +5,11 @@ import cn.hutool.core.util.StrUtil;
import com.mesalab.executor.core.utils.JobUtil;
import com.mesalab.executor.core.utils.TypeUtils;
import com.mesalab.executor.pojo.JDBCParam;
-import com.mesalab.executor.service.StorageQuotaInfoService;
+import com.mesalab.executor.service.StorageQuotaService;
import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
+import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -25,7 +27,7 @@ import java.util.Map;
public class StorageQuotaJob {
@Resource
- private StorageQuotaInfoService storageQuotaInfoService;
+ private StorageQuotaService storageQuotaService;
@XxlJob("getDatabaseStorageQuotaJobHandler")
@@ -40,7 +42,7 @@ public class StorageQuotaJob {
Boolean containBytes = TypeUtils.castToBoolean(source.get("containBytes"));
JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class);
- failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes);
+ failCount += storageQuotaService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes);
}
} catch (Exception e) {
@@ -49,4 +51,28 @@ public class StorageQuotaJob {
}
return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS;
}
+
+ @XxlJob("deleteOldLogJobHandler")
+ public ReturnT<String> deleteOldLogJobHandler(String params) {
+
+ int failCount = 0;
+ try {
+ Map<String, Object> paramsMap = ValidParamUtil.parseParams(params);
+ if (ObjectUtils.isEmpty(paramsMap)) {
+ JobUtil.errorLog("params parse error , params is {}", params);
+ return IJobHandler.FAIL;
+ }
+ String logType = TypeUtils.castToString(paramsMap.get("logType"));
+ int maxUsage = TypeUtils.castToInt(paramsMap.get("maxUsage"));
+ int minIntervalMinutes = TypeUtils.castToInt(paramsMap.get("minIntervalMinutes"));
+
+ failCount += storageQuotaService.deleteOldLog(logType, maxUsage, minIntervalMinutes);
+
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("getDatabaseStorageQuota job error:", e.getMessage());
+ }
+ return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS;
+ }
+
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
index ce41541..611b23e 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
@@ -3,6 +3,7 @@ package com.mesalab.executor.jobhandler;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
+import com.geedgenetworks.utils.StringUtil;
import com.xxl.job.core.log.XxlJobLogger;
import java.util.List;
@@ -54,4 +55,18 @@ public class ValidParamUtil {
}
return paramList;
}
+
+ public static Map<String, Object> parseParams(String params) {
+
+ if (StringUtil.isBlank(params)) {
+ XxlJobLogger.log("params is Empty !");
+ return null;
+ }
+ Map<String, Object> paramMap = JSONUtil.toBean(params, Map.class);
+ if (paramMap == null) {
+ XxlJobLogger.log("params error !");
+ return null;
+ }
+ return paramMap;
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
deleted file mode 100644
index cf1b0d4..0000000
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
+++ /dev/null
@@ -1,314 +0,0 @@
-package com.mesalab.executor.service;
-
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.db.handler.NumberHandler;
-import cn.hutool.db.sql.SqlExecutor;
-import cn.hutool.log.Log;
-import com.geedgenetworks.utils.DateUtils;
-import com.google.common.collect.Lists;
-import com.mesalab.executor.core.config.StorgeConfig;
-import com.mesalab.executor.core.utils.*;
-import com.mesalab.executor.exception.BusinessException;
-import com.mesalab.executor.pojo.HosSpace;
-import com.mesalab.executor.pojo.JDBCParam;
-import com.mesalab.executor.pojo.JobResult;
-import com.mesalab.executor.pojo.SysStorageEvent;
-import org.apache.http.Header;
-import org.apache.http.message.BasicHeader;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ObjectUtils;
-
-import java.sql.Connection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @Description:
- * @Author: zhq
- * @CreateDate: 2024/6/13
- * @Version: 1.0
- */
-
-@Service
-public class StorageQuotaInfoService {
-
- private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig");
- private Log logger = Log.get();
-
- //查询历史数据时允许的最大间隔60 MINUTE
- private static final int STORAGE_LIMITATION = 120 * 60;
-
- @Value("${zookeeper.server}")
- private String zookeeperServer;
-
-
- public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) {
-
- int failCount = 0;
- for (String logType : logTypes) {
- JobResult<SysStorageEvent> jobResult = new JobResult<>();
- try {
- switch (logType) {
- case Constant.TRAFFIC_LOGS:
- jobResult = getClickhouseStorageInfo(jdbcParam, containBytes);
- break;
- case Constant.METRICS:
- jobResult = getDruidStorageInfo(jdbcParam, containBytes);
- break;
- case Constant.FILES:
- jobResult = getHosStorageInfo(jdbcParam, containBytes);
- break;
- default:
- break;
- }
- failCount += jobResult.getFailCount();
- if (containBytes) {
- setStorageAvg(jobResult);
- }
- DBUtils.save(jobResult.getData(), jdbcParam);
- } catch (Exception e) {
- failCount++;
- JobUtil.errorLog(e.getMessage());
- }
- }
- return failCount;
- }
-
- /**
- * 将一天的数据平均分为每5分钟的数据
- *
- * @param jobResult
- */
- private void setStorageAvg(JobResult<SysStorageEvent> jobResult) {
-
- long interval = 60;
- long num = 24 * 60 / interval;
-
- List<SysStorageEvent> newResultList = Lists.newArrayList();
- long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd");
- for (SysStorageEvent sysStorageEvent : jobResult.getData()) {
- Long dayBytes = sysStorageEvent.getBytes();
- long minBytes = dayBytes / num;
- for (long i = 0; i < num; i++) {
- SysStorageEvent newStorageEvent = sysStorageEvent;
- newStorageEvent.setBytes(minBytes);
-
- timestamp = timestamp + interval * 60;
- newStorageEvent.setGeneratedTime(timestamp);
- newResultList.add(newStorageEvent);
- }
- }
- jobResult.setData(newResultList);
- }
-
- private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
-
- final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks());
- final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource());
-
- List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
- JobResult<SysStorageEvent> jobResult = new JobResult();
- int failCount = 0;
- long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
- for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) {
- try {
- String datacenterHost = datacenterMap.getValue();
- String datacenterName = datacenterMap.getKey();
- Map<String, Object> ckParamMap = storgeConfig.getCkSource();
- // 1. 总计
- ckParamMap.put("query", maxSizeSql);
- Long totalSize = queryClickhouse(datacenterHost, ckParamMap);
- //2. 已使用
- ckParamMap.put("query", usedSizeSql);
- Long usedSize = queryClickhouse(datacenterHost, ckParamMap);
-
- //3. 增量
- Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS);
- SysStorageEvent storageEvent = SysStorageEvent.builder()
- .logType(Constant.TRAFFIC_LOGS)
- .dataCenter(datacenterName)
- .generatedTime(generatedTime)
- .totalAllocatedSize(totalSize)
- .usedSize(usedSize)
- .bytes(bytes)
- .sinceTime(sinceTime)
- .build();
-
- sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString());
- } catch (Exception e) {
- failCount++;
- JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
- }
- }
- jobResult.setFailCount(failCount);
- jobResult.setData(sysStorageEvents);
- return jobResult;
- }
-
- private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
-
- final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
- final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
-
- List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
- JobResult<SysStorageEvent> jobResult = new JobResult();
- int failCount = 0;
- long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS);
- for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) {
- try {
- String datacenterHost = datacenterMap.getValue();
- String datacenterName = datacenterMap.getKey();
- // 1. 总计
- Long totalSize = queryDruid(datacenterHost, maxSizeSql);
- //2. 已使用
- Long usedSize = queryDruid(datacenterHost, usedSizeSql);
-
- //3. 增量
- Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS);
- SysStorageEvent storageEvent = SysStorageEvent.builder()
- .logType(Constant.METRICS)
- .dataCenter(datacenterName)
- .generatedTime(generatedTime)
- .totalAllocatedSize(totalSize)
- .usedSize(usedSize)
- .bytes(bytes)
- .sinceTime(sinceTime)
- .build();
-
- sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString());
- } catch (Exception e) {
- failCount++;
- JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
- }
- }
- jobResult.setFailCount(failCount);
- jobResult.setData(sysStorageEvents);
- return jobResult;
- }
-
- private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
-
- List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
- JobResult<SysStorageEvent> jobResult = new JobResult();
- int failCount = 0;
- Long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.FILES);
- for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) {
- try {
- String datacenterHost = datacenterMap.getValue();
- String datacenterName = datacenterMap.getKey();
-
- HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName);
- Long totalSize = hosSpace.getHosCapacity();
- Long usedSize = hosSpace.getHosUsed();
- if (totalSize == -1 || usedSize == -1) {
- throw new BusinessException("hos server error : " + datacenterName);
- }
-
- //3. 增量
- Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES);
- SysStorageEvent storageEvent = SysStorageEvent.builder()
- .logType(Constant.FILES)
- .dataCenter(datacenterName)
- .generatedTime(generatedTime)
- .totalAllocatedSize(totalSize)
- .usedSize(usedSize)
- .bytes(bytes)
- .sinceTime(sinceTime)
- .build();
-
- sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString());
- } catch (Exception e) {
- failCount++;
- JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
- }
- }
- jobResult.setFailCount(failCount);
- jobResult.setData(sysStorageEvents);
- return jobResult;
- }
-
-
- private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception {
- final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter
- + "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;";
- JobUtil.infoLog(lastUsedSizeSql);
-
- try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
- Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler());
- if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) {
- return 0L;
- }
-
- return usedSize - lastUsedSize.longValue();
- } catch (Exception e) {
- throw e;
- }
- }
-
- private Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) {
- String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
- if ("-1".equals(result)) {
- throw new BusinessException("Get clickhouse http fail -1");
- }
- return Long.valueOf(result.trim());
- }
-
- private Long queryDruid(String datacenterHost, String usedSizeSql) {
- final String druidPath = "/druid/v2/sql";
- String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql);
-
- if ("-1".equals(result)) {
- throw new BusinessException("Get druid http fail -1");
- }
- return Long.valueOf(result.trim());
- }
-
- private HosSpace getHosSpace(String datacenterHost, String datacenterName) {
- final String fileStoragePath = "/admin/diskspace";
- Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
- String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers);
-
- if ("-1".equals(result) || "-1".equals(result)) {
- throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName);
- }
- HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
- return hosSpace;
- }
-
- /**
- * @param node
- * @return 自定义的标准时间 last_storage,
- * 存在zookeeper 节点/storage/worker/+node
- */
- private Long getLastStorage(String node) {
- try {
- ZookeeperUtils zk = new ZookeeperUtils();
- String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer);
- //不存在创建一个
- if (ObjectUtils.isEmpty(nodeData)) {
- Long lastTime = System.currentTimeMillis() / 1000;
- zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer);
- return lastTime;
- }
- Long lastStorage = Long.valueOf(nodeData);
- logger.info("query standard time last_storage success,{}", lastStorage);
-
- return Long.valueOf(nodeData);
- } catch (Exception e) {
- logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e));
- throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e));
- }
-
- }
-
-
-}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
index 26b9ab3..d8bbf03 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaService.java
@@ -1,13 +1,22 @@
package com.mesalab.executor.service;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.db.handler.NumberHandler;
+import cn.hutool.db.sql.SqlExecutor;
import cn.hutool.log.Log;
import com.geedgenetworks.utils.DateUtils;
-import com.geedgenetworks.utils.JsonMapper;
+import com.google.common.collect.Lists;
import com.mesalab.executor.core.config.StorgeConfig;
import com.mesalab.executor.core.utils.*;
import com.mesalab.executor.exception.BusinessException;
+import com.mesalab.executor.jobhandler.LogStorageQuotaJob;
import com.mesalab.executor.pojo.HosSpace;
-import com.xxl.job.core.log.XxlJobLogger;
+import com.mesalab.executor.pojo.JDBCParam;
+import com.mesalab.executor.pojo.JobResult;
+import com.mesalab.executor.pojo.SysStorageEvent;
+import com.xxl.job.core.biz.model.ReturnT;
+import org.apache.groovy.util.Maps;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.springframework.beans.factory.annotation.Value;
@@ -15,352 +24,482 @@ import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
-import java.util.ArrayList;
-import java.util.HashMap;
+import javax.annotation.Resource;
+import javax.management.timer.Timer;
+import java.sql.Connection;
import java.util.List;
import java.util.Map;
/**
- * 存储配额获取指标类 组合 原始日志结果对象,统计日志结果对象,文件系统结果对象
- *
- * @author qidaijie
+ * @Description:
+ * @Author: zhq
+ * @CreateDate: 2024/6/13
+ * @Version: 1.0
*/
+
@Service
public class StorageQuotaService {
+ private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig");
private Log logger = Log.get();
- private static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/";
- private static final String FILE_STORAGE_PATH = "/admin/diskspace";
- private static final String DRUID_PATH = "/druid/v2/sql";
- private static final int STORAGE_LIMITATION = 60; //查询历史数据时允许的最大间隔60 MINUTE
+ //查询历史数据时允许的最大间隔second
+ private static final int MAX_QUERY_INTERVAL = 2 * 60 * 60;
+ final String clickhouseMaxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks());
+ final String clickhouseUsedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource());
@Value("${zookeeper.server}")
private String zookeeperServer;
- private static StorgeConfig storgeConfig = (StorgeConfig)SpringContextUtil.getBean("storgeConfig");
-
- /**
- * 用于获取 ClickHouse 当前存储大小 若获取 failquerydruid内最新的值替补当前值
- */
- private Long getClickHouseCurr(String point) {
- Map<String, Object> deleteParamMap = storgeConfig.getCkSource();
- String currSql = "SELECT SUM(`bytes_on_disk`) FROM " + storgeConfig.getSystemPartsCluster() + " WHERE database = '" + storgeConfig.getTrafficDatasource() + "';";
- deleteParamMap.put("query", currSql);
- String currResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap));
- Long result = Long.valueOf(currResult.trim());
- logger.info("query clickhouse success, used_size={}", result);
- return result;
+ @Resource
+ private LogStorageQuotaJob logStorageQuotaJob;
+
+ public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) {
+
+ int failCount = 0;
+ for (String logType : logTypes) {
+ JobResult<SysStorageEvent> jobResult = new JobResult<>();
+ try {
+ switch (logType) {
+ case Constant.TRAFFIC_LOGS:
+ jobResult = getClickhouseStorageInfo(jdbcParam, containBytes);
+ break;
+ case Constant.METRICS:
+ jobResult = getDruidStorageInfo(jdbcParam, containBytes);
+ break;
+ case Constant.FILES:
+ jobResult = getHosStorageInfo(jdbcParam, containBytes);
+ break;
+ default:
+ break;
+ }
+ failCount += jobResult.getFailCount();
+ if (containBytes) {
+ setStorageAvg(jobResult);
+ }
+ DBUtils.save(jobResult.getData(), jdbcParam);
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog(e.getMessage());
+ }
+ }
+ return failCount;
}
- /**
- * 用于获取ClickHouse最大存储大小 若获取 failquerydruid内最新的值替补当前值
- */
- private Long getClickHouseMax(String point) {
- Map<String, Object> deleteParamMap = storgeConfig.getCkSource();
- String maxSql = "SELECT SUM(`total_space`) FROM ".concat(storgeConfig.getSystemDisks()).concat(";");
- deleteParamMap.put("query", maxSql);
- String maxResult = HttpClientUtils
- .httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap));
- long result = Long.parseLong(maxResult.trim());
- logger.info("query clickhouse max_size success ,{}", result);
- return result;
- }
+ public int deleteOldLog(String logType, int maxUsage, int minIntervalMinutes) {
- /**
- * 用于获取ClickHouse 差值 若获取 fail直接写入0
- */
- private Long getClickHouseDiff(String point) {
- String date = DateUtils.getDateOfYesterday("yyyyMMdd");
- Map<String, Object> deleteParamMap = storgeConfig.getCkSource();
- String diffSql = "SELECT SUM(bytes_on_disk) FROM ".concat(storgeConfig.getSystemPartsCluster()).concat(" WHERE database = '" ).concat(storgeConfig.getTrafficDatasource())
- .concat("' AND partition = '").concat(date).concat("';");
- deleteParamMap.put("query", diffSql);
- String diffResult = HttpClientUtils
- .httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(point), deleteParamMap));
- long result = Long.parseLong(diffResult.trim());
- logger.info("query clickhouse success aggregate_size={}", result);
- return result;
+ int failCount = 0;
+ try {
+ switch (logType) {
+ case Constant.TRAFFIC_LOGS:
+ failCount+= deleteClickhouseOldLog(maxUsage, minIntervalMinutes);
+ break;
+ case Constant.METRICS:
+ failCount+= deleteDruidOldLog(maxUsage, minIntervalMinutes);
+ break;
+ case Constant.FILES:
+ failCount+= deleteHosOldLog(maxUsage, minIntervalMinutes);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog(e.getMessage());
+ }
+ return failCount;
}
-//========================Druid==============================
+ public int deleteClickhouseOldLog(int maxUsage, int minIntervalMinutes) {
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) {
+ try {
+ String dataCenterHost = datacenterMap.getValue();
+ String dataCenterName = datacenterMap.getKey();
- /**
- * 获取Druid当前存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值
- */
- private Long getDruidUsed(String point) {
- String currSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- String currResult = HttpClientUtils.httpPost(UrlUtil.getUrl(point).concat(DRUID_PATH), currSql);
- List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(currResult, List.class);
- if(list!=null&&list.size()==0){
- return 0L;
- }
- Long currSize = 0L;
- if(list!=null&&list.get(0)!=null){
- Map<String, Object> map = list.get(0);
- if(map!=null&&map.get("used_size")!=null){
- currSize = Long.valueOf(String.valueOf(map.get("used_size")));
+ String zkClearTimePath = StrUtil.join("/", Constant.ZK_TRAFFIC_LOGS, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME);
+ if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) {
+ continue;
+ }
+ double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName);
+ if (maxUsage <= clickhouseUsage * 100d) {
+ // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1
+ // 2.空则使用 maxDays = now - last_storage - 1
+ // 3.调用删除接口 参数maxDays, 记录日志
+ // 4.修改 datacenter时间
+ String zkDcLastStoragePath = StrUtil.join("/", Constant.ZK_TRAFFIC_LOGS, dataCenterName);
+ Long lastStorageTime = getLastStorage(zkDcLastStoragePath);
+ if (lastStorageTime == null) {
+ lastStorageTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
+ }
+ // 删除1天
+ long storageTime = lastStorageTime - Timer.ONE_DAY;
+ Long maxDays = DateUtil.currentSeconds() - storageTime;
+
+ ReturnT<String> deleteResult = logStorageQuotaJob.deleteTrafficDataByCluster(Maps.of(Constant.MAX_DAYS, maxDays).toString());
+ if (deleteResult.getCode() != 200) {
+ failCount++;
+ JobUtil.errorLog("{} clickhouse log delete fail {}", dataCenterName, deleteResult.getMsg());
+ continue;
+ }
+ setLastStorage(zkDcLastStoragePath, storageTime);
+ JobUtil.infoLog("{} clickhouse log delete success {}");
+ }
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("clickhouse delete log error {}", e.getMessage());
}
}
- logger.info("query druid used_size success,{}", currSize);
- return currSize;
+
+ return failCount;
}
- /**
- * 获取Druid最大存储大小 若获取 fail直接补0,Druid本身无法提供服务无需再query上次值
- */
- private Long getDruidMax(String point) {
- String maxSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- String maxResult = HttpClientUtils.httpPost(UrlUtil.getUrl(point).concat(DRUID_PATH), maxSql);
- List<Map<String, Object>> list = (List) JsonMapper.fromJsonString(maxResult, List.class);
- Long maxSize = 0L;
- if(list!=null&&list.get(0)!=null){
- Map<String, Object> map = list.get(0);
- if(map!=null&&map.get("max_size")!=null){
- maxSize = Long.valueOf(String.valueOf(map.get("max_size")));
+ public int deleteDruidOldLog(int maxUsage, int minIntervalMinutes) {
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) {
+ try {
+ String dataCenterHost = datacenterMap.getValue();
+ String dataCenterName = datacenterMap.getKey();
+
+ String zkClearTimePath = StrUtil.join("/", Constant.ZK_REPORT_AND_METRICS, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME);
+ if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) {
+ continue;
+ }
+ double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName);
+ if (maxUsage <= clickhouseUsage * 100d) {
+ // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1
+ // 2.空则使用 maxDays = now - last_storage - 1
+ // 3.调用删除接口 参数maxDays, 记录日志
+ // 4.修改 datacenter时间
+ String zkDcLastStoragePath = StrUtil.join("/", Constant.ZK_REPORT_AND_METRICS, dataCenterName);
+ Long lastStorageTime = getLastStorage(zkDcLastStoragePath);
+ if (lastStorageTime == null) {
+ lastStorageTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS);
+ }
+ // 删除1天
+ long storageTime = lastStorageTime - Timer.ONE_DAY;
+ Long maxDays = DateUtil.currentSeconds() - storageTime;
+
+ ReturnT<String> deleteResult = logStorageQuotaJob.deleteReportAndMetricsData(Maps.of(Constant.MAX_DAYS, maxDays).toString());
+ if (deleteResult.getCode() != 200) {
+ failCount++;
+ JobUtil.errorLog("{} druid log delete fail {}", dataCenterName, deleteResult.getMsg());
+ continue;
+ }
+ setLastStorage(zkDcLastStoragePath, storageTime);
+ JobUtil.infoLog("{} druid log delete success {}");
+ }
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("druid delete log error {}", e.getMessage());
}
}
- logger.info("query druid max_size success,{}", maxSize);
- return maxSize;
+
+ return failCount;
}
- /**
- * 获取hos前存储大小 若获取 failquerydruid内最新的值替补当前值
- */
- private Map getHBaseStorage(String key,String point) {
- Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
- String result = HttpClientUtils.httpGet(UrlUtil.getUrl(point).concat(FILE_STORAGE_PATH), headers);
- if("-1".equals(result)){
- throw new BusinessException("hos server error");
- }
- HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
- Long hosCapacity = hosSpace.getHosCapacity();
- Long hosUsed = hosSpace.getHosUsed();
- if (hosCapacity==-1||hosUsed==-1){
- throw new BusinessException("hos server error");
+ public int deleteHosOldLog(int maxUsage, int minIntervalMinutes) {
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) {
+ try {
+ String dataCenterHost = datacenterMap.getValue();
+ String dataCenterName = datacenterMap.getKey();
+
+ String zkClearTimePath = StrUtil.join("/", Constant.FILES, dataCenterName, Constant.ZOOKEEPER_STORAGE_CLEAR_TIME);
+
+ if (!checkStorageInterval(zkClearTimePath, minIntervalMinutes)) {
+ continue;
+ }
+ double clickhouseUsage = getClickhouseUsage(dataCenterHost, dataCenterName);
+ if (maxUsage <= clickhouseUsage * 100d) {
+ // 1.获取删除天数 /datacenter maxDays = now - data_center_last_storage - 1
+ // 2.空则使用 maxDays = now - last_storage - 1
+ // 3.调用删除接口 参数maxDays, 记录日志
+ // 4.修改 datacenter时间
+ String zkDcLastStoragePath = StrUtil.join("/", Constant.FILES, dataCenterName);
+ Long lastStorageTime = getLastStorage(zkDcLastStoragePath);
+ if (lastStorageTime == null) {
+ lastStorageTime = getLastStorage(Constant.FILES);
+ }
+ // 删除1天
+ long storageTime = lastStorageTime - Timer.ONE_DAY;
+ Long maxDays = DateUtil.currentSeconds() - storageTime;
+
+ ReturnT<String> deleteResult = logStorageQuotaJob.deleteFiles(Maps.of(Constant.MAX_DAYS, maxDays).toString());
+ if (deleteResult.getCode() != 200) {
+ failCount++;
+ JobUtil.errorLog("{} hos log delete fail {}", dataCenterName, deleteResult.getMsg());
+ continue;
+ }
+ setLastStorage(zkDcLastStoragePath, storageTime);
+ JobUtil.infoLog("{} hos log delete success {}");
+ }
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("hos delete log error {}", e.getMessage());
+ }
}
- Map<String, Object> data = new HashMap<>();
- data.put("max_size", hosCapacity);
- data.put("used_size", hosUsed);
- data.put("aggregate_size", getDiffNum(hosUsed, getCurrBefore(Constant.FILES,key)));
+ return failCount;
+ }
- logger.info("query file storage success,{}", data);
- return data;
+ private boolean checkStorageInterval(String zkClearTimePath, int minIntervalMinutes) {
+ long now = DateUtil.currentSeconds();
+ Long lastStorage = getLastStorage(zkClearTimePath);
+ if (lastStorage != null && now - lastStorage <= minIntervalMinutes * 60) {
+ JobUtil.infoLog("The deletion task is being executed");
+ return false;
+ }
+ return true;
+ }
+
+ private double getClickhouseUsage(String dataCenterHost, String dataCenterName) {
+ Map<String, Object> ckParamMap = storgeConfig.getCkSource();
+ // 1. 总计
+ ckParamMap.put("query", clickhouseMaxSizeSql);
+ Long totalSize = queryClickhouse(dataCenterHost, ckParamMap);
+ //2. 已使用
+ ckParamMap.put("query", clickhouseUsedSizeSql);
+ Long usedSize = queryClickhouse(dataCenterHost, ckParamMap);
+ double usage = usedSize / (double) totalSize;
+ usage = Double.valueOf(String.format("%.4f", usage));
+ JobUtil.infoLog("clickhouse total size {}, used size {}, usage {}",totalSize, usedSize, usage);
+ return usage;
}
-//=======================工具类方法===============================
/**
- * 用于通过druid获取上次对应类型的Curr值
+ * 将一天的数据平均分为每5分钟的数据
*
- * @param logType 统计类型
- * @return 上次的值,如果不是当天,也不是在00:00:00则返回0
- * //如果历史记录时间比现在少于1个小时,写0,如果最近一小时为0则查今天的
+ * @param jobResult
*/
- private Long getCurrBefore(String logType, String key) {
- String currSql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType
- + "' and data_center = '"+ key +"' and __time >= CURRENT_TIMESTAMP - INTERVAL '"+ STORAGE_LIMITATION +"' MINUTE ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
- String currResult = HttpClientUtils.httpPost(UrlUtil.getUrl(storgeConfig.getAnalyticServer()).concat(DRUID_PATH), currSql);
- List<Map> list = (List) JsonMapper.fromJsonString(currResult, List.class);
- if(list!=null&&list.size()==0){
- return 0L;
+ private void setStorageAvg(JobResult<SysStorageEvent> jobResult) {
+
+ long interval = 60;
+ long num = 24 * 60 / interval;
+
+ List<SysStorageEvent> newResultList = Lists.newArrayList();
+ long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd");
+ for (SysStorageEvent sysStorageEvent : jobResult.getData()) {
+ Long dayBytes = sysStorageEvent.getBytes();
+ long minBytes = dayBytes / num;
+ for (long i = 0; i < num; i++) {
+ SysStorageEvent newStorageEvent = sysStorageEvent;
+ newStorageEvent.setBytes(minBytes);
+
+ timestamp = timestamp + interval * 60;
+ newStorageEvent.setGeneratedTime(timestamp);
+ newResultList.add(newStorageEvent);
+ }
}
- Long historyUsed = 0L;
- if(list!=null&&list.get(0)!=null){
- Map<String, Object> map = list.get(0);
- if(map!=null&&map.get("used_size")!=null){
- historyUsed = Long.valueOf(String.valueOf(map.get("used_size")));
+ jobResult.setData(newResultList);
+ JobUtil.infoLog("set storage data ... ");
+ }
+ private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
+
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
+ int failCount = 0;
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+ Map<String, Object> ckParamMap = storgeConfig.getCkSource();
+ // 1. 总计
+ ckParamMap.put("query", clickhouseMaxSizeSql);
+ Long totalSize = queryClickhouse(datacenterHost, ckParamMap);
+ //2. 已使用
+ ckParamMap.put("query", clickhouseUsedSizeSql);
+ Long usedSize = queryClickhouse(datacenterHost, ckParamMap);
+
+ //3. 增量
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS);
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.TRAFFIC_LOGS)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(bytes)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString());
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
}
}
- logger.info("query {} history used_size success,{}",logType, historyUsed);
- return historyUsed;
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
- /**
- * @param node
- * @return 自定义的标准时间 last_storage,
- * 存在zookeeper 节点/storage/worker/+node
- */
- public Long getLastStorage(String node) {
- try {
- ZookeeperUtils zk = new ZookeeperUtils();
- //query
- String nodeData = zk.getNodeData(ZOOKEEPER_STORAGE_PATH + node, zookeeperServer);
- //不存在创建一个
- if (ObjectUtils.isEmpty(nodeData)) {
- Long lastTime = System.currentTimeMillis() / 1000;
- zk.modifyNode(ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer);
- return lastTime;
+ private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
+
+ final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
+ final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
+
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
+ int failCount = 0;
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS);
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+ // 1. 总计
+ Long totalSize = queryDruid(datacenterHost, maxSizeSql);
+ //2. 已使用
+ Long usedSize = queryDruid(datacenterHost, usedSizeSql);
+
+ //3. 增量
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS);
+
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.METRICS)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(bytes)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString());
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
}
- Long lastStorage = Long.valueOf(nodeData);
- logger.info("query standard time last_storage success,{}", lastStorage);
-
- return Long.valueOf(nodeData);
- } catch (Exception e) {
- logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e));
- throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e));
}
-
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
- /**
- * 修改zookeeper节点信息
- * <p>
- * path 节点路径/storage/worker/ +node
- */
- public void modifyLastStorage(String node, String data) {
- ZookeeperUtils zk = new ZookeeperUtils();
- //是否存在 不存在创建
- zk.modifyNode(ZOOKEEPER_STORAGE_PATH + node, data, zookeeperServer);
+ private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
+
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
+ int failCount = 0;
+ Long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.FILES);
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+
+ HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName);
+ Long totalSize = hosSpace.getHosCapacity();
+ Long usedSize = hosSpace.getHosUsed();
+ if (totalSize == -1 || usedSize == -1) {
+ throw new BusinessException("hos server error : " + datacenterName);
+ }
+
+ //3. 增量
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES);
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.FILES)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(bytes)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString());
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
+ }
+ }
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
- /**
- * 用于组合正常维度的JSON串
- *
- * @return 结果json
- */
- public Map<String, Map> getDiskJson(boolean ckDay) {
-
- Map all = new HashMap<>();
- List<Map> data = new ArrayList<>();
- Long now = System.currentTimeMillis() / 1000;
- Map<String, Integer> status = new HashMap<>();
- if(ckDay){
- //当clickhouse任务设置在第二天时,这个时间点为前一天的统计数据
- long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd");
- status.put("trafficFailCount", getCKStorageJson(data, timestamp,true));
- } else{
- status.put("trafficFailCount", getCKStorageJson(data, now,false));
- status.put("reportFailCount", getDruidStorageJson(data, now));
- status.put("fileFailCount", getFileStorageJson(data, now));
+ private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception {
+ final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter
+ + "' and generated_time >= UNIX_TIMESTAMP() - " + MAX_QUERY_INTERVAL + " ORDER BY generated_time DESC LIMIT 1;";
+ JobUtil.infoLog(lastUsedSizeSql);
+
+ try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
+ Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler());
+ if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) {
+ return 0L;
+ }
+
+ return usedSize - lastUsedSize.longValue();
+ } catch (Exception e) {
+ throw e;
}
- all.put("data", data);
- all.put("status", status);
- return all;
}
- private int getCKStorageJson(List<Map> data, Long time, boolean day) {
- int errorCount = 0;
- try {
- for (Map.Entry<String, String> dc : storgeConfig.getTrafficDataCenter().entrySet()) {
- try {
- Map<String,Object> traffic = new HashMap<>();
- traffic.put("log_type", Constant.TRAFFIC_LOGS);
- traffic.put("last_storage", getLastStorage(Constant.ZK_TRAFFIC_LOGS));
- traffic.put("data_center",dc.getKey());
- traffic.put("time", time);
- traffic.put("used_size",getClickHouseCurr(dc.getValue()));
- traffic.put("max_size",getClickHouseMax(dc.getValue()));
- if(day){
- //昨日平均一天的5分钟粒度平均值
- long interval = 5;
- long num = 24*60/interval;
- Long ckDiffAvg = getClickHouseDiff(dc.getValue())/num;
- for (int i = 0; i < num; i++) {
- Map<String,Object> trafficAvg = new HashMap<>();
- trafficAvg.putAll(traffic);
- trafficAvg.put("time", time);
- trafficAvg.put("aggregate_size",ckDiffAvg);
- data.add(trafficAvg);
- time = time + interval*60;//转换为秒
- }
- return errorCount;
- }
- data.add(traffic);
- } catch (BusinessException e) {
- logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e));
- XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e));
- errorCount++;
- }
- }
- } catch (BusinessException e) {
- logger.error("clickhouse storage error {}", JobUtil.getErrorMsg(e));
- XxlJobLogger.log("clickhouse storage error {}", JobUtil.getErrorMsg(e));
- errorCount++;
+
+ public Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) {
+ String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get clickhouse http fail -1");
}
- return errorCount;
+ return Long.valueOf(result.trim());
}
- private int getDruidStorageJson(List<Map> data, Long time) {
- int errorCount = 0;
- try {
- for (Map.Entry<String, String> dc : storgeConfig.getAnalyticDataCenter().entrySet()) {
- try {
- Map metrics = new HashMap<>();
- metrics.put("log_type", Constant.REPORT_AND_METRICS);
- metrics.put("time", time);
- metrics.put("last_storage", getLastStorage(Constant.ZK_REPORT_AND_METRICS));
- metrics.put("data_center", dc.getKey());
-
- metrics.put("max_size",getDruidMax(dc.getValue()));
- Long druidUsed = getDruidUsed(dc.getValue());
- metrics.put("used_size",druidUsed);
- metrics.put("aggregate_size", getDiffNum(druidUsed, getCurrBefore(Constant.REPORT_AND_METRICS,dc.getKey())));
- data.add(metrics);
- } catch (BusinessException e) {
- logger.error("druid storage endpoint={}, error {}",dc.getValue(), JobUtil.getErrorMsg(e));
- XxlJobLogger.log("druid storage endpoint={},error {}",dc.getValue(), JobUtil.getErrorMsg(e));
- errorCount++;
- }
- }
- } catch (BusinessException e) {
- logger.error("druid storage error {}", JobUtil.getErrorMsg(e));
- XxlJobLogger.log("druid storage error {}", JobUtil.getErrorMsg(e));
- errorCount++;
+ public Long queryDruid(String datacenterHost, String usedSizeSql) {
+ final String druidPath = "/druid/v2/sql";
+ String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql);
+
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get druid http fail -1");
}
+ return Long.valueOf(result.trim());
+ }
+
+ public HosSpace getHosSpace(String datacenterHost, String datacenterName) {
+ final String fileStoragePath = "/admin/diskspace";
+ Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()),
+ new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
+ String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers);
- return errorCount;
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName);
+ }
+ HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
+ return hosSpace;
}
- /*
- * 集群hbase存储配额
- **/
- private int getFileStorageJson(List<Map> data, Long time) {
- int errorCount = 0;
+
+ /**
+ * @param node
+ * @return 自定义的标准时间 last_storage,
+ * 存在zookeeper 节点/storage/worker/+node
+ */
+ private Long getLastStorage(String node) {
try {
- for (Map.Entry<String, String> entry : storgeConfig.getFilesDataCenter().entrySet()) {
- try {
- Map files = new HashMap<>();
- files.put("log_type", Constant.FILES);
- files.put("time", time);
- files.put("last_storage", getLastStorage(Constant.FILES));
- files.put("data_center",entry.getKey());
- files.putAll(getHBaseStorage(entry.getKey(),entry.getValue()));
- data.add(files);
- } catch (BusinessException e) {
- logger.error("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e));
- XxlJobLogger.log("file storage endpoint={},error={}", entry.getValue(), JobUtil.getErrorMsg(e));
- errorCount++;
- }
+ ZookeeperUtils zk = new ZookeeperUtils();
+ String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer);
+ if (ObjectUtils.isEmpty(nodeData)) {
+ return null;
}
- } catch (BusinessException e) {
- logger.error("file storage error {}", JobUtil.getErrorMsg(e));
- XxlJobLogger.log("file storage error {}", JobUtil.getErrorMsg(e));
- errorCount++;
+ Long lastStorage = Long.valueOf(nodeData);
+ logger.info("query standard time last_storage success,{}", lastStorage);
+ return Long.valueOf(nodeData);
+ } catch (Exception e) {
+ throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e));
}
- return errorCount;
}
- /**
- * 获取差值计算,若为负数则填写0
- *
- * @param now 这一次的值
- * @param before 上一次的值
- * @return 差值
- */
- private Long getDiffNum(Long now, Long before) {
- if(before<=0){
- return 0L;
- }
- long diff = now - before;
- if (diff >= 0) {
- return diff;
- } else {
- return 0L;
- }
+ private void setLastStorage(String path, Long lastStorage) {
+ ZookeeperUtils zk = new ZookeeperUtils();
+ zk.modifyNode(path, String.valueOf(lastStorage), zookeeperServer);
}
}
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
index 6f1321b..bcecbbb 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
@@ -10,7 +10,6 @@ import com.mesalab.executor.core.utils.Constant;
import com.mesalab.executor.core.utils.KafkaUtils;
import com.mesalab.executor.jobhandler.LogStorageQuotaJob;
import com.mesalab.executor.pojo.JDBCParam;
-import com.mesalab.executor.service.StorageQuotaInfoService;
import com.mesalab.executor.service.StorageQuotaService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
@@ -18,7 +17,6 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.HashMap;
@@ -33,17 +31,6 @@ import java.util.stream.Collectors;
public class StorageQuotaTest {
@Autowired
- StorageQuotaService storageQuotaService;
-
- @Test
- public void storageQuotaServiceTest() {
-
- Map diskJson = storageQuotaService.getDiskJson(false);
- Assert.notEmpty(diskJson, "存储配额获取失败");
-
- }
-
- @Autowired
LogStorageQuotaJob logStorageQuotaJob;
/**
@@ -56,19 +43,6 @@ public class StorageQuotaTest {
// Assert.isTrue(200==(deleteAllFiles.getCode()),"success");
}
- /**
- * 设置文件存储策略
- */
-// @Test
-// public void deleteFilesTest() {
-// ReturnT<String> deleteFiles = logStorageQuotaJob.deleteFiles("{\"maxdays\":365}");
-// Assert.isTrue(200 == (deleteFiles.getCode()), "success");
-// }
- @Test
- public void zookeeperTest() {
- Long lastStorage = storageQuotaService.getLastStorage("Files");
- Assert.notNull(lastStorage, "获取标准时间失败");
- }
@Test
public void testS() {
@@ -102,7 +76,7 @@ public class StorageQuotaTest {
@Resource
- StorageQuotaInfoService storageQuotaInfoService;
+ StorageQuotaService storageQuotaService;
@Test
public void testStorage2() {
@@ -114,9 +88,9 @@ public class StorageQuotaTest {
sourceParams.put("username", "root");
sourceParams.put("pin", "galaxy2019");
JDBCParam jdbcParam = new JDBCParam(sourceParams);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true);
+ storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true);
+ storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true);
+ storageQuotaService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true);
}
}