summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2024-08-09 18:32:45 +0800
committerzhanghongqing <[email protected]>2024-08-09 18:32:45 +0800
commit4c69aa1ca21723e140036184da74f03e5cabfe14 (patch)
tree96b5746a81618dd25019eb96e02a9f5c8891aaf0
parent9a5188c22960ef25c632dd5900d9f285a11806c4 (diff)
[修改][日志删除] 修改删除clickhouse分区方式为挨个请求clickhouse数据节点删除分区
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java76
1 files changed, 74 insertions, 2 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index 7895f59..e93f129 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -29,6 +29,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -41,6 +42,7 @@ public class LogStorageQuotaJob {
private static final String analyticUrl = "/druid/v2/sql";
private static final String ck_cluster = "ck_cluster";
private static final String max_days = "maxDays";
+ private static final String trafficPort = "8123";
@Value("${zookeeper.server}")
private String zookeeperServer;
@@ -52,11 +54,72 @@ public class LogStorageQuotaJob {
/**
* 设置流量日志存储策略储
+ * clickhouse
+ */
+ @XxlJob("deleteTrafficDataJobHandler")
+ public ReturnT<String> deleteTrafficData(String params) {
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+ if (ObjectUtils.isEmpty(paramsMap)) {
+ logger.error("params parser error , params is {}", params);
+ return IJobHandler.FAIL;
+ }
+ Map<String, Object> deleteParamMap = deletionConfig.getCkSource();
+
+ //根据服务器地址,表,分区去删除数据
+ String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), ".");
+ String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -Integer.valueOf(String.valueOf(paramsMap.get(max_days))));
+ String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString();
+ List<String> addressForCKs = getAddressForCK(ckUrl);
+ List<String> tablesForCKs = getTablesForCK(ckUrl,deletionConfig.getNotInSql("name",deletionConfig.getTafficExclusion()));
+ int failCount = 0;
+ // 指定服务器
+ for (String endpoint : addressForCKs) {
+ try {
+ JobUtil.infoLog("-----------------reset storage clickhouse ttl endpoint = {} ------------------", endpoint);
+ String endpointUrl = UrlUtil.getUrl(endpoint);
+ for (String table : tablesForCKs) {
+ try {
+ //指定分区
+ List<String> partitions = getSystemDataForCK(endpointUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;"));
+ for (String partition : partitions) {
+ try {
+ deleteParamMap.put("query", Joiner.on("").join(prefixDeleteSql, table, " DROP PARTITION ", partition));
+ HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(endpointUrl, deleteParamMap), "");
+ } catch (BusinessException e) {
+ failCount++;
+ JobUtil.errorLog("clickhouse ttl error endpoint = {}, table = {}, partition = {}, fail message = {}", endpoint, table, partition, JobUtil.getErrorMsg(e));
+ }
+ }
+ JobUtil.infoLog("reset clickhouse ttl table = {}, partition < {}, size = {}, fail count = {}", table, deleteMaxDate, partitions, failCount);
+ } catch (BusinessException e) {
+ failCount++;
+ JobUtil.errorLog("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e));
+ }
+ }
+ } catch (BusinessException e) {
+ failCount++;
+ JobUtil.errorLog("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e));
+ }
+ }
+ modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, Integer.valueOf(String.valueOf(paramsMap.get(max_days))));
+ if (failCount > 0) {
+ throw new BusinessException("reset storage clickhouse ttl error ,failCount " + failCount);
+ }
+
+ } catch (BusinessException be) {
+ JobUtil.errorLog(be.getMessage());
+ return ReturnT.FAIL;
+ }
+ return ReturnT.SUCCESS;
+ }
+ /**
+ * 设置流量日志存储策略储
* clickhouse 数据库
*
* @param params {"maxDays":30}
*/
- @XxlJob("deleteTrafficDataJobHandler")
+ @XxlJob("deleteTrafficDataJobHandler1")
public ReturnT<String> deleteTrafficDataByCluster(String params) {
try {
Map<String, Object> paramsMap = validParams(params);
@@ -650,7 +713,16 @@ public class LogStorageQuotaJob {
List<Map> startTimeList = (List) JsonMapper.fromJsonString(httpPost, List.class);
return ObjectUtils.isEmpty(startTimeList) ? DateUtils.getCurrentDate() : String.valueOf(startTimeList.get(0).get("version"));
}
-
+ /***
+ * --查所有的IP地址
+ * @Date 2020\9\18 0018
+ * @return java.util.List<java.lang.String>
+ **/
+ private List<String> getAddressForCK(String url) {
+ List<String> endpointList = new ArrayList<>();
+ endpointList.addAll(getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT concat(host_address,':','" + trafficPort + "') as endpoint FROM ", deletionConfig.getSystemClusters(), " FORMAT JSON;")));
+ return endpointList;
+ }
/**
* @param node
* @return 自定义的标准时间 last_storage,