summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2023-02-08 16:54:55 +0800
committerhoujinchuan <[email protected]>2023-02-08 16:54:55 +0800
commit0e42adc8e1d1109e2fbe2399aca206813b3c47ab (patch)
treec0b6acd10f3bffa768cbe782ffd937ea298a5cf5
parentc6645f6be893788a416562a9e2357d6dfb351113 (diff)
修复hos TTL bugdevelop-fix-2210
-rw-r--r--galaxy-job-executor/pom.xml7
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java231
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java60
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java289
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java2
5 files changed, 2 insertions, 587 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml
index 0ade40f..8ec09cb 100644
--- a/galaxy-job-executor/pom.xml
+++ b/galaxy-job-executor/pom.xml
@@ -111,11 +111,6 @@
<artifactId>async-http-client</artifactId>
<version>1.9.40</version>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>2.2.3</version>
- </dependency>
</dependencies>
<build>
@@ -184,7 +179,7 @@
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<imageTags>
- <imageTag>v1.3.220823</imageTag>
+ <imageTag>v1.3.221101-rc1</imageTag>
</imageTags>
<resources>
<resource>
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java
deleted file mode 100644
index 8a65a9c..0000000
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java
+++ /dev/null
@@ -1,231 +0,0 @@
-package com.mesalab.executor.core.config;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.List;
-
-@Configuration
-@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.PROPERTIES, autoRefreshed = true)
-public class HbaseConfig {
- private String zookeeper_quorum;
-
- private String zookeeper_property_clientPort;
-
- private String zookeeper_znode_parent;
-
- private String client_pause;
-
- private Integer client_retries_number;
-
- private Integer rpc_timeout;
-
- private Integer connect_pool;
-
- private Integer delete_file_thread;
-
- private Integer scanner_timeout;
-
- private Integer scanner_max_result_size;
-
- private Integer scanner_caching;
-
- private String time_index_table_prefix;
-
- private String filename_index_table_prefix;
-
- private String partfile_index_table_prefix;
-
- private String system_bucket_meta;
-
- private String client_keyvalue_maxsize;
-
- private List<String> filename_head;
-
- private List<String> part_head;
-
- private Integer get_batch;
-
- private Integer delete_batch;
-
- private Integer scan_batch;
-
- private Long executeTime;
-
- public Integer getDelete_file_thread() {
- return delete_file_thread;
- }
-
- public void setDelete_file_thread(Integer delete_file_thread) {
- this.delete_file_thread = delete_file_thread;
- }
-
- public Long getExecuteTime() {
- return executeTime;
- }
-
- public void setExecuteTime(Long executeTime) {
- this.executeTime = executeTime;
- }
-
- public Integer getScan_batch() {
- return scan_batch;
- }
-
- public void setScan_batch(Integer scan_batch) {
- this.scan_batch = scan_batch;
- }
-
- public Integer getDelete_batch() {
- return delete_batch;
- }
-
- public void setDelete_batch(Integer delete_batch) {
- this.delete_batch = delete_batch;
- }
-
- public String getZookeeper_quorum() {
- return zookeeper_quorum;
- }
-
- public void setZookeeper_quorum(String zookeeper_quorum) {
- this.zookeeper_quorum = zookeeper_quorum;
- }
-
- public String getZookeeper_property_clientPort() {
- return zookeeper_property_clientPort;
- }
-
- public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) {
- this.zookeeper_property_clientPort = zookeeper_property_clientPort;
- }
-
- public String getZookeeper_znode_parent() {
- return zookeeper_znode_parent;
- }
-
- public void setZookeeper_znode_parent(String zookeeper_znode_parent) {
- this.zookeeper_znode_parent = zookeeper_znode_parent;
- }
-
- public String getClient_pause() {
- return client_pause;
- }
-
- public void setClient_pause(String client_pause) {
- this.client_pause = client_pause;
- }
-
- public Integer getClient_retries_number() {
- return client_retries_number;
- }
-
- public void setClient_retries_number(Integer client_retries_number) {
- this.client_retries_number = client_retries_number;
- }
-
- public Integer getRpc_timeout() {
- return rpc_timeout;
- }
-
- public void setRpc_timeout(Integer rpc_timeout) {
- this.rpc_timeout = rpc_timeout;
- }
-
- public Integer getConnect_pool() {
- return connect_pool;
- }
-
- public void setConnect_pool(Integer connect_pool) {
- this.connect_pool = connect_pool;
- }
-
- public Integer getScanner_timeout() {
- return scanner_timeout;
- }
-
- public void setScanner_timeout(Integer scanner_timeout) {
- this.scanner_timeout = scanner_timeout;
- }
-
- public Integer getScanner_max_result_size() {
- return scanner_max_result_size;
- }
-
- public void setScanner_max_result_size(Integer scanner_max_result_size) {
- this.scanner_max_result_size = scanner_max_result_size;
- }
-
- public Integer getScanner_caching() {
- return scanner_caching;
- }
-
- public void setScanner_caching(Integer scanner_caching) {
- this.scanner_caching = scanner_caching;
- }
-
- public String getTime_index_table_prefix() {
- return time_index_table_prefix;
- }
-
- public void setTime_index_table_prefix(String time_index_table_prefix) {
- this.time_index_table_prefix = time_index_table_prefix;
- }
-
- public String getFilename_index_table_prefix() {
- return filename_index_table_prefix;
- }
-
- public void setFilename_index_table_prefix(String filename_index_table_prefix) {
- this.filename_index_table_prefix = filename_index_table_prefix;
- }
-
- public String getPartfile_index_table_prefix() {
- return partfile_index_table_prefix;
- }
-
- public void setPartfile_index_table_prefix(String partfile_index_table_prefix) {
- this.partfile_index_table_prefix = partfile_index_table_prefix;
- }
-
- public String getSystem_bucket_meta() {
- return system_bucket_meta;
- }
-
- public void setSystem_bucket_meta(String system_bucket_meta) {
- this.system_bucket_meta = system_bucket_meta;
- }
-
- public String getClient_keyvalue_maxsize() {
- return client_keyvalue_maxsize;
- }
-
- public void setClient_keyvalue_maxsize(String client_keyvalue_maxsize) {
- this.client_keyvalue_maxsize = client_keyvalue_maxsize;
- }
-
- public List<String> getFilename_head() {
- return filename_head;
- }
-
- public void setFilename_head(List<String> filename_head) {
- this.filename_head = filename_head;
- }
-
- public List<String> getPart_head() {
- return part_head;
- }
-
- public void setPart_head(List<String> part_head) {
- this.part_head = part_head;
- }
-
- public Integer getGet_batch() {
- return get_batch;
- }
-
- public void setGet_batch(Integer get_batch) {
- this.get_batch = get_batch;
- }
-}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java
deleted file mode 100644
index b410daf..0000000
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package com.mesalab.executor.core.utils;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.log.Log;
-import com.mesalab.executor.core.config.HbaseConfig;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class HbaseUtil {
-
- private Log logger = Log.get();
-
- @Autowired
- private HbaseConfig hbaseConfig;
-
- @Bean(name = "hbaseConfiguration")
- public org.apache.hadoop.conf.Configuration getConfiguration() {
- org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", hbaseConfig.getZookeeper_quorum());
- conf.set("hbase.zookeeper.property.clientPort", hbaseConfig.getZookeeper_property_clientPort());
- conf.set("zookeeper.znode.parent", hbaseConfig.getZookeeper_znode_parent());
- conf.setInt("hbase.client.retries.number", hbaseConfig.getClient_retries_number());
- conf.setInt("hbase.rpc.timeout", hbaseConfig.getRpc_timeout());
- conf.set("hbase.client.keyvalue.maxsize", hbaseConfig.getClient_keyvalue_maxsize());
- conf.setInt("hbase.client.ipc.pool.size", hbaseConfig.getConnect_pool());
- conf.setInt("hbase.client.scanner.timeout.period", hbaseConfig.getScanner_timeout());
- conf.setInt("hbase.client.scanner.max.result.size", hbaseConfig.getScanner_max_result_size());
-// conf.setInt("hbase.client.scanner.caching", hbaseConfig.getScanner_caching());
- return conf;
- }
-
- @Bean(name = "hbaseConnection")
- public Connection getConnection(@Qualifier("hbaseConfiguration") org.apache.hadoop.conf.Configuration conf) {
- Connection conn = null;
- try {
- conn = ConnectionFactory.createConnection(conf);
- } catch (IOException e) {
- logger.error("create hbase connection error.", e);
- }
- return conn;
- }
-
- @Bean(name = "threadPool")
- public ExecutorService getThreadPool() {
- return Executors.newFixedThreadPool(hbaseConfig.getDelete_file_thread());
- }
-
-
-}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java
deleted file mode 100644
index 43804f5..0000000
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java
+++ /dev/null
@@ -1,289 +0,0 @@
-package com.mesalab.executor.jobhandler;
-
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.core.io.IoUtil;
-import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.log.Log;
-import com.mesalab.executor.core.config.HbaseConfig;
-import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.handler.IJobHandler;
-import com.xxl.job.core.handler.annotation.XxlJob;
-import com.xxl.job.core.log.XxlJobLogger;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-
-@Component
-public class HosTtlJob {
- private Log logger = Log.get();
-
- @Autowired
- private Connection hbaseConnection;
- @Autowired
- private ExecutorService threadPool;
- @Autowired
- private HbaseConfig hbaseConfig;
-
- private static final String max_days = "maxDays";
- private int deleteCount;
- private int deleteErrorCount;
-
- /**
- * 删除过期数据
- *
- * @param params{"maxdays":30}
- */
- @XxlJob("deleteFilesJobHandler")
- public ReturnT<String> deleteExpiredFile(String params) {
- try {
- Map<String, Object> paramsMap = validParams(params);
- if (ObjectUtils.isEmpty(paramsMap)) {
- logger.error("params parser error , params is {}", params);
- return IJobHandler.FAIL;
- }
- Date date = DateUtil.date();
- XxlJobLogger.log("date: " + date);
- ArrayList<String> bucketList = getTableList();
- XxlJobLogger.log("bucket list: " + bucketList);
- deleteCount = 0;
- deleteErrorCount = 0;
- if (bucketList.size() > 0) {
- for (String bucket : bucketList) {
- deleteBucketFile(bucket, date, Integer.parseInt(String.valueOf(paramsMap.get(max_days))));
- }
- XxlJobLogger.log("delete file count: " + deleteCount + "; delete error count : " + deleteErrorCount);
- }
- } catch (RuntimeException be) {
- logger.error(be.getMessage());
- XxlJobLogger.log(be.getMessage());
- return ReturnT.FAIL;
- }
- return ReturnT.SUCCESS;
- }
-
- private ArrayList<String> getTableList() {
- ArrayList<String> tableList = new ArrayList<>();
- Table table = null;
- try {
- table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getSystem_bucket_meta()));
- Scan scan = new Scan();
- ResultScanner scanner = table.getScanner(scan);
- for (Result rs : scanner) {
- tableList.add(Bytes.toString(rs.getRow()));
- }
- } catch (Exception be) {
- logger.error("get bucket list error. " + be.getMessage());
- XxlJobLogger.log("get bucket list error. " + be.getMessage());
- } finally {
- IoUtil.close(table);
- }
- return tableList;
- }
-
- private void deleteBucketFile(String tableName, Date date, int maxDays) {
- Table table = null;
- Table index_time_table = null;
- try {
- table = hbaseConnection.getTable(TableName.valueOf(tableName));
- index_time_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getTime_index_table_prefix() + tableName));
- for (String prefix : hbaseConfig.getFilename_head()) {
- String startRow = prefix;
- while (true) {
- ResultScanner scanner = null;
- Set<Get> fileGets = new HashSet<>();
- try {
- Scan scan = new Scan();
- scan.setRowPrefixFilter((prefix + "|").getBytes());
- scan.withStartRow(startRow.getBytes(), false);
- scan.withStopRow((prefix + "|" + dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())).getBytes());
- scan.setCaching(hbaseConfig.getScanner_caching());
- scan.setLimit(hbaseConfig.getScan_batch());
- scanner = index_time_table.getScanner(scan);
- for (Result rs : scanner) {
- if (!rs.isEmpty()) {
- startRow = Bytes.toString(rs.getRow());
- Get get = new Get(Bytes.toString(rs.getRow()).split("\\|")[2].getBytes());
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("isparent"));
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filenamekey"));
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("timekey"));
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("partname"));
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filesize"));
- get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filename"));
- fileGets.add(get);
- }
- }
- } catch (Exception be) {
- logger.error("scan index_time_" + tableName + " data error, start row is " + startRow + ". " + be.getMessage());
- XxlJobLogger.log("scan index_time_table error, start row is " + startRow + ". " + be.getMessage());
- } finally {
- IoUtil.close(scanner);
- }
-
- if (fileGets.size() <= 0) {
- break;
- } else {
- Result[] results = table.get(new ArrayList<>(fileGets));
- for (Result result : results) {
- threadPool.submit(new Runnable() {
- @Override
- public void run() {
- deleteData(tableName, date, maxDays, result);
- }
- });
- deleteCount++;
- }
- }
- }
- }
- } catch (Exception be) {
- logger.error("delete table " + tableName + " data error. " + be.getMessage());
- XxlJobLogger.log("delete table " + tableName + " data error. " + be.getMessage());
- } finally {
- IoUtil.close(table);
- IoUtil.close(index_time_table);
- }
- }
-
- private void deleteData(String tableName, Date date, int maxDays, Result result) {
- String fileName = "";
- Table table = null;
- Table index_time_table = null;
- Table index_filename_table = null;
- try {
- table = hbaseConnection.getTable(TableName.valueOf(tableName));
- index_time_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getTime_index_table_prefix() + tableName));
- index_filename_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getFilename_index_table_prefix() + tableName));
- if (!result.isEmpty() && result.containsColumn("meta".getBytes(), "isparent".getBytes())) {
- int isparent = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("meta".getBytes(), "isparent".getBytes())));
- String filenamekey = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filenamekey"))));
- String timekey = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("timekey"))));
- fileName = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filename"))));
- if (isparent == 0 || isparent == 2) {
- Cell filesize = result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filesize"));
- if (filesize.getTimestamp() < dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())) {
- index_filename_table.delete(new Delete(filenamekey.getBytes()));
- table.delete(new Delete(result.getRow()));
- index_time_table.delete(new Delete(timekey.getBytes()));
- }else {
- Increment increment = new Increment(timekey.getBytes());
- increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1);
- index_time_table.increment(increment);
- }
- } else if (isparent == 1) {
- Cell partNameCell = result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("partname"));
- if (partNameCell.getTimestamp() < dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())) {
- String[] filenamekeys = filenamekey.split(",");
- for (String fk : filenamekeys) {
- if (!"".equals(fk)) {
- index_filename_table.delete(new Delete(fk.getBytes()));
- }
- }
- String[] partNames = Bytes.toString(CellUtil.cloneValue(partNameCell)).split(",");
- ArrayList<Delete> partDeletes = new ArrayList<>();
- for (String part : partNames) {
- partDeletes.add(new Delete(getRowKey(fileName + "|" + part).getBytes()));
- }
- partDeletes.add(new Delete(result.getRow()));
- table.delete(partDeletes);
- String[] timekeys = timekey.split(",");
- for (String tk : timekeys) {
- if (!"".equals(tk)) {
- index_time_table.delete(new Delete(tk.getBytes()));
- }
- }
- }else {
- String[] timekeys = timekey.split(",");
- for (String tk : timekeys) {
- Increment increment = new Increment(tk.getBytes());
- increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1);
- index_time_table.increment(increment);
- }
- }
- } else if (isparent == 3) {
- int update = 0;
- ArrayList<Delete> partDeletes = new ArrayList<>();
- ArrayList<Get> partGets = new ArrayList<>();
- for (String partHead : hbaseConfig.getPart_head()) {
- Get getPart = new Get((partHead + "|" + Bytes.toString(result.getRow())).getBytes());
- getPart.addFamily("meta".getBytes());
- getPart.setTimeRange(dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString()), dateToTimestamp(DateUtil.beginOfDay(date).toString()));
- getPart.setMaxResultsPerColumnFamily(1);
- partGets.add(getPart);
- partDeletes.add(new Delete((partHead + "|" + Bytes.toString(result.getRow())).getBytes()));
- }
- for (Result part : table.get(partGets)) {
- if (!part.isEmpty()) {
- update += 1;
- break;
- }
- }
- if (update == 0) {
- String[] filenamekeys = filenamekey.split(",");
- for (String fk : filenamekeys) {
- if (!"".equals(fk)) {
- index_filename_table.delete(new Delete(fk.getBytes()));
- }
- }
- partDeletes.add(new Delete(result.getRow()));
- table.delete(partDeletes);
- String[] timekeys = timekey.split(",");
- for (String tk : timekeys) {
- if (!"".equals(tk)) {
- index_time_table.delete(new Delete(tk.getBytes()));
- }
- }
- }else {
- String[] timekeys = timekey.split(",");
- for (String tk : timekeys) {
- Increment increment = new Increment(tk.getBytes());
- increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1);
- index_time_table.increment(increment);
- }
- }
- }
- }
- } catch (Exception be) {
- logger.error("delete " + tableName + " " + fileName + " error. " + be.getMessage());
- XxlJobLogger.log("delete " + tableName + " " + fileName + " error. " + be.getMessage());
- deleteErrorCount++;
- } finally {
- IoUtil.close(table);
- IoUtil.close(index_filename_table);
- IoUtil.close(index_time_table);
- }
- }
-
- private Map<String, Object> validParams(String params) {
- logger.info("params{}", params);
- if (StringUtil.isBlank(params)) {
- XxlJobLogger.log("params is Empty !");
- return null;
- }
- Map<String, Object> paramMap = (Map) JsonMapper.fromJsonString(params, Map.class);
- if (paramMap == null) {
- XxlJobLogger.log("params error !");
- return null;
- }
- return paramMap;
- }
-
- private long dateToTimestamp(String date) {
- return DateUtil.parse(date).getTime();
- }
-
- private String getRowKey(String filename) {
- String md5str = DigestUtil.md5Hex(filename);
- md5str = md5str.substring(8, 24);
- return md5str;
- }
-}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
index c224b9c..39c9077 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java
@@ -302,7 +302,7 @@ public class LogStorageQuotaJob {
*
* @param params{"maxdays":30}
*/
- @XxlJob("deleteFilesOldJobHandler")
+ @XxlJob("deleteFilesJobHandler")
public ReturnT<String> deleteFiles(String params) {
try {
Map<String, Object> paramsMap = validParams(params);