diff options
| author | zhanghongqing <[email protected]> | 2024-06-19 11:14:52 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2024-06-19 11:14:52 +0800 |
| commit | 3e69996133147d1d7119f893a17c657564a8b4ec (patch) | |
| tree | bc27dbe70401d7c667c9137d12850d36d1d66f5f | |
| parent | 901aca4a1785db14dbb54beaee8fbd02e54176dc (diff) | |
[优化][存储配额查询] 优化存储配额生成数据流 TSG-21555
10 files changed, 599 insertions, 63 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml index 86b1e39..325572c 100644 --- a/galaxy-job-executor/pom.xml +++ b/galaxy-job-executor/pom.xml @@ -209,7 +209,7 @@ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE> </buildArgs> <imageTags> - <imageTag>2.2.0.3</imageTag> + <imageTag>2.2.0.4</imageTag> </imageTags> <resources> <resource> diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java index 661087f..30c59e5 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java @@ -6,11 +6,12 @@ public class Constant { public static final String FILES = "Files"; public static final String REPORT_AND_METRICS = "Report and Metrics"; public static final String TRAFFIC_LOGS = "Traffic Logs"; + public static final String METRICS = "Metrics"; // zookeeper /path+node public static final String ZK_TRAFFIC_LOGS = "Traffic-Logs"; public static final String ZK_REPORT_AND_METRICS = "Report-and-Metrics"; - + public static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/"; public static final String TOKEN = "Token"; public static final String TEXT_XML = "text/xml"; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java index 5e57dcd..c134ba2 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java @@ -1,6 +1,7 @@ package com.mesalab.executor.core.utils; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import com.mesalab.executor.pojo.JDBCParam; @@ -8,9 +9,8 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import org.springframework.util.ObjectUtils; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; +import java.lang.reflect.Field; +import java.sql.*; import java.util.List; import java.util.Map; @@ -19,6 +19,7 @@ public class DBUtils { private static Log logger = Log.get(); private static HikariDataSource hikariDataSource; private static String dbParams = "?useUnicode=true&serverTimezone=UTC&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&tinyInt1isBit=false&transformedBitIsBoolean=false"; + private final static int batchSize = 10000; public static Connection getDBConn(JDBCParam jdbcParam) throws SQLException { String url = "jdbc:mysql://" + jdbcParam.getIp() + ":3306/" + jdbcParam.getDatabase() + dbParams; @@ -72,7 +73,7 @@ public class DBUtils { * @param transformResult * @param tableName */ - public String getInsertSql(List<Map> transformResult, String tableName) { + public static String getInsertSql(List<Map> transformResult, String tableName) { String sql = ""; String sqlStr1 = "INSERT IGNORE INTO " + tableName + " ("; String sqlStr2 = ") VALUES ("; @@ -109,7 +110,7 @@ public class DBUtils { /** * update table set a=? where conditions; */ - public String getUpdateSql(List<Map> transformResult, String tableName, String[]... conditions) { + public static String getUpdateSql(List<Map> transformResult, String tableName, String[]... conditions) { String sqlKV = ""; for (Object key : transformResult.get(0).keySet()) { if (!"id".equals(key)) { @@ -131,6 +132,7 @@ public class DBUtils { /** * 构建 INSERT INTO ... ON DUPLICATE KEY UPDATE + * * @param transformResult * @param tableName * @return @@ -139,7 +141,7 @@ public class DBUtils { StringBuilder columns = new StringBuilder(); StringBuilder values = new StringBuilder(); StringBuilder updateStatements = new StringBuilder(); - Map<String,Object> data = transformResult.get(0); + Map<String, Object> data = transformResult.get(0); for (Map.Entry<String, Object> entry : data.entrySet()) { String column = entry.getKey(); if (!"id".equalsIgnoreCase(column)) { @@ -157,4 +159,148 @@ public class DBUtils { } + + public void saveByMap(List<Map> dataList, JDBCParam jdbcParam) throws Exception { + if (ObjectUtil.isEmpty(dataList)) { + logger.info("save data is empty "); + return; + } + Connection conn = null; + PreparedStatement pst = null; + try { + String tableName = jdbcParam.getTable(); + conn = DBUtils.getDBConn(jdbcParam); + long start = System.currentTimeMillis(); + conn.setAutoCommit(false); + pst = conn.prepareStatement(getInsertSql(dataList, tableName)); + int count = 0; + int index = 1; + for (int i = 1; i <= dataList.size(); i++) { + for (Object val : dataList.get(i - 1).values()) { + index = setPstVal(pst, index, val); + } + index = 1; + pst.addBatch(); + //批量提交 + if (i % batchSize == 0) { + int[] ints = pst.executeBatch(); + count = count + ints.length; + conn.commit(); + pst.clearBatch(); + } + } + int[] ints = pst.executeBatch(); + count = count + ints.length; + conn.commit(); + JobUtil.infoLog("save mysql table {}, count {}, take {},", tableName, count, (System.currentTimeMillis() - start) + "ms"); + } catch (Exception e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1); + } + } + logger.error(e); + throw e; + } finally { + IoUtil.close(pst); + IoUtil.close(conn); + } + } + + private int setPstVal(PreparedStatement pst, int index, Object val) throws SQLException { + if (val instanceof String) { + if (val == null) { + pst.setNull(index++, Types.VARCHAR); + } else { + pst.setString((index++), StrUtil.toString(val)); + } + } else if (val instanceof Integer) { + if (val == null) { + pst.setNull(index++, Types.INTEGER); + } else { + pst.setInt((index++), Integer.valueOf(StrUtil.toString(val))); + } + } else if (val instanceof Long) { + if (val == null) { + pst.setNull(index++, Types.BIGINT); + } else { + pst.setLong((index++), Long.valueOf(StrUtil.toString(val))); + } + } else if (val instanceof Boolean) { + if (val == null) { + pst.setNull(index++, Types.BOOLEAN); + } else { + pst.setBoolean((index++), Boolean.valueOf(StrUtil.toString(val))); + } + } else { + pst.setObject((index++), val); + } + return index; + } + + public static <T> void save(List<T> dataList, JDBCParam jdbcParam) throws Exception { + if (dataList == null || dataList.isEmpty()) { + logger.info("Data list is empty, nothing to save."); + return; + } + try (Connection conn = DBUtils.getDBConn(jdbcParam)) { + conn.setAutoCommit(false); + String sql = prepareInsertSql(dataList.get(0), jdbcParam.getTable()); + try (PreparedStatement pst = conn.prepareStatement(sql)) { + int count = executeBatchInsert(dataList, pst); + conn.commit(); + logger.info("Saved {} records to table {} successfully.", count, jdbcParam.getTable()); + } catch (SQLException e) { + conn.rollback(); + logger.error("Error executing batch insert, transaction rolled back.", e); + throw e; + } + } catch (Exception e) { + logger.error("Failed to save data.", e); + throw e; + } + } + + private static <T> int executeBatchInsert(List<T> dataList, PreparedStatement pst) throws IllegalAccessException, SQLException { + int count = 0; + for (int i = 0; i < dataList.size(); i++) { + T item = dataList.get(i); + setPreparedStatementParameters(pst, item); + pst.addBatch(); + if ((i + 1) % batchSize == 0 || i == dataList.size() - 1) { + count += executeAndClearBatch(pst); + } + } + return count; + } + + private static <T> void setPreparedStatementParameters(PreparedStatement pst, T item) throws IllegalAccessException, SQLException { + Field[] fields = item.getClass().getDeclaredFields(); + for (int j = 0; j < fields.length; j++) { + fields[j].setAccessible(true); + pst.setObject(j + 1, fields[j].get(item)); + } + } + + private static int executeAndClearBatch(PreparedStatement pst) throws SQLException { + int[] results = pst.executeBatch(); + pst.clearBatch(); + return results.length; + } + + private static <T> String prepareInsertSql(T item, String tableName) { + StringBuilder columns = new StringBuilder(); + StringBuilder placeholders = new StringBuilder(); + Field[] fields = item.getClass().getDeclaredFields(); + for (Field field : fields) { + columns.append( StrUtil.toUnderlineCase(field.getName())).append(","); + placeholders.append("?,"); + } + columns.setLength(columns.length() - 1); + placeholders.setLength(placeholders.length() - 1); + return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, placeholders); + } + } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java index 4db39a3..0abc2b9 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java @@ -193,7 +193,6 @@ public class LogStorageTtlJob { deleteParamMap.put("database", deletionConfig.getTrafficDatasource()); deleteParamMap.put("password", deletionConfig.getTrafficUserKey()); deleteParamMap.put("user", deletionConfig.getTrafficUsername()); - // deleteParamMap.put("password", "galaxy2019"); return deleteParamMap; } @@ -207,7 +206,6 @@ public class LogStorageTtlJob { Map<String, String> dropTtltables = new HashMap<>(); Configuration conf = Configuration.defaultConfiguration(); Configuration conf2 = conf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL); - // Map<String, Integer> tableobj = new HashMap<>(); for (String tablename : tablesForCKs) { try { diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java new file mode 100644 index 0000000..bf8078a --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java @@ -0,0 +1,50 @@ +package com.mesalab.executor.jobhandler; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.StrUtil; +import com.mesalab.executor.core.utils.JobUtil; +import com.mesalab.executor.pojo.JDBCParam; +import com.mesalab.executor.service.StorageQuotaInfoService; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +/** + * @Description: + * @Author: zhq + * @CreateDate: 2024/6/13 + * @Version: 1.0 + */ + +@Component +public class StorageQuotaInfoJob { + + @Resource + private StorageQuotaInfoService storageQuotaInfoService; + + + @XxlJob("getDatabaseStorageQuotaJobHandler") + public ReturnT<String> getDatabaseStorageQuotaJobHandler(String params) { + + int failCount = 0; + try { + List<Map> paramsMaps = ValidParamUtil.parserParamsSink(params); + for (Map paramsMap : paramsMaps) { + Map<String, Object> source = BeanUtil.beanToMap(paramsMap.get("source")); + String[] items = StrUtil.split(source.get("items").toString(), ","); + JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class); + + failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam); + + } + } catch (Exception e) { + failCount++; + JobUtil.errorLog("getDatabaseStorageQuota job error:", e.getMessage()); + } + return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS; + } +} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java index 937895a..ce41541 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java @@ -36,5 +36,22 @@ public class ValidParamUtil { } return paramList; } - + public static List<Map> parserParamsSink(String params) { + if (StrUtil.isBlank(params)) { + XxlJobLogger.log("params is empty!"); + return null; + } + List<Map> paramList = JSONUtil.toList(params, Map.class); + for (Map<String, Map<String, String>> param : paramList) { + if (ObjectUtil.isNull(param)) { + XxlJobLogger.log("params is not format json ! "); + return null; + } + if (ObjectUtil.isNull(param.get("sink"))) { + XxlJobLogger.log("params sink is null ! "); + return null; + } + } + return paramList; + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java new file mode 100644 index 0000000..36bf082 --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java @@ -0,0 +1,29 @@ +package com.mesalab.executor.pojo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Description: + * @Author: zhq + * @CreateDate: 2024/6/13 + * @Version: 1.0 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SysStorageEvent { + + private Long id; + private String logType; + private String dataCenter; + private Long totalAllocatedSize; + private Long usedSize; + private Long bytes; + private Long sinceTime; + private Long generatedTime; + +} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java index 505369f..c6e0ed9 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java @@ -110,22 +110,20 @@ public class DataSinkService { return; } Connection conn = null; - DBUtils dbUtils = null; PreparedStatement pst = null; try { JDBCParam jdbcParam = new JDBCParam(sinkParams); String tableName = jdbcParam.getTable(); - dbUtils = new DBUtils(); //创建数据库连接库对象 - conn = dbUtils.getDBConn(jdbcParam); + conn = DBUtils.getDBConn(jdbcParam); String sql = ""; if ("update".equals(sinkParams.get("option"))) { - sql = dbUtils.getUpdateSql(transformResult, tableName, parseConditions(sinkParams)); + sql = DBUtils.getUpdateSql(transformResult, tableName, parseConditions(sinkParams)); } else if ("insertOrUpdate".equals(sinkParams.get("option"))) { - sql = dbUtils.getInsertOrUpdateSql(transformResult, tableName); + sql = DBUtils.getInsertOrUpdateSql(transformResult, tableName); } else { - sql = dbUtils.getInsertSql(transformResult, tableName); + sql = DBUtils.getInsertSql(transformResult, tableName); } long start = System.currentTimeMillis(); conn.setAutoCommit(false); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java new file mode 100644 index 0000000..1628922 --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java @@ -0,0 +1,269 @@ +package com.mesalab.executor.service; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.handler.NumberHandler; +import cn.hutool.db.sql.SqlExecutor; +import cn.hutool.log.Log; +import com.google.common.collect.Lists; +import com.mesalab.executor.core.config.StorgeConfig; +import com.mesalab.executor.core.utils.*; +import com.mesalab.executor.exception.BusinessException; +import com.mesalab.executor.pojo.HosSpace; +import com.mesalab.executor.pojo.JDBCParam; +import com.mesalab.executor.pojo.SysStorageEvent; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; + +import java.sql.Connection; +import java.util.List; +import java.util.Map; + +/** + * @Description: + * @Author: zhq + * @CreateDate: 2024/6/13 + * @Version: 1.0 + */ + +@Service +public class StorageQuotaInfoService { + + private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig"); + private Log logger = Log.get(); + + //查询历史数据时允许的最大间隔60 MINUTE + private static final int STORAGE_LIMITATION = 120 * 60; + + @Value("${zookeeper.server}") + private String zookeeperServer; + + + public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam) { + + int failCount = 0; + for (String logType : logTypes) { + try { + switch (logType) { + case Constant.TRAFFIC_LOGS: + failCount += getAndSaveClickhouseStorageInfo(jdbcParam); + break; + case Constant.METRICS: + failCount += getAndSaveDruidStorageInfo(jdbcParam); + break; + case Constant.FILES: + failCount += getAndSaveHosStorageInfo(jdbcParam); + break; + default: + break; + } + + } catch (Exception e) { + failCount++; + JobUtil.errorLog(e.getMessage()); + } + } + return failCount; + } + + private int getAndSaveClickhouseStorageInfo(JDBCParam jdbcParam) throws Exception { + + final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks()); + final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource()); + + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + Map<String, Object> ckParamMap = storgeConfig.getCkSource(); + // 1. 总计 + ckParamMap.put("query", maxSizeSql); + String totalResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); + //2. 已使用 + ckParamMap.put("query", usedSizeSql); + String usedResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); + + if ("-1".equals(totalResult) || "-1".equals(usedResult)) { + throw new BusinessException("Get clickhouse http fail -1"); + } + + Long totalSize = Long.valueOf(totalResult.trim()); + Long usedSize = Long.valueOf(usedResult.trim()); + + //3. 增量 + Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS); + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.TRAFFIC_LOGS) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(increaseSize) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get clickhouse storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", + datacenterName, totalSize, usedSize, increaseSize); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); + } + } + DBUtils.save(sysStorageEvents, jdbcParam); + return failCount; + } + + private int getAndSaveDruidStorageInfo(JDBCParam jdbcParam) throws Exception { + + final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; + final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; + final String druidPath = "/druid/v2/sql"; + + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + // 1. 总计 + String totalResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), maxSizeSql); + //2. 已使用 + String usedResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql); + + if ("-1".equals(totalResult) || "-1".equals(usedResult)) { + throw new BusinessException("Get druid http fail -1"); + } + + Long totalSize = Long.valueOf(totalResult.trim()); + Long usedSize = Long.valueOf(usedResult.trim()); + + //3. 增量 + Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS); + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.METRICS) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(increaseSize) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get druid storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", + datacenterName, totalSize, usedSize, increaseSize); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); + } + } + DBUtils.save(sysStorageEvents, jdbcParam); + return failCount; + } + + private int getAndSaveHosStorageInfo(JDBCParam jdbcParam) throws Exception { + + final String fileStoragePath = "/admin/diskspace"; + + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.FILES); + List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + int failCount = 0; + for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) { + try { + String datacenterHost = datacenterMap.getValue(); + String datacenterName = datacenterMap.getKey(); + + Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; + String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers); + + if ("-1".equals(result) || "-1".equals(result)) { + throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName); + } + HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); + Long totalSize = hosSpace.getHosCapacity(); + Long usedSize = hosSpace.getHosUsed(); + if (totalSize == -1 || usedSize == -1) { + throw new BusinessException("hos server error server: " + datacenterName); + } + + //3. 增量 + Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES); + SysStorageEvent storageEvent = SysStorageEvent.builder() + .logType(Constant.FILES) + .dataCenter(datacenterName) + .generatedTime(generatedTime) + .totalAllocatedSize(totalSize) + .usedSize(usedSize) + .bytes(increaseSize) + .sinceTime(sinceTime) + .build(); + + sysStorageEvents.add(storageEvent); + JobUtil.infoLog("Get hos storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", + datacenterName, totalSize, usedSize, increaseSize); + } catch (Exception e) { + failCount++; + JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); + } + } + DBUtils.save(sysStorageEvents, jdbcParam); + return failCount; + } + + private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception { + final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter + + "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;"; + JobUtil.infoLog(lastUsedSizeSql); + + try (Connection conn = DBUtils.getDBConn(jdbcParam)) { + Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler()); + if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) { + return 0L; + } + + return usedSize - lastUsedSize.longValue(); + } catch (Exception e) { + throw e; + } + } + + /** + * @param node + * @return 自定义的标准时间 last_storage, + * 存在zookeeper 节点/storage/worker/+node + */ + private Long getLastStorage(String node) { + try { + ZookeeperUtils zk = new ZookeeperUtils(); + String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer); + //不存在创建一个 + if (ObjectUtils.isEmpty(nodeData)) { + Long lastTime = System.currentTimeMillis() / 1000; + zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer); + return lastTime; + } + Long lastStorage = Long.valueOf(nodeData); + logger.info("query standard time last_storage success,{}", lastStorage); + + return Long.valueOf(nodeData); + } catch (Exception e) { + logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e)); + throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e)); + } + + } + + +} diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java index e4387eb..454682a 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java @@ -6,8 +6,11 @@ import com.geedgenetworks.utils.StringUtil; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.mesalab.executor.GalaxyExecutorApplication; +import com.mesalab.executor.core.utils.Constant; import com.mesalab.executor.core.utils.KafkaUtils; import com.mesalab.executor.jobhandler.LogStorageQuotaJob; +import com.mesalab.executor.pojo.JDBCParam; +import com.mesalab.executor.service.StorageQuotaInfoService; import com.mesalab.executor.service.StorageQuotaService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; @@ -17,78 +20,103 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.Assert; +import javax.annotation.Resource; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @RunWith(SpringRunner.class) -@SpringBootTest(classes = { GalaxyExecutorApplication.class }) +@SpringBootTest(classes = {GalaxyExecutorApplication.class}) @Slf4j public class StorageQuotaTest { - @Autowired StorageQuotaService storageQuotaService; + @Autowired + StorageQuotaService storageQuotaService; - @Test public void storageQuotaServiceTest() { + @Test + public void storageQuotaServiceTest() { - Map diskJson = storageQuotaService.getDiskJson(false); - Assert.notEmpty(diskJson, "存储配额获取失败"); + Map diskJson = storageQuotaService.getDiskJson(false); + Assert.notEmpty(diskJson, "存储配额获取失败"); - } + } - @Autowired + @Autowired LogStorageQuotaJob logStorageQuotaJob; - /** - * 文件清库 慎 - */ - @Test - public void delteAllFilesTest() { + /** + * 文件清库 慎 + */ + @Test + public void delteAllFilesTest() { // ReturnT<String> deleteAllFiles = logStorageQuotaJob.deleteAllFiles("{\"maxdays\":365}"); // Assert.isTrue(200==(deleteAllFiles.getCode()),"success"); - } + } - /** - * 设置文件存储策略 - */ + /** + * 设置文件存储策略 + */ // @Test // public void deleteFilesTest() { // ReturnT<String> deleteFiles = logStorageQuotaJob.deleteFiles("{\"maxdays\":365}"); // Assert.isTrue(200 == (deleteFiles.getCode()), "success"); // } + @Test + public void zookeeperTest() { + Long lastStorage = storageQuotaService.getLastStorage("Files"); + Assert.notNull(lastStorage, "获取标准时间失败"); + } - @Test - public void zookeeperTest() { - Long lastStorage = storageQuotaService.getLastStorage("Files"); - Assert.notNull(lastStorage, "获取标准时间失败"); - } - @Test - public void testS() { - String filesHosServer ="Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|"; + @Test + public void testS() { + String filesHosServer = "Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|"; // Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|"))); - String b ="Nur-sultan|192.168.40.223:9098"; + String b = "Nur-sultan|192.168.40.223:9098"; // String collect = Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x-> StringUtil.isNotEmpty(x)).collect(Collectors.joining(",")); - List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x->{ - List<String> dc = Splitter.on("|").splitToList(x); - if (dc.size()==2&& StringUtil.isNotBlank(dc.get(1))){ - return ImmutableMap.of(dc.get(0),dc.get(1)); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - log.info(JsonMapper.toJsonString(collect)); - } - - @Test - public void test(){ - String date = DateUtils.getDateOfYesterday("yyyyMMdd"); - long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd"); - log.info(String.valueOf(yyyyMMdd)); - } - @Test - public void testStorage(){ - String msgc ="{\"log_type\":\"Files\",\"aggregate_size\":0,\"data_center\":\"Nur-sultan\",\"time\":1630569900,\"last_storage\":1622299518,\"used_size\":1217383982783,\"max_size\":25169958203392}"; - new KafkaUtils().sendMessage("SYS-STORAGE-LOG", msgc); - } + List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x -> { + List<String> dc = Splitter.on("|").splitToList(x); + if (dc.size() == 2 && StringUtil.isNotBlank(dc.get(1))) { + return ImmutableMap.of(dc.get(0), dc.get(1)); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + log.info(JsonMapper.toJsonString(collect)); + } + + @Test + public void test() { + String date = DateUtils.getDateOfYesterday("yyyyMMdd"); + long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd"); + log.info(String.valueOf(yyyyMMdd)); + } + + @Test + public void testStorage() { + String msgc = "{\"log_type\":\"Files\",\"aggregate_size\":0,\"data_center\":\"Nur-sultan\",\"time\":1630569900,\"last_storage\":1622299518,\"used_size\":1217383982783,\"max_size\":25169958203392}"; + new KafkaUtils().sendMessage("SYS-STORAGE-LOG", msgc); + } + + + @Resource + StorageQuotaInfoService storageQuotaInfoService; + + @Test + public void testStorage2() { + Map<String, String> sourceParams = new HashMap<>(); + sourceParams.put("ip", "192.168.44.12"); + sourceParams.put("type", "mariadb"); + sourceParams.put("database", "tsg_olap"); + sourceParams.put("table", "sys_storage_event"); + sourceParams.put("username", "root"); + sourceParams.put("pin", "galaxy2019"); + JDBCParam jdbcParam = new JDBCParam(sourceParams); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam); + + } } |
