summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2024-06-25 12:01:20 +0800
committerzhanghongqing <[email protected]>2024-06-25 12:01:20 +0800
commitdee95eb5ed7cf292bd334c662971863afe07ebe1 (patch)
tree5f6e89a8e641a3db3803647a47b018328835a282
parent3e69996133147d1d7119f893a17c657564a8b4ec (diff)
[优化][存储配额查询] 适配增量统计执行周期1天一次 TSG-21555
-rw-r--r--galaxy-job-executor/pom.xml2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java (renamed from galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java)6
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java21
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java167
-rw-r--r--galaxy-job-executor/src/main/resources/application.properties2
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java6
6 files changed, 136 insertions, 68 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml
index 325572c..b5009eb 100644
--- a/galaxy-job-executor/pom.xml
+++ b/galaxy-job-executor/pom.xml
@@ -209,7 +209,7 @@
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<imageTags>
- <imageTag>2.2.0.4</imageTag>
+ <imageTag>2.2.0.5</imageTag>
</imageTags>
<resources>
<resource>
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java
index bf8078a..3d16deb 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java
@@ -3,6 +3,7 @@ package com.mesalab.executor.jobhandler;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.mesalab.executor.core.utils.JobUtil;
+import com.mesalab.executor.core.utils.TypeUtils;
import com.mesalab.executor.pojo.JDBCParam;
import com.mesalab.executor.service.StorageQuotaInfoService;
import com.xxl.job.core.biz.model.ReturnT;
@@ -21,7 +22,7 @@ import java.util.Map;
*/
@Component
-public class StorageQuotaInfoJob {
+public class StorageQuotaJob {
@Resource
private StorageQuotaInfoService storageQuotaInfoService;
@@ -36,9 +37,10 @@ public class StorageQuotaInfoJob {
for (Map paramsMap : paramsMaps) {
Map<String, Object> source = BeanUtil.beanToMap(paramsMap.get("source"));
String[] items = StrUtil.split(source.get("items").toString(), ",");
+ Boolean containBytes = TypeUtils.castToBoolean(source.get("containBytes"));
JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class);
- failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam);
+ failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes);
}
} catch (Exception e) {
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java
new file mode 100644
index 0000000..d1c301d
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java
@@ -0,0 +1,21 @@
+package com.mesalab.executor.pojo;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * @author galaxy
+ */
+@Data
+@NoArgsConstructor
+public class JobResult<T> {
+
+ private Integer code = 200;
+ private String message;
+ private Integer total;
+ private Integer failCount;
+ private List<T> data;
+
+}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
index 1628922..cf1b0d4 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java
@@ -5,12 +5,14 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.db.handler.NumberHandler;
import cn.hutool.db.sql.SqlExecutor;
import cn.hutool.log.Log;
+import com.geedgenetworks.utils.DateUtils;
import com.google.common.collect.Lists;
import com.mesalab.executor.core.config.StorgeConfig;
import com.mesalab.executor.core.utils.*;
import com.mesalab.executor.exception.BusinessException;
import com.mesalab.executor.pojo.HosSpace;
import com.mesalab.executor.pojo.JDBCParam;
+import com.mesalab.executor.pojo.JobResult;
import com.mesalab.executor.pojo.SysStorageEvent;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
@@ -43,25 +45,30 @@ public class StorageQuotaInfoService {
private String zookeeperServer;
- public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam) {
+ public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) {
int failCount = 0;
for (String logType : logTypes) {
+ JobResult<SysStorageEvent> jobResult = new JobResult<>();
try {
switch (logType) {
case Constant.TRAFFIC_LOGS:
- failCount += getAndSaveClickhouseStorageInfo(jdbcParam);
+ jobResult = getClickhouseStorageInfo(jdbcParam, containBytes);
break;
case Constant.METRICS:
- failCount += getAndSaveDruidStorageInfo(jdbcParam);
+ jobResult = getDruidStorageInfo(jdbcParam, containBytes);
break;
case Constant.FILES:
- failCount += getAndSaveHosStorageInfo(jdbcParam);
+ jobResult = getHosStorageInfo(jdbcParam, containBytes);
break;
default:
break;
}
-
+ failCount += jobResult.getFailCount();
+ if (containBytes) {
+ setStorageAvg(jobResult);
+ }
+ DBUtils.save(jobResult.getData(), jdbcParam);
} catch (Exception e) {
failCount++;
JobUtil.errorLog(e.getMessage());
@@ -70,15 +77,43 @@ public class StorageQuotaInfoService {
return failCount;
}
- private int getAndSaveClickhouseStorageInfo(JDBCParam jdbcParam) throws Exception {
+ /**
+ * 将一天的数据平均分为每5分钟的数据
+ *
+ * @param jobResult
+ */
+ private void setStorageAvg(JobResult<SysStorageEvent> jobResult) {
+
+ long interval = 60;
+ long num = 24 * 60 / interval;
+
+ List<SysStorageEvent> newResultList = Lists.newArrayList();
+ long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd");
+ for (SysStorageEvent sysStorageEvent : jobResult.getData()) {
+ Long dayBytes = sysStorageEvent.getBytes();
+ long minBytes = dayBytes / num;
+ for (long i = 0; i < num; i++) {
+ SysStorageEvent newStorageEvent = sysStorageEvent;
+ newStorageEvent.setBytes(minBytes);
+
+ timestamp = timestamp + interval * 60;
+ newStorageEvent.setGeneratedTime(timestamp);
+ newResultList.add(newStorageEvent);
+ }
+ }
+ jobResult.setData(newResultList);
+ }
+
+ private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks());
final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource());
- long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
int failCount = 0;
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) {
try {
String datacenterHost = datacenterMap.getValue();
@@ -86,142 +121,122 @@ public class StorageQuotaInfoService {
Map<String, Object> ckParamMap = storgeConfig.getCkSource();
// 1. 总计
ckParamMap.put("query", maxSizeSql);
- String totalResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
+ Long totalSize = queryClickhouse(datacenterHost, ckParamMap);
//2. 已使用
ckParamMap.put("query", usedSizeSql);
- String usedResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
-
- if ("-1".equals(totalResult) || "-1".equals(usedResult)) {
- throw new BusinessException("Get clickhouse http fail -1");
- }
-
- Long totalSize = Long.valueOf(totalResult.trim());
- Long usedSize = Long.valueOf(usedResult.trim());
+ Long usedSize = queryClickhouse(datacenterHost, ckParamMap);
//3. 增量
- Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS);
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS);
SysStorageEvent storageEvent = SysStorageEvent.builder()
.logType(Constant.TRAFFIC_LOGS)
.dataCenter(datacenterName)
.generatedTime(generatedTime)
.totalAllocatedSize(totalSize)
.usedSize(usedSize)
- .bytes(increaseSize)
+ .bytes(bytes)
.sinceTime(sinceTime)
.build();
sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get clickhouse storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
- datacenterName, totalSize, usedSize, increaseSize);
+ JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString());
} catch (Exception e) {
failCount++;
JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
}
}
- DBUtils.save(sysStorageEvents, jdbcParam);
- return failCount;
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
- private int getAndSaveDruidStorageInfo(JDBCParam jdbcParam) throws Exception {
+ private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}";
- final String druidPath = "/druid/v2/sql";
- long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS);
List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
int failCount = 0;
+ long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS);
for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) {
try {
String datacenterHost = datacenterMap.getValue();
String datacenterName = datacenterMap.getKey();
// 1. 总计
- String totalResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), maxSizeSql);
+ Long totalSize = queryDruid(datacenterHost, maxSizeSql);
//2. 已使用
- String usedResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql);
-
- if ("-1".equals(totalResult) || "-1".equals(usedResult)) {
- throw new BusinessException("Get druid http fail -1");
- }
-
- Long totalSize = Long.valueOf(totalResult.trim());
- Long usedSize = Long.valueOf(usedResult.trim());
+ Long usedSize = queryDruid(datacenterHost, usedSizeSql);
//3. 增量
- Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS);
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS);
SysStorageEvent storageEvent = SysStorageEvent.builder()
.logType(Constant.METRICS)
.dataCenter(datacenterName)
.generatedTime(generatedTime)
.totalAllocatedSize(totalSize)
.usedSize(usedSize)
- .bytes(increaseSize)
+ .bytes(bytes)
.sinceTime(sinceTime)
.build();
sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get druid storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
- datacenterName, totalSize, usedSize, increaseSize);
+ JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString());
} catch (Exception e) {
failCount++;
JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
}
}
- DBUtils.save(sysStorageEvents, jdbcParam);
- return failCount;
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
- private int getAndSaveHosStorageInfo(JDBCParam jdbcParam) throws Exception {
-
- final String fileStoragePath = "/admin/diskspace";
+ private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception {
- long generatedTime = DateUtil.currentSeconds();
- Long sinceTime = getLastStorage(Constant.FILES);
List<SysStorageEvent> sysStorageEvents = Lists.newArrayList();
+ JobResult<SysStorageEvent> jobResult = new JobResult();
int failCount = 0;
+ Long generatedTime = DateUtil.currentSeconds();
+ Long sinceTime = getLastStorage(Constant.FILES);
for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) {
try {
String datacenterHost = datacenterMap.getValue();
String datacenterName = datacenterMap.getKey();
- Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
- String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers);
-
- if ("-1".equals(result) || "-1".equals(result)) {
- throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName);
- }
- HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
+ HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName);
Long totalSize = hosSpace.getHosCapacity();
Long usedSize = hosSpace.getHosUsed();
if (totalSize == -1 || usedSize == -1) {
- throw new BusinessException("hos server error server: " + datacenterName);
+ throw new BusinessException("hos server error : " + datacenterName);
}
//3. 增量
- Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES);
+ Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES);
SysStorageEvent storageEvent = SysStorageEvent.builder()
.logType(Constant.FILES)
.dataCenter(datacenterName)
.generatedTime(generatedTime)
.totalAllocatedSize(totalSize)
.usedSize(usedSize)
- .bytes(increaseSize)
+ .bytes(bytes)
.sinceTime(sinceTime)
.build();
sysStorageEvents.add(storageEvent);
- JobUtil.infoLog("Get hos storage info : datacenter {}, max_size {}, used_size {}, bytes {}.",
- datacenterName, totalSize, usedSize, increaseSize);
+ JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString());
} catch (Exception e) {
failCount++;
JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage());
}
}
- DBUtils.save(sysStorageEvents, jdbcParam);
- return failCount;
+ jobResult.setFailCount(failCount);
+ jobResult.setData(sysStorageEvents);
+ return jobResult;
}
+
private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception {
final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter
+ "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;";
@@ -239,6 +254,36 @@ public class StorageQuotaInfoService {
}
}
+ private Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) {
+ String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap));
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get clickhouse http fail -1");
+ }
+ return Long.valueOf(result.trim());
+ }
+
+ private Long queryDruid(String datacenterHost, String usedSizeSql) {
+ final String druidPath = "/druid/v2/sql";
+ String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql);
+
+ if ("-1".equals(result)) {
+ throw new BusinessException("Get druid http fail -1");
+ }
+ return Long.valueOf(result.trim());
+ }
+
+ private HosSpace getHosSpace(String datacenterHost, String datacenterName) {
+ final String fileStoragePath = "/admin/diskspace";
+ Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)};
+ String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers);
+
+ if ("-1".equals(result) || "-1".equals(result)) {
+ throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName);
+ }
+ HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result);
+ return hosSpace;
+ }
+
/**
* @param node
* @return 自定义的标准时间 last_storage,
diff --git a/galaxy-job-executor/src/main/resources/application.properties b/galaxy-job-executor/src/main/resources/application.properties
index 3285f8e..5b1ac7c 100644
--- a/galaxy-job-executor/src/main/resources/application.properties
+++ b/galaxy-job-executor/src/main/resources/application.properties
@@ -1,7 +1,7 @@
#配置集的配置格式
nacos.config.type=properties
#配置中心地址
-nacos.config.server-addr=192.168.44.55:8848
+nacos.config.server-addr=192.168.45.102:8848
#命名空间
nacos.config.namespace=prod
#数据集ID
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
index 454682a..6f1321b 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java
@@ -114,9 +114,9 @@ public class StorageQuotaTest {
sourceParams.put("username", "root");
sourceParams.put("pin", "galaxy2019");
JDBCParam jdbcParam = new JDBCParam(sourceParams);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam);
- storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true);
+ storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true);
}
}