diff options
| author | zhanghongqing <[email protected]> | 2024-06-25 12:01:20 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2024-06-25 12:01:20 +0800 |
| commit | dee95eb5ed7cf292bd334c662971863afe07ebe1 (patch) | |
| tree | 5f6e89a8e641a3db3803647a47b018328835a282 | |
| parent | 3e69996133147d1d7119f893a17c657564a8b4ec (diff) | |
[优化][存储配额查询] 适配增量统计执行周期1天一次 TSG-21555
| -rw-r--r-- | galaxy-job-executor/pom.xml | 2 | ||||
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java (renamed from galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java) | 6 | ||||
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java | 21 | ||||
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java | 167 | ||||
| -rw-r--r-- | galaxy-job-executor/src/main/resources/application.properties | 2 | ||||
| -rw-r--r-- | galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java | 6 |
6 files changed, 136 insertions, 68 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml index 325572c..b5009eb 100644 --- a/galaxy-job-executor/pom.xml +++ b/galaxy-job-executor/pom.xml @@ -209,7 +209,7 @@ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE> </buildArgs> <imageTags> - <imageTag>2.2.0.4</imageTag> + <imageTag>2.2.0.5</imageTag> </imageTags> <resources> <resource> diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java index bf8078a..3d16deb 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaInfoJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/StorageQuotaJob.java @@ -3,6 +3,7 @@ package com.mesalab.executor.jobhandler; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.mesalab.executor.core.utils.JobUtil; +import com.mesalab.executor.core.utils.TypeUtils; import com.mesalab.executor.pojo.JDBCParam; import com.mesalab.executor.service.StorageQuotaInfoService; import com.xxl.job.core.biz.model.ReturnT; @@ -21,7 +22,7 @@ import java.util.Map; */ @Component -public class StorageQuotaInfoJob { +public class StorageQuotaJob { @Resource private StorageQuotaInfoService storageQuotaInfoService; @@ -36,9 +37,10 @@ public class StorageQuotaInfoJob { for (Map paramsMap : paramsMaps) { Map<String, Object> source = BeanUtil.beanToMap(paramsMap.get("source")); String[] items = StrUtil.split(source.get("items").toString(), ","); + Boolean containBytes = TypeUtils.castToBoolean(source.get("containBytes")); JDBCParam jdbcParam = BeanUtil.toBean(paramsMap.get("sink"), JDBCParam.class); - failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam); + failCount += storageQuotaInfoService.getAndSaveStorageQuotaInfo(items, jdbcParam, containBytes); } } catch (Exception e) { diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java new file mode 100644 index 0000000..d1c301d --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/JobResult.java @@ -0,0 +1,21 @@ +package com.mesalab.executor.pojo; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author galaxy + */ +@Data +@NoArgsConstructor +public class JobResult<T> { + + private Integer code = 200; + private String message; + private Integer total; + private Integer failCount; + private List<T> data; + +} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java index 1628922..cf1b0d4 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/StorageQuotaInfoService.java @@ -5,12 +5,14 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.db.handler.NumberHandler; import cn.hutool.db.sql.SqlExecutor; import cn.hutool.log.Log; +import com.geedgenetworks.utils.DateUtils; import com.google.common.collect.Lists; import com.mesalab.executor.core.config.StorgeConfig; import com.mesalab.executor.core.utils.*; import com.mesalab.executor.exception.BusinessException; import com.mesalab.executor.pojo.HosSpace; import com.mesalab.executor.pojo.JDBCParam; +import com.mesalab.executor.pojo.JobResult; import com.mesalab.executor.pojo.SysStorageEvent; import org.apache.http.Header; import org.apache.http.message.BasicHeader; @@ -43,25 +45,30 @@ public class StorageQuotaInfoService { private String zookeeperServer; - public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam) { + public int getAndSaveStorageQuotaInfo(String[] logTypes, JDBCParam jdbcParam, boolean containBytes) { int failCount = 0; for (String logType : logTypes) { + JobResult<SysStorageEvent> jobResult = new JobResult<>(); try { switch (logType) { case Constant.TRAFFIC_LOGS: - failCount += getAndSaveClickhouseStorageInfo(jdbcParam); + jobResult = getClickhouseStorageInfo(jdbcParam, containBytes); break; case Constant.METRICS: - failCount += getAndSaveDruidStorageInfo(jdbcParam); + jobResult = getDruidStorageInfo(jdbcParam, containBytes); break; case Constant.FILES: - failCount += getAndSaveHosStorageInfo(jdbcParam); + jobResult = getHosStorageInfo(jdbcParam, containBytes); break; default: break; } - + failCount += jobResult.getFailCount(); + if (containBytes) { + setStorageAvg(jobResult); + } + DBUtils.save(jobResult.getData(), jdbcParam); } catch (Exception e) { failCount++; JobUtil.errorLog(e.getMessage()); @@ -70,15 +77,43 @@ public class StorageQuotaInfoService { return failCount; } - private int getAndSaveClickhouseStorageInfo(JDBCParam jdbcParam) throws Exception { + /** + * 将一天的数据平均分为每5分钟的数据 + * + * @param jobResult + */ + private void setStorageAvg(JobResult<SysStorageEvent> jobResult) { + + long interval = 60; + long num = 24 * 60 / interval; + + List<SysStorageEvent> newResultList = Lists.newArrayList(); + long timestamp = DateUtils.convertStringToTimestamp(DateUtils.getDateOfYesterday("yyyyMMdd"), "yyyyMMdd"); + for (SysStorageEvent sysStorageEvent : jobResult.getData()) { + Long dayBytes = sysStorageEvent.getBytes(); + long minBytes = dayBytes / num; + for (long i = 0; i < num; i++) { + SysStorageEvent newStorageEvent = sysStorageEvent; + newStorageEvent.setBytes(minBytes); + + timestamp = timestamp + interval * 60; + newStorageEvent.setGeneratedTime(timestamp); + newResultList.add(newStorageEvent); + } + } + jobResult.setData(newResultList); + } + + private JobResult<SysStorageEvent> getClickhouseStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { final String maxSizeSql = StrUtil.format("SELECT SUM(`total_space`) FROM {};", storgeConfig.getSystemDisks()); final String usedSizeSql = StrUtil.format("SELECT SUM(`bytes_on_disk`) FROM {} WHERE database = '{}' ;", storgeConfig.getSystemPartsCluster(), storgeConfig.getTrafficDatasource()); - long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); int failCount = 0; + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); for (Map.Entry<String, String> datacenterMap : storgeConfig.getTrafficDataCenter().entrySet()) { try { String datacenterHost = datacenterMap.getValue(); @@ -86,142 +121,122 @@ public class StorageQuotaInfoService { Map<String, Object> ckParamMap = storgeConfig.getCkSource(); // 1. 总计 ckParamMap.put("query", maxSizeSql); - String totalResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); + Long totalSize = queryClickhouse(datacenterHost, ckParamMap); //2. 已使用 ckParamMap.put("query", usedSizeSql); - String usedResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); - - if ("-1".equals(totalResult) || "-1".equals(usedResult)) { - throw new BusinessException("Get clickhouse http fail -1"); - } - - Long totalSize = Long.valueOf(totalResult.trim()); - Long usedSize = Long.valueOf(usedResult.trim()); + Long usedSize = queryClickhouse(datacenterHost, ckParamMap); //3. 增量 - Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS); + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.TRAFFIC_LOGS); SysStorageEvent storageEvent = SysStorageEvent.builder() .logType(Constant.TRAFFIC_LOGS) .dataCenter(datacenterName) .generatedTime(generatedTime) .totalAllocatedSize(totalSize) .usedSize(usedSize) - .bytes(increaseSize) + .bytes(bytes) .sinceTime(sinceTime) .build(); sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get clickhouse storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", - datacenterName, totalSize, usedSize, increaseSize); + JobUtil.infoLog("Get clickhouse storage info {}:", storageEvent.toString()); } catch (Exception e) { failCount++; JobUtil.errorLog("Get clickhouse storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); } } - DBUtils.save(sysStorageEvents, jdbcParam); - return failCount; + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } - private int getAndSaveDruidStorageInfo(JDBCParam jdbcParam) throws Exception { + private JobResult<SysStorageEvent> getDruidStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { final String maxSizeSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; final String usedSizeSql = "{\"query\":\"SELECT SUM(size) AS used_size FROM sys.segments WHERE datasource NOT LIKE '%hot%' and is_published = 1 and is_overshadowed = 0\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"csv\"}"; - final String druidPath = "/druid/v2/sql"; - long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.ZK_TRAFFIC_LOGS); List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); int failCount = 0; + long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.ZK_REPORT_AND_METRICS); for (Map.Entry<String, String> datacenterMap : storgeConfig.getAnalyticDataCenter().entrySet()) { try { String datacenterHost = datacenterMap.getValue(); String datacenterName = datacenterMap.getKey(); // 1. 总计 - String totalResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), maxSizeSql); + Long totalSize = queryDruid(datacenterHost, maxSizeSql); //2. 已使用 - String usedResult = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql); - - if ("-1".equals(totalResult) || "-1".equals(usedResult)) { - throw new BusinessException("Get druid http fail -1"); - } - - Long totalSize = Long.valueOf(totalResult.trim()); - Long usedSize = Long.valueOf(usedResult.trim()); + Long usedSize = queryDruid(datacenterHost, usedSizeSql); //3. 增量 - Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS); + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.METRICS); SysStorageEvent storageEvent = SysStorageEvent.builder() .logType(Constant.METRICS) .dataCenter(datacenterName) .generatedTime(generatedTime) .totalAllocatedSize(totalSize) .usedSize(usedSize) - .bytes(increaseSize) + .bytes(bytes) .sinceTime(sinceTime) .build(); sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get druid storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", - datacenterName, totalSize, usedSize, increaseSize); + JobUtil.infoLog("Get druid storage info {}:", storageEvent.toString()); } catch (Exception e) { failCount++; JobUtil.errorLog("Get druid storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); } } - DBUtils.save(sysStorageEvents, jdbcParam); - return failCount; + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } - private int getAndSaveHosStorageInfo(JDBCParam jdbcParam) throws Exception { - - final String fileStoragePath = "/admin/diskspace"; + private JobResult<SysStorageEvent> getHosStorageInfo(JDBCParam jdbcParam, boolean containIncrease) throws Exception { - long generatedTime = DateUtil.currentSeconds(); - Long sinceTime = getLastStorage(Constant.FILES); List<SysStorageEvent> sysStorageEvents = Lists.newArrayList(); + JobResult<SysStorageEvent> jobResult = new JobResult(); int failCount = 0; + Long generatedTime = DateUtil.currentSeconds(); + Long sinceTime = getLastStorage(Constant.FILES); for (Map.Entry<String, String> datacenterMap : storgeConfig.getFilesDataCenter().entrySet()) { try { String datacenterHost = datacenterMap.getValue(); String datacenterName = datacenterMap.getKey(); - Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; - String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers); - - if ("-1".equals(result) || "-1".equals(result)) { - throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName); - } - HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); + HosSpace hosSpace = getHosSpace(datacenterHost, datacenterName); Long totalSize = hosSpace.getHosCapacity(); Long usedSize = hosSpace.getHosUsed(); if (totalSize == -1 || usedSize == -1) { - throw new BusinessException("hos server error server: " + datacenterName); + throw new BusinessException("hos server error : " + datacenterName); } //3. 增量 - Long increaseSize = getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES); + Long bytes = containIncrease ? 0L : getIncreaseSize(jdbcParam, usedSize, datacenterMap.getKey(), Constant.FILES); SysStorageEvent storageEvent = SysStorageEvent.builder() .logType(Constant.FILES) .dataCenter(datacenterName) .generatedTime(generatedTime) .totalAllocatedSize(totalSize) .usedSize(usedSize) - .bytes(increaseSize) + .bytes(bytes) .sinceTime(sinceTime) .build(); sysStorageEvents.add(storageEvent); - JobUtil.infoLog("Get hos storage info : datacenter {}, max_size {}, used_size {}, bytes {}.", - datacenterName, totalSize, usedSize, increaseSize); + JobUtil.infoLog("Get hos storage info {}:", storageEvent.toString()); } catch (Exception e) { failCount++; JobUtil.errorLog("Get hos storage error : datacenter {}, message {}", datacenterMap.getKey(), e.getMessage()); } } - DBUtils.save(sysStorageEvents, jdbcParam); - return failCount; + jobResult.setFailCount(failCount); + jobResult.setData(sysStorageEvents); + return jobResult; } + private Long getIncreaseSize(JDBCParam jdbcParam, Long usedSize, String datacenter, String logType) throws Exception { final String lastUsedSizeSql = "SELECT used_size FROM " + jdbcParam.getTable() + " WHERE log_type = '" + logType + "' and data_center = '" + datacenter + "' and generated_time >= UNIX_TIMESTAMP() - " + STORAGE_LIMITATION + " ORDER BY generated_time DESC LIMIT 1;"; @@ -239,6 +254,36 @@ public class StorageQuotaInfoService { } } + private Long queryClickhouse(String datacenterHost, Map<String, Object> ckParamMap) { + String result = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(UrlUtil.getUrl(datacenterHost), ckParamMap)); + if ("-1".equals(result)) { + throw new BusinessException("Get clickhouse http fail -1"); + } + return Long.valueOf(result.trim()); + } + + private Long queryDruid(String datacenterHost, String usedSizeSql) { + final String druidPath = "/druid/v2/sql"; + String result = HttpClientUtils.httpPost(UrlUtil.getUrl(datacenterHost).concat(druidPath), usedSizeSql); + + if ("-1".equals(result)) { + throw new BusinessException("Get druid http fail -1"); + } + return Long.valueOf(result.trim()); + } + + private HosSpace getHosSpace(String datacenterHost, String datacenterName) { + final String fileStoragePath = "/admin/diskspace"; + Header[] headers = {new BasicHeader(Constant.TOKEN, storgeConfig.getFilesToken()), new BasicHeader(HttpHeaders.CONTENT_TYPE, Constant.TEXT_XML)}; + String result = HttpClientUtils.httpGet(UrlUtil.getUrl(datacenterHost).concat(fileStoragePath), headers); + + if ("-1".equals(result) || "-1".equals(result)) { + throw new BusinessException("Get hos http request fail -1 , server: " + datacenterName); + } + HosSpace hosSpace = XmlUtil.converXmlToBean(HosSpace.class, result); + return hosSpace; + } + /** * @param node * @return 自定义的标准时间 last_storage, diff --git a/galaxy-job-executor/src/main/resources/application.properties b/galaxy-job-executor/src/main/resources/application.properties index 3285f8e..5b1ac7c 100644 --- a/galaxy-job-executor/src/main/resources/application.properties +++ b/galaxy-job-executor/src/main/resources/application.properties @@ -1,7 +1,7 @@ #配置集的配置格式 nacos.config.type=properties #配置中心地址 -nacos.config.server-addr=192.168.44.55:8848 +nacos.config.server-addr=192.168.45.102:8848 #命名空间 nacos.config.namespace=prod #数据集ID diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java index 454682a..6f1321b 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/StorageQuotaTest.java @@ -114,9 +114,9 @@ public class StorageQuotaTest { sourceParams.put("username", "root"); sourceParams.put("pin", "galaxy2019"); JDBCParam jdbcParam = new JDBCParam(sourceParams); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam); - storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.TRAFFIC_LOGS}, jdbcParam, true); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.METRICS}, jdbcParam, true); + storageQuotaInfoService.getAndSaveStorageQuotaInfo(new String[]{Constant.FILES}, jdbcParam, true); } } |
