diff options
5 files changed, 2 insertions, 587 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml index 0ade40f..8ec09cb 100644 --- a/galaxy-job-executor/pom.xml +++ b/galaxy-job-executor/pom.xml @@ -111,11 +111,6 @@ <artifactId>async-http-client</artifactId> <version>1.9.40</version> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>2.2.3</version> - </dependency> </dependencies> <build> @@ -184,7 +179,7 @@ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE> </buildArgs> <imageTags> - <imageTag>v1.3.220823</imageTag> + <imageTag>v1.3.221101-rc1</imageTag> </imageTags> <resources> <resource> diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java deleted file mode 100644 index 8a65a9c..0000000 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/config/HbaseConfig.java +++ /dev/null @@ -1,231 +0,0 @@ -package com.mesalab.executor.core.config; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -import java.util.List; - -@Configuration -@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.PROPERTIES, autoRefreshed = true) -public class HbaseConfig { - private String zookeeper_quorum; - - private String zookeeper_property_clientPort; - - private String zookeeper_znode_parent; - - private String client_pause; - - private Integer client_retries_number; - - private Integer rpc_timeout; - - private Integer connect_pool; - - private Integer delete_file_thread; - - private Integer scanner_timeout; - - private Integer scanner_max_result_size; - - private Integer scanner_caching; - - private String time_index_table_prefix; - - private String filename_index_table_prefix; - - private String partfile_index_table_prefix; - - private String system_bucket_meta; - - private String client_keyvalue_maxsize; - - private List<String> filename_head; - - private List<String> part_head; - - private Integer get_batch; - - private Integer delete_batch; - - private Integer scan_batch; - - private Long executeTime; - - public Integer getDelete_file_thread() { - return delete_file_thread; - } - - public void setDelete_file_thread(Integer delete_file_thread) { - this.delete_file_thread = delete_file_thread; - } - - public Long getExecuteTime() { - return executeTime; - } - - public void setExecuteTime(Long executeTime) { - this.executeTime = executeTime; - } - - public Integer getScan_batch() { - return scan_batch; - } - - public void setScan_batch(Integer scan_batch) { - this.scan_batch = scan_batch; - } - - public Integer getDelete_batch() { - return delete_batch; - } - - public void setDelete_batch(Integer delete_batch) { - this.delete_batch = delete_batch; - } - - public String getZookeeper_quorum() { - return zookeeper_quorum; - } - - public void setZookeeper_quorum(String zookeeper_quorum) { - this.zookeeper_quorum = zookeeper_quorum; - } - - public String getZookeeper_property_clientPort() { - return zookeeper_property_clientPort; - } - - public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) { - this.zookeeper_property_clientPort = zookeeper_property_clientPort; - } - - public String getZookeeper_znode_parent() { - return zookeeper_znode_parent; - } - - public void setZookeeper_znode_parent(String zookeeper_znode_parent) { - this.zookeeper_znode_parent = zookeeper_znode_parent; - } - - public String getClient_pause() { - return client_pause; - } - - public void setClient_pause(String client_pause) { - this.client_pause = client_pause; - } - - public Integer getClient_retries_number() { - return client_retries_number; - } - - public void setClient_retries_number(Integer client_retries_number) { - this.client_retries_number = client_retries_number; - } - - public Integer getRpc_timeout() { - return rpc_timeout; - } - - public void setRpc_timeout(Integer rpc_timeout) { - this.rpc_timeout = rpc_timeout; - } - - public Integer getConnect_pool() { - return connect_pool; - } - - public void setConnect_pool(Integer connect_pool) { - this.connect_pool = connect_pool; - } - - public Integer getScanner_timeout() { - return scanner_timeout; - } - - public void setScanner_timeout(Integer scanner_timeout) { - this.scanner_timeout = scanner_timeout; - } - - public Integer getScanner_max_result_size() { - return scanner_max_result_size; - } - - public void setScanner_max_result_size(Integer scanner_max_result_size) { - this.scanner_max_result_size = scanner_max_result_size; - } - - public Integer getScanner_caching() { - return scanner_caching; - } - - public void setScanner_caching(Integer scanner_caching) { - this.scanner_caching = scanner_caching; - } - - public String getTime_index_table_prefix() { - return time_index_table_prefix; - } - - public void setTime_index_table_prefix(String time_index_table_prefix) { - this.time_index_table_prefix = time_index_table_prefix; - } - - public String getFilename_index_table_prefix() { - return filename_index_table_prefix; - } - - public void setFilename_index_table_prefix(String filename_index_table_prefix) { - this.filename_index_table_prefix = filename_index_table_prefix; - } - - public String getPartfile_index_table_prefix() { - return partfile_index_table_prefix; - } - - public void setPartfile_index_table_prefix(String partfile_index_table_prefix) { - this.partfile_index_table_prefix = partfile_index_table_prefix; - } - - public String getSystem_bucket_meta() { - return system_bucket_meta; - } - - public void setSystem_bucket_meta(String system_bucket_meta) { - this.system_bucket_meta = system_bucket_meta; - } - - public String getClient_keyvalue_maxsize() { - return client_keyvalue_maxsize; - } - - public void setClient_keyvalue_maxsize(String client_keyvalue_maxsize) { - this.client_keyvalue_maxsize = client_keyvalue_maxsize; - } - - public List<String> getFilename_head() { - return filename_head; - } - - public void setFilename_head(List<String> filename_head) { - this.filename_head = filename_head; - } - - public List<String> getPart_head() { - return part_head; - } - - public void setPart_head(List<String> part_head) { - this.part_head = part_head; - } - - public Integer getGet_batch() { - return get_batch; - } - - public void setGet_batch(Integer get_batch) { - this.get_batch = get_batch; - } -} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java deleted file mode 100644 index b410daf..0000000 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/HbaseUtil.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.mesalab.executor.core.utils; - -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import cn.hutool.core.date.DateUtil; -import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.log.Log; -import com.mesalab.executor.core.config.HbaseConfig; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class HbaseUtil { - - private Log logger = Log.get(); - - @Autowired - private HbaseConfig hbaseConfig; - - @Bean(name = "hbaseConfiguration") - public org.apache.hadoop.conf.Configuration getConfiguration() { - org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", hbaseConfig.getZookeeper_quorum()); - conf.set("hbase.zookeeper.property.clientPort", hbaseConfig.getZookeeper_property_clientPort()); - conf.set("zookeeper.znode.parent", hbaseConfig.getZookeeper_znode_parent()); - conf.setInt("hbase.client.retries.number", hbaseConfig.getClient_retries_number()); - conf.setInt("hbase.rpc.timeout", hbaseConfig.getRpc_timeout()); - conf.set("hbase.client.keyvalue.maxsize", hbaseConfig.getClient_keyvalue_maxsize()); - conf.setInt("hbase.client.ipc.pool.size", hbaseConfig.getConnect_pool()); - conf.setInt("hbase.client.scanner.timeout.period", hbaseConfig.getScanner_timeout()); - conf.setInt("hbase.client.scanner.max.result.size", hbaseConfig.getScanner_max_result_size()); -// conf.setInt("hbase.client.scanner.caching", hbaseConfig.getScanner_caching()); - return conf; - } - - @Bean(name = "hbaseConnection") - public Connection getConnection(@Qualifier("hbaseConfiguration") org.apache.hadoop.conf.Configuration conf) { - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(conf); - } catch (IOException e) { - logger.error("create hbase connection error.", e); - } - return conn; - } - - @Bean(name = "threadPool") - public ExecutorService getThreadPool() { - return Executors.newFixedThreadPool(hbaseConfig.getDelete_file_thread()); - } - - -} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java deleted file mode 100644 index 43804f5..0000000 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/HosTtlJob.java +++ /dev/null @@ -1,289 +0,0 @@ -package com.mesalab.executor.jobhandler; - -import cn.hutool.core.date.DateUtil; -import cn.hutool.core.io.IoUtil; -import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.log.Log; -import com.mesalab.executor.core.config.HbaseConfig; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.annotation.XxlJob; -import com.xxl.job.core.log.XxlJobLogger; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import org.apache.commons.lang3.ObjectUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.*; -import java.util.concurrent.ExecutorService; - -@Component -public class HosTtlJob { - private Log logger = Log.get(); - - @Autowired - private Connection hbaseConnection; - @Autowired - private ExecutorService threadPool; - @Autowired - private HbaseConfig hbaseConfig; - - private static final String max_days = "maxDays"; - private int deleteCount; - private int deleteErrorCount; - - /** - * 删除过期数据 - * - * @param params{"maxdays":30} - */ - @XxlJob("deleteFilesJobHandler") - public ReturnT<String> deleteExpiredFile(String params) { - try { - Map<String, Object> paramsMap = validParams(params); - if (ObjectUtils.isEmpty(paramsMap)) { - logger.error("params parser error , params is {}", params); - return IJobHandler.FAIL; - } - Date date = DateUtil.date(); - XxlJobLogger.log("date: " + date); - ArrayList<String> bucketList = getTableList(); - XxlJobLogger.log("bucket list: " + bucketList); - deleteCount = 0; - deleteErrorCount = 0; - if (bucketList.size() > 0) { - for (String bucket : bucketList) { - deleteBucketFile(bucket, date, Integer.parseInt(String.valueOf(paramsMap.get(max_days)))); - } - XxlJobLogger.log("delete file count: " + deleteCount + "; delete error count : " + deleteErrorCount); - } - } catch (RuntimeException be) { - logger.error(be.getMessage()); - XxlJobLogger.log(be.getMessage()); - return ReturnT.FAIL; - } - return ReturnT.SUCCESS; - } - - private ArrayList<String> getTableList() { - ArrayList<String> tableList = new ArrayList<>(); - Table table = null; - try { - table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getSystem_bucket_meta())); - Scan scan = new Scan(); - ResultScanner scanner = table.getScanner(scan); - for (Result rs : scanner) { - tableList.add(Bytes.toString(rs.getRow())); - } - } catch (Exception be) { - logger.error("get bucket list error. " + be.getMessage()); - XxlJobLogger.log("get bucket list error. " + be.getMessage()); - } finally { - IoUtil.close(table); - } - return tableList; - } - - private void deleteBucketFile(String tableName, Date date, int maxDays) { - Table table = null; - Table index_time_table = null; - try { - table = hbaseConnection.getTable(TableName.valueOf(tableName)); - index_time_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getTime_index_table_prefix() + tableName)); - for (String prefix : hbaseConfig.getFilename_head()) { - String startRow = prefix; - while (true) { - ResultScanner scanner = null; - Set<Get> fileGets = new HashSet<>(); - try { - Scan scan = new Scan(); - scan.setRowPrefixFilter((prefix + "|").getBytes()); - scan.withStartRow(startRow.getBytes(), false); - scan.withStopRow((prefix + "|" + dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())).getBytes()); - scan.setCaching(hbaseConfig.getScanner_caching()); - scan.setLimit(hbaseConfig.getScan_batch()); - scanner = index_time_table.getScanner(scan); - for (Result rs : scanner) { - if (!rs.isEmpty()) { - startRow = Bytes.toString(rs.getRow()); - Get get = new Get(Bytes.toString(rs.getRow()).split("\\|")[2].getBytes()); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("isparent")); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filenamekey")); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("timekey")); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("partname")); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filesize")); - get.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("filename")); - fileGets.add(get); - } - } - } catch (Exception be) { - logger.error("scan index_time_" + tableName + " data error, start row is " + startRow + ". " + be.getMessage()); - XxlJobLogger.log("scan index_time_table error, start row is " + startRow + ". " + be.getMessage()); - } finally { - IoUtil.close(scanner); - } - - if (fileGets.size() <= 0) { - break; - } else { - Result[] results = table.get(new ArrayList<>(fileGets)); - for (Result result : results) { - threadPool.submit(new Runnable() { - @Override - public void run() { - deleteData(tableName, date, maxDays, result); - } - }); - deleteCount++; - } - } - } - } - } catch (Exception be) { - logger.error("delete table " + tableName + " data error. " + be.getMessage()); - XxlJobLogger.log("delete table " + tableName + " data error. " + be.getMessage()); - } finally { - IoUtil.close(table); - IoUtil.close(index_time_table); - } - } - - private void deleteData(String tableName, Date date, int maxDays, Result result) { - String fileName = ""; - Table table = null; - Table index_time_table = null; - Table index_filename_table = null; - try { - table = hbaseConnection.getTable(TableName.valueOf(tableName)); - index_time_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getTime_index_table_prefix() + tableName)); - index_filename_table = hbaseConnection.getTable(TableName.valueOf(hbaseConfig.getFilename_index_table_prefix() + tableName)); - if (!result.isEmpty() && result.containsColumn("meta".getBytes(), "isparent".getBytes())) { - int isparent = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell("meta".getBytes(), "isparent".getBytes()))); - String filenamekey = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filenamekey")))); - String timekey = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("timekey")))); - fileName = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filename")))); - if (isparent == 0 || isparent == 2) { - Cell filesize = result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("filesize")); - if (filesize.getTimestamp() < dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())) { - index_filename_table.delete(new Delete(filenamekey.getBytes())); - table.delete(new Delete(result.getRow())); - index_time_table.delete(new Delete(timekey.getBytes())); - }else { - Increment increment = new Increment(timekey.getBytes()); - increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1); - index_time_table.increment(increment); - } - } else if (isparent == 1) { - Cell partNameCell = result.getColumnLatestCell(Bytes.toBytes("meta"), Bytes.toBytes("partname")); - if (partNameCell.getTimestamp() < dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString())) { - String[] filenamekeys = filenamekey.split(","); - for (String fk : filenamekeys) { - if (!"".equals(fk)) { - index_filename_table.delete(new Delete(fk.getBytes())); - } - } - String[] partNames = Bytes.toString(CellUtil.cloneValue(partNameCell)).split(","); - ArrayList<Delete> partDeletes = new ArrayList<>(); - for (String part : partNames) { - partDeletes.add(new Delete(getRowKey(fileName + "|" + part).getBytes())); - } - partDeletes.add(new Delete(result.getRow())); - table.delete(partDeletes); - String[] timekeys = timekey.split(","); - for (String tk : timekeys) { - if (!"".equals(tk)) { - index_time_table.delete(new Delete(tk.getBytes())); - } - } - }else { - String[] timekeys = timekey.split(","); - for (String tk : timekeys) { - Increment increment = new Increment(tk.getBytes()); - increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1); - index_time_table.increment(increment); - } - } - } else if (isparent == 3) { - int update = 0; - ArrayList<Delete> partDeletes = new ArrayList<>(); - ArrayList<Get> partGets = new ArrayList<>(); - for (String partHead : hbaseConfig.getPart_head()) { - Get getPart = new Get((partHead + "|" + Bytes.toString(result.getRow())).getBytes()); - getPart.addFamily("meta".getBytes()); - getPart.setTimeRange(dateToTimestamp(DateUtil.beginOfDay(DateUtil.offsetDay(date, -maxDays)).toString()), dateToTimestamp(DateUtil.beginOfDay(date).toString())); - getPart.setMaxResultsPerColumnFamily(1); - partGets.add(getPart); - partDeletes.add(new Delete((partHead + "|" + Bytes.toString(result.getRow())).getBytes())); - } - for (Result part : table.get(partGets)) { - if (!part.isEmpty()) { - update += 1; - break; - } - } - if (update == 0) { - String[] filenamekeys = filenamekey.split(","); - for (String fk : filenamekeys) { - if (!"".equals(fk)) { - index_filename_table.delete(new Delete(fk.getBytes())); - } - } - partDeletes.add(new Delete(result.getRow())); - table.delete(partDeletes); - String[] timekeys = timekey.split(","); - for (String tk : timekeys) { - if (!"".equals(tk)) { - index_time_table.delete(new Delete(tk.getBytes())); - } - } - }else { - String[] timekeys = timekey.split(","); - for (String tk : timekeys) { - Increment increment = new Increment(tk.getBytes()); - increment.addColumn("meta".getBytes(), "scancount".getBytes(), 1); - index_time_table.increment(increment); - } - } - } - } - } catch (Exception be) { - logger.error("delete " + tableName + " " + fileName + " error. " + be.getMessage()); - XxlJobLogger.log("delete " + tableName + " " + fileName + " error. " + be.getMessage()); - deleteErrorCount++; - } finally { - IoUtil.close(table); - IoUtil.close(index_filename_table); - IoUtil.close(index_time_table); - } - } - - private Map<String, Object> validParams(String params) { - logger.info("params{}", params); - if (StringUtil.isBlank(params)) { - XxlJobLogger.log("params is Empty !"); - return null; - } - Map<String, Object> paramMap = (Map) JsonMapper.fromJsonString(params, Map.class); - if (paramMap == null) { - XxlJobLogger.log("params error !"); - return null; - } - return paramMap; - } - - private long dateToTimestamp(String date) { - return DateUtil.parse(date).getTime(); - } - - private String getRowKey(String filename) { - String md5str = DigestUtil.md5Hex(filename); - md5str = md5str.substring(8, 24); - return md5str; - } -} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java index c224b9c..39c9077 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/LogStorageQuotaJob.java @@ -302,7 +302,7 @@ public class LogStorageQuotaJob { * * @param params{"maxdays":30} */ - @XxlJob("deleteFilesOldJobHandler") + @XxlJob("deleteFilesJobHandler") public ReturnT<String> deleteFiles(String params) { try { Map<String, Object> paramsMap = validParams(params); |
