summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2024-06-19 11:14:52 +0800
committerzhanghongqing <[email protected]>2024-06-19 11:14:52 +0800
commit3e69996133147d1d7119f893a17c657564a8b4ec (patch)
treebc27dbe70401d7c667c9137d12850d36d1d66f5f
parent901aca4a1785db14dbb54beaee8fbd02e54176dc (diff)
[优化][存储配额查询] 优化存储配额生成数据流 TSG-21555
-rw-r--r--galaxy-job-executor/pom.xml2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java3
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java158
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java50
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java19
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java29
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java10
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java269
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java120
10 files changed, 599 insertions, 63 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml
index 86b1e39..325572c 100644
--- a/galaxy-job-executor/pom.xml
+++ b/galaxy-job-executor/pom.xml
@@ -209,7 +209,7 @@
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<imageTags>
- <imageTag>2.2.0.3</imageTag>
+ <imageTag>2.2.0.4</imageTag>
</imageTags>
<resources>
<resource>
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
index 661087f..30c59e5 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/Constant.java
@@ -6,11 +6,12 @@ public class Constant {
public static final String FILES = "Files";
public static final String REPORT_AND_METRICS = "Report and Metrics";
public static final String TRAFFIC_LOGS = "Traffic Logs";
+ public static final String METRICS = "Metrics";
// zookeeper /path+node
public static final String ZK_TRAFFIC_LOGS = "Traffic-Logs";
public static final String ZK_REPORT_AND_METRICS = "Report-and-Metrics";
-
+ public static final String ZOOKEEPER_STORAGE_PATH = "/storage/worker/";
public static final String TOKEN = "Token";
public static final String TEXT_XML = "text/xml";
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java
index 5e57dcd..c134ba2 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/DBUtils.java
@@ -1,6 +1,7 @@
package com.mesalab.executor.core.utils;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import com.mesalab.executor.pojo.JDBCParam;
@@ -8,9 +9,8 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.util.ObjectUtils;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
+import java.lang.reflect.Field;
+import java.sql.*;
import java.util.List;
import java.util.Map;
@@ -19,6 +19,7 @@ public class DBUtils {
private static Log logger = Log.get();
private static HikariDataSource hikariDataSource;
private static String dbParams = "?useUnicode=true&serverTimezone=UTC&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&tinyInt1isBit=false&transformedBitIsBoolean=false";
+ private final static int batchSize = 10000;
public static Connection getDBConn(JDBCParam jdbcParam) throws SQLException {
String url = "jdbc:mysql://" + jdbcParam.getIp() + ":3306/" + jdbcParam.getDatabase() + dbParams;
@@ -72,7 +73,7 @@ public class DBUtils {
* @param transformResult
* @param tableName
*/
- public String getInsertSql(List<Map> transformResult, String tableName) {
+ public static String getInsertSql(List<Map> transformResult, String tableName) {
String sql = "";
String sqlStr1 = "INSERT IGNORE INTO " + tableName + " (";
String sqlStr2 = ") VALUES (";
@@ -109,7 +110,7 @@ public class DBUtils {
/**
* update table set a=? where conditions;
*/
- public String getUpdateSql(List<Map> transformResult, String tableName, String[]... conditions) {
+ public static String getUpdateSql(List<Map> transformResult, String tableName, String[]... conditions) {
String sqlKV = "";
for (Object key : transformResult.get(0).keySet()) {
if (!"id".equals(key)) {
@@ -131,6 +132,7 @@ public class DBUtils {
/**
* 构建 INSERT INTO ... ON DUPLICATE KEY UPDATE
+ *
* @param transformResult
* @param tableName
* @return
@@ -139,7 +141,7 @@ public class DBUtils {
StringBuilder columns = new StringBuilder();
StringBuilder values = new StringBuilder();
StringBuilder updateStatements = new StringBuilder();
- Map<String,Object> data = transformResult.get(0);
+ Map<String, Object> data = transformResult.get(0);
for (Map.Entry<String, Object> entry : data.entrySet()) {
String column = entry.getKey();
if (!"id".equalsIgnoreCase(column)) {
@@ -157,4 +159,148 @@ public class DBUtils {
}
+
+ public void saveByMap(List<Map> dataList, JDBCParam jdbcParam) throws Exception {
+ if (ObjectUtil.isEmpty(dataList)) {
+ logger.info("save data is empty ");
+ return;
+ }
+ Connection conn = null;
+ PreparedStatement pst = null;
+ try {
+ String tableName = jdbcParam.getTable();
+ conn = DBUtils.getDBConn(jdbcParam);
+ long start = System.currentTimeMillis();
+ conn.setAutoCommit(false);
+ pst = conn.prepareStatement(getInsertSql(dataList, tableName));
+ int count = 0;
+ int index = 1;
+ for (int i = 1; i <= dataList.size(); i++) {
+ for (Object val : dataList.get(i - 1).values()) {
+ index = setPstVal(pst, index, val);
+ }
+ index = 1;
+ pst.addBatch();
+ //批量提交
+ if (i % batchSize == 0) {
+ int[] ints = pst.executeBatch();
+ count = count + ints.length;
+ conn.commit();
+ pst.clearBatch();
+ }
+ }
+ int[] ints = pst.executeBatch();
+ count = count + ints.length;
+ conn.commit();
+ JobUtil.infoLog("save mysql table {}, count {}, take {},", tableName, count, (System.currentTimeMillis() - start) + "ms");
+ } catch (Exception e) {
+ if (conn != null) {
+ try {
+ conn.rollback();
+ } catch (SQLException e1) {
+ logger.error(e1);
+ }
+ }
+ logger.error(e);
+ throw e;
+ } finally {
+ IoUtil.close(pst);
+ IoUtil.close(conn);
+ }
+ }
+
+ private int setPstVal(PreparedStatement pst, int index, Object val) throws SQLException {
+ if (val instanceof String) {
+ if (val == null) {
+ pst.setNull(index++, Types.VARCHAR);
+ } else {
+ pst.setString((index++), StrUtil.toString(val));
+ }
+ } else if (val instanceof Integer) {
+ if (val == null) {
+ pst.setNull(index++, Types.INTEGER);
+ } else {
+ pst.setInt((index++), Integer.valueOf(StrUtil.toString(val)));
+ }
+ } else if (val instanceof Long) {
+ if (val == null) {
+ pst.setNull(index++, Types.BIGINT);
+ } else {
+ pst.setLong((index++), Long.valueOf(StrUtil.toString(val)));
+ }
+ } else if (val instanceof Boolean) {
+ if (val == null) {
+ pst.setNull(index++, Types.BOOLEAN);
+ } else {
+ pst.setBoolean((index++), Boolean.valueOf(StrUtil.toString(val)));
+ }
+ } else {
+ pst.setObject((index++), val);
+ }
+ return index;
+ }
+
+ public static <T> void save(List<T> dataList, JDBCParam jdbcParam) throws Exception {
+ if (dataList == null || dataList.isEmpty()) {
+ logger.info("Data list is empty, nothing to save.");
+ return;
+ }
+ try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
+ conn.setAutoCommit(false);
+ String sql = prepareInsertSql(dataList.get(0), jdbcParam.getTable());
+ try (PreparedStatement pst = conn.prepareStatement(sql)) {
+ int count = executeBatchInsert(dataList, pst);
+ conn.commit();
+ logger.info("Saved {} records to table {} successfully.", count, jdbcParam.getTable());
+ } catch (SQLException e) {
+ conn.rollback();
+ logger.error("Error executing batch insert, transaction rolled back.", e);
+ throw e;
+ }
+ } catch (Exception e) {
+ logger.error("Failed to save data.", e);
+ throw e;
+ }
+ }
+
+ private static <T> int executeBatchInsert(List<T> dataList, PreparedStatement pst) throws IllegalAccessException, SQLException {
+ int count = 0;
+ for (int i = 0; i < dataList.size(); i++) {
+ T item = dataList.get(i);
+ setPreparedStatementParameters(pst, item);
+ pst.addBatch();
+ if ((i + 1) % batchSize == 0 || i == dataList.size() - 1) {
+ count += executeAndClearBatch(pst);
+ }
+ }
+ return count;
+ }
+
+ private static <T> void setPreparedStatementParameters(PreparedStatement pst, T item) throws IllegalAccessException, SQLException {
+ Field[] fields = item.getClass().getDeclaredFields();
+ for (int j = 0; j < fields.length; j++) {
+ fields[j].setAccessible(true);
+ pst.setObject(j + 1, fields[j].get(item));
+ }
+ }
+
+ private static int executeAndClearBatch(PreparedStatement pst) throws SQLException {
+ int[] results = pst.executeBatch();
+ pst.clearBatch();
+ return results.length;
+ }
+
+ private static <T> String prepareInsertSql(T item, String tableName) {
+ StringBuilder columns = new StringBuilder();
+ StringBuilder placeholders = new StringBuilder();
+ Field[] fields = item.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ columns.append( StrUtil.toUnderlineCase(field.getName())).append(",");
+ placeholders.append("?,");
+ }
+ columns.setLength(columns.length() - 1);
+ placeholders.setLength(placeholders.length() - 1);
+ return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, columns, placeholders);
+ }
+
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
index 4db39a3..0abc2b9 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageTtlJob.java
@@ -193,7 +193,6 @@ public class LogStorageTtlJob {
deleteParamMap.put("database", deletionConfig.getTrafficDatasource());
deleteParamMap.put("password", deletionConfig.getTrafficUserKey());
deleteParamMap.put("user", deletionConfig.getTrafficUsername());
- // deleteParamMap.put("password", "galaxy2019");
return deleteParamMap;
}
@@ -207,7 +206,6 @@ public class LogStorageTtlJob {
Map<String, String> dropTtltables = new HashMap<>();
Configuration conf = Configuration.defaultConfiguration();
Configuration conf2 = conf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL);
- // Map<String, Integer> tableobj = new HashMap<>();
for (String tablename : tablesForCKs) {
try {
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java
new file mode 100644
index 0000000..bf8078a
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java
@@ -0,0 +1,50 @@
+package com.mesalab.executor.jobhandler;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
+import com.mesalab.executor.core.utils.JobUtil;
+import com.mesalab.executor.pojo.JDBCParam;
+import com.mesalab.executor.service.StorageQuotaInfoService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Description:
+ * @Author: zhq
+ * @CreateDate: 2024/6/13
+ * @Version: 1.0
+ */
+
+@Component
+public class StorageQuotaInfoJob {
+
+ @Resource
+ private StorageQuotaInfoService storageQuotaInfoService;
+
+
+ @XxlJob("getDatabaseStorageQuotaJobHandler")
+ public ReturnT<String> getDatabaseStorageQuotaJobHandler(String params) {
+
+ int failCount = 0;
+ try {
+ List<Map> paramsMaps = ValidParamUtil.parserParamsSink(params);
+ for (Map paramsMap : paramsMaps) {
+ Map<String, Object> source = BeanUtil.beanToMap(paramsMap.get("source"));
+ String[] items = StrUtil.split(source.get("items").toString(), ",");
+ JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class);
+
+ failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam);
+
+ }
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("getDatabaseStorageQuota job error:", e.getMessage());
+ }
+ return failCount > 0 ? ReturnT.FAIL : ReturnT.SUCCESS;
+ }
+}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
index 937895a..ce41541 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/ValidParamUtil.java
@@ -36,5 +36,22 @@ public class ValidParamUtil {
}
return paramList;
}
-
+ public static List<Map> parserParamsSink(String params) {
+ if (StrUtil.isBlank(params)) {
+ XxlJobLogger.log("params is empty!");
+ return null;
+ }
+ List<Map> paramList = JSONUtil.toList(params, Map.class);
+ for (Map<String, Map<String, String>> param : paramList) {
+ if (ObjectUtil.isNull(param)) {
+ XxlJobLogger.log("params is not format json ! ");
+ return null;
+ }
+ if (ObjectUtil.isNull(param.get("sink"))) {
+ XxlJobLogger.log("params sink is null ! ");
+ return null;
+ }
+ }
+ return paramList;
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java
new file mode 100644
index 0000000..36bf082
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/SysStorageEvent.java
@@ -0,0 +1,29 @@
+package com.mesalab.executor.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Description:
+ * @Author: zhq
+ * @CreateDate: 2024/6/13
+ * @Version: 1.0
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SysStorageEvent {
+
+ private Long id;
+ private String logType;
+ private String dataCenter;
+ private Long totalAllocatedSize;
+ private Long usedSize;
+ private Long bytes;
+ private Long sinceTime;
+ private Long generatedTime;
+
+}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
index 505369f..c6e0ed9 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
@@ -110,22 +110,20 @@ public class DataSinkService {
return;
}
Connection conn = null;
- DBUtils dbUtils = null;
PreparedStatement pst = null;
try {
JDBCParam jdbcParam = new JDBCParam(sinkParams);
String tableName = jdbcParam.getTable();
- dbUtils = new DBUtils();
//创建数据库连接库对象
- conn = dbUtils.getDBConn(jdbcParam);
+ conn = DBUtils.getDBConn(jdbcParam);
String sql = "";
if ("update".equals(sinkParams.get("option"))) {
- sql = dbUtils.getUpdateSql(transformResult, tableName, parseConditions(sinkParams));
+ sql = DBUtils.getUpdateSql(transformResult, tableName, parseConditions(sinkParams));
} else if ("insertOrUpdate".equals(sinkParams.get("option"))) {
- sql = dbUtils.getInsertOrUpdateSql(transformResult, tableName);
+ sql = DBUtils.getInsertOrUpdateSql(transformResult, tableName);
} else {
- sql = dbUtils.getInsertSql(transformResult, tableName);
+ sql = DBUtils.getInsertSql(transformResult, tableName);
}
long start = System.currentTimeMillis();
conn.setAutoCommit(false);
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
new file mode 100644
index 0000000..1628922
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
@@ -0,0 +1,269 @@
+package com.mesalab.executor.service;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.db.handler.NumberHandler;
+import cn.hutool.db.sql.SqlExecutor;
+import cn.hutool.log.Log;
+import com.google.common.collect.Lists;
+import com.mesalab.executor.core.config.StorgeConfig;
+import com.mesalab.executor.core.utils.*;
+import com.mesalab.executor.exception.BusinessException;
+import com.mesalab.executor.pojo.HosSpace;
+import com.mesalab.executor.pojo.JDBCParam;
+import com.mesalab.executor.pojo.SysStorageEvent;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpHeaders;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+
+import java.sql.Connection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Description:
+ * @Author: zhq
+ * @CreateDate: 2024/6/13
+ * @Version: 1.0
+ */
+
+@Service
+public class StorageQuotaInfoService {
+
+ private StorgeConfig storgeConfig = (StorgeConfig) SpringContextUtil.getBean("storgeConfig");
+ private Log logger = Log.get();
+
+ //查询历史数据时允许的最大间隔60 MINUTE
+ private static final int STORAGE_LIMITATION = 120 * 60;
+
+ @Value("${zookeeper.server}")
+ private String zookeeperServer;
+
+
+ public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam) {
+
+ int failCount = 0;
+ for (String logType : logTypes) {
+ try {
+ switch (logType) {
+ case Constant.TRAFFIC_LOGS:
+ failCount += getAndSaveClickhouseStorageInfo(jdbcParam);
+ break;
+ case Constant.METRICS:
+ failCount += getAndSaveDruidStorageInfo(jdbcParam);
+ break;
+ case Constant.FILES:
+ failCount += getAndSaveHosStorageInfo(jdbcParam);
+ break;
+ default:
+ break;
+ }
+
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog(e.getMessage());
+ }
+ }
+ return failCount;
+ }
+
+ private int getAndSaveClickhouseStorageInfo(JDBCParam jdbcParam) throws Exception {
+
+ final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks());
+ final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource());
+
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+ Map<String, Object> ckParamMap = storgeConfig.getCkSource();
+ // 1. 总计
+ ckParamMap.put("query", maxSizeSql);
+ String totalResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
+ //2. 已使用
+ ckParamMap.put("query", usedSizeSql);
+ String usedResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
+
+ if ("-1".equals(totalResult) || "-1".equals(usedResult)) {
+ throw new BusinessException("Get clickhouse http fail -1");
+ }
+
+ Long totalSize = Long.valueOf(totalResult.trim());
+ Long usedSize = Long.valueOf(usedResult.trim());
+
+ //3. 增量
+ Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS);
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.TRAFFIC_LOGS)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(increaseSize)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get clickhouse storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
+ datacenterName, totalSize, usedSize, increaseSize);
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
+ }
+ }
+ DBUtils.save(sysStorageEvents, jdbcParam);
+ return failCount;
+ }
+
+ private int getAndSaveDruidStorageInfo(JDBCParam jdbcParam) throws Exception {
+
+ final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
+ final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
+ final String druidPath = "/druid/v2/sql";
+
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+ // 1. 总计
+ String totalResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), maxSizeSql);
+ //2. 已使用
+ String usedResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql);
+
+ if ("-1".equals(totalResult) || "-1".equals(usedResult)) {
+ throw new BusinessException("Get druid http fail -1");
+ }
+
+ Long totalSize = Long.valueOf(totalResult.trim());
+ Long usedSize = Long.valueOf(usedResult.trim());
+
+ //3. 增量
+ Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS);
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.METRICS)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(increaseSize)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get druid storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
+ datacenterName, totalSize, usedSize, increaseSize);
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
+ }
+ }
+ DBUtils.save(sysStorageEvents, jdbcParam);
+ return failCount;
+ }
+
+ private int getAndSaveHosStorageInfo(JDBCParam jdbcParam) throws Exception {
+
+ final String fileStoragePath = "/admin/diskspace";
+
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.FILES);
+ List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ int failCount = 0;
+ for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) {
+ try {
+ String datacenterHost = datacenterMap.getValue();
+ String datacenterName = datacenterMap.getKey();
+
+ Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
+ String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers);
+
+ if ("-1".equals(result) || "-1".equals(result)) {
+ throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName);
+ }
+ HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
+ Long totalSize = hosSpace.getHosCapacity();
+ Long usedSize = hosSpace.getHosUsed();
+ if (totalSize == -1 || usedSize == -1) {
+ throw new BusinessException("hos server error server: " + datacenterName);
+ }
+
+ //3. 增量
+ Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES);
+ SysStorageEvent storageEvent = SysStorageEvent.builder()
+ .logType(Constant.FILES)
+ .dataCenter(datacenterName)
+ .generatedTime(generatedTime)
+ .totalAllocatedSize(totalSize)
+ .usedSize(usedSize)
+ .bytes(increaseSize)
+ .sinceTime(sinceTime)
+ .build();
+
+ sysStorageEvents.add(storageEvent);
+ JobUtil.infoLog("Get hos storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
+ datacenterName, totalSize, usedSize, increaseSize);
+ } catch (Exception e) {
+ failCount++;
+ JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
+ }
+ }
+ DBUtils.save(sysStorageEvents, jdbcParam);
+ return failCount;
+ }
+
+ private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception {
+ final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter
+ + "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;";
+ JobUtil.infoLog(lastUsedSizeSql);
+
+ try (Connection conn = DBUtils.getDBConn(jdbcParam)) {
+ Number lastUsedSize = SqlExecutor.query(conn, lastUsedSizeSql, new NumberHandler());
+ if (lastUsedSize == null || lastUsedSize.longValue() <= 0 || usedSize - lastUsedSize.longValue() <= 0) {
+ return 0L;
+ }
+
+ return usedSize - lastUsedSize.longValue();
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * @param node
+ * @return 自定义的标准时间 last_storage,
+ * 存在zookeeper 节点/storage/worker/+node
+ */
+ private Long getLastStorage(String node) {
+ try {
+ ZookeeperUtils zk = new ZookeeperUtils();
+ String nodeData = zk.getNodeData(Constant.ZOOKEEPER_STORAGE_PATH + node, zookeeperServer);
+ //不存在创建一个
+ if (ObjectUtils.isEmpty(nodeData)) {
+ Long lastTime = System.currentTimeMillis() / 1000;
+ zk.modifyNode(Constant.ZOOKEEPER_STORAGE_PATH + node, String.valueOf(lastTime), zookeeperServer);
+ return lastTime;
+ }
+ Long lastStorage = Long.valueOf(nodeData);
+ logger.info("query standard time last_storage success,{}", lastStorage);
+
+ return Long.valueOf(nodeData);
+ } catch (Exception e) {
+ logger.error("query standard time last_storage fail,{}", JobUtil.getErrorMsg(e));
+ throw new BusinessException("query standard time last_storage fail ," + JobUtil.getErrorMsg(e));
+ }
+
+ }
+
+
+}
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
index e4387eb..454682a 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
@@ -6,8 +6,11 @@ import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.mesalab.executor.GalaxyExecutorApplication;
+import com.mesalab.executor.core.utils.Constant;
import com.mesalab.executor.core.utils.KafkaUtils;
import com.mesalab.executor.jobhandler.LogStorageQuotaJob;
+import com.mesalab.executor.pojo.JDBCParam;
+import com.mesalab.executor.service.StorageQuotaInfoService;
import com.mesalab.executor.service.StorageQuotaService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
@@ -17,78 +20,103 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
+import javax.annotation.Resource;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@RunWith(SpringRunner.class)
-@SpringBootTest(classes = { GalaxyExecutorApplication.class })
+@SpringBootTest(classes = {GalaxyExecutorApplication.class})
@Slf4j
public class StorageQuotaTest {
- @Autowired StorageQuotaService storageQuotaService;
+ @Autowired
+ StorageQuotaService storageQuotaService;
- @Test public void storageQuotaServiceTest() {
+ @Test
+ public void storageQuotaServiceTest() {
- Map diskJson = storageQuotaService.getDiskJson(false);
- Assert.notEmpty(diskJson, "存储配额获取失败");
+ Map diskJson = storageQuotaService.getDiskJson(false);
+ Assert.notEmpty(diskJson, "存储配额获取失败");
- }
+ }
- @Autowired
+ @Autowired
LogStorageQuotaJob logStorageQuotaJob;
- /**
- * 文件清库 慎
- */
- @Test
- public void delteAllFilesTest() {
+ /**
+ * 文件清库 慎
+ */
+ @Test
+ public void delteAllFilesTest() {
// ReturnT<String> deleteAllFiles = logStorageQuotaJob.deleteAllFiles("{\"maxdays\":365}");
// Assert.isTrue(200==(deleteAllFiles.getCode()),"success");
- }
+ }
- /**
- * 设置文件存储策略
- */
+ /**
+ * 设置文件存储策略
+ */
// @Test
// public void deleteFilesTest() {
// ReturnT<String> deleteFiles = logStorageQuotaJob.deleteFiles("{\"maxdays\":365}");
// Assert.isTrue(200 == (deleteFiles.getCode()), "success");
// }
+ @Test
+ public void zookeeperTest() {
+ Long lastStorage = storageQuotaService.getLastStorage("Files");
+ Assert.notNull(lastStorage, "获取标准时间失败");
+ }
- @Test
- public void zookeeperTest() {
- Long lastStorage = storageQuotaService.getLastStorage("Files");
- Assert.notNull(lastStorage, "获取标准时间失败");
- }
- @Test
- public void testS() {
- String filesHosServer ="Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|";
+ @Test
+ public void testS() {
+ String filesHosServer = "Nur-sultan|192.168.40.223:9098,Aktau|192.168.1.2:9098,Aktubinsk|,Almaty|,Atyrau|,Karaganda|,Kokshetau|,Kostanay|,Kyzylorda|,Pavlodar|,Semey|,Shymkent|,Taldykurgan|,Taraz|,Uralsk|,Ust-Kamenogorsk|,Zhezkazgan|";
// Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|")));
- String b ="Nur-sultan|192.168.40.223:9098";
+ String b = "Nur-sultan|192.168.40.223:9098";
// String collect = Splitter.on(",").splitToList(a).stream().map(x -> x.substring(x.indexOf("|") + 1)).filter(x-> StringUtil.isNotEmpty(x)).collect(Collectors.joining(","));
- List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x->{
- List<String> dc = Splitter.on("|").splitToList(x);
- if (dc.size()==2&& StringUtil.isNotBlank(dc.get(1))){
- return ImmutableMap.of(dc.get(0),dc.get(1));
- }
- return null;
- }).filter(Objects::nonNull).collect(Collectors.toList());
- log.info(JsonMapper.toJsonString(collect));
- }
-
- @Test
- public void test(){
- String date = DateUtils.getDateOfYesterday("yyyyMMdd");
- long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd");
- log.info(String.valueOf(yyyyMMdd));
- }
- @Test
- public void testStorage(){
- String msgc ="{\"log_type\":\"Files\",\"aggregate_size\":0,\"data_center\":\"Nur-sultan\",\"time\":1630569900,\"last_storage\":1622299518,\"used_size\":1217383982783,\"max_size\":25169958203392}";
- new KafkaUtils().sendMessage("SYS-STORAGE-LOG", msgc);
- }
+ List collect = Splitter.on(",").splitToList(filesHosServer).stream().map(x -> {
+ List<String> dc = Splitter.on("|").splitToList(x);
+ if (dc.size() == 2 && StringUtil.isNotBlank(dc.get(1))) {
+ return ImmutableMap.of(dc.get(0), dc.get(1));
+ }
+ return null;
+ }).filter(Objects::nonNull).collect(Collectors.toList());
+ log.info(JsonMapper.toJsonString(collect));
+ }
+
+ @Test
+ public void test() {
+ String date = DateUtils.getDateOfYesterday("yyyyMMdd");
+ long yyyyMMdd = DateUtils.convertStringToTimestamp(date, "yyyyMMdd");
+ log.info(String.valueOf(yyyyMMdd));
+ }
+
+ @Test
+ public void testStorage() {
+ String msgc = "{\"log_type\":\"Files\",\"aggregate_size\":0,\"data_center\":\"Nur-sultan\",\"time\":1630569900,\"last_storage\":1622299518,\"used_size\":1217383982783,\"max_size\":25169958203392}";
+ new KafkaUtils().sendMessage("SYS-STORAGE-LOG", msgc);
+ }
+
+
+ @Resource
+ StorageQuotaInfoService storageQuotaInfoService;
+
+ @Test
+ public void testStorage2() {
+ Map<String, String> sourceParams = new HashMap<>();
+ sourceParams.put("ip", "192.168.44.12");
+ sourceParams.put("type", "mariadb");
+ sourceParams.put("database", "tsg_olap");
+ sourceParams.put("table", "sys_storage_event");
+ sourceParams.put("username", "root");
+ sourceParams.put("pin", "galaxy2019");
+ JDBCParam jdbcParam = new JDBCParam(sourceParams);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam);
+
+ }
}