diff options
| author | zhanghongqing <[email protected]> | 2022-10-13 16:58:46 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-10-13 16:58:46 +0800 |
| commit | 0dc921234088e3e8d2476b31d56094366302cf37 (patch) | |
| tree | bd957377310fc814e1b55446bf125f9a95355f28 | |
| parent | a64b75965c8b65386d6eb0f390cd46582cf1df3a (diff) | |
2207版本调度任务修复ck清理失效问题,查询表改为tables_clusterdevelop-fix-2207
| -rw-r--r-- | galaxy-job-executor/pom.xml | 2 | ||||
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java | 91 |
2 files changed, 77 insertions, 16 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml index 0d2f1f9..1edb55b 100644 --- a/galaxy-job-executor/pom.xml +++ b/galaxy-job-executor/pom.xml @@ -180,7 +180,7 @@ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE> </buildArgs> <imageTags> - <imageTag>v1.3.220601</imageTag> + <imageTag>v1.3.220623-rc1</imageTag> </imageTags> <resources> <resource> diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index 341e359..50f5690 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -66,7 +66,7 @@ public class LogStorageQuotaJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - Map<String, Object> deleteParamMap = getDeleteSource(); + Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource(); //根据服务器地址,表,分区去删除数据 String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), "."); @@ -129,7 +129,7 @@ public class LogStorageQuotaJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - Map<String, Object> deleteParamMap = getDeleteSource(); + Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource(); String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), "."); String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ", ck_cluster, " DROP PARTITION "); @@ -138,14 +138,18 @@ public class LogStorageQuotaJob { String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString(); List<String> tablesForCKs = getTablesForCK(ckUrl, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion())); + logger.info("reset storage clickhouse endpoint = {}, tables = {}", ckUrl, tablesForCKs); + XxlJobLogger.log("reset storage clickhouse endpoint = {}, tables = {}", ckUrl, tablesForCKs); + int failCount = 0; // 设置一个获取sql的方法,把表List for (String table : tablesForCKs) { try { - List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); + List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemPartsCluster() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;")); logger.info("reset storage days table {}, drop partition size {} ", table, partitionList.size()); XxlJobLogger.log("reset storage days table {}, drop partition size {} ", table, partitionList.size()); + partitionList.forEach(partition -> { String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'"); deleteParamMap.put("query", deleteSql); @@ -165,6 +169,8 @@ public class LogStorageQuotaJob { if (failCount > 0) { throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount); } + logger.info("reset clickhouse {} days success",maxdays); + XxlJobLogger.log("reset clickhouse {} days success",maxdays); } catch (BusinessException be) { logger.error(be.getMessage()); XxlJobLogger.log(be.getMessage()); @@ -371,8 +377,8 @@ public class LogStorageQuotaJob { * * @param params{"maxdays":365} */ - @XxlJob("deleteAllTrafficDataJobHandler") - public ReturnT<String> deleteAllTrafficData(String params) { + @XxlJob("deleteAllTrafficDataJobHandler2") + public ReturnT<String> deleteAllTrafficData2(String params) { try { Map<String, Object> paramsMap = validParams(params); @@ -381,7 +387,7 @@ public class LogStorageQuotaJob { return IJobHandler.FAIL; } List<String> addressForCKs = getAddressForCK(UrlUtil.getUrl(deletionConfig.getTrafficServer())); - Map<String, Object> deleteParamMap = getDeleteSource(); + Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource(); //清库命令参数 String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), "."); int failCount = 0; @@ -422,6 +428,69 @@ public class LogStorageQuotaJob { } /** + * 清除所有流量数据,click hosue库 + * + * @param params{"maxdays":365} + */ + @XxlJob("deleteAllTrafficDataJobHandler") + public ReturnT<String> deleteAllTrafficData(String params) { + + try { + Map<String, Object> paramsMap = validParams(params); + if (ObjectUtils.isEmpty(paramsMap)) { + logger.error("params parser error , params is {}", params); + return IJobHandler.FAIL; + } + Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource(); + //清库命令参数 + String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), "."); + int failCount = 0; + try { + String url = UrlUtil.getUrl(deletionConfig.getTrafficServer()); + List<String> tablesForCKs = getTablesForCK(url, ""); + logger.info("delete storage clickhouse endpoint = {}, tables = {}", url, tablesForCKs); + XxlJobLogger.log("delete storage clickhouse endpoint = {}, tables = {}", url, tablesForCKs); + + for (String table : tablesForCKs) { + try { + String deleteSqlCluster = String.join(" ",deleteSql, table, " ON CLUSTER ", ck_cluster); + deleteParamMap.put("query", deleteSqlCluster); + + logger.info("delete clickhouse table = {}, sql:{}", table, deleteSqlCluster); + HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), ""); + XxlJobLogger.log("delete clickhouse table = {}" ,table); + + } catch (BusinessException e) { + logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e)); + XxlJobLogger.log("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e)); + failCount++; + } + } + logger.info("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs); + XxlJobLogger.log("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs); + + } catch (BusinessException e) { + logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e)); + XxlJobLogger.log("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e)); + failCount++; + } + + modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, 0); + if (failCount > 0) { + throw new BusinessException("delete storage clickhouse error ,failCount " + failCount); + } + logger.info("delete clickhouse success"); + XxlJobLogger.log("delete clickhouse success"); + } catch (BusinessException be) { + logger.error(be.getMessage()); + XxlJobLogger.log(be.getMessage()); + return ReturnT.FAIL; + } + + return ReturnT.SUCCESS; + } + + /** * 清除所有report metric数据 druid库 * * @param params{"maxdays":365} @@ -616,14 +685,6 @@ public class LogStorageQuotaJob { * user:, * } */ - private Map<String, Object> getDeleteSource() { - //删除sql参数拼接 - Map<String, Object> deleteParamMap = Maps.newHashMap(); - deleteParamMap.put("database", deletionConfig.getTrafficDatasource()); - deleteParamMap.put("password", deletionConfig.getTrafficUserKey()); - deleteParamMap.put("user", deletionConfig.getTrafficUsername()); - return deleteParamMap; - } /** * 必填参数进行验证,其它自行配置 @@ -681,7 +742,7 @@ public class LogStorageQuotaJob { //公共查询clickhouse private List<String> getSystemDataForCK(String url, String sql) { List<String> dataList = Lists.newArrayList(); - Map<String, Object> deleteParamMap = getDeleteSource(); + Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource(); deleteParamMap.put("query", sql); String httpGetResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(url, deleteParamMap)); |
