diff options
| author | zhanghongqing <[email protected]> | 2024-08-09 18:32:45 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2024-08-09 18:32:45 +0800 |
| commit | 4c69aa1ca21723e140036184da74f03e5cabfe14 (patch) | |
| tree | 96b5746a81618dd25019eb96e02a9f5c8891aaf0 | |
| parent | 9a5188c22960ef25c632dd5900d9f285a11806c4 (diff) | |
[修改][日志删除] 修改删除clickhouse分区方式为挨个请求clickhouse数据节点删除分区
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java | 76 |
1 files changed, 74 insertions, 2 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index 7895f59..e93f129 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -29,6 +29,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +42,7 @@ public class LogStorageQuotaJob { private static final String analyticUrl = "/druid/v2/sql"; private static final String ck_cluster = "ck_cluster"; private static final String max_days = "maxDays"; + private static final String trafficPort = "8123"; @Value("${zookeeper.server}") private String zookeeperServer; @@ -52,11 +54,72 @@ public class LogStorageQuotaJob { /** * 设置流量日志存储策略储 + * clickhouse + */ + @XxlJob("deleteTrafficDataJobHandler") + public ReturnT<String> deleteTrafficData(String params) { + try { + Map<String, Object> paramsMap = validParams(params); + if (ObjectUtils.isEmpty(paramsMap)) { + logger.error("params parser error , params is {}", params); + return IJobHandler.FAIL; + } + Map<String, Object> deleteParamMap = deletionConfig.getCkSource(); + + //根据服务器地址,表,分区去删除数据 + String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), "."); + String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -Integer.valueOf(String.valueOf(paramsMap.get(max_days)))); + String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString(); + List<String> addressForCKs = getAddressForCK(ckUrl); + List<String> tablesForCKs = getTablesForCK(ckUrl,deletionConfig.getNotInSql("name",deletionConfig.getTafficExclusion())); + int failCount = 0; + // 指定服务器 + for (String endpoint : addressForCKs) { + try { + JobUtil.infoLog("-----------------reset storage clickhouse ttl endpoint = {} ------------------", endpoint); + String endpointUrl = UrlUtil.getUrl(endpoint); + for (String table : tablesForCKs) { + try { + //指定分区 + List<String> partitions = getSystemDataForCK(endpointUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); + for (String partition : partitions) { + try { + deleteParamMap.put("query", Joiner.on("").join(prefixDeleteSql, table, " DROP PARTITION ", partition)); + HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(endpointUrl, deleteParamMap), ""); + } catch (BusinessException e) { + failCount++; + JobUtil.errorLog("clickhouse ttl error endpoint = {}, table = {}, partition = {}, fail message = {}", endpoint, table, partition, JobUtil.getErrorMsg(e)); + } + } + JobUtil.infoLog("reset clickhouse ttl table = {}, partition < {}, size = {}, fail count = {}", table, deleteMaxDate, partitions, failCount); + } catch (BusinessException e) { + failCount++; + JobUtil.errorLog("clickhouse ttl error endpoint = {},table = {}, fail message = {}", endpoint, table, JobUtil.getErrorMsg(e)); + } + } + } catch (BusinessException e) { + failCount++; + JobUtil.errorLog("clickhouse ttl error endpoint = {}, fail message = {}", endpoint, JobUtil.getErrorMsg(e)); + } + } + modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, Integer.valueOf(String.valueOf(paramsMap.get(max_days)))); + if (failCount > 0) { + throw new BusinessException("reset storage clickhouse ttl error ,failCount " + failCount); + } + + } catch (BusinessException be) { + JobUtil.errorLog(be.getMessage()); + return ReturnT.FAIL; + } + return ReturnT.SUCCESS; + } + /** + * 设置流量日志存储策略储 * clickhouse 数据库 * * @param params {"maxDays":30} */ - @XxlJob("deleteTrafficDataJobHandler") + @XxlJob("deleteTrafficDataJobHandler1") public ReturnT<String> deleteTrafficDataByCluster(String params) { try { Map<String, Object> paramsMap = validParams(params); @@ -650,7 +713,16 @@ public class LogStorageQuotaJob { List<Map> startTimeList = (List) JsonMapper.fromJsonString(httpPost, List.class); return ObjectUtils.isEmpty(startTimeList) ? DateUtils.getCurrentDate() : String.valueOf(startTimeList.get(0).get("version")); } - + /*** + * --查所有的IP地址 + * @Date 2020\9\18 0018 + * @return java.util.List<java.lang.String> + **/ + private List<String> getAddressForCK(String url) { + List<String> endpointList = new ArrayList<>(); + endpointList.addAll(getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT concat(host_address,':','" + trafficPort + "') as endpoint FROM ", deletionConfig.getSystemClusters(), " FORMAT JSON;"))); + return endpointList; + } /** * @param node * @return 自定义的标准时间 last_storage, |
