summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--galaxy-job-executor/pom.xml2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java91
2 files changed, 77 insertions, 16 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml
index 0d2f1f9..1edb55b 100644
--- a/galaxy-job-executor/pom.xml
+++ b/galaxy-job-executor/pom.xml
@@ -180,7 +180,7 @@
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<imageTags>
- <imageTag>v1.3.220601</imageTag>
+ <imageTag>v1.3.220623-rc1</imageTag>
</imageTags>
<resources>
<resource>
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index 341e359..50f5690 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -66,7 +66,7 @@ public class LogStorageQuotaJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- Map<String, Object> deleteParamMap = getDeleteSource();
+ Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource();
//根据服务器地址,表,分区去删除数据
String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), ".");
@@ -129,7 +129,7 @@ public class LogStorageQuotaJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- Map<String, Object> deleteParamMap = getDeleteSource();
+ Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource();
String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), ".");
String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ", ck_cluster, " DROP PARTITION ");
@@ -138,14 +138,18 @@ public class LogStorageQuotaJob {
String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString();
List<String> tablesForCKs = getTablesForCK(ckUrl, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion()));
+ logger.info("reset storage clickhouse endpoint = {}, tables = {}", ckUrl, tablesForCKs);
+ XxlJobLogger.log("reset storage clickhouse endpoint = {}, tables = {}", ckUrl, tablesForCKs);
+
int failCount = 0;
// 设置一个获取sql的方法,把表List
for (String table : tablesForCKs) {
try {
- List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;"));
+ List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemPartsCluster() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;"));
logger.info("reset storage days table {}, drop partition size {} ", table, partitionList.size());
XxlJobLogger.log("reset storage days table {}, drop partition size {} ", table, partitionList.size());
+
partitionList.forEach(partition -> {
String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'");
deleteParamMap.put("query", deleteSql);
@@ -165,6 +169,8 @@ public class LogStorageQuotaJob {
if (failCount > 0) {
throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount);
}
+ logger.info("reset clickhouse {} days success",maxdays);
+ XxlJobLogger.log("reset clickhouse {} days success",maxdays);
} catch (BusinessException be) {
logger.error(be.getMessage());
XxlJobLogger.log(be.getMessage());
@@ -371,8 +377,8 @@ public class LogStorageQuotaJob {
*
* @param params{"maxdays":365}
*/
- @XxlJob("deleteAllTrafficDataJobHandler")
- public ReturnT<String> deleteAllTrafficData(String params) {
+ @XxlJob("deleteAllTrafficDataJobHandler2")
+ public ReturnT<String> deleteAllTrafficData2(String params) {
try {
Map<String, Object> paramsMap = validParams(params);
@@ -381,7 +387,7 @@ public class LogStorageQuotaJob {
return IJobHandler.FAIL;
}
List<String> addressForCKs = getAddressForCK(UrlUtil.getUrl(deletionConfig.getTrafficServer()));
- Map<String, Object> deleteParamMap = getDeleteSource();
+ Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource();
//清库命令参数
String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), ".");
int failCount = 0;
@@ -422,6 +428,69 @@ public class LogStorageQuotaJob {
}
/**
+ * 清除所有流量数据,click hosue库
+ *
+ * @param params{"maxdays":365}
+ */
+ @XxlJob("deleteAllTrafficDataJobHandler")
+ public ReturnT<String> deleteAllTrafficData(String params) {
+
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+ if (ObjectUtils.isEmpty(paramsMap)) {
+ logger.error("params parser error , params is {}", params);
+ return IJobHandler.FAIL;
+ }
+ Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource();
+ //清库命令参数
+ String deleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", deletionConfig.getTrafficDatasource(), ".");
+ int failCount = 0;
+ try {
+ String url = UrlUtil.getUrl(deletionConfig.getTrafficServer());
+ List<String> tablesForCKs = getTablesForCK(url, "");
+ logger.info("delete storage clickhouse endpoint = {}, tables = {}", url, tablesForCKs);
+ XxlJobLogger.log("delete storage clickhouse endpoint = {}, tables = {}", url, tablesForCKs);
+
+ for (String table : tablesForCKs) {
+ try {
+ String deleteSqlCluster = String.join(" ",deleteSql, table, " ON CLUSTER ", ck_cluster);
+ deleteParamMap.put("query", deleteSqlCluster);
+
+ logger.info("delete clickhouse table = {}, sql:{}", table, deleteSqlCluster);
+ HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), "");
+ XxlJobLogger.log("delete clickhouse table = {}" ,table);
+
+ } catch (BusinessException e) {
+ logger.error("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("delete storage clickhouse error endpoint = {}, table = {}, fail message = {}", url, table, JobUtil.getErrorMsg(e));
+ failCount++;
+ }
+ }
+ logger.info("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs);
+ XxlJobLogger.log("delete storage clickhouse endpoint = {}, fail count = {}, tables = {}", url, failCount, tablesForCKs);
+
+ } catch (BusinessException e) {
+ logger.error("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("delete storage clickhouse error , fail message = {}", JobUtil.getErrorMsg(e));
+ failCount++;
+ }
+
+ modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, 0);
+ if (failCount > 0) {
+ throw new BusinessException("delete storage clickhouse error ,failCount " + failCount);
+ }
+ logger.info("delete clickhouse success");
+ XxlJobLogger.log("delete clickhouse success");
+ } catch (BusinessException be) {
+ logger.error(be.getMessage());
+ XxlJobLogger.log(be.getMessage());
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+
+ /**
* 清除所有report metric数据 druid库
*
* @param params{"maxdays":365}
@@ -616,14 +685,6 @@ public class LogStorageQuotaJob {
* user:,
* }
*/
- private Map<String, Object> getDeleteSource() {
- //删除sql参数拼接
- Map<String, Object> deleteParamMap = Maps.newHashMap();
- deleteParamMap.put("database", deletionConfig.getTrafficDatasource());
- deleteParamMap.put("password", deletionConfig.getTrafficUserKey());
- deleteParamMap.put("user", deletionConfig.getTrafficUsername());
- return deleteParamMap;
- }
/**
* 必填参数进行验证,其它自行配置
@@ -681,7 +742,7 @@ public class LogStorageQuotaJob {
//公共查询clickhouse
private List<String> getSystemDataForCK(String url, String sql) {
List<String> dataList = Lists.newArrayList();
- Map<String, Object> deleteParamMap = getDeleteSource();
+ Map<String, Object> deleteParamMap = deletionConfig.getDeleteSource();
deleteParamMap.put("query", sql);
String httpGetResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(url, deleteParamMap));