diff options
| author | zhanghongqing <[email protected]> | 2022-06-02 17:04:37 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-06-02 17:04:37 +0800 |
| commit | d2389545db0e6d9b1157c2c618318022b8037368 (patch) | |
| tree | d0d77bc43305dd3ae4642376dd3444f94b9b6e40 | |
| parent | 386fe585fd676ebb6bf701a4c058fc8d6b889dbd (diff) | |
ck设置日志保留时间改为on cluster
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java | 83 |
1 files changed, 68 insertions, 15 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index b867898..1a5aa16 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -41,6 +41,7 @@ public class LogStorageQuotaJob { private static final String analyticUrl = "/druid/v2/sql"; private static final String trafficPort = "8123"; + private static final String ck_cluster = "ck_cluster"; @Autowired StorageQuotaService storageQuotaService; @@ -56,7 +57,7 @@ public class LogStorageQuotaJob { * * @param params {"maxdays":30} */ - @XxlJob("deleteTrafficDataJobHandler") + @XxlJob("deleteTrafficDataJobHandler2") public ReturnT<String> deleteTrafficData(String params) { try { Map<String, Object> paramsMap = validParams(params); @@ -119,6 +120,58 @@ public class LogStorageQuotaJob { return ReturnT.SUCCESS; } + @XxlJob("deleteTrafficDataJobHandler") + public ReturnT<String> deleteTrafficDataByCluster(String params) { + try { + Map<String, Object> paramsMap = validParams(params); + if (ObjectUtils.isEmpty(paramsMap)) { + logger.error("params parser error , params is {}", params); + return IJobHandler.FAIL; + } + Map<String, Object> deleteParamMap = getDeleteSource(); + + String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), "."); + String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ", ck_cluster, " DROP PARTITION "); + Integer maxdays = Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))); + String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -maxdays); + + String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString(); + List<String> tablesForCKs = getTablesForCK(ckUrl, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion())); + int failCount = 0; + // 设置一个获取sql的方法,把表List + for (String table : tablesForCKs) { + + try { + List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); + + partitionList.forEach(partition -> { + String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'"); + deleteParamMap.put("query", deleteSql); + XxlJobLogger.log("reset storage days clickhouse sql:{}", deleteSql); + + String deleteMessage = HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(ckUrl, deleteParamMap), null); + XxlJobLogger.log("reset storage days table {} success, message:{}", table, deleteMessage); + logger.info("reset storage days table {} success, message:{}", table, deleteMessage); + }); + } catch (BusinessException e) { + failCount++; + logger.error("clickhouse ttl error table = {}, fail message = {}", table, JobUtil.getErrorMsg(e)); + XxlJobLogger.log("clickhouse ttl error table = {}, fail message = {}", table, JobUtil.getErrorMsg(e)); + } + } + modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, maxdays); + if (failCount > 0) { + throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount); + } + } catch (BusinessException be) { + logger.error(be.getMessage()); + XxlJobLogger.log(be.getMessage()); + return ReturnT.FAIL; + } + + return ReturnT.SUCCESS; + } + /** * Report and Metrics 日志存储策略 * druid 数据库 @@ -135,7 +188,7 @@ public class LogStorageQuotaJob { return IJobHandler.FAIL; } - String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%' "+deletionConfig.getNotInSql("datasource",deletionConfig.getAnalyticExclusion())+" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; + String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%' " + deletionConfig.getNotInSql("datasource", deletionConfig.getAnalyticExclusion()) + " \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; modifyLastStorage(Constant.ZK_REPORT_AND_METRICS, Integer.valueOf(String.valueOf(paramsMap.get("maxdays")))); deleteDruidTable(queryTablesSql, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))), false); @@ -164,7 +217,7 @@ public class LogStorageQuotaJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource LIKE '%hot%' "+deletionConfig.getNotInSql("datasource",deletionConfig.getAnalyticExclusion())+" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; + String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource LIKE '%hot%' " + deletionConfig.getNotInSql("datasource", deletionConfig.getAnalyticExclusion()) + " \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}"; deleteDruidTable(queryTablesSql, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))), true); } catch (BusinessException be) { logger.error(be.getMessage()); @@ -216,13 +269,13 @@ public class LogStorageQuotaJob { XxlJobLogger.log("reset storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); } } - if(ObjectUtils.isNotEmpty(deletionConfig.getAnalyticExclusion())){ + if (ObjectUtils.isNotEmpty(deletionConfig.getAnalyticExclusion())) { List<String> tableExc = Splitter.on(",").trimResults().splitToList(deletionConfig.getAnalyticExclusion()); for (String table : tableExc) { try { - logger.info("load forever druid table {}",table); + logger.info("load forever druid table {}", table); HttpClientUtils.httpPost(changeRulesUrl + table, "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"_default_tier\":1}}]"); - XxlJobLogger.log("load forever druid table {}",table); + XxlJobLogger.log("load forever druid table {}", table); } catch (BusinessException e) { failCount++; logger.error("load forever storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e)); @@ -250,7 +303,7 @@ public class LogStorageQuotaJob { return IJobHandler.FAIL; } List<String> endpointList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesHosServer()); - List<String> whiteList =StringUtil.isEmpty(deletionConfig.getFilesExclusion())?null:Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion()); + List<String> whiteList = StringUtil.isEmpty(deletionConfig.getFilesExclusion()) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion()); Header header1 = new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken()); Header header2 = new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML); Header[] headers = {header1, header2}; @@ -259,7 +312,7 @@ public class LogStorageQuotaJob { try { String filesServer = UrlUtil.getUrl(endpoint); String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header1); - if(httpGetRes!=null&&!"-1".equals(httpGetRes)){ + if (httpGetRes != null && !"-1".equals(httpGetRes)) { ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes); logger.info("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, list); if (ObjectUtils.allNotNull(list, list.getBuckets(), list.getBuckets().getBucket())) { @@ -271,7 +324,7 @@ public class LogStorageQuotaJob { for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) { try { - if(ObjectUtils.isNotEmpty(whiteList)&&whiteList.contains(bucket.getName())){ + if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) { LifecycleConfiguration whiteLc = new LifecycleConfiguration(); whiteLc.getRule().setExpiration(new LifecycleConfiguration.Expiration("24855"));//hos保存最大年限 HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(whiteLc), headers); @@ -333,7 +386,7 @@ public class LogStorageQuotaJob { for (String endpoint : addressForCKs) { try { String url = UrlUtil.getUrl(endpoint); - List<String> tablesForCKs = getTablesForCK(url,""); + List<String> tablesForCKs = getTablesForCK(url, ""); for (String table : tablesForCKs) { try { deleteParamMap.put("query", deleteSql.concat(table)); @@ -470,7 +523,7 @@ public class LogStorageQuotaJob { String filesServer = UrlUtil.getUrl(endpoint); String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header); - if(httpGetRes!=null&&!"-1".equals(httpGetRes)){ + if (httpGetRes != null && !"-1".equals(httpGetRes)) { ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes); logger.info("delete storage files ttl endpoint = {}, bucket list = {} ", endpoint, list); XxlJobLogger.log("endpoint = {}, bucket list = {} ", endpoint, list); @@ -482,7 +535,7 @@ public class LogStorageQuotaJob { for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) { try { //排除默认桶 - if(!"default".equals(bucket.getName())){ + if (!"default".equals(bucket.getName())) { HttpClientUtils.httpDelete(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?truncate"), header); } } catch (BusinessException e) { @@ -495,7 +548,7 @@ public class LogStorageQuotaJob { } else { failCount++; logger.error("delete file error failCount = {}, endpoint = {} , fail message = I/O exception", failCount, endpoint); - XxlJobLogger.log("endpoint = {} , fail message = I/O exception",endpoint); + XxlJobLogger.log("endpoint = {} , fail message = I/O exception", endpoint); } } catch (BusinessException e) { failCount++; @@ -619,8 +672,8 @@ public class LogStorageQuotaJob { * * @return List<String> */ - private List<String> getTablesForCK(String url,String condition) { - return getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(name) FROM ", deletionConfig.getSystemTables(), " WHERE database = '", deletionConfig.getTrafficDatasource(), "' AND engine in ('MergeTree','ReplicatedMergeTree')", condition==null?"":condition, " FORMAT JSON;")); + private List<String> getTablesForCK(String url, String condition) { + return getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(name) FROM ", deletionConfig.getSystemTables(), " WHERE database = '", deletionConfig.getTrafficDatasource(), "' AND engine in ('MergeTree','ReplicatedMergeTree')", condition == null ? "" : condition, " FORMAT JSON;")); } //公共查询clickhouse |
