diff options
| author | zhanghongqing <[email protected]> | 2020-07-31 12:47:04 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2020-07-31 12:47:04 +0800 |
| commit | 575d4f2d7a48e2b1713ad665c389820e179c7d66 (patch) | |
| tree | 66d7273690e7ba47d7d6dfcfc42bb77423b1c119 | |
| parent | 500895f22756db558f360de812733ab6cec3e04f (diff) | |
| parent | ca542f87d5eec1ffe99e2447c49c7c2869ff1d28 (diff) | |
Merge branch 'develop' into dev20200519+consuldev20200519+consul
Conflicts:
xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/KafkaConfig.java
xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/StorgeConfig.java
xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/LogStorageQuotaJob.java
xxl-job-executor-galaxy/src/main/resources/application-executor.yml
xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/ShellTest.java
16 files changed, 1941 insertions, 15 deletions
diff --git a/xxl-job-executor-galaxy/pom.xml b/xxl-job-executor-galaxy/pom.xml index 87b9a52..2712b22 100644 --- a/xxl-job-executor-galaxy/pom.xml +++ b/xxl-job-executor-galaxy/pom.xml @@ -25,7 +25,7 @@ <properties> <xxl-job-core.version>2.2.0</xxl-job-core.version> - <galaxy.version>1.0.2</galaxy.version> + <galaxy.version>1.0.4</galaxy.version> <httpclient.version>4.5.6</httpclient.version> <lombok.version>1.16.20</lombok.version> <docker.build>192.168.40.153</docker.build> @@ -156,9 +156,9 @@ <JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73</JDK_IMAGE> <JAR_FILE>${project.build.finalName}.jar</JAR_FILE> </buildArgs> -<!-- <imageTags> --> -<!-- <imageTag>1.0</imageTag> --> -<!-- </imageTags> --> + <imageTags> + <imageTag>v1.0.2.20200628</imageTag> + </imageTags> <!--将构建jar拷贝到/target/docker 目录下与dockerfile一起--> <resources> diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/KafkaConfig.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/KafkaConfig.java index e05e55f..f7b2572 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/KafkaConfig.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/KafkaConfig.java @@ -5,7 +5,10 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
+<<<<<<< HEAD import org.springframework.cloud.context.config.annotation.RefreshScope;
+======= +>>>>>>> refs/heads/develop import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@@ -28,7 +31,7 @@ public class KafkaConfig { configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
configs.put(ProducerConfig.RETRIES_CONFIG, 0);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- configs.put(ProducerConfig.ACKS_CONFIG, "-1");
+ configs.put(ProducerConfig.ACKS_CONFIG, "1");
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 524288);
//设置序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/StorgeConfig.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/StorgeConfig.java index 6b45d81..1b50087 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/StorgeConfig.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/config/StorgeConfig.java @@ -1,7 +1,9 @@ package com.xxl.job.executor.core.config; import org.springframework.beans.factory.annotation.Value; + import org.springframework.cloud.context.config.annotation.RefreshScope; + import org.springframework.context.annotation.Configuration; import lombok.Data; @@ -26,7 +28,7 @@ public class StorgeConfig { @Value("${storge.traffic.clustername}") private String trafficClusterName = "ck_cluster"; - + @Value("${storge.analytic.server.select}") private String analyticServerSelect;// druid 查询节点ip和port @@ -68,4 +70,37 @@ public class StorgeConfig { return sb.toString(); } + @Value("${storge.traffic.system.parts}") + private String systemParts ; + + @Value("${storge.traffic.system.disks}") + private String systemDisks ; + + @Value("${storge.traffic.system.tables}") + private String systemTables ; + + + @Value("${storge.analytic.server.select}") + private String analyticServerSelect;// druid 查询节点ip和port + + @Value("${storge.analytic.server.delete}") + private String analyticServerDelete;// druid 删除节点ip和port + + @Value("${storge.files.server}") + private String filesServer;// 文件服务器 ip:port + + @Value("${storge.files.hbase-linux-dir}") + private String hbaseLinuxDir;// 单机数据目录在linux上的位置 + + @Value("${storge.files.hbase-hdfs-dir}") + private String hbaseHdfsDir;// HBase 在HDFS上的路径 + + @Value("${storge.isclusder}") + private Boolean isClusder;// 是否为集群 + + @Value("${storge.files.token}") + private String filesToken; //登录验证 + + + } diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/HttpClientUtils.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/HttpClientUtils.java index 0200aab..b0127e9 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/HttpClientUtils.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/HttpClientUtils.java @@ -14,6 +14,7 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLHandshakeException; import org.apache.commons.io.IOUtils; +import org.apache.http.Header; import org.apache.http.HeaderElement; import org.apache.http.HeaderElementIterator; import org.apache.http.HttpEntity; @@ -27,14 +28,17 @@ import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeaderElementIterator; @@ -158,7 +162,7 @@ public class HttpClientUtils { * @param url 请求地 * @return */ - public static String httpGet(String url) { + public static String httpGet(String url, Header... headers) { String msg = "-1"; // 获取客户端连接对象 @@ -173,6 +177,13 @@ public class HttpClientUtils { logger.info("http get uri {}",uri); // 创建GET请求对象 HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}",h); + } + } // 执行请求 response = httpClient.execute(httpGet); int statusCode = response.getStatusLine().getStatusCode(); @@ -221,7 +232,7 @@ public class HttpClientUtils { * @param requestBody * @return */ - public static String httpPost(String url, String requestBody) { + public static String httpPost(String url, String requestBody, Header... headers) { String msg = "-1"; // 获取客户端连接对象 CloseableHttpClient httpClient = getHttpClient(); @@ -238,6 +249,13 @@ public class HttpClientUtils { HttpPost httpPost = new HttpPost(uri); httpPost.setHeader("Content-Type", "application/json"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + logger.info("request header : {}",h); + } + } + if(StringUtil.isNotBlank(requestBody)) { httpPost.setEntity(new ByteArrayEntity(requestBody.getBytes("utf-8"))); } @@ -282,7 +300,132 @@ public class HttpClientUtils { } return msg; } + /** + * PUT 请求 + * @param url + * @param requestBody + * @return + */ + public static String httpPut(String url, String requestBody, Header... headers) { + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + URL ul = new URL(url); + + URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + + logger.info("http post uri:{}, http post body:{}", uri, requestBody); + + HttpPut httpPut = new HttpPut(uri); + httpPut.setHeader("Content-Type", "application/json"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPut.addHeader(h); + logger.info("request header : {}",h); + } + } + + if(StringUtil.isNotBlank(requestBody)) { + httpPut.setEntity(new ByteArrayEntity(requestBody.getBytes("utf-8"))); + } + + response = httpClient.execute(httpPut); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + throw new BusinessException(response.getStatusLine().getStatusCode(), + "Http put content is :" + msg); + } + } catch (URISyntaxException e) { + logger.error("URI 转换错误: {}", e.getMessage()); + e.printStackTrace(); + + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + e.printStackTrace(); + + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + e.printStackTrace(); + + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + e.printStackTrace(); + + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + e.printStackTrace(); + } + } + } + return msg; + } + /** + * 发送http delete请求 + */ + public static String httpDelete(String url, Header... headers){ + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + + try { + URL ul = new URL(url); + + URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + logger.info("http delete uri:{}", uri); + + HttpDelete httpdelete = new HttpDelete(uri); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpdelete.addHeader(h); + logger.info("request header : {}",h); + } + } + response = httpClient.execute(httpdelete); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + throw new BusinessException(response.getStatusLine().getStatusCode(), + "Http post content is :" + msg); + } + + } catch (Exception e) { + e.printStackTrace(); + }finally{ + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + e.printStackTrace(); + } + } + } + return msg; + } diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/XmlUtil.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/XmlUtil.java new file mode 100644 index 0000000..9ac46f4 --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/XmlUtil.java @@ -0,0 +1,58 @@ +package com.xxl.job.executor.core.utils;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+public class XmlUtil {
+
+ /**
+ * xml转对象
+ * @param clazz
+ * @param xmlStr
+ * @return
+ */
+ public static <T> T converXmlToBean(Class<T> clazz, String xmlStr) {
+ T xmlObject = null;
+ try {
+ JAXBContext context = JAXBContext.newInstance(clazz);
+ Unmarshaller unmarshaller = context.createUnmarshaller();
+ StringReader sr = new StringReader(xmlStr);
+ xmlObject = (T) unmarshaller.unmarshal(sr);
+
+ } catch (JAXBException e) {
+ e.printStackTrace();
+ }
+ return xmlObject;
+ }
+
+ /**
+ *
+ * 对象转xml
+ *
+ * @param obj
+ */
+ public static String converToXml(Object obj) {
+ StringWriter sw = new StringWriter();
+
+ try {
+ // jdk自带转换类
+ JAXBContext context = JAXBContext.newInstance(obj.getClass());
+ // 对象转xml
+ Marshaller marshaller = context.createMarshaller();
+ // 格式化xml输出
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+ // 将对象转换成输出流形式的XML
+ marshaller.marshal(obj, sw);
+
+ } catch (JAXBException e) {
+ e.printStackTrace();
+ }
+ return sw.toString();
+ }
+
+}
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/ZookeeperUtils.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/ZookeeperUtils.java new file mode 100644 index 0000000..526d48f --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/core/utils/ZookeeperUtils.java @@ -0,0 +1,136 @@ +package com.xxl.job.executor.core.utils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.base.Splitter;
+
+public class ZookeeperUtils implements Watcher {
+ private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
+
+ private ZooKeeper zookeeper;
+
+ private static final int SESSION_TIME_OUT = 10000;//ms
+
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ countDownLatch.countDown();
+ }
+ }
+
+ /**
+ * 修改节点信息
+ *
+ * @param path 节点路径
+ */
+ public void modifyNode(String path, String data, String zookeeperIp) {
+ createNode(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat stat = zookeeper.exists(path, true);
+ if (stat != null) {
+ zookeeper.setData(path, data.getBytes(), stat.getVersion());
+ } else {
+ logger.error("Node does not exist!,Can't modify");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ closeConn();
+ }
+ }
+
+ /**
+ * 连接zookeeper
+ *
+ * @param host 地址
+ */
+ public void connectZookeeper(String host) {
+ try {
+ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
+ countDownLatch.await();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 关闭连接
+ */
+ public void closeConn() {
+ try {
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 获取节点内容
+ *
+ * @param path 节点路径
+ * @return 内容/异常null
+ */
+ public String getNodeData(String path, String zookeeperIp) {
+ String result = null;
+ Stat stat = new Stat();
+ connectZookeeper(zookeeperIp);
+ try {
+ byte[] resByte = zookeeper.getData(path, true, stat);
+ result = new String(resByte);
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("Get node information exception");
+ e.printStackTrace();
+ } finally {
+ closeConn();
+ }
+ return result;
+ }
+
+ /**
+ * @param path 节点创建的路径
+ * @param data 节点所存储的数据的byte[]
+ * @param acls 控制权限策略
+ */
+ public void createNode(String path, byte[] data, List<ACL> acls, String zookeeperIp) {
+ try {
+ connectZookeeper(zookeeperIp);
+ List<String> list = Splitter.on("/").omitEmptyStrings().splitToList(path);
+ StringBuffer sb= new StringBuffer();
+ for (String dir : list) {
+ sb.append("/").append(dir);
+ Stat exists = zookeeper.exists(sb.toString(), true);
+ if (exists == null) {
+ zookeeper.create(sb.toString(), null, acls, CreateMode.PERSISTENT);
+ }
+ }
+ Stat exists = zookeeper.exists(path, true);
+ if (exists == null) {
+ zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
+ } else {
+ logger.warn("Node already exists ! Don't need to create");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ closeConn();
+ }
+ }
+
+}
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/DataflowJob.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/DataflowJob.java index 4dacb9d..ab19ace 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/DataflowJob.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/DataflowJob.java @@ -1,4 +1,4 @@ -package com.xxl.job.executor.service.jobhandler; +package com.xxl.job.executor.jobhandler; import java.util.List; import java.util.Map; diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/DruidNativeQueryJob.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/DruidNativeQueryJob.java index d5f08d1..d1873c9 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/DruidNativeQueryJob.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/DruidNativeQueryJob.java @@ -1,4 +1,4 @@ -package com.xxl.job.executor.service.jobhandler; +package com.xxl.job.executor.jobhandler; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/LogStorageQuotaJob.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/LogStorageQuotaJob.java new file mode 100644 index 0000000..20263b5 --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/LogStorageQuotaJob.java @@ -0,0 +1,551 @@ +package com.xxl.job.executor.jobhandler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.IJobHandler;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import com.xxl.job.executor.Exception.BusinessException;
+import com.xxl.job.executor.core.config.StorgeConfig;
+import com.xxl.job.executor.core.utils.HttpClientUtils;
+import com.xxl.job.executor.core.utils.KafkaUtils;
+import com.xxl.job.executor.core.utils.XmlUtil;
+import com.xxl.job.executor.pojo.LifecycleConfiguration;
+import com.xxl.job.executor.pojo.LifecycleConfiguration.Expiration;
+import com.xxl.job.executor.pojo.ListAllMyBucketsResult;
+import com.xxl.job.executor.pojo.ListAllMyBucketsResult.Buckets;
+import com.xxl.job.executor.pojo.ListAllMyBucketsResult.Buckets.Bucket;
+import com.xxl.job.executor.service.StorageQuotaService;
+import com.zdjizhi.utils.DateUtils;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.TimeConstants;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+@Component
+public class LogStorageQuotaJob {
+
+ private static Logger logger = LoggerFactory.getLogger(LogStorageQuotaJob.class);
+
+ private static String trafficServer;
+ private static String trafficDataSource;
+ private static String trafficUserName;
+ private static String trafficPassword;
+
+ private static String trafficClusterName;//clickhouse集群名称
+ private static String queryClickhouseUrl;
+
+ private static String analyticServer;
+ private static String selectAnalyticServer;
+ private static List<String> trafficTableList;
+ private static List<String> analyticTableList;
+ private static String queryDruidUrl;
+
+ private static String systemParts;
+ private static String systemTables;
+
+ private static String filesServer;
+ private static String filesToken;
+
+ // zookeeper /path+node
+ private static final String TRAFFIC_LOGS = "Traffic-Logs";
+ private static final String REPORT_AND_METRICS = "Report-and-Metrics";
+ private static final String FILES = "Files";
+
+ @Autowired
+ StorgeConfig storgeConfig;
+
+ @Autowired
+ StorageQuotaService storageQuotaService;
+
+
+ @Autowired
+ public LogStorageQuotaJob(StorgeConfig deletionConfig) {
+ trafficServer = deletionConfig.getTrafficServer();
+ trafficDataSource = deletionConfig.getTrafficDatasource();
+ trafficUserName = deletionConfig.getTrafficUsername();
+ trafficPassword = deletionConfig.getTrafficPassword();
+ trafficClusterName = deletionConfig.getTrafficClusterName();
+ systemParts = deletionConfig.getSystemParts();
+ systemTables= deletionConfig.getSystemTables();
+
+ analyticServer = deletionConfig.getAnalyticServerDelete();//用于Druid删除
+ selectAnalyticServer = deletionConfig.getAnalyticServerSelect();//用于Druid查询
+
+ queryDruidUrl = Joiner.on("").join("http://", selectAnalyticServer, "/druid/v2/sql");
+ queryClickhouseUrl = Joiner.on("").join("http://", trafficServer);
+
+ filesServer = deletionConfig.getFilesServer();
+ filesToken = deletionConfig.getFilesToken();
+ }
+
+ //*********************************************设置存储策略**********************************************************
+
+
+ /**
+ * 设置流量日志存储策略储
+ * clickhouse 数据库
+ * @param params {"maxdays":30}
+ */
+ @XxlJob("deleteTrafficDataJobHandler")
+ public ReturnT<String> deleteTrafficData(String params) {
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+
+ Map<String, Object> deleteParamMap = getDeleteSource();
+
+ String prefixDeleteSql = Joiner.on("").join("ALTER TABLE ", trafficDataSource,".");
+ String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ",trafficClusterName," DROP PARTITION ");
+ Integer maxdays = Integer.valueOf(String.valueOf(paramsMap.get("maxdays")));
+ String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYYMMDD, -maxdays);
+ // 设置一个获取sql的方法,把表List
+ getTrafficTables().forEach(table->{
+
+ String selectSql = Joiner.on("").join("SELECT `partition` from "+systemParts+" WHERE table = '", table, "' AND `partition` < '",deleteMaxDate,"' FORMAT JSON;");
+ //查询所有小于设定时间的分区
+ deleteParamMap.put("query", selectSql);
+ String httpGetResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(queryClickhouseUrl, deleteParamMap));
+ Map httpGetResultMap = (Map) JsonMapper.fromJsonString(httpGetResult, Map.class);
+ List<Map> partitionList = (List) httpGetResultMap.get("data");
+
+ partitionList.forEach(partition->{
+ String deleteSql = Joiner.on("").join(prefixDeleteSql, table, suffixDeleteSql, partition.get("partition"));
+ deleteParamMap.put("query", deleteSql);
+ XxlJobLogger.log("delete clickhouse sql:{}", deleteSql);
+
+ String deleteMessage = HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(queryClickhouseUrl, deleteParamMap), null);
+ XxlJobLogger.log("delete clickhouse message:{}", deleteMessage);
+ XxlJobLogger.log("delete table {} success", table);
+ });
+ });
+ modifyLastStorage(TRAFFIC_LOGS, maxdays);
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+
+ /**
+ * Report and Metrics 日志存储策略
+ * druid 数据库
+ * @param params{"maxdays":365}
+ */
+ @XxlJob("deleteReportAndMetricsDataJobHandler")
+ public ReturnT<String> deleteReportAndMetricsData(String params) {
+
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+
+ //修改规则
+ String changeRulesUrl = Joiner.on("").join("http://", analyticServer, "/druid/coordinator/v1/rules/");
+ String changeRulesParam = Joiner.on("").join("[{\"type\":\"loadByPeriod\",\"period\":\"P", paramsMap.get("maxdays"), "D\",\"tieredReplicants\":{\"_default_tier\":1}},{\"type\":\"dropForever\"}]");
+ //物理删除除
+ String deleteUrl = Joiner.on("").join("http://", analyticServer, "/druid/indexer/v1/task");
+ String prefixDeleteParam = "{\"type\":\"kill\",\"dataSource\":\"";
+ //起始时间/当前时间-maxDay
+ String druidDataStartTime = getDruidDataStartTime();
+ String deleteMaxDate = DateUtils.getSomeDate(TimeConstants.YYYY_MM_DD, -Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))));
+ String suffixDeleteParam = Joiner.on("").join("\",\"interval\":\"", druidDataStartTime,"/", deleteMaxDate, "\"}");
+ getAnalyticTables().forEach(table->{
+
+ XxlJobLogger.log("delete first command table :{}, sql : {}",table,changeRulesParam);
+ HttpClientUtils.httpPost(changeRulesUrl+table, changeRulesParam);
+ if(druidDataStartTime.compareTo(deleteMaxDate)<0) {
+ XxlJobLogger.log("delete second command table :{}, sql : {}",table,prefixDeleteParam+table+suffixDeleteParam);
+ String deleteMessage = HttpClientUtils.httpPost(deleteUrl, prefixDeleteParam+table+suffixDeleteParam);
+ XxlJobLogger.log("delete result message :{}",deleteMessage);
+ }
+ });
+ modifyLastStorage(REPORT_AND_METRICS, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))));
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+ /**
+ * 设置文件存储策略
+ * @param params{"maxdays":30}
+ */
+ @XxlJob("deleteFilesJobHandler")
+ public ReturnT<String> deleteFiles(String params){
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+ Header header1 = new BasicHeader("Token", filesToken);
+ Header header2 = new BasicHeader(HttpHeaders.CONTENT_TYPE, "text/xml");
+ Header[] headers = {header1,header2};
+
+ String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer,"/hos/"), header1);
+ ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
+ if(ObjectUtils.allNotNull(list,list.getBuckets(),list.getBuckets().getBucket())) {
+ Buckets buckets = list.getBuckets();
+ List<Bucket> bucketList = buckets.getBucket();
+ XxlJobLogger.log("bucket list {} ",bucketList);
+ LifecycleConfiguration lc = new LifecycleConfiguration();
+ lc.getRule().setExpiration(new Expiration(String.valueOf(paramsMap.get("maxdays"))));
+ int failCount = 0;
+ for (Bucket bucket : bucketList) {
+ try {
+ HttpClientUtils.httpPut(Joiner.on("").join(filesServer,"/hos/",bucket.getName(),"?lifecycle"),XmlUtil.converToXml(lc), headers);
+ } catch (BusinessException be) {
+ logger.error("delete files fail ,{}",be.getErrorMessage());
+ XxlJobLogger.log("delete files fail ,{}",be.getErrorMessage());
+ failCount++;
+ }
+ XxlJobLogger.log("delete bucket {} ",bucket);
+ }
+ if(failCount>0) {
+ throw new BusinessException("number of total task "+bucketList.size()+", number of failed tasks "+failCount) ;
+ }
+ }
+ modifyLastStorage(FILES, Integer.valueOf(String.valueOf(paramsMap.get("maxdays"))));
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+ return ReturnT.SUCCESS;
+ }
+
+ //*********************************************清库**********************************************************
+
+ /**
+ * 清除所有流量数据,click hosue库
+ * @param params{"maxdays":365}
+ */
+ @XxlJob("deleteAllTrafficDataJobHandler")
+ public ReturnT<String> deleteAllTrafficData(String params){
+
+ try {
+ Map<String, Object> deleteParamMap = getDeleteSource();
+ //清库命令参数
+ String prefixDeleteSql = Joiner.on("").join("TRUNCATE TABLE IF EXISTS ", trafficDataSource, ".");
+ String suffixDeleteSql = Joiner.on("").join(" ON CLUSTER ", trafficClusterName);
+ getTrafficTables().forEach(table->{
+ deleteParamMap.put("query", prefixDeleteSql + table + suffixDeleteSql);
+ XxlJobLogger.log("delete clickhouse sql:{}",prefixDeleteSql + table + suffixDeleteSql);
+ HttpClientUtils.httpPost(HttpClientUtils.getUrlWithParams(queryClickhouseUrl, deleteParamMap), null);
+ XxlJobLogger.log("delete table {} success",table);
+ });
+ modifyLastStorage(TRAFFIC_LOGS, 0);
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+
+ /**
+ * 清除所有report metric数据 druid库
+ * @param params{"maxdays":30}
+ */
+ @XxlJob("deleteAllReportAndMetricsDataJobHandler")
+ public ReturnT<String> deleteAllReportAndMetricsData(String params) {
+ try {
+ Map<String, Object> paramMap = validParams(params);
+ String deleteEndDate = DateUtils.getSomeDate(TimeConstants.YYYY_MM_DD,1);
+
+ String deleteStartTime = getDruidDataStartTime();
+
+ //1停止加载
+ String changeRulesUrl = Joiner.on("").join("http://", analyticServer, "/druid/coordinator/v1/rules/");
+ String changeRulesParam = "[{ \"type\":\"dropByInterval\", \"interval\":\""+deleteStartTime+"/"+deleteEndDate+"\" }]";
+
+ //2物理删除所有数据
+ String deleteUrl = Joiner.on("").join("http://", analyticServer, "/druid/indexer/v1/task");
+ String prefixDeleteParam = "{\"type\":\"kill\",\"dataSource\":\"";
+ String suffixDeleteParam = Joiner.on("").join("\",\"interval\":\"",deleteStartTime+"/",deleteEndDate,"\"}");
+ //3恢复加载规则
+ String resetRulesUrl = Joiner.on("").join("http://", analyticServer, "/druid/coordinator/v1/rules/");
+ String resetRulesParam = Joiner.on("").join("[{\"type\":\"loadByPeriod\",\"period\":\"P", paramMap.get("maxdays"), "D\",\"tieredReplicants\":{\"_default_tier\":1}},{\"type\":\"dropForever\"}]");
+ List<String> analyticTables = getAnalyticTables();
+ int failCount = 0;
+ for( String table: analyticTables){
+ //每个表限制操作时间五分钟,开始计时
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + 1000*60*5;
+ XxlJobLogger.log("delete table first command table :{}, sql : {}",table , changeRulesParam);
+ HttpClientUtils.httpPost(changeRulesUrl+table, changeRulesParam);
+ try {
+ //查询表数据是否删除 固定key EXPR$0
+ String countTableDataSql = Joiner.on("").join("{\"query\":\"SELECT count(*) from ", table, " where __time >= '", deleteStartTime, "' AND __time <= '", getDruidDataEndTime(), "'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}");
+ int count = 1;
+ while(count > 0) {
+ String httpPostResult = HttpClientUtils.httpPost(queryDruidUrl, countTableDataSql);
+ List<Map> list = (List) JsonMapper.fromJsonString(httpPostResult, List.class);
+ count = Integer.valueOf(String.valueOf(list.get(0).get("EXPR$0")));
+ try {
+ if(count>0) {
+ //每次停留5秒判断是否删除结束
+ Thread.sleep(5000L);
+ }
+ if(System.currentTimeMillis()>endTime) {
+ XxlJobLogger.log("delete time out error, table : {} ",table);
+ break;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (BusinessException e) {
+ // 表不存在时,查询发生错误,记录下继续执行
+ logger.error(" query error {}",e.getErrorMessage());
+ XxlJobLogger.log(" query error {}",e.getErrorMessage());
+ failCount++;
+ }
+
+ XxlJobLogger.log("delete table second command table :{}, sql : {}",table , prefixDeleteParam+ table +suffixDeleteParam);
+ String deleteMessage = HttpClientUtils.httpPost(deleteUrl, prefixDeleteParam+ table +suffixDeleteParam);
+ XxlJobLogger.log("delete table third command table :{}, sql : {}",table , resetRulesParam);
+ HttpClientUtils.httpPost(resetRulesUrl+table, resetRulesParam);
+
+ XxlJobLogger.log("delete druid message :{}",deleteMessage);
+ }
+ if(failCount>0) {
+ throw new BusinessException("number of total tasks "+analyticTables.size()+" , number of failed tasks "+failCount) ;
+ }
+ modifyLastStorage(REPORT_AND_METRICS, 0);
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+ /**
+ * 清除文件数据hbase库
+ * @param params{"maxdays":365}
+ */
+ @XxlJob("deleteAllFilesJobHandler")
+ public ReturnT<String> deleteAllFiles(String params){
+
+ try {
+ Header header = new BasicHeader("Token", filesToken);
+
+ String httpGetRes = HttpClientUtils.httpGet(Joiner.on("").join(filesServer,"/hos/"), header);
+ ListAllMyBucketsResult list = XmlUtil.converXmlToBean(ListAllMyBucketsResult.class, httpGetRes);
+ if(ObjectUtils.allNotNull(list,list.getBuckets(),list.getBuckets().getBucket())) {
+ Buckets buckets = list.getBuckets();
+ List<Bucket> bucketList = buckets.getBucket();
+ XxlJobLogger.log("bucket list {} ",bucketList);
+ int failCount=0;
+ for (Bucket bucket : bucketList) {
+ try {
+ HttpClientUtils.httpDelete(Joiner.on("").join(filesServer,"/hos/",bucket.getName(),"?truncate"), header);
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+
+ failCount++;
+ }
+ XxlJobLogger.log("delete bucket {} ",bucket);
+ }
+ if(failCount>0) {
+ throw new BusinessException("number of total tasks "+bucketList.size()+" , number of failed tasks "+failCount) ;
+ }
+ }
+ modifyLastStorage(FILES, 0);
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+ return ReturnT.SUCCESS;
+ }
+
+
+ /**
+ * 直接查询数据库获取参数
+ * @param params
+ * {"topic":"SYS-STORAGE-LOG","ckDayGrowth":"false"}
+ * ckDayGrowth:表示是否为一天的执行一次的时间粒度
+ */
+ @SuppressWarnings("null")
+ @XxlJob("getStorageQuotaJobHandler")
+ public ReturnT<String> getStorageQuota (String params) {
+
+ try {
+ Map<String, Object> paramsMap = validParams(params);
+ //调用查询方法单机,集群
+ Map resultMap = null;
+
+ if ("false".equals(paramsMap.get("ckDayGrowth"))) {
+ resultMap = storageQuotaService.getDiskJson();
+ } else {
+ resultMap = storageQuotaService.getDayJson();
+ }
+
+ //发送到kafka
+ new KafkaUtils().sendMessage(String.valueOf(paramsMap.get("topic")), (List<Object>) resultMap.get("data"));
+ XxlJobLogger.log("send message to kafka, topic is {}", paramsMap.get("topic"));
+ //查询存在问题会返回fail
+ if(resultMap.toString().contains("fail")) {
+ return new ReturnT<String>(IJobHandler.FAIL.getCode(), " query error "+resultMap.get("status"));
+ }
+
+ } catch (BusinessException be) {
+ logger.error(be.getErrorMessage());
+ XxlJobLogger.log(be.getErrorMessage());
+ return ReturnT.FAIL;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ XxlJobLogger.log(e);
+ return ReturnT.FAIL;
+ }
+
+ return ReturnT.SUCCESS;
+ }
+
+ /**
+ * 数据库连接参数
+ * @return map
+ * {database:,
+ * password:,
+ * user:,
+ * }
+ */
+ private Map<String, Object> getDeleteSource() {
+ //删除sql参数拼接
+ Map<String, Object> deleteParamMap = Maps.newHashMap();
+ deleteParamMap.put("database", trafficDataSource);
+ deleteParamMap.put("password", trafficPassword);
+ deleteParamMap.put("user", trafficUserName);
+ return deleteParamMap;
+ }
+ /**
+ * 必填参数进行验证,其它自行配置
+ * @param params
+ */
+ private Map<String, Object> validParams(String params) {
+ logger.info("params{}",params);
+
+ if (StringUtil.isBlank(params)) {
+ XxlJobLogger.log("params is Empty !");
+ return null;
+ }
+ Map<String,Object> paramMap = (Map) JsonMapper.fromJsonString(params, Map.class);
+ if(paramMap==null) {
+ XxlJobLogger.log("params error !");
+ return null;
+ }
+ return paramMap;
+ }
+ /**
+ * 根据druid日志表命名规则查询以log结尾的表
+ * @return List<String>
+ */
+ private List<String> getAnalyticTables() {
+ //查询特定的表名
+ if(analyticTableList==null) {
+ List<String> tableList = Lists.newArrayList();
+ String queryTablesSql = "{\"query\":\"SELECT DISTINCT(datasource) FROM sys.tasks WHERE datasource NOT LIKE '%hot%'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String httpPostResult = HttpClientUtils.httpPost(queryDruidUrl, queryTablesSql);
+
+ List<Map> list = (List) JsonMapper.fromJsonString(httpPostResult, List.class);
+ list.forEach(map->tableList.addAll(map.values()));
+ analyticTableList = tableList;
+ }
+ return analyticTableList;
+ }
+ /**
+ * 查询clickhouse 需要删除的表
+ * @return List<String>
+ */
+ private List<String> getTrafficTables() {
+ //查询特定的表名
+ List<String> tableList = Lists.newArrayList();
+ if(trafficTableList==null) {
+ Map<String, Object> deleteParamMap = getDeleteSource();
+
+ deleteParamMap.put("query", Joiner.on("").join("SELECT name FROM "+systemTables+" WHERE database = '", trafficDataSource, "' AND engine in ('MergeTree','ReplicatedMergeTree') FORMAT JSON;"));
+ String httpGetResult = HttpClientUtils.httpGet(HttpClientUtils.getUrlWithParams(queryClickhouseUrl, deleteParamMap));
+
+ Map httpGetResultMap = (Map) JsonMapper.fromJsonString(httpGetResult, Map.class);
+ List<Map> list = (List) httpGetResultMap.get("data");
+ list.forEach(map->tableList.addAll(map.values()));
+ trafficTableList = tableList;
+ }
+ return trafficTableList;
+ }
+
+ /**
+ * 查询Druid数据的开始时间,为空时返回今天,格式yyyy-MM-dd
+ */
+ private String getDruidDataStartTime() {
+ //查询druid数据库的数据的开始时间
+ String queryAnalyticStartTimeSql = "{\"query\":\"SELECT \\\"start\\\" FROM sys.segments WHERE \\\"start\\\" <= '"+DateUtils.getSomeDate(DateUtils.YYYY_MM_DD, 1)+"' order by \\\"start\\\" limit 1\"}";//durid 清库的起始时间
+ String httpPost = HttpClientUtils.httpPost(queryDruidUrl, queryAnalyticStartTimeSql);
+ //查询表数据url
+ List<Map> startTimeList = (List) JsonMapper.fromJsonString(httpPost, List.class);
+ return ObjectUtils.isEmpty(startTimeList)?DateUtils.getCurrentDate():DateUtils.getFormatDate(DateUtils.convertStringToDate(String.valueOf(startTimeList.get(0).get("start")), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"), DateUtils.YYYY_MM_DD);
+ }
+ /**
+ * 更新标准时间
+ * @param logType
+ * @param maxdays
+ */
+ private void modifyLastStorage(String logType, Integer maxdays) {
+ // 设置标准时间,如果now - 标准时间 > maxdays,则更新 标准时间 = 当前时间-maxday
+ Long lastStorage = storageQuotaService.getLastStorage(logType);
+ Long now = System.currentTimeMillis() / 1000;
+ Long max = maxdays * 24 * 60 * 60L;// 单位秒
+ if (now - lastStorage > maxdays) {
+ storageQuotaService.modifyLastStorage(logType, String.valueOf(now - max));
+ }
+ }
+ private String getDruidDataEndTime() {
+ String sql ="SELECT version FROM sys.segments WHERE version LIKE '2%' ORDER BY version DESC LIMIT 1";
+ //查询druid数据库的数据的结束时间
+ String queryAnalyticStartTimeSql = "{\"query\":\"SELECT version FROM sys.segments WHERE version LIKE '2%' ORDER BY version DESC LIMIT 1\"}";//durid 清库的起始时间
+ String httpPost = HttpClientUtils.httpPost(queryDruidUrl, queryAnalyticStartTimeSql);
+ //查询表数据url
+ List<Map> startTimeList = (List) JsonMapper.fromJsonString(httpPost, List.class);
+ return ObjectUtils.isEmpty(startTimeList)?DateUtils.getCurrentDate(): String.valueOf(startTimeList.get(0).get("version"));
+ }
+}
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/TroubleshootingJob.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/TroubleshootingJob.java index 670eb22..ea0f89b 100644 --- a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/jobhandler/TroubleshootingJob.java +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/jobhandler/TroubleshootingJob.java @@ -1,4 +1,4 @@ -package com.xxl.job.executor.service.jobhandler; +package com.xxl.job.executor.jobhandler; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/LifecycleConfiguration.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/LifecycleConfiguration.java new file mode 100644 index 0000000..ff00fba --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/LifecycleConfiguration.java @@ -0,0 +1,67 @@ +package com.xxl.job.executor.pojo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@XmlRootElement(name = "LifecycleConfiguration")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LifecycleConfiguration {
+
+ @XmlElement(name="Rule")
+ private Rule rule = new Rule();
+
+ public static class Rule {
+ @XmlElement(name="ID")
+ private String id = "1";
+ @XmlElement(name="Prefix")
+ private String prefix = "";
+ @XmlElement(name="Status")
+ private String status = "Enabled";
+
+ private Expiration expiration;
+ @XmlElement(name="Expiration")
+ public Expiration getExpiration() {
+ return expiration;
+ }
+ public void setExpiration(Expiration expiration) {
+ this.expiration = expiration;
+ }
+ @Override
+ public String toString() {
+ return "Rule [id=" + id + ", prefix=" + prefix + ", status=" + status + ", expiration=" + expiration + "]";
+ }
+
+ }
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Expiration {
+
+ private String days;
+
+ @XmlElement(name="Days")
+ public String getDays() {
+ return days;
+ }
+ public void setDays(String days) {
+ this.days = days;
+ }
+ @Override
+ public String toString() {
+ return "Expiration [days=" + days + "]";
+ }
+
+ }
+ @Override
+ public String toString() {
+ return "LifecycleConfiguration [rule=" + rule + "]";
+ }
+
+
+}
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/ListAllMyBucketsResult.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/ListAllMyBucketsResult.java new file mode 100644 index 0000000..a9fbd80 --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/pojo/ListAllMyBucketsResult.java @@ -0,0 +1,111 @@ +package com.xxl.job.executor.pojo;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "ListAllMyBucketsResult")
+public class ListAllMyBucketsResult {
+
+ private Owner owner;
+
+ private Buckets buckets;
+
+
+ public static class Owner {
+
+ private String id;
+
+ private String displayName;
+ @XmlElement(name="ID")
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+ @XmlElement(name="DisplayName")
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ }
+
+ public static class Buckets {
+
+ private List<Bucket> bucket;
+
+ public static class Bucket {
+
+ private String name;
+
+ private String creationDate;
+ @XmlElement(name="Name")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ @XmlElement(name="CreationDate")
+ public String getCreationDate() {
+ return creationDate;
+ }
+
+ public void setCreationDate(String creationDate) {
+ this.creationDate = creationDate;
+ }
+
+ @Override
+ public String toString() {
+ return "Bucket [name=" + name + ", creationDate=" + creationDate + "]";
+ }
+
+ }
+ @XmlElement(name="Bucket")
+ public List<Bucket> getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(List<Bucket> bucket) {
+ this.bucket = bucket;
+ }
+
+ @Override
+ public String toString() {
+ return "Buckets [bucket=" + bucket + "]";
+ }
+
+ }
+
+ @XmlElement(name="Owner")
+ public Owner getOwner() {
+ return owner;
+ }
+
+ public void setOwner(Owner owner) {
+ this.owner = owner;
+ }
+ @XmlElement(name="Buckets")
+ public Buckets getBuckets() {
+ return buckets;
+ }
+
+ public void setBuckets(Buckets buckets) {
+ this.buckets = buckets;
+ }
+
+ @Override
+ public String toString() {
+ return "ListAllMyBucketsResult [owner=" + owner + ", buckets=" + buckets + "]";
+ }
+
+}
diff --git a/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/StorageQuotaService.java b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/StorageQuotaService.java new file mode 100644 index 0000000..76c8bee --- /dev/null +++ b/xxl-job-executor-galaxy/src/main/java/com/xxl/job/executor/service/StorageQuotaService.java @@ -0,0 +1,651 @@ +package com.xxl.job.executor.service;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.xxl.job.executor.Exception.BusinessException;
+import com.xxl.job.executor.core.config.StorgeConfig;
+import com.xxl.job.executor.core.utils.HttpClientUtils;
+import com.xxl.job.executor.core.utils.ZookeeperUtils;
+import com.zdjizhi.utils.DateUtils;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+
+import lombok.Data;
+
+/**
+ * 存储配额获取指标类 组合 原始日志结果对象,统计日志结果对象,文件系统结果对象
+ *
+ * @author qidaijie
+ */
+@SuppressWarnings("unchecked")
+@Service
+public class StorageQuotaService {
+
+ private static Logger logger = LoggerFactory.getLogger(StorageQuotaService.class);
+ /**
+ * 调用Linux命令返回日志编码
+ */
+ private static final String CHART_SET_UTF8 = "UTF-8";
+ /**
+ * HBase 单机数据目录在linux上的位置
+ */
+ private static String hbaselinuxDir;
+ /**
+ * HBase 在HDFS上的路径
+ */
+ private static String hbaseHdfsDir;
+ /**
+ * Druid rest地址
+ */
+ private static String queryDruidUrl;
+ /**
+ * ClickHouse rest地址
+ */
+ private static String queryClickHouseUrl;
+
+ private static String database;
+ /**
+ * HBase 所在HDFS集群地址
+ */
+ private static String queryHBaseUrl;
+
+ private static String trafficUserName;
+ private static String trafficPassword;
+
+ private static String systemParts;
+ private static String systemDisks;
+ private static boolean isCluster;
+ private String zookeeperStoragePath = "/storage/worker/";
+
+ @Value("${zookeeper.server}")
+ private String zookeeperServer;
+
+ @Autowired
+ public StorageQuotaService(StorgeConfig config) {
+ database = config.getTrafficDatasource();
+ trafficUserName = config.getTrafficUsername();
+ trafficPassword = config.getTrafficPassword();
+ queryClickHouseUrl = Joiner.on("").join("http://", config.getTrafficServer());
+ systemParts = config.getSystemParts();
+ systemDisks = config.getSystemDisks();
+
+ queryDruidUrl = Joiner.on("").join("http://", config.getAnalyticServerSelect(), "/druid/v2/sql");
+
+ queryHBaseUrl = Joiner.on("").join("http://", config.getFilesServer());
+ hbaselinuxDir = config.getHbaseLinuxDir();
+ hbaseHdfsDir = config.getHbaseHdfsDir();
+ isCluster = config.getIsClusder();
+ }
+
+ /**
+ * 用于获取 ClickHouse 当前存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getClickHouseCurr() {
+ Map<String, Object> deleteParamMap = getDeleteSource();
+ StorageQuota storageQuota = new StorageQuota();
+ String currSql = "SELECT SUM(`bytes_on_disk`) FROM " + systemParts + " WHERE database = '" + database + "';";
+ try {
+ deleteParamMap.put("query", currSql);
+ String currResult = HttpClientUtils
+ .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ Long result = Long.valueOf(currResult.trim());
+
+ storageQuota.setData(ImmutableMap.of("used_size", result));
+ storageQuota.setStatus(ImmutableMap.of("traffic_current", "success"));
+ logger.info("查询 clickhouse used_size 成功, {}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ Long trafficLogs = getCurrBefore("Traffic Logs");
+ storageQuota.setData(ImmutableMap.of("used_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("traffic_current", e.getMessage()));
+ logger.error("查询 clickhouse used_size 失败, {}", e.getMessage());
+ return storageQuota;
+ }
+
+ }
+
+ /**
+ * 用于获取ClickHouse最大存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getClickHouseMax() {
+ Map<String, Object> deleteParamMap = getDeleteSource();
+ StorageQuota storageQuota = new StorageQuota();
+ String maxSql = "SELECT SUM(`total_space`) FROM " + systemDisks + ";";
+ try {
+ deleteParamMap.put("query", maxSql);
+ String maxResult = HttpClientUtils
+ .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ long result = Long.parseLong(maxResult.trim());
+ storageQuota.setData(ImmutableMap.of("max_size", result));
+ storageQuota.setStatus(ImmutableMap.of("traffic_max", "success"));
+ logger.info("查询 clickhouse max_size 成功 ,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ Long trafficLogs = getMaxBefore("Traffic Logs");
+ storageQuota.setData(ImmutableMap.of("max_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("traffic_max", e.getMessage()));
+ logger.error("查询 clickhouse max_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 用于获取ClickHouse 差值 若获取失败直接写入0
+ */
+ private StorageQuota getClickHouseDiff() {
+ String date = DateUtils.getDateOfYesterday("yyyyMMdd");
+ Map<String, Object> deleteParamMap = getDeleteSource();
+ StorageQuota storageQuota = new StorageQuota();
+ String diffSql = "SELECT SUM(bytes_on_disk) FROM " + systemParts + " WHERE database = '" + database
+ + "' AND partition = '" + date + "';";
+ try {
+ deleteParamMap.put("query", diffSql);
+ String diffResult = HttpClientUtils
+ .httpGet(HttpClientUtils.getUrlWithParams(queryClickHouseUrl, deleteParamMap));
+ long result = Long.parseLong(diffResult.trim());
+ storageQuota.setData(ImmutableMap.of("aggregate_size", result));
+ storageQuota.setStatus(ImmutableMap.of("traffic_aggregate", "success"));
+ logger.info("查询 clickhouse traffic_aggregate 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ storageQuota.setData(ImmutableMap.of("aggregate_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("traffic_aggregate", e.getMessage()));
+ logger.error("查询clickhouse traffic_aggregate 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+//========================Druid==============================
+
+ /**
+ * 获取Druid当前存储大小 若获取失败直接补0,Druid本身无法提供服务无需再查询上次值
+ */
+ private StorageQuota getDruidCurr() {
+ String currSql = "{\"query\":\"SELECT SUM(curr_size) AS curr_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql);
+ List<Map<String, Object >> list = (List) JsonMapper.fromJsonString(currResult, List.class);
+ Long currSize = Long.valueOf(String.valueOf(list.get(0).get("curr_size")));
+
+ storageQuota.setData(ImmutableMap.of("used_size", currSize));
+ storageQuota.setStatus(ImmutableMap.of("analytic_current", "success"));
+ logger.info("查询 druid used_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ storageQuota.setData(ImmutableMap.of("used_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("analytic_current", e.getMessage()));
+ logger.error("查询 druid used_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取Druid过去30秒增量 若获取失败直接补0,Druid本身无法提供服务无需再查询上次值
+ */
+ private StorageQuota getDruidDiff(Long currSize) {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ Long before = getCurrBefore("Report and Metrics");
+ long diff = getDiffNum(currSize, before);
+ storageQuota.setData(ImmutableMap.of("aggregate_size", diff));
+ storageQuota.setStatus(ImmutableMap.of("analytic_aggregate", "success"));
+ logger.info("查询 druid aggregate_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ storageQuota.setData(ImmutableMap.of("aggregate_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("analytic_aggregate", "success"));
+ logger.error("查询druid aggregate_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取Druid最大存储大小 若获取失败直接补0,Druid本身无法提供服务无需再查询上次值
+ */
+ private StorageQuota getDruidMax() {
+ String maxSql = "{\"query\":\"SELECT SUM(max_size) AS max_size FROM sys.servers WHERE server_type = 'historical'\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql);
+ List<Map<String, Long>> list = (List) JsonMapper.fromJsonString(maxResult, List.class);
+ Long maxSize = list.get(0).get("max_size");
+ storageQuota.setData(ImmutableMap.of("max_size", maxSize));
+ storageQuota.setStatus(ImmutableMap.of("analytic_max", "success"));
+ logger.info("查询 druid max_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ storageQuota.setData(ImmutableMap.of("max_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("analytic_max", e.getMessage()));
+ logger.error("查询 druid max_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+//======================================================
+
+ /**
+ * 获取HBase 单机当前存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getHBaseAloneCurr() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ Process catCommands = Runtime.getRuntime()
+ .exec(new String[] { "/bin/sh", "-c", "du -s " + hbaselinuxDir + " | awk '{print $1}'" });
+ String message = getProcessMessage(catCommands.getInputStream(), CHART_SET_UTF8);
+ long result = Long.parseLong(message.trim());
+ storageQuota.setData(ImmutableMap.of("used_size", result));
+ storageQuota.setStatus(ImmutableMap.of("files_current", "success"));
+ logger.info("查询 hbase alone used_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (IOException e) {
+ Long trafficLogs = getCurrBefore("Files");
+ storageQuota.setData(ImmutableMap.of("used_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("files_current", e.getMessage()));
+ logger.error("查询 hbase alone used_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取HBase 单机最大存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getHBaseAloneMax() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ Process catCommands = Runtime.getRuntime().exec(
+ new String[] { "/bin/sh", "-c", "df " + hbaselinuxDir + " | sed -n '2,2p' | awk '{print $2}'" });
+ String message = getProcessMessage(catCommands.getInputStream(), CHART_SET_UTF8);
+ long result = Long.parseLong(message.trim()) * 1000;
+ storageQuota.setData(ImmutableMap.of("max_size", result));
+ storageQuota.setStatus(ImmutableMap.of("files_max", "success"));
+ logger.info("查询 max_size alone max_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (IOException e) {
+ Long trafficLogs = getMaxBefore("Files");
+ storageQuota.setData(ImmutableMap.of("max_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("files_max", e.getMessage()));
+ logger.error("查询 hbase alone max_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取HBase 单机存储差值 若获取失败直接补0
+ */
+ private StorageQuota getHBaseAloneDiff() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ Process catCommands = Runtime.getRuntime()
+ .exec(new String[] { "/bin/sh", "-c", "du -s " + hbaselinuxDir + " | awk '{print $1}'" });
+ String message = getProcessMessage(catCommands.getInputStream(), CHART_SET_UTF8);
+ long result = Long.parseLong(message.trim());
+ Long trafficLogs = getCurrBefore("Files");
+ long diff = getDiffNum(result, trafficLogs);
+ storageQuota.setData(ImmutableMap.of("aggregate_size", diff));
+ storageQuota.setStatus(ImmutableMap.of("files_aggregate", "success"));
+ logger.info("查询 hbase alone aggregate_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (IOException e) {
+ storageQuota.setData(ImmutableMap.of("aggregate_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("files_aggregate", e.getMessage()));
+ logger.error("查询 hbase alone aggregate_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+//=========================HBase集群模式=============================
+
+ /**
+ * 获取HBase 集群当前存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getHBaseClusterCurr() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ String currDir = queryHBaseUrl + "/webhdfs/v1" + hbaseHdfsDir + "?op=GETCONTENTSUMMARY";
+ String diffResult = HttpClientUtils.httpGet(currDir);
+ Map map = (Map) JsonMapper.fromJsonString(diffResult, Map.class);
+ Map contentSummary = (Map) map.get("ContentSummary");
+ Long length = Long.valueOf(String.valueOf(contentSummary.get("length")));
+
+ storageQuota.setData(ImmutableMap.of("used_size", length));
+ storageQuota.setStatus(ImmutableMap.of("files_current", "success"));
+ logger.info("查询 hbase cluster used_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ Long trafficLogs = getCurrBefore("Files");
+ storageQuota.setData(ImmutableMap.of("used_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("files_current", e.getMessage()));
+ logger.error("查询 hbase cluster used_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取HBase 集群最大存储大小 若获取失败查询druid内最新的值替补当前值
+ */
+ private StorageQuota getHBaseClusterMax() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ String currDir = queryHBaseUrl + "/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo";
+ String diffResult = HttpClientUtils.httpGet(currDir);
+ Map map = (Map) JsonMapper.fromJsonString(diffResult, Map.class);
+ List<Map> beans = (List<Map>) map.get("beans");
+ Long total = Long.valueOf(String.valueOf(beans.get(0).get("Total")));
+
+ storageQuota.setData(ImmutableMap.of("max_size", total));
+ storageQuota.setStatus(ImmutableMap.of("files_max", "success"));
+ logger.info("查询 hbase cluster max_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ Long trafficLogs = getMaxBefore("Files");
+
+ storageQuota.setData(ImmutableMap.of("max_size", trafficLogs));
+ storageQuota.setStatus(ImmutableMap.of("files_max", e.getMessage()));
+ logger.error("查询 hbase cluster max_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+ /**
+ * 获取HBase 集群当前存储大小 若获取失败直接补0
+ */
+ private StorageQuota getHBaseClusterDiff() {
+ StorageQuota storageQuota = new StorageQuota();
+ try {
+ String currDir = queryHBaseUrl + "/webhdfs/v1" + hbaseHdfsDir + "?op=GETCONTENTSUMMARY ";
+ String diffResult = HttpClientUtils.httpGet(currDir);
+ Map map = (Map) JsonMapper.fromJsonString(diffResult, Map.class);
+ Map contentSummary = (Map) map.get("ContentSummary");
+ Long length = Long.valueOf(String.valueOf(contentSummary.get("length")));
+
+ Long trafficLogs = getCurrBefore("Files");
+ long diff = getDiffNum(length, trafficLogs);
+ storageQuota.setData(ImmutableMap.of("aggregate_size", diff));
+ storageQuota.setStatus(ImmutableMap.of("files_aggregate", "success"));
+ logger.info("查询 HBase cluster aggregate_size 成功,{}", storageQuota);
+ return storageQuota;
+ } catch (Exception e) {
+ storageQuota.setData(ImmutableMap.of("aggregate_size", 0L));
+ storageQuota.setStatus(ImmutableMap.of("files_aggregate", e.getMessage()));
+ logger.error("查询 hbase cluster aggregate_size 失败,{}", e.getMessage());
+ return storageQuota;
+ }
+ }
+
+//=======================工具类方法===============================
+
+ /**
+ * 用于通过druid获取上次对应类型的Curr值
+ *
+ * @param logType 统计类型
+ * @return 上次的值
+ */
+ private Long getCurrBefore(String logType) {
+ try {
+ String currSql = "{\"query\":\"SELECT used_size FROM sys_storage_log WHERE log_type = '" + logType
+ + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String currResult = HttpClientUtils.httpPost(queryDruidUrl, currSql);
+ List<Map> list = (List) JsonMapper.fromJsonString(currResult, List.class);
+ Long currSize = Long.valueOf(String.valueOf(list.get(0).get("used_size")));
+ logger.info("查询 druid 上次 used_size 成功,{}", currSize);
+ return currSize;
+ } catch (Exception e) {
+ logger.error("查询druid上次 used_size 失败,{}", e.getMessage());
+ return 0L;
+ }
+ }
+
+ /**
+ * 用于通过druid获取上次对应类型的Max值
+ *
+ * @param logType 统计类型
+ * @return 上次的值
+ */
+ private Long getMaxBefore(String logType) {
+ try {
+ String maxSql = "{\"query\":\"SELECT max_size FROM sys_storage_log WHERE log_type = '" + logType
+ + "' ORDER BY __time DESC LIMIT 1\",\"context\":{\"skipEmptyBuckets\":\"false\"},\"resultFormat\":\"object\"}";
+ String maxResult = HttpClientUtils.httpPost(queryDruidUrl, maxSql);
+ List<Map> list = (List<Map>) JsonMapper.fromJsonString(maxResult, List.class);
+ Long currSize = Long.valueOf(String.valueOf(list.get(0).get("max_size")));
+ logger.info("查询 druid max_size 上次值成功,{}", currSize);
+ return currSize;
+ } catch (Exception e) {
+ logger.error("查询 druid max_size 上次值失败,{}", e.getMessage());
+ return 0L;
+ }
+ }
+
+ /**
+ * @param node
+ * @return 自定义的标准时间 last_storage,
+ * 存在zookeeper 节点/storage/worker/+node
+ */
+ public Long getLastStorage(String node) {
+ try {
+ ZookeeperUtils zk = new ZookeeperUtils();
+ //查询
+ String nodeData = zk.getNodeData(zookeeperStoragePath + node, zookeeperServer);
+ //不存在创建一个
+ if(ObjectUtils.isEmpty(nodeData)) {
+ Long lastTime = System.currentTimeMillis()/1000;
+ zk.modifyNode(zookeeperStoragePath + node, String.valueOf(lastTime), zookeeperServer);
+ return lastTime;
+ }
+ Long lastStorage = Long.valueOf(nodeData);
+ logger.info("查询标准时间 last_storage成功,{}", lastStorage);
+
+ return Long.valueOf(nodeData);
+ } catch (Exception e) {
+ logger.error("查询标准时间 last_storage 失败,{}",e.getMessage());
+ throw new BusinessException("查询标准时间 last_storage 失败");
+ }
+
+ }
+ /**
+ * 修改zookeeper节点信息
+ *
+ * @param path 节点路径/storage/worker/ +node
+ */
+ public void modifyLastStorage(String node,String data) {
+ ZookeeperUtils zk = new ZookeeperUtils();
+ //是否存在 不存在创建
+ zk.modifyNode(zookeeperStoragePath + node, data, zookeeperServer);
+ }
+ /**
+ * 数据库连接参数
+ *
+ * @return map {database,password:,user:,}
+ */
+ private Map<String, Object> getDeleteSource() {
+ // sql参数拼接
+ Map<String, Object> deleteParamMap = Maps.newHashMap();
+ deleteParamMap.put("password", trafficPassword);
+ deleteParamMap.put("user", trafficUserName);
+ return deleteParamMap;
+ }
+
+ /**
+ * 解析脚本返回结果
+ *
+ * @param input 流
+ * @param charset 编码格式
+ */
+ private String getProcessMessage(InputStream in, String charset) {
+ if (in != null) {
+ BufferedReader br = null;
+ StringBuffer buffer = new StringBuffer();
+ try {
+ br = new BufferedReader(
+ new InputStreamReader(in, StringUtil.isBlank(charset) ? CHART_SET_UTF8 : charset));
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ buffer.append(line);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return buffer.toString();
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * 用于组合正常维度的JSON串
+ *
+ * @return 结果json
+ */
+ public Map getDiskJson() {
+ StorageQuota clickHouseCurr = getClickHouseCurr();
+ StorageQuota clickHouseMax = getClickHouseMax();
+ Map status = new HashMap<>();
+ status.putAll(clickHouseCurr.getStatus());
+ status.putAll(clickHouseMax.getStatus());
+ Long now = System.currentTimeMillis()/1000;
+
+ Map traffic = new HashMap<>();
+ traffic.put("log_type", "Traffic Logs");
+ traffic.put("time", now);
+ traffic.put("last_storage",getLastStorage("Traffic-Logs"));
+ traffic.putAll(clickHouseCurr.getData());
+ traffic.putAll(clickHouseMax.getData());
+
+
+ StorageQuota druidCurr = getDruidCurr();
+ StorageQuota druidDiff = getDruidDiff(Long.valueOf(String.valueOf(druidCurr.getData().get("used_size"))));
+ StorageQuota druidMax = getDruidMax();
+ status.putAll(druidCurr.getStatus());
+ status.putAll(druidDiff.getStatus());
+ status.putAll(druidMax.getStatus());
+
+ Map metrics = new HashMap<>();
+ metrics.put("log_type", "Report and Metrics");
+ metrics.put("time", now);
+ metrics.put("last_storage",getLastStorage("Report-and-Metrics"));
+ metrics.putAll(druidCurr.getData());
+ metrics.putAll(druidDiff.getData());
+ metrics.putAll(druidMax.getData());
+
+
+
+ StorageQuota hBaseCurr = new StorageQuota();
+ StorageQuota hBaseMax = new StorageQuota();
+ StorageQuota hBaseDiff = new StorageQuota();
+
+ if (isCluster) {
+ hBaseCurr = getHBaseClusterCurr();
+ hBaseDiff = getHBaseClusterDiff();
+ hBaseMax = getHBaseClusterMax();
+ } else {
+ hBaseCurr = getHBaseAloneCurr();
+ hBaseDiff = getHBaseAloneDiff();
+ hBaseMax = getHBaseAloneMax();
+ }
+ status.putAll(hBaseCurr.getStatus());
+ status.putAll(hBaseMax.getStatus());
+ status.putAll(hBaseDiff.getStatus());
+
+ Map files = new HashMap<>();
+ files.put("log_type", "Files");
+ files.put("time", now);
+ files.put("last_storage",getLastStorage("Files"));
+ files.putAll(hBaseCurr.getData());
+ files.putAll(hBaseMax.getData());
+ files.putAll(hBaseDiff.getData());
+
+
+ Map all = new HashMap<>();
+ List data = new ArrayList<>();
+
+ data.add(traffic);
+ data.add(metrics);
+ data.add(files);
+
+ all.put("status", status);
+ all.put("data", data);
+
+ return all;
+ }
+
+ /**
+ * 用于组合ClickHouse一天执行一次的JSON串 特殊处理
+ *
+ * @return 结果json
+ */
+ public Map getDayJson() {
+ StorageQuota clickHouseCurr = getClickHouseCurr();
+ StorageQuota clickHouseMax = getClickHouseMax();
+ StorageQuota clickHouseDiff = getClickHouseDiff();
+ Map status = new HashMap<>();
+ status.putAll(clickHouseCurr.getStatus());
+ status.putAll(clickHouseMax.getStatus());
+ status.putAll(clickHouseDiff.getStatus());
+
+ Map traffic = new HashMap<>();
+ traffic.put("log_type", "Traffic Logs");
+ traffic.put("time", System.currentTimeMillis()/1000);
+ traffic.put("last_storage",getLastStorage("Traffic-Logs"));
+ traffic.putAll(clickHouseCurr.getData());
+ traffic.putAll(clickHouseMax.getData());
+ traffic.putAll(clickHouseDiff.getData());
+
+ Map all = new HashMap<>();
+ List data = new ArrayList<>();
+ data.add(traffic);
+
+ all.put("status", status);
+ all.put("data", data);
+ return all;
+ }
+
+ /**
+ * 获取差值计算,若为负数则填写0
+ *
+ * @param now 这一次的值
+ * @param before 上一次的值
+ * @return 差值
+ */
+ private Long getDiffNum(Long now, Long before) {
+ long diff = now - before;
+ if (diff >= 0) {
+ return diff;
+ } else {
+ return 0L;
+ }
+ }
+
+ @Data
+ class StorageQuota {
+
+ private Map<String, Object> data;
+
+ private Map<String, String> status;
+ }
+}
diff --git a/xxl-job-executor-galaxy/src/main/resources/application-executor.yml b/xxl-job-executor-galaxy/src/main/resources/application-executor.yml index d7fab5c..8299ab1 100644 --- a/xxl-job-executor-galaxy/src/main/resources/application-executor.yml +++ b/xxl-job-executor-galaxy/src/main/resources/application-executor.yml @@ -7,26 +7,53 @@ xxl: ##指定kafka server的地址,集群配多个,中间,逗号隔开 spring: kafka: - bootstrap-servers: 192.168.40.224:9092 + bootstrap-servers: 192.168.40.207:9092 #####存储配额脚本配置 storge: ## 存储配额文件服务器 +<<<<<<< HEAD +======= + isclusder: false +>>>>>>> refs/heads/develop files: - server: 192.168.40.131:9090 + server: http://192.168.40.207:9098 + hbase-linux-dir: /home/tsg3.0-volumes/hbase/data #单机数据目录在linux上的位置 + hbase-hdfs-dir: /hbase/hbase-2.2.3/mobdir #HBase 在HDFS上的路径 + token: c21f969b5f03d33d43e04f8f136e7682 ## 存储配额查询druid analytic: +<<<<<<< HEAD #Druid清库起始时间 deletestarttime: "2000-01-01" server: select: 192.168.40.224:8082 delete: 192.168.40.119:8079 +======= + server: + select: 192.168.40.207:8082 + delete: 192.168.40.207:8081 +>>>>>>> refs/heads/develop ## 存储配额查询 clickhouse traffic: +<<<<<<< HEAD server: 192.168.40.224:8123 +======= + server: 192.168.40.203:8123 +>>>>>>> refs/heads/develop datasource: tsg_galaxy_v3 username: default password: ceiec2019 #clickhouse集群名称 clustername: ck_cluster +<<<<<<< HEAD +======= + #clickhouse的表 + system: + parts: "`system`.parts" + disks: "`system`.disks" + tables: "`system`.tables" +zookeeper: + server: 192.168.40.207:2181 +>>>>>>> refs/heads/develop diff --git a/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/ShellTest.java b/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/ShellTest.java index 16b0b8b..2633917 100644 --- a/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/ShellTest.java +++ b/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/ShellTest.java @@ -1,14 +1,18 @@ package com.xxl.job.executor.test;
-import java.io.File;
-import java.io.IOException;
+import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
+import com.google.common.base.Splitter;
import com.xxl.job.executor.XxlJobExecutorApplication;
+<<<<<<< HEAD import com.zdjizhi.file.FileApplication;
+======= +import com.xxl.job.executor.core.utils.HttpClientUtils;
+>>>>>>> refs/heads/develop @RunWith(SpringRunner.class)
@SpringBootTest(classes = { XxlJobExecutorApplication.class })
public class ShellTest {
@@ -17,6 +21,7 @@ public class ShellTest { @Test
public void shellTest() {
+<<<<<<< HEAD //脚本默认存放目录
// String remoteDirectory = "/home/admin/test/";
@@ -32,5 +37,57 @@ public class ShellTest { } catch (IOException e) {
e.printStackTrace();
}
+======= + String str = "{\r\n" +
+ " \"clientId\": null,\r\n" +
+ " \"query\": {\r\n" +
+ " \"queryType\": \"iplearning\",\r\n" +
+ " \"dataSource\": \"IP_LEARNING_VIEW\",\r\n" +
+ " \"parameters\": {\r\n" +
+ " \"match\": [\r\n" +
+ " {\r\n" +
+ " \"type\": \"regex\",\r\n" +
+ " \"fieldKey\": \"FQDN_NAME\",\r\n" +
+ " \"fieldValues\": [\r\n" +
+ " \"360\"\r\n" +
+ " ]\r\n" +
+ " }\r\n" +
+ " ],\r\n" +
+ " \"range\": [\r\n" +
+ " {\r\n" +
+ " \"type\": \"ge\",\r\n" +
+ " \"fieldKey\": \"PROTOCOL\",\r\n" +
+ " \"fieldValues\": [\r\n" +
+ " \"TLS\",\r\n" +
+ " \"HTTP\"\r\n" +
+ " ]\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"type\": \"eq\",\r\n" +
+ " \"fieldKey\": \"DEPTH\",\r\n" +
+ " \"fieldValues\": [\r\n" +
+ " \"0..1\"\r\n" +
+ " ]\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"type\": \"ge\",\r\n" +
+ " \"fieldKey\": \"UNIQ_CIP\",\r\n" +
+ " \"fieldValues\": [\r\n" +
+ " 0\r\n" +
+ " ]\r\n" +
+ " }\r\n" +
+ " ],\r\n" +
+ " \"intervals\": [\r\n" +
+ " \"2020-01-01 00:00:00/2020-08-02 00:00:00\"\r\n" +
+ " ],\r\n" +
+ " \"limit\": \"3\"\r\n" +
+ " }\r\n" +
+ " }\r\n" +
+ "}";
+
+ String httpPost = HttpClientUtils.httpPost("http://192.168.40.224:9999/knowledge/v1/?iplearni1ng", str);
+ System.out.println(httpPost);
+
+>>>>>>> refs/heads/develop }
}
diff --git a/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/StorageQuotaTest.java b/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/StorageQuotaTest.java new file mode 100644 index 0000000..7772a6a --- /dev/null +++ b/xxl-job-executor-galaxy/src/test/java/com/xxl/job/executor/test/StorageQuotaTest.java @@ -0,0 +1,87 @@ +package com.xxl.job.executor.test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.message.BasicHeader;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.util.Assert;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.executor.XxlJobExecutorApplication;
+import com.xxl.job.executor.core.utils.HttpClientUtils;
+import com.xxl.job.executor.core.utils.XmlUtil;
+import com.xxl.job.executor.jobhandler.LogStorageQuotaJob;
+import com.xxl.job.executor.pojo.ListAllMyBucketsResult;
+import com.xxl.job.executor.pojo.ListAllMyBucketsResult.Buckets.Bucket;
+import com.xxl.job.executor.service.StorageQuotaService;
+import com.zdjizhi.utils.DateUtils;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.TimeConstants;
+import com.zdjizhi.utils.ZooKeeperLock;
+import com.zdjizhi.utils.ZookeeperUtils;
+
+import ch.qos.logback.core.joran.spi.XMLUtil;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = { XxlJobExecutorApplication.class })
+public class StorageQuotaTest {
+
+// @Autowired StorageQuotaService storageQuotaService;
+//
+// @Test public void storageQuotaServiceTest() {
+//
+// Map dayJson = storageQuotaService.getDayJson(); Assert.notEmpty(dayJson,
+// "clickhouse 一天的为空"); Map diskJson = storageQuotaService.getDiskJson();
+// Assert.notEmpty(diskJson, "存储配额获取失败");
+//
+// }
+
+ @Autowired
+ LogStorageQuotaJob logStorageQuotaJob;
+
+ @Autowired
+ StorageQuotaService storageQuotaService;
+
+ /**
+ * 文件清库 慎
+ */
+ @Test
+ public void delteAllFilesTest() {
+
+// ReturnT<String> deleteAllFiles = logStorageQuotaJob.deleteAllFiles("{\"maxdays\":365}");
+// Assert.isTrue(200==(deleteAllFiles.getCode()),"success");
+ }
+
+ /**
+ * 设置文件存储策略
+ */
+// @Test
+// public void deleteFilesTest() {
+// ReturnT<String> deleteFiles = logStorageQuotaJob.deleteFiles("{\"maxdays\":365}");
+// Assert.isTrue(200 == (deleteFiles.getCode()), "success");
+// }
+
+ @Test
+ public void zookeeperTest() {
+ Long lastStorage = storageQuotaService.getLastStorage("Files");
+ Assert.notNull(lastStorage, "获取标准时间失败");
+ }
+
+}
|
