summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-06-02 17:04:37 +0800
committerzhanghongqing <[email protected]>2022-06-02 17:04:37 +0800
commitd2389545db0e6d9b1157c2c618318022b8037368 (patch)
treed0d77bc43305dd3ae4642376dd3444f94b9b6e40
parent386fe585fd676ebb6bf701a4c058fc8d6b889dbd (diff)
ck设置日志保留时间改为on cluster
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java83
1 files changed, 68 insertions, 15 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index b867898..1a5aa16 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -41,6 +41,7 @@ public class LogStorageQuotaJob {
private static final String analyticUrl = "/druid/v2/sql";
private static final String trafficPort = "8123";
+ private static final String ck_cluster = "ck_cluster";
@Autowired
StorageQuotaService storageQuotaService;
@@ -56,7 +57,7 @@ public class LogStorageQuotaJob {
*
* @param params {"maxdays":30}
*/
- @XxlJob("deleteTrafficDataJobHandler")
+ @XxlJob("deleteTrafficDataJobHandler2")
public ReturnT<String> deleteTrafficData(String params) {
try {
Map<String, Object> paramsMap = validParams(params);
@@ -119,6 +120,58 @@ public class LogStorageQuotaJob {
return ReturnT.SUCCESS;
}
+ @XxlJob("deleteTrafficDataJobHandler")
+ public ReturnT<String> deleteTrafficDataByCluster(String params) {
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+ if (ObjectUtils.isEmpty(paramsMap)) {
+ logger.error("params parser error , params is {}", params);
+ return IJobHandler.FAIL;
+ }
+ Map<String, Object> deleteParamMap = getDeleteSource();
+
+ String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", deletionConfig.getTrafficDatasource(), ".");
+ String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ", ck_cluster, " DROP PARTITION ");
+ Integer maxdays = Integer.valueOf(String.valueOf(paramsMap.get("maxdays")));
+ String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -maxdays);
+
+ String ckUrl = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8).toString();
+ List<String> tablesForCKs = getTablesForCK(ckUrl, deletionConfig.getNotInSql("name", deletionConfig.getTafficExclusion()));
+ int failCount = 0;
+ // 设置一个获取sql的方法,把表List
+ for (String table : tablesForCKs) {
+
+ try {
+ List<String> partitionList = getSystemDataForCK(ckUrl, Joiner.on("").join("SELECT DISTINCT(`partition`) from " + deletionConfig.getSystemParts() + " WHERE table = '", table, "' AND `partition` < '", deleteMaxDate, "' FORMAT JSON;"));
+
+ partitionList.forEach(partition -> {
+ String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, "'",partition,"'");
+ deleteParamMap.put("query", deleteSql);
+ XxlJobLogger.log("reset storage days clickhouse sql:{}", deleteSql);
+
+ String deleteMessage = HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(ckUrl, deleteParamMap), null);
+ XxlJobLogger.log("reset storage days table {} success, message:{}", table, deleteMessage);
+ logger.info("reset storage days table {} success, message:{}", table, deleteMessage);
+ });
+ } catch (BusinessException e) {
+ failCount++;
+ logger.error("clickhouse ttl error table = {}, fail message = {}", table, JobUtil.getErrorMsg(e));
+ XxlJobLogger.log("clickhouse ttl error table = {}, fail message = {}", table, JobUtil.getErrorMsg(e));
+ }
+ }
+ modifyLastStorage(Constant.ZK_TRAFFIC_LOGS, maxdays);
+ if (failCount > 0) {
+ throw new BusinessException("reset storage clickhouse ttl error, failCount " + failCount);
+ }
+ } catch (BusinessException be) {
+ logger.error(be.getMessage());
+ XxlJobLogger.log(be.getMessage());
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+
/**
* Report and Metrics 日志存储策略
* druid 数据库
@@ -135,7 +188,7 @@ public class LogStorageQuotaJob {
return IJobHandler.FAIL;
}
- String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%' "+deletionConfig.getNotInSql("datasource",deletionConfig.getAnalyticExclusion())+" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%' " + deletionConfig.getNotInSql("datasource", deletionConfig.getAnalyticExclusion()) + " \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
modifyLastStorage(Constant.ZK_REPORT_AND_METRICS, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))));
deleteDruidTable(queryTablesSql, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))), false);
@@ -164,7 +217,7 @@ public class LogStorageQuotaJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource LIKE '%hot%' "+deletionConfig.getNotInSql("datasource",deletionConfig.getAnalyticExclusion())+" \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource LIKE '%hot%' " + deletionConfig.getNotInSql("datasource", deletionConfig.getAnalyticExclusion()) + " \",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
deleteDruidTable(queryTablesSql, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))), true);
} catch (BusinessException be) {
logger.error(be.getMessage());
@@ -216,13 +269,13 @@ public class LogStorageQuotaJob {
XxlJobLogger.log("reset storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e));
}
}
- if(ObjectUtils.isNotEmpty(deletionConfig.getAnalyticExclusion())){
+ if (ObjectUtils.isNotEmpty(deletionConfig.getAnalyticExclusion())) {
List<String> tableExc = Splitter.on(",").trimResults().splitToList(deletionConfig.getAnalyticExclusion());
for (String table : tableExc) {
try {
- logger.info("load forever druid table {}",table);
+ logger.info("load forever druid table {}", table);
HttpClientUtils.httpPost(changeRulesUrl + table, "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"_default_tier\":1}}]");
- XxlJobLogger.log("load forever druid table {}",table);
+ XxlJobLogger.log("load forever druid table {}", table);
} catch (BusinessException e) {
failCount++;
logger.error("load forever storage druid ttl endpoint = {}, table = {}, fail message = {}", deletionConfig.getAnalyticServer(), table, JobUtil.getErrorMsg(e));
@@ -250,7 +303,7 @@ public class LogStorageQuotaJob {
return IJobHandler.FAIL;
}
List<String> endpointList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesHosServer());
- List<String> whiteList =StringUtil.isEmpty(deletionConfig.getFilesExclusion())?null:Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion());
+ List<String> whiteList = StringUtil.isEmpty(deletionConfig.getFilesExclusion()) ? null : Splitter.on(",").trimResults().omitEmptyStrings().splitToList(deletionConfig.getFilesExclusion());
Header header1 = new BasicHeader(Constant.TOKEN, deletionConfig.getFilesToken());
Header header2 = new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML);
Header[] headers = {header1, header2};
@@ -259,7 +312,7 @@ public class LogStorageQuotaJob {
try {
String filesServer = UrlUtil.getUrl(endpoint);
String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header1);
- if(httpGetRes!=null&&!"-1".equals(httpGetRes)){
+ if (httpGetRes != null && !"-1".equals(httpGetRes)) {
ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
logger.info("reset storage files ttl endpoint = {}, bucketList = {} ", endpoint, list);
if (ObjectUtils.allNotNull(list, list.getBuckets(), list.getBuckets().getBucket())) {
@@ -271,7 +324,7 @@ public class LogStorageQuotaJob {
for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) {
try {
- if(ObjectUtils.isNotEmpty(whiteList)&&whiteList.contains(bucket.getName())){
+ if (ObjectUtils.isNotEmpty(whiteList) && whiteList.contains(bucket.getName())) {
LifecycleConfiguration whiteLc = new LifecycleConfiguration();
whiteLc.getRule().setExpiration(new LifecycleConfiguration.Expiration("24855"));//hos保存最大年限
HttpClientUtils.httpPut(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?lifecycle"), XmlUtil.converToXml(whiteLc), headers);
@@ -333,7 +386,7 @@ public class LogStorageQuotaJob {
for (String endpoint : addressForCKs) {
try {
String url = UrlUtil.getUrl(endpoint);
- List<String> tablesForCKs = getTablesForCK(url,"");
+ List<String> tablesForCKs = getTablesForCK(url, "");
for (String table : tablesForCKs) {
try {
deleteParamMap.put("query", deleteSql.concat(table));
@@ -470,7 +523,7 @@ public class LogStorageQuotaJob {
String filesServer = UrlUtil.getUrl(endpoint);
String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer, "/hos/"), header);
- if(httpGetRes!=null&&!"-1".equals(httpGetRes)){
+ if (httpGetRes != null && !"-1".equals(httpGetRes)) {
ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
logger.info("delete storage files ttl endpoint = {}, bucket list = {} ", endpoint, list);
XxlJobLogger.log("endpoint = {}, bucket list = {} ", endpoint, list);
@@ -482,7 +535,7 @@ public class LogStorageQuotaJob {
for (ListAllMyBucketsResult.Buckets.Bucket bucket : bucketList) {
try {
//排除默认桶
- if(!"default".equals(bucket.getName())){
+ if (!"default".equals(bucket.getName())) {
HttpClientUtils.httpDelete(Joiner.on("").join(filesServer, "/hos/", bucket.getName(), "?truncate"), header);
}
} catch (BusinessException e) {
@@ -495,7 +548,7 @@ public class LogStorageQuotaJob {
} else {
failCount++;
logger.error("delete file error failCount = {}, endpoint = {} , fail message = I/O exception", failCount, endpoint);
- XxlJobLogger.log("endpoint = {} , fail message = I/O exception",endpoint);
+ XxlJobLogger.log("endpoint = {} , fail message = I/O exception", endpoint);
}
} catch (BusinessException e) {
failCount++;
@@ -619,8 +672,8 @@ public class LogStorageQuotaJob {
*
* @return List<String>
*/
- private List<String> getTablesForCK(String url,String condition) {
- return getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(name) FROM ", deletionConfig.getSystemTables(), " WHERE database = '", deletionConfig.getTrafficDatasource(), "' AND engine in ('MergeTree','ReplicatedMergeTree')", condition==null?"":condition, " FORMAT JSON;"));
+ private List<String> getTablesForCK(String url, String condition) {
+ return getSystemDataForCK(url, Joiner.on("").join("SELECT DISTINCT(name) FROM ", deletionConfig.getSystemTables(), " WHERE database = '", deletionConfig.getTrafficDatasource(), "' AND engine in ('MergeTree','ReplicatedMergeTree')", condition == null ? "" : condition, " FORMAT JSON;"));
}
//公共查询clickhouse