diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-05-26 16:20:56 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-05-26 16:20:56 +0800 |
| commit | 36748438ad5c637c8351b95239752a0f28843bae (patch) | |
| tree | 1f1bc680000eecdf2ebbba87f195b5568645099d | |
| parent | eacb33ffb79a753837e6131c7f4faccd3b26b523 (diff) | |
增加日志输出
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java | 346 |
1 files changed, 190 insertions, 156 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java index 9c7a71c..47669dd 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java @@ -21,7 +21,6 @@ import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -34,7 +33,7 @@ public class LogStorageTtlJob { private Log logger = Log.get(); - private static final String trafficPort = "8124"; + private static final String trafficPort = "8123"; @Autowired StorageQuotaService storageQuotaService; @@ -55,11 +54,11 @@ public class LogStorageTtlJob { @XxlJob("changeCkTtlJobHandler") public ReturnT<String> changeCkTtl(String params) { try { - - UrlBuilder ckurlBuilder = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8); + // deletionConfig.setTrafficServer("192.168.40.203:8124"); + UrlBuilder ckurlBuilder = UrlBuilder.ofHttp(deletionConfig.getTrafficServer(), StandardCharsets.UTF_8); if (ifDdlQueueEmpty(ckurlBuilder.toString())) { - + // if (true) { List<String> addressForCKs = getAddressForCK(ckurlBuilder.toString()); List<String> tablesForCKs = getTablesForCK(ckurlBuilder.toString(), deletionConfig.getNotInSql("name", null)); Map<String, Map<String, String>> getChangeTtlTables = getChangeTtlTable(tablesForCKs, addressForCKs); @@ -69,7 +68,8 @@ public class LogStorageTtlJob { Map<String, Map<String, ColumnParam>> getChangeTtlColumns = getChangeTtlColumns(table, addressForCKs); if (getChangeTtlColumns == null) { - logger.info("ChangeTtlColumns is empty"); + logger.info("ChangeTtlColumns "+table+" is error"); + XxlJobLogger.log("ChangeTtlColumns "+table+" is error"); continue; } modifyColumnTtl(ckurlBuilder, getChangeTtlColumns, table); @@ -92,89 +92,96 @@ public class LogStorageTtlJob { private Boolean modifyTableTtl(UrlBuilder ckurlBuilder, Map<String, Map<String, String>> changeTtlTables) { try { - Map<String, String> modifyTtlTables = changeTtlTables.get("modifyTtlTables"); - Map<String, String> dropTtlTables = changeTtlTables.get("dropTtlTables"); + Map<String, String> modifyTtlTables = changeTtlTables.get("modifyTtlTables"); + Map<String, String> dropTtlTables = changeTtlTables.get("dropTtlTables"); + + if (modifyTtlTables != null && modifyTtlTables.size() > 0) { - if (modifyTtlTables!=null && modifyTtlTables.size() > 0 ) { + for (Map.Entry<String, String> entry : modifyTtlTables.entrySet()) { - for (Map.Entry<String, String> entry : modifyTtlTables.entrySet()) { + try { + StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + entry.getKey() + " on cluster ck_cluster MODIFY TTL " + entry.getValue()); + alterTablesForCK(ckurlBuilder.toString(), sqery.toString()); + XxlJobLogger.log("table ddl sql : " + sqery.toString()); + } catch (RuntimeException be) { - System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue()); - StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + entry.getKey() + " on cluster ck_cluster MODIFY TTL " + entry.getValue()); - alterTablesForCK(ckurlBuilder.toString(), sqery.toString()); - XxlJobLogger.log("table ddl sql : " + sqery.toString()); + XxlJobLogger.log("modify table " + entry.getKey() + "ddl sql error: " + be.toString()); + } + } } - } - if (dropTtlTables!=null && dropTtlTables.size() > 0) { + if (dropTtlTables != null && dropTtlTables.size() > 0) { + for (Map.Entry<String, String> entry : dropTtlTables.entrySet()) { - System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue()); - StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + entry.getKey() + " on cluster ck_cluster REMOVE TTL"); - alterTablesForCK(ckurlBuilder.toString(), sqery.toString()); - XxlJobLogger.log("table ddl sql : " + sqery.toString()); + try { + StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + entry.getKey() + " on cluster ck_cluster REMOVE TTL"); + alterTablesForCK(ckurlBuilder.toString(), sqery.toString()); + XxlJobLogger.log("table ddl sql : " + sqery.toString()); + } catch (RuntimeException be) { + + XxlJobLogger.log("drop table " + entry.getKey() + "ddl sql error: " + be.toString()); + } } } - } - catch (RuntimeException be) { + } catch (RuntimeException be) { - logger.error("modifyTableTtl function error " + be.getMessage()); - XxlJobLogger.log(be.getMessage()); - } - return true; + logger.error("modifyTableTtl function error " + be.getMessage()); + XxlJobLogger.log(be.getMessage()); } + return true; + } private Boolean modifyColumnTtl(UrlBuilder ckurlBuilder, Map<String, Map<String, ColumnParam>> getChangeTtlColumns, String table) { try { - Map<String, ColumnParam> modifyTtlColumns = getChangeTtlColumns.get("modifyTtlColumns"); - Map<String, ColumnParam> dropTtlColumns = getChangeTtlColumns.get("dropTtlColumns"); + Map<String, ColumnParam> modifyTtlColumns = getChangeTtlColumns.get("modifyTtlColumns"); + Map<String, ColumnParam> dropTtlColumns = getChangeTtlColumns.get("dropTtlColumns"); - if (modifyTtlColumns.size() > 0 || dropTtlColumns.size() > 0) { - StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + table + " on cluster ck_cluster "); + if (modifyTtlColumns.size() > 0 || dropTtlColumns.size() > 0) { + StringBuilder sqery = new StringBuilder("alter table " + deletionConfig.getTrafficDatasource() + "." + table + " on cluster ck_cluster "); - for (Map.Entry<String, ColumnParam> entry : modifyTtlColumns.entrySet()) { - sqery.append("MODIFY COLUMN if EXISTS "); - sqery.append(entry.getValue().getName()); - sqery.append(" "); - sqery.append(entry.getValue().getType()); - sqery.append(" TTL "); - sqery.append(entry.getValue().getTtl()); - sqery.append(","); - } + for (Map.Entry<String, ColumnParam> entry : modifyTtlColumns.entrySet()) { + sqery.append("MODIFY COLUMN if EXISTS "); + sqery.append(entry.getValue().getName()); + sqery.append(" "); + sqery.append(entry.getValue().getType()); + sqery.append(" TTL "); + sqery.append(entry.getValue().getTtl()); + sqery.append(","); + } - for (Map.Entry<String, ColumnParam> entry : dropTtlColumns.entrySet()) { - sqery.append("MODIFY COLUMN if EXISTS "); - sqery.append(entry.getValue().getName()); - sqery.append(" "); - sqery.append("REMOVE TTL"); - sqery.append(","); + for (Map.Entry<String, ColumnParam> entry : dropTtlColumns.entrySet()) { + sqery.append("MODIFY COLUMN if EXISTS "); + sqery.append(entry.getValue().getName()); + sqery.append(" "); + sqery.append("REMOVE TTL"); + sqery.append(","); - } - sqery.deleteCharAt(sqery.length() - 1); - String tquery = sqery.toString(); - logger.info("ddl sql : " + tquery); - XxlJobLogger.log("ddl sql : " + tquery); - alterTablesForCK(ckurlBuilder.toString(), tquery); + } + sqery.deleteCharAt(sqery.length() - 1); + String tquery = sqery.toString(); + logger.info("ddl sql : " + tquery); + XxlJobLogger.log("ddl sql : " + tquery); + alterTablesForCK(ckurlBuilder.toString(), tquery); - } else { - logger.info(table + "column ttl no change"); - XxlJobLogger.log(table + "column ttl no change"); - } - } - catch (RuntimeException be) { + } else { + logger.info(table + "column ttl no change"); + XxlJobLogger.log(table + " column ttl no change"); + } + } catch (RuntimeException be) { logger.error("modifyColumnTtl function error " + be.getMessage()); XxlJobLogger.log(be.getMessage()); } - return true; + return true; } private Map<String, Object> getDeleteSource() { @@ -183,99 +190,120 @@ public class LogStorageTtlJob { deleteParamMap.put("database", deletionConfig.getTrafficDatasource()); deleteParamMap.put("password", deletionConfig.getTrafficUserKey()); deleteParamMap.put("user", deletionConfig.getTrafficUsername()); - // deleteParamMap.put("password", "galaxy2019"); + // deleteParamMap.put("password", "galaxy2019"); return deleteParamMap; } private Map<String, Map<String, String>> getChangeTtlTable(List<String> tablesForCKs, List<String> addressForCKs) { + Map<String, Map<String, String>> ChangeTtltables = new HashMap<>(); try { - Map<String, Map<String, String>> ChangeTtlColumns = new HashMap<>(); - Map<String, String> modifyTtlColumns = new HashMap<>(); - Map<String, String> dropTtlColumns = new HashMap<>(); - Map<String, Object> deleteParamMap = getDeleteSource(); + Map<String, String> modifyTtltables = new HashMap<>(); + Map<String, String> dropTtltables = new HashMap<>(); Configuration conf = Configuration.defaultConfiguration(); Configuration conf2 = conf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL); + // Map<String, Integer> tableobj = new HashMap<>(); for (String tablename : tablesForCKs) { + try { + String viewName = tablename.replace("_local", ""); + String content = configService.getConfig(viewName + ".json", "Galaxy", 3000); - String viewName = tablename.replace("_local", ""); - String content = configService.getConfig(viewName + ".json", "Galaxy", 3000); + if (content != null && !"".equals(content)) { - if (content != null && !"".equals(content)) { + String getTableDdlsql = "SELECT name ,extract(engine_full, '.*toIntervalSecond\\((\\d+)\\)') as tablettl from `system`.tables where name ='" + tablename + "' and database = '" + deletionConfig.getTrafficDatasource() + "' and engine ='MergeTree' format JSON"; - String getTableDdlsql = "SELECT name ,extract(engine_full, '.*toIntervalSecond\\((\\d+)\\)') as tablettl from `system`.tables where tablettl!='' and database = '" + deletionConfig.getTrafficDatasource() + "' and engine ='MergeTree' and engine_full like '%TTL%' format JSON"; + String databaseschema = "$..data[*]"; + String ttltime = null; + String partitionKey = null; + Map<String, Object> schemaobj = (Map) JsonMapper.fromJsonString(content, Map.class); - String databaseschema = "$..data[*]"; - // String tableschema = "$.[data]"; + if (schemaobj.containsKey("doc")) { + Map<String, Object> schemaHasDoc = (Map<String, Object>) schemaobj.get("doc"); + if (schemaHasDoc.containsKey("partition_key")) { - // List<Map<String, Object>> schemaTableMessageList = JsonPath.using(conf2).parse(content).read(tableschema, List.class); + if (schemaHasDoc.containsKey("ttl")) { - for (String ip : addressForCKs) { - String tableDdl = gettableDdl(ip,getTableDdlsql); + ttltime = schemaHasDoc.get("ttl").toString().trim(); + partitionKey = schemaHasDoc.get("partition_key").toString().trim(); + } else { - List<Map<String, Object>> databaseMessageList = JsonPath.using(conf2).parse(tableDdl).read(databaseschema, List.class); - Map<String, Integer> tableobj = new HashMap<>(); - for (Map<String, Object> obj : databaseMessageList) { - String tName = (String) obj.get("name"); - Integer tTtl = Integer.valueOf((String) obj.get("tablettl")); - tableobj.put(tName, tTtl); + + } + } else { + + logger.error("table " + tablename + " schema not containsKey partition_key error "); + XxlJobLogger.log("table " + tablename + " schema not containsKey partition_key error "); + } + } else { + logger.error("table " + tablename + " schema not containsKey doc error "); + XxlJobLogger.log("table " + tablename + " schema not containsKey doc error "); } - Map<String, Object> schemaobj = (Map) JSON.parse(content); - if (schemaobj.containsKey("doc")) { - Map<String, Object> schemaColumnHasDoc = (Map<String, Object>) schemaobj.get("doc"); - if (schemaColumnHasDoc.containsKey("ttl") && schemaColumnHasDoc.containsKey("partition_key")) { - Integer ttltime = (Integer) schemaColumnHasDoc.get("ttl"); - String partitionKey = (String) schemaColumnHasDoc.get("partition_key"); + for (String ip : addressForCKs) { + try { + String tableDdl = gettableDdl(ip, getTableDdlsql); - if (tableobj.containsKey(tablename)) { - if (!ttltime.equals(tableobj.get(tablename))) { - String ttlvalue = "toDateTime(" + partitionKey + ") + toIntervalSecond(" + ttltime + ")"; - modifyTtlColumns.put(tablename, ttlvalue); - } + List<Map<String, Object>> databaseMessageList = JsonPath.using(conf2).parse(tableDdl).read(databaseschema, List.class); - } else { - String ttlvalue = "toDateTime(" + partitionKey + ") + toIntervalSecond(" + ttltime + ")"; - modifyTtlColumns.put(tablename, ttlvalue); + if (databaseMessageList.size() > 0) { + String tTtl = databaseMessageList.get(0).get("tablettl").toString().trim(); - } + if (ttltime != null) { + if (!ttltime.equals(tTtl)) { - } else { + String ttlvalue = "toDateTime(" + partitionKey + ") + toIntervalSecond(" + ttltime + ")"; + modifyTtltables.put(tablename, ttlvalue); + XxlJobLogger.log("TableTTl is different ! ip:" + ip + " tablename: " + tablename + " TTL :" + tTtl + " schematableTTL :" + ttltime + " modify table TTL!"); + } + } else { + + if (!"".equals(tTtl)) { + dropTtltables.put(tablename, ""); + XxlJobLogger.log("TableTTl is different ! ip:" + ip + " tablename: " + tablename + " TTL :" + tTtl + " schematableTTL :" + ttltime + " drop table TTL!"); + + } - if (tableobj.containsKey(tablename)) { - dropTtlColumns.put(tablename, ""); + + } } - } - } else { - } + } catch (RuntimeException be) { + logger.error("table " + tablename + " foreach local ip " + ip + " error " + be.getMessage()); + XxlJobLogger.log(be.getMessage()); + + } + } + } else { + XxlJobLogger.log(tablename + " table is not exist in schema !"); } - } else { - XxlJobLogger.log(tablename + " table is not exist in schema !"); - } + } catch (RuntimeException be) { + logger.error(" getChangeTtltables foreach table " + tablename + " error " + be.getMessage()); + XxlJobLogger.log(" getChangeTtltables foreach table " + tablename + " error " + be.getMessage()); + } } - ChangeTtlColumns.put("modifyTtlTables", modifyTtlColumns); - ChangeTtlColumns.put("dropTtlTables", dropTtlColumns); - return ChangeTtlColumns; + + ChangeTtltables.put("modifyTtlTables", modifyTtltables); + ChangeTtltables.put("dropTtlTables", dropTtltables); + return ChangeTtltables; } catch (RuntimeException | NacosException be) { - logger.error(" getChangeTtlColumns function error " + be.getMessage()); - XxlJobLogger.log(be.getMessage()); + logger.error(" getChangeTtltables function error " + be.getMessage()); + XxlJobLogger.log(" getChangeTtltables function error " + be.getMessage()); } - return null; + return ChangeTtltables; } private String gettableDdl(String ip, String getTableDdlsql) { @@ -300,33 +328,53 @@ public class LogStorageTtlJob { private Map<String, Map<String, ColumnParam>> getChangeTtlColumns(String tablename, List<String> addressForCKs) { try { - - + Map<String, Map<String, ColumnParam>> ChangeTtlColumns = new HashMap<>(); + Map<String, ColumnParam> modifyTtlColumns = new HashMap<>(); + Map<String, ColumnParam> dropTtlColumns = new HashMap<>(); String viewName = tablename.replace("_local", ""); String content = configService.getConfig(viewName + ".json", "Galaxy", 3000); + String partitionKey = ""; + Configuration conf = Configuration.defaultConfiguration(); + Configuration conf2 = conf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL); + String getdatabaseDDL = "desc " + deletionConfig.getTrafficDatasource() + "." + tablename + " format JSON"; + String schema = "$..fields[*]"; + String databaseschema = "$['data']"; + Map<String, Object> deleteParamMap = getDeleteSource(); + List<Map<String, Object>> schemaColumnMessageList = null; if (content != null && !"".equals(content)) { - Map<String, Map<String, ColumnParam>> ChangeTtlColumns = new HashMap<>(); - Configuration conf = Configuration.defaultConfiguration(); + Map<String, Object> schemaobj = (Map) JSON.parse(content); - // List<Object> revenueList = context.read("$[*]['box office']"); - Configuration conf2 = conf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL); - String getdatabaseDDL = "desc " + deletionConfig.getTrafficDatasource() + "." + tablename + " format JSON"; - Map<String, Object> deleteParamMap = getDeleteSource(); + if (schemaobj.containsKey("doc")) { + Map<String, Object> schemaHasDoc = (Map<String, Object>) schemaobj.get("doc"); + partitionKey = (String) schemaHasDoc.get("partition_key"); + if (schemaHasDoc.containsKey("partition_key")) { + schemaColumnMessageList = JsonPath.using(conf2).parse(content).read(schema, List.class); - String schema = "$..fields[*]"; - String databaseschema = "$['data']"; + } else { + logger.error("table " + tablename + " schema not containsKey partition_key error "); + XxlJobLogger.log("table " + tablename + " schema not containsKey partition_key error "); + return null; + } + }else { - Map<String, ColumnParam> modifyTtlColumns = new HashMap<>(); - Map<String, ColumnParam> dropTtlColumns = new HashMap<>(); + logger.error("table " + tablename + " schema not containsKey doc error "); + XxlJobLogger.log("table " + tablename + " schema not containsKey doc error "); + return null; + } + } else { + logger.error("table " + tablename + " schema not exist "); + XxlJobLogger.log(tablename + " table is not exist in schema !"); + return null; + } - for (String ip : addressForCKs) { + for (String ip : addressForCKs) { + try { String databaseDdl = HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams("http://" + ip, deleteParamMap), getdatabaseDDL); - List<Map<String, Object>> schemaColumnMessageList = JsonPath.using(conf2).parse(content).read(schema, List.class); List<Map<String, Object>> databaseColumnMessageList = JsonPath.using(conf2).parse(databaseDdl).read(databaseschema, List.class); Map<String, ColumnParam> tableobj = new HashMap<>(); for (Map<String, Object> obj : databaseColumnMessageList) { @@ -349,14 +397,18 @@ public class LogStorageTtlJob { if (schemaColumnHasDoc.containsKey("ttl")) { Integer ttltime = (Integer) schemaColumnHasDoc.get("ttl"); - String ttlvalue = "toDateTime(common_recv_time) + toIntervalSecond(" + ttltime + ")"; - if (!ttlvalue.equals(tableobj.get(column).getTtl())) { + String ttlvalue = "toDateTime(" + partitionKey + ") + toIntervalSecond(" + ttltime + ")"; + if (!ttlvalue.equals(tableobj.get(column).getTtl().trim())) { tableobj.get(column).setTtl(ttlvalue); modifyTtlColumns.put(column, tableobj.get(column)); + // XxlJobLogger.log("ChangeTtlColumns error! ip:"+ip+" tablename :"+tablename+" different ttl columns: "+be.getMessage()); + } + + } else { - if(tableobj.containsKey(column)) { + if (tableobj.containsKey(column)) { if (tableobj.get(column).getTtl() != null && !("").equals(tableobj.get(column).getTtl())) { tableobj.get(column).setTtl(""); dropTtlColumns.put(column, tableobj.get(column)); @@ -365,7 +417,7 @@ public class LogStorageTtlJob { } } } else { - if(tableobj.containsKey(column)) { + if (tableobj.containsKey(column)) { if (tableobj.get(column).getTtl() != null && !("").equals(tableobj.get(column).getTtl())) { tableobj.get(column).setTtl(""); @@ -376,46 +428,30 @@ public class LogStorageTtlJob { } } + } catch (RuntimeException be) { + + logger.error("getChangeTtlColumns error! ip:" + ip + " tablename :" + tablename + " message: " + be.getMessage()); + XxlJobLogger.log("getChangeTtlColumns error! ip:" + ip + " tablename :" + tablename + " message: " + be.getMessage()); + } - ChangeTtlColumns.put("modifyTtlColumns", modifyTtlColumns); - ChangeTtlColumns.put("dropTtlColumns", dropTtlColumns); - return ChangeTtlColumns; - } else { - XxlJobLogger.log(tablename + " table is not exist in schema !"); - return null; } + + + ChangeTtlColumns.put("modifyTtlColumns", modifyTtlColumns); + ChangeTtlColumns.put("dropTtlColumns", dropTtlColumns); + return ChangeTtlColumns; + } catch (RuntimeException | NacosException be) { - be.printStackTrace(); logger.error("getChangeTtlColumns function error " + be.getMessage()); - XxlJobLogger.log(be.getMessage()); + XxlJobLogger.log("getChangeTtlColumns function error " + be.getMessage()); } return null; } - /** - * 必填参数进行验证,其它自行配置 - * - * @param params - */ - private Map<String, Object> validParams(String params) { - logger.info("params{}", params); - - if (StringUtil.isBlank(params)) { - XxlJobLogger.log("params is Empty !"); - return null; - } - Map<String, Object> paramMap = (Map) JsonMapper.fromJsonString(params, Map.class); - if (paramMap == null) { - XxlJobLogger.log("params error !"); - return null; - } - return paramMap; - } private Boolean alterTablesForCK(String url, String sql) { - // sql = sql.replaceAll("\\+", "%2B"); return excutesqlToCK(url, sql); } @@ -474,7 +510,6 @@ public class LogStorageTtlJob { private List<String> getSystemDataForCK(String url, String sql) { List<String> dataList = Lists.newArrayList(); Map<String, Object> deleteParamMap = getDeleteSource(); - //deleteParamMap.put("query", sql); String httpGetResult = HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(url, deleteParamMap), sql); if ("-1".equals(httpGetResult)) { @@ -499,5 +534,4 @@ public class LogStorageTtlJob { } - } |
