summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2021-04-14 14:50:34 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2021-04-14 14:50:34 +0800
commit691dd176a88d251b96b77c577331c220461de966 (patch)
tree87dce3b72d9a079652d2feb0d9356d3d05c1dde5
parent4d906338aee93da3121558c15ef8990c538bdcbd (diff)
eal4修复
-rw-r--r--docker/Dockerfile19
-rw-r--r--pom.xml15
-rw-r--r--src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java17
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java1
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java6
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java19
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java5
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java21
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java8
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java12
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZkConfig.java10
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZkProperties.java7
-rw-r--r--src/main/java/com/mesa/reportservice/controller/MonitorController.java14
-rw-r--r--src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java135
-rw-r--r--src/main/java/com/mesa/reportservice/service/ClickhouseService.java10
-rw-r--r--src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java6
-rw-r--r--src/main/java/com/mesa/reportservice/service/FormatService.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/MysqlService.java5
-rw-r--r--src/main/java/com/mesa/reportservice/service/ZkService.java2
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java153
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java71
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java8
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/FormatServiceImpl.java53
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java44
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java10
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java29
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java17
-rw-r--r--src/main/java/com/mesa/reportservice/util/DateUtil.java60
-rw-r--r--src/main/java/com/mesa/reportservice/util/StringUtil.java35
29 files changed, 326 insertions, 477 deletions
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 00eda8c..c090aa9 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,16 +1,27 @@
ARG JDK_IMAGE
+ARG GO_IMAGE
+#FROM 192.168.40.153:9080/common/golang:1.15.6 as builder
+FROM ${GO_IMAGE} as builder
+
+WORKDIR /build
+COPY xjar.go /build/
+
+RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o xjar .
+
+#FROM 192.168.40.153:9080/common/jdk:1.8.0_73-jre
FROM ${JDK_IMAGE}
MAINTAINER Galaxy
VOLUME /tmp
WORKDIR /home/tsg/galaxy/galaxy-report-service
+COPY --from=builder /build .
+
COPY config config
-COPY xjar.go xjar.go
-RUN go build xjar.go
ARG JAR_FILE
COPY ${JAR_FILE} galaxy-report-service.xjar
+
#dockercompose set JAVA_OPTS
-ENV JAVA_OPTS=" -Xms2048m -Xmx2048m "
+ENV JAVA_OPTS=" -Xms1024m -Xmx2048m "
ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
#ENV TZ=Asia/Almaty
#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
-ENTRYPOINT [ "sh", "-c", "./xjar java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar galaxy-report-service.xjar" ]
+ENTRYPOINT [ "sh", "-c", "./xjar java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar galaxy-report-service.xjar" ] \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index c16b9c9..3316d48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,12 +5,12 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.0.4.RELEASE</version>
+ <version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.mesa</groupId>
<artifactId>galaxy-report-service</artifactId>
- <version>21.03.12</version>
+ <version>21.04.07</version>
<name>galaxy-report-service</name>
<packaging>jar</packaging>
<description>Demo project for Spring Boot</description>
@@ -224,6 +224,12 @@
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
+
+ <dependency>
+ <groupId>com.alibaba.boot</groupId>
+ <artifactId>nacos-config-spring-boot-starter</artifactId>
+ <version>0.2.7</version>
+ </dependency>
</dependencies>
@@ -286,13 +292,14 @@
<forceTags>true</forceTags>
<imageTags>
- <imageTag>21.03.12</imageTag>
+ <imageTag>21.04.07</imageTag>
</imageTags>
<!--远程docker构建,供dockerfile使用-->
<dockerHost>http://192.168.40.153:2375</dockerHost>
<dockerDirectory>docker</dockerDirectory>
<buildArgs>
- <JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73-go</JDK_IMAGE>
+ <JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73-jre</JDK_IMAGE>
+ <GO_IMAGE>192.168.40.153:9080/common/golang:1.15.6</GO_IMAGE>
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
diff --git a/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java b/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java
deleted file mode 100644
index 87e0fc9..0000000
--- a/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.mesa.reportservice.bean;
-
-
-public enum DBTypeEnum {
- CLICKHOUSE("clickhouse"),
- DRUID("druid"),
- HBASE("hbase"),;
- private String value;
-
- DBTypeEnum(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index de10222..3e7edcf 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -256,7 +256,6 @@ public class JobEntity implements Cloneable {
try {
o = (JobEntity) super.clone();
} catch (CloneNotSupportedException e) {
- System.out.println(e.toString());
}
return o;
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
index e1bbe6d..3526c4e 100644
--- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
@@ -1,5 +1,7 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@@ -10,7 +12,9 @@ import java.net.URLEncoder;
* Created by wk1 on 2019/5/17.
*/
@Component
-@ConfigurationProperties(prefix = "ck")
+//@ConfigurationProperties(prefix = "ck")
+@NacosConfigurationProperties(prefix = "ck", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+
public class ClickhouseConfig {
private static String gateway_ip;
diff --git a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java
index f860788..7334b45 100644
--- a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java
@@ -1,5 +1,7 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import com.mesa.reportservice.bean.JobEntity;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@@ -13,13 +15,22 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Component
-@ConfigurationProperties(prefix = "globle")
+@NacosConfigurationProperties(prefix = "globle", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+
public class GlobelConfig {
- public static int job_thread;
- public static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>();
+ private static int job_thread;
+ private static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>();
public void setJob_thread(int job_thread) {
- GlobelConfig.job_thread = job_thread;
+ this.job_thread = job_thread;
}
public final static String zkuuid = UUID.randomUUID().toString().replaceAll("-","");
+
+ public int getJob_thread() {
+ return job_thread;
+ }
+
+ public Map<String, JobEntity> getMapresult() {
+ return mapresult;
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java b/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java
index fcf6df2..2cdb8e4 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java
@@ -1,5 +1,7 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.util.StringUtil;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -11,7 +13,8 @@ import java.net.URL;
* Created by wk1 on 2019/5/27.
*/
@Component
-@ConfigurationProperties(prefix = "hbase")
+@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+
public class HbaseConfig {
private static String columefamily;
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
index 3a63bdc..96f6457 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
@@ -17,16 +17,13 @@ import java.util.concurrent.Executors;
* Created by Administrator on 2020/3/10.
*/
@Configuration
-@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseFactory {
- private final HbaseProperties hbproperties;
+
+ @Autowired
+ private HbaseProperties hbaseProperties;
Log logger = Log.get();
- @Autowired
- public HbaseFactory(HbaseProperties hbproperties) {
- this.hbproperties = hbproperties;
- }
@Bean(name = "hbaseConfiguration")
public org.apache.hadoop.conf.Configuration getConfiguration() {
@@ -34,14 +31,14 @@ public class HbaseFactory {
org.apache.hadoop.conf.Configuration conf = null;
if (conf == null) {
conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", hbproperties.getZookeeper_quorum());
- conf.set("hbase.zookeeper.property.clientPort", hbproperties.getZookeeper_property_clientPort());
- conf.set("zookeeper.znode.parent", hbproperties.getZookeeper_znode_parent());
- conf.set("hbase.client.retries.number", hbproperties.getClient_retries_number());
- conf.set("hbase.rpc.timeout", hbproperties.getRpc_timeout());
+ conf.set("hbase.zookeeper.quorum", hbaseProperties.getZookeeper_quorum());
+ conf.set("hbase.zookeeper.property.clientPort", hbaseProperties.getZookeeper_property_clientPort());
+ conf.set("zookeeper.znode.parent", hbaseProperties.getZookeeper_znode_parent());
+ conf.set("hbase.client.retries.number", hbaseProperties.getClient_retries_number());
+ conf.set("hbase.rpc.timeout", hbaseProperties.getRpc_timeout());
conf.set("hbase.client.keyvalue.maxsize", "1024000000");
conf.set("zookeeper.recovery.retry", "3");
- conf.set("hbase.client.ipc.pool.size", hbproperties.getConnect_pool().toString());
+ conf.set("hbase.client.ipc.pool.size", hbaseProperties.getConnect_pool().toString());
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
index fa4d4a1..e814a6e 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
@@ -1,12 +1,18 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
/**
* Created by Administrator on 2020/3/10.
*/
-@ConfigurationProperties(prefix = "hbase")
+//@ConfigurationProperties(prefix = "hbase")
+@Component
+@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+
public class HbaseProperties {
private String zookeeper_quorum;
diff --git a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
index 9188db7..e6ac85a 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
@@ -1,5 +1,7 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
@@ -13,7 +15,9 @@ import org.springframework.stereotype.Component;
* Created by wk1 on 2019/5/15.
*/
@Component
-@ConfigurationProperties(prefix = "http")
+//@ConfigurationProperties(prefix = "http")
+@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+
public class HttpClientPool {
private Integer maxTotal;
@@ -24,7 +28,7 @@ public class HttpClientPool {
private Integer connectionRequestTimeout;
- public static Integer socketTimeout;
+ private Integer socketTimeout;
private boolean staleConnectionCheckEnabled;
@@ -59,6 +63,10 @@ public class HttpClientPool {
this.socketTimeoutShort = socketTimeoutShort;
}
+ public Integer getSocketTimeout() {
+ return socketTimeout;
+ }
+
/**
* 首先实例化一个连接池管理器,设置最大连接数、并发连接数
*
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java b/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java
index ed09cfa..c4cebf6 100644
--- a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java
@@ -14,17 +14,13 @@ import org.springframework.context.annotation.Configuration;
* Created by wk1 on 2020/1/6.
*/
@Configuration
-@EnableConfigurationProperties(ZkProperties.class)
-
public class ZkConfig {
- private final ZkProperties zkproperties;
+ @Autowired
+ private ZkProperties zkproperties;
Log logger = Log.get();
- @Autowired
- public ZkConfig(ZkProperties zkproperties) {
- this.zkproperties = zkproperties;
- }
+
@Bean(name = "curatorConnect")
public CuratorFramework CuratorConnect() {
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java b/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java
index 6a7a35f..f8ed4e0 100644
--- a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java
+++ b/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java
@@ -1,11 +1,16 @@
package com.mesa.reportservice.configuration;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
/**
* Created by wk1 on 2020/1/6.
*/
-@ConfigurationProperties(prefix = "zookeeper")
+//@ConfigurationProperties(prefix = "zookeeper")
+@Component
+@NacosConfigurationProperties(prefix = "zookeeper", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
public class ZkProperties {
private int open;
diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
index 73ecc0f..0f8e61d 100644
--- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java
+++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
@@ -8,7 +8,9 @@ import com.mesa.reportservice.service.MysqlService;
import com.mesa.reportservice.service.ZkService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import com.mesa.reportservice.configuration.ZkProperties;
import java.util.Map;
@@ -24,6 +26,11 @@ public class MonitorController {
private MysqlService mysqlService;
@Autowired
private ZkService zs;
+ @Autowired
+ private GlobelConfig gc;
+ @Autowired
+ private ZkProperties zkProperties;
+
Log logger = Log.get();
@@ -39,14 +46,14 @@ public class MonitorController {
me.setExcuteingJobNum(numMap.get("excuteingNum"));
me.setTodaySuccessJobNum(numMap.get("todaySuccessNum"));
me.setTodayErrorJobNum(numMap.get("todayErrorNum"));
- me.setJoblist(GlobelConfig.mapresult);
+ me.setJoblist(gc.getMapresult());
if(zs.isMaster()){
me.setStatus("active");
}
else{
me.setStatus("standby");
}
- GlobelConfig.mapresult.size();
+ gc.getMapresult().size();
Object obj = JSONArray.toJSON(me);
json = obj.toString();
} catch (Exception e) {
@@ -57,5 +64,4 @@ public class MonitorController {
return json;
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
index 041a0bd..1085bfe 100644
--- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
+++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
@@ -5,7 +5,6 @@ import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.configuration.GlobelConfig;
import com.mesa.reportservice.configuration.HttpClientPool;
import com.mesa.reportservice.service.*;
-import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@@ -37,6 +36,11 @@ public class ScheduledResultController {
private ZkService zs;
@Autowired
private ClickhouseService cs;
+ @Autowired
+ private HttpClientPool hc;
+ @Autowired
+ private GlobelConfig gc;
+
@Scheduled(cron = "${scan.result.scheduled.plan}")
public void getExcuteResult() {
@@ -51,19 +55,19 @@ public class ScheduledResultController {
sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')");
sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");
- String queryid = cs.getQueryId(jobEntity.getResultId().toString(),sql);
+ String queryid = cs.getQueryId(jobEntity.getResultId().toString(), sql);
jobEntity.setQuery_id(queryid);
if (jobEntity.getIsValid() == 0) {
eps.killQuery(jobEntity);
- GlobelConfig.mapresult.get(jobEntity.getQuery_id()).setIsValid(0);
- } else if (!GlobelConfig.mapresult.containsKey(jobEntity.getQuery_id())) {
+ gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0);
+ } else if (!gc.getMapresult().containsKey(jobEntity.getQuery_id())) {
eps.reSet(jobEntity);
}
- if (GlobelConfig.mapresult.containsKey(jobEntity.getQuery_id())) {
+ if (gc.getMapresult().containsKey(jobEntity.getQuery_id())) {
if (jobEntity.getIsValid() == 0) {
eps.killQuery(jobEntity);
- GlobelConfig.mapresult.get(jobEntity.getQuery_id()).setIsValid(0);
+ gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0);
}
} else {
eps.reSet(jobEntity);
@@ -71,90 +75,83 @@ public class ScheduledResultController {
}
//遍历内存中的任务对状态1的更新进度,其他更新数据库的状态
- for (Map.Entry<String, JobEntity> entry : GlobelConfig.mapresult.entrySet()) {
+ for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus());
long currentTime = System.currentTimeMillis();
- long excutetime =currentTime-entry.getValue().getBeginTime();
- logger.info("excute time="+excutetime+"ttl_time="+HttpClientPool.socketTimeout);
- if (entry.getValue().getStatus()==1 && excutetime> HttpClientPool.socketTimeout+1){
+ long excutetime = currentTime - entry.getValue().getBeginTime();
+ logger.info("excute time=" + excutetime + "ttl_time=" + hc.getSocketTimeout());
+ if (entry.getValue().getStatus() == 1 && excutetime > hc.getSocketTimeout()) {
entry.getValue().setStatus(2);
entry.getValue().setExcute_status(500001);
eps.killQuery(entry.getValue());
eps.updateResultMessage(entry.getValue());
- }
- else{
+ } else {
if (entry.getValue().getStatus() == 1) {
eps.updateProcessMessage(entry.getValue());
- }
- else {
+ } else {
eps.updateResultMessage(entry.getValue());
}
}
}
- int rows = GlobelConfig.job_thread - GlobelConfig.mapresult.size();
- if (rows > 0) {
- List<JobEntity> jobs = ms.getJobTask(rows);
- for (JobEntity job : jobs) {
- logger.info("开始执行任务");
- long begintime = System.currentTimeMillis();
- job.setBeginTime(begintime);
- String sql = job.getQuerySql().trim();
- sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')");
- sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')");
- sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");
- job.setQuerySql(sql);
- job.setStatus(1);
- job.setExcute_status(1);
- job.setExcuteDetail("EXECUTING");
- job.setExcuteRow(0L);
- job.setExcuteTime(0);
- job.setExcuteProcess(0);
- job.setResultRows(0);
- String queryid="";
- try {
- queryid = cs.getQueryId(job.getResultId().toString(), sql);
- }
- catch (Exception e){
- job.setExcute_status(0);
- job.setStatus(7);
- job.setExcuteDetail("Unknow Error");
- }
- job.setQuery_id(queryid);
-
- if(job.getStatus()==1){
- if(ms.updateProcesses(job)!=0){
- GlobelConfig.mapresult.put(queryid, job);
- pool.execute(new Runnable() {
- @Override
- public void run() {
- es.excuteCkTask(job);
- }
- });
- }else{
- logger.error("更新数据库状态失败");
+ int rows = gc.getJob_thread() - gc.getMapresult().size();
+ if (rows > 0) {
+ List<JobEntity> jobs = ms.getJobTask(rows);
+ for (JobEntity job : jobs) {
+ logger.info("开始执行任务");
+ long begintime = System.currentTimeMillis();
+ job.setBeginTime(begintime);
+ String sql = job.getQuerySql().trim();
+ sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')");
+ sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')");
+ sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");
+ job.setQuerySql(sql);
+ job.setStatus(1);
+ job.setExcute_status(1);
+ job.setExcuteDetail("EXECUTING");
+ job.setExcuteRow(0L);
+ job.setExcuteTime(0);
+ job.setExcuteProcess(0);
+ job.setResultRows(0);
+ String queryid = "";
+ queryid = cs.getQueryId(job.getResultId().toString(), sql);
+ job.setQuery_id(queryid);
+ if (queryid.equals("") ) {
+ job.setExcute_status(0);
+ job.setStatus(7);
+ job.setExcuteDetail("Unknow Error");
+ }
+ if (job.getStatus() == 1) {
+ if (ms.updateProcesses(job) != 0) {
+ gc.getMapresult().put(queryid, job);
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ es.excuteCkTask(job);
+ }
+ });
+ } else {
+ logger.error("更新数据库状态失败");
- }
}
- else{
- if(ms.updateProcesses(job)!=0){
- logger.error("任务不能执行");
- }else{
- logger.error("更新数据库状态失败");
+ } else {
+ if (ms.updateProcesses(job) != 0) {
+ logger.error("任务不能执行");
+ } else {
+ logger.error("更新数据库状态失败");
- }
}
}
- } else {
- logger.info("无待执行任务");
}
- }
- else {
- if (GlobelConfig.mapresult.size() > 0) {
- for (Map.Entry<String, JobEntity> entry : GlobelConfig.mapresult.entrySet()) {
+ } else {
+ logger.info("无待执行任务");
+ }
+ } else {
+ if (gc.getMapresult().size() > 0) {
+ for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus());
eps.killQuery(entry.getValue());
}
- GlobelConfig.mapresult.clear();
+ gc.getMapresult().clear();
}
}
} catch (Exception e) {
diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
index a21a9c2..f178d6e 100644
--- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
+++ b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
@@ -2,23 +2,23 @@ package com.mesa.reportservice.service;
import com.mesa.reportservice.bean.HttpResult;
-import java.util.Map;
+import java.io.IOException;
/**
* Created by wk1 on 2020/1/2.
*/
public interface ClickhouseService {
- String getQueryId(String resultId,String query) throws Exception;
+ String getQueryId(String resultId,String query) ;
- HttpResult queryForExcute(String resultId,String query) throws Exception;
+ HttpResult queryForExcute(String resultId,String query) throws IOException;
- HttpResult queryForProcess(String queryId) throws Exception;
+ HttpResult queryForProcess(String queryId) throws IOException;
- HttpResult queryForCancel(String queryId) throws Exception;
+ HttpResult queryForCancel(String queryId) throws IOException;
diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
index 94e31db..4660e9a 100644
--- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
+++ b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
@@ -2,7 +2,7 @@ package com.mesa.reportservice.service;
import com.mesa.reportservice.bean.JobEntity;
-import java.util.List;
+import java.io.IOException;
/**
* Created by wk1 on 2020/1/8.
@@ -11,7 +11,7 @@ public interface ExcuteProcessService {
void updateResultMessage(JobEntity job);
void reSet(JobEntity jobEntity);
- void updateProcessMessage(JobEntity job);
- void killQuery(JobEntity jobEntity);
+ void updateProcessMessage(JobEntity job) throws IOException;
+ void killQuery(JobEntity jobEntity) throws IOException;
}
diff --git a/src/main/java/com/mesa/reportservice/service/FormatService.java b/src/main/java/com/mesa/reportservice/service/FormatService.java
deleted file mode 100644
index c9618b5..0000000
--- a/src/main/java/com/mesa/reportservice/service/FormatService.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.mesa.reportservice.service;
-
-/**
- * Created by wk1 on 2020/4/15.
- */
-public interface FormatService {
-
-
- String doFormat(String url) throws Exception;
-
-}
diff --git a/src/main/java/com/mesa/reportservice/service/MysqlService.java b/src/main/java/com/mesa/reportservice/service/MysqlService.java
index f6d9dca..036bad9 100644
--- a/src/main/java/com/mesa/reportservice/service/MysqlService.java
+++ b/src/main/java/com/mesa/reportservice/service/MysqlService.java
@@ -10,15 +10,14 @@ import java.util.Map;
public interface MysqlService {
- List<JobEntity> getJobForExcute() throws Exception;
+ List<JobEntity> getJobForExcute() ;
- List<JobEntity> getJobTask(int Rows) throws Exception;
+ List<JobEntity> getJobTask(int Rows) ;
int updateProcesses(JobEntity job);
- int updateStatue(JobEntity job);
Map<String,Long> getJobCount() throws Exception;
diff --git a/src/main/java/com/mesa/reportservice/service/ZkService.java b/src/main/java/com/mesa/reportservice/service/ZkService.java
index deff69a..d42f636 100644
--- a/src/main/java/com/mesa/reportservice/service/ZkService.java
+++ b/src/main/java/com/mesa/reportservice/service/ZkService.java
@@ -12,5 +12,5 @@ import java.net.InetAddress;
public interface ZkService {
- boolean isMaster();
+ boolean isMaster() throws Exception;
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
index 344e75f..0760b41 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
@@ -1,9 +1,9 @@
package com.mesa.reportservice.service.impl;
+import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.log.Log;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.configuration.ClickhouseConfig;
import com.mesa.reportservice.service.ClickhouseService;
@@ -11,16 +11,13 @@ import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
-import javax.annotation.Resource;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.List;
@@ -49,31 +46,37 @@ public class ClickhouseServiceImpl implements ClickhouseService {
@Override
- public String getQueryId(String resultId, String query) throws Exception {
-
-
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sys/engine/queryIds?resultId=" + resultId + "&query=");
- String sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
- url = url+sql;
- HttpGet httpGet = new HttpGet(url);
- // 加入配置信息
- httpGet.setConfig(RequestshortConfig);
- try(CloseableHttpResponse response = this.httpClient.execute(httpGet) ){
- if(response.getStatusLine().getStatusCode()!=200){
- throw new Exception();
- }
- else {
+ public String getQueryId(String resultId, String query) {
+
+ CloseableHttpResponse response = null;
+ String query_id = "";
+ try {
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sys/engine/queryIds?resultId=" + resultId + "&query=");
+ String sql = null;
+ sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
+ url = url + sql;
+ HttpGet httpGet = new HttpGet(url);
+ // 加入配置信息
+ httpGet.setConfig(RequestshortConfig);
+
+
+ response = this.httpClient.execute(httpGet);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException();
+ } else {
Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8"));
List listdata = (List) data.get("data");
Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
- String query_id = map.get("queryId").toString();
- return query_id;
+ query_id = map.get("queryId").toString();
}
- }catch (Exception e){
- logger.error("获取queryid失败"+e.toString());
- throw new Exception();
+ } catch (IOException e) {
+ logger.error(e.toString());
+
+ } finally {
+ IoUtil.close(response);
}
+ return query_id;
}
/**
@@ -83,80 +86,86 @@ public class ClickhouseServiceImpl implements ClickhouseService {
* @throws Exception
*/
@Override
- public HttpResult queryForExcute(String resultId, String query) throws Exception,OutOfMemoryError{
+ public HttpResult queryForExcute(String resultId, String query) throws UnsupportedEncodingException {
// 声明httpPost请求
-
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/?option=long-term&resultId=" + resultId + "&query=");
- query = URLEncoder.encode(query , "utf8").replaceAll("\\+", "%20");
- String jobsql = url + query;
- HttpGet httpGet = new HttpGet(jobsql);
- // 加入配置信息
- httpGet.setConfig(RequestLongConfig);
-
- try(CloseableHttpResponse response = this.httpClient.execute(httpGet) ){
- HttpResult rs = new HttpResult();
+ CloseableHttpResponse response = null;
+ // 发起请求
+ HttpResult rs = null;
+ try {
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/?option=long-term&resultId=" + resultId + "&query=");
+ query = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
+ String jobsql = url + query;
+ HttpGet httpGet = new HttpGet(jobsql);
+ // 加入配置信息
+ httpGet.setConfig(RequestLongConfig);
+
+
+ response = this.httpClient.execute(httpGet);
+ rs = new HttpResult();
rs.setCode(response.getStatusLine().getStatusCode());
rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
- return rs;
- }catch (Exception e){
+ } catch (IOException e) {
logger.error(e.toString());
- throw new Exception();
- }
+ } finally {
+ IoUtil.close(response);
+ }
+ return rs;
}
-
-
@Override
- public HttpResult queryForProcess(String queryId) throws Exception {
+ public HttpResult queryForProcess(String queryId) {
// 声明httpPost请求
- String url = URLUtil.normalize( clickhouseConfig.getGateway_ip().trim() + "/sys/engine/processes/"+queryId);
+ CloseableHttpResponse response = null;
+ // 发起请求
+ HttpResult rs = null;
+ try {
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sys/engine/processes/" + queryId);
- HttpGet httpGet = new HttpGet(url);
- // 加入配置信息
- httpGet.setConfig(RequestshortConfig);
+ HttpGet httpGet = new HttpGet(url);
+ // 加入配置信息
+ httpGet.setConfig(RequestshortConfig);
- // 发起请求
- try(CloseableHttpResponse response = this.httpClient.execute(httpGet) ){
- HttpResult rs = new HttpResult();
+ response = this.httpClient.execute(httpGet);
+ rs = new HttpResult();
rs.setCode(response.getStatusLine().getStatusCode());
rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
-
- //Thread.sleep(120);
- return rs;
-
- }catch (Exception e){
- logger.error(e.toString());
- throw new Exception();
- }
+ } catch (IOException e) {
+ logger.error(e.toString());
+ } finally {
+ IoUtil.close(response);
+ }
+ return rs;
}
@Override
- public HttpResult queryForCancel(String queryId) throws Exception {
+ public HttpResult queryForCancel(String queryId) {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sys/engine/tasks/"+queryId);
- // 声明httpPost请求
- HttpDelete HttpDelete = new HttpDelete(url);
- // 加入配置信息
- HttpDelete.setConfig(RequestshortConfig);
+ CloseableHttpResponse response = null;
+ HttpResult rs = null;
+ try {
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sys/engine/tasks/" + queryId);
+ // 声明httpPost请求
+ HttpDelete HttpDelete = new HttpDelete(url);
+ // 加入配置信息
+ HttpDelete.setConfig(RequestshortConfig);
- // 发起请求
- try(CloseableHttpResponse response = this.httpClient.execute(HttpDelete)) {
+ // 发起请求
- HttpResult rs = new HttpResult();
+ response = this.httpClient.execute(HttpDelete);
+ rs = new HttpResult();
rs.setCode(response.getStatusLine().getStatusCode());
rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
- return rs;
+ } catch (IOException e) {
+ logger.error(e.toString());
+ } finally {
+ IoUtil.close(response);
}
- catch (Exception e){
- logger.error(e.toString());
- throw new Exception();
- }
-
+ return rs;
}
public CloseableHttpClient getHttpClient() {
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
index 604dd20..05e9a67 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
@@ -4,12 +4,12 @@ import cn.hutool.log.Log;
import com.alibaba.fastjson.JSON;
import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.ClickhouseConfig;
import com.mesa.reportservice.configuration.GlobelConfig;
import com.mesa.reportservice.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -30,7 +30,8 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
private HbaseService hs;
@Autowired
private MonitorService mons;
-
+ @Autowired
+ private GlobelConfig gc;
@Override
public void updateResultMessage(JobEntity je) {
@@ -104,7 +105,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
} finally {
saveToMonitor(je);
- GlobelConfig.mapresult.remove(je.getQuery_id());
+ gc.getMapresult().remove(je.getQuery_id());
}
@@ -120,46 +121,45 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
}
@Override
- public void killQuery(JobEntity jobEntity) {
- try {
+ public void killQuery(JobEntity jobEntity) throws IOException {
+
cs.queryForCancel(jobEntity.getQuery_id());
- } catch (Exception e) {
- logger.error(e.toString());
- }
}
@Override
- public void updateProcessMessage(JobEntity job) {
- HttpResult hr = null;
- try {
- hr = cs.queryForProcess(job.getQuery_id());
- String rs = hr.getBody().trim();
- Map data = JSON.parseObject(rs);
-
- if (!rs.isEmpty() && !rs.equals("")) {
- List listdata = (List) data.get("data");
- if (null != listdata && listdata.size() > 0) {
- Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
- long read_rows = Long.parseLong(map.get("rows_read").toString());
- float elapsed = Float.parseFloat(map.get("elapsed").toString());
- double persent = Double.parseDouble(map.get("percent").toString());
- int result = (int) (persent * 100);
- job.setExcuteTime((int) elapsed);
- job.setExcuteRow(read_rows);
- job.setExcuteProcess(result);
- if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) {
- ms.updateProcesses(job);
+ public void updateProcessMessage(JobEntity job) throws IOException {
+
+ HttpResult hr = cs.queryForProcess(job.getQuery_id());
+ if (hr!=null) {
+ String rs = hr.getBody().trim();
+ Map data = JSON.parseObject(rs);
+
+ if (!rs.isEmpty() && !rs.equals("")) {
+ List listdata = (List) data.get("data");
+ if (null != listdata && listdata.size() > 0) {
+ Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
+ long read_rows = Long.parseLong(map.get("rows_read").toString());
+ float elapsed = Float.parseFloat(map.get("elapsed").toString());
+ double persent = Double.parseDouble(map.get("percent").toString());
+ int result = (int) (persent * 100);
+ job.setExcuteTime((int) elapsed);
+ job.setExcuteRow(read_rows);
+ job.setExcuteProcess(result);
+ if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) {
+ ms.updateProcesses(job);
+ }
+ } else {
+ logger.info("responsedata is null");
}
} else {
- logger.info("responsedata is null");
+ logger.error("responsebody is null");
}
- } else {
+ }
+ else{
logger.error("responsebody is null");
}
- } catch (Exception e) {
- logger.error(e.toString());
- }
+
}
/**
* 结果存入hbase
@@ -185,16 +185,11 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
* promethus记录结果
*/
public void saveToMonitor(JobEntity entity) {
-
- try {
if (entity.getStatus() == 2) {
mons.addSuccess();
} else {
mons.addFail();
}
- } catch (Exception e) {
- logger.error("监控结果失败" + e.toString());
- }
}
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
index aa50055..8359b15 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
@@ -4,14 +4,11 @@ import cn.hutool.log.Log;
import com.alibaba.fastjson.JSON;
import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.ClickhouseConfig;
import com.mesa.reportservice.configuration.GlobelConfig;
import com.mesa.reportservice.service.ClickhouseService;
import com.mesa.reportservice.service.ExcuteService;
-import com.mesa.reportservice.service.FormatService;
import com.mesa.reportservice.service.MysqlService;
import io.netty.channel.ConnectTimeoutException;
-import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -31,16 +28,15 @@ public class ExcuteserviceImpl implements ExcuteService {
private ClickhouseService cs;
@Autowired
private MysqlService ms;
-
@Autowired
- private FormatService fs;
+ private GlobelConfig gc;
@Override
public void excuteCkTask(JobEntity job) {
- logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + GlobelConfig.mapresult.size());
+ logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size());
HttpResult hr = new HttpResult();
int k = 3;
do {
diff --git a/src/main/java/com/mesa/reportservice/service/impl/FormatServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/FormatServiceImpl.java
deleted file mode 100644
index db98ba8..0000000
--- a/src/main/java/com/mesa/reportservice/service/impl/FormatServiceImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.mesa.reportservice.service.impl;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.mesa.reportservice.service.FormatService;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.params.HttpParams;
-import org.apache.http.util.EntityUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import java.util.Map;
-
-/**
- * Created by wk1 on 2020/4/15.
- */
-@Service
-public class FormatServiceImpl implements FormatService {
-
- @Autowired
- private CloseableHttpClient httpClient;
-
- @Autowired
- @Qualifier("RequestShortConfig")
- private RequestConfig config;
-
- @Override
- public String doFormat(String url) throws Exception {
-
- HttpGet httpGet = new HttpGet(url);
- httpGet.setHeader("Accept", "application/json");
- // 装载配置信息
- httpGet.setConfig(config);
- CloseableHttpResponse response = this.httpClient.execute(httpGet);
- if (response.getStatusLine().getStatusCode() == 200) {
- // 返回响应体的内容
-
- JSONObject jsonObject = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8"));
- String responseMessage = jsonObject.getString("message");
- return responseMessage.trim();
- }
- else{
- throw new Exception("Formate sql error url = "+url);
- }
-
- }
-
-
-}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
index 708522f..8f3ae5a 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
@@ -1,16 +1,20 @@
package com.mesa.reportservice.service.impl;
+import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
-import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.configuration.HbaseProperties;
import com.mesa.reportservice.service.HbaseService;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.IOException;
+
/**
* Created by wk1 on 2019/5/15.
@@ -27,26 +31,23 @@ public class HbaseServiceImpl implements HbaseService {
Log logger = Log.get();
- private String getRowKey(String query_id){
+ private String getRowKey(String query_id) {
- String rowKey = query_id;
- try {
- String[] queryIdArray = query_id.split(":");
- rowKey=queryIdArray[1];
- }
- catch (Exception e){
- logger.error(e.toString());
- }
+ String rowKey = "";
+ String[] queryIdArray = query_id.split(":");
+ rowKey = queryIdArray[1];
return rowKey;
}
@Override
- public Boolean put(JobEntity jobEntity) throws Exception {
+ public Boolean put(JobEntity jobEntity) {
Boolean status = false;
- try(Table table =hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()))) {
+ Table table = null;
+ try {
+ table = hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()));
Put put = new Put(Bytes.toBytes(getRowKey(jobEntity.getQuery_id())));
put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), Bytes.toBytes(jobEntity.getResult()));
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("result_id"), Bytes.toBytes(jobEntity.getResultId()));
@@ -54,23 +55,24 @@ public class HbaseServiceImpl implements HbaseService {
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql()));
//put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql()));
- if(jobEntity.getExcuteRow()!=null){
+ if (jobEntity.getExcuteRow() != null) {
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getExcuteRow()));
}
- if(jobEntity.getQuery_duration_ms()!=null){
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms()));
+ if (jobEntity.getQuery_duration_ms() != null) {
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms()));
}
- if(jobEntity.getMemory_usage()!=null){
+ if (jobEntity.getMemory_usage() != null) {
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemory_usage()));
}
table.put(put);
- status =true;
- }catch(Exception e) {
+ status = true;
+ } catch (IOException e) {
logger.error(e.toString());
+ } finally {
+ IoUtil.close(table);
}
return status;
-}
-
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
index a36433e..008d207 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
@@ -43,21 +43,11 @@ public class MonitorServiceImpl implements MonitorService {
}
@Override
public void addSuccess() {
-
- try{
counter_report_success.increment();
- } catch (Exception e) {
- logger.error(e.toString());
- }
}
@Override
public void addFail() {
-
- try{
counter_report_fail.increment();
- } catch (Exception e) {
- logger.error(e.toString());
- }
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
index fdc5cc7..bce645f 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
@@ -26,14 +26,14 @@ public class MysqlServiceImpl implements MysqlService {
public ReportResultMapper rrm;
@Override
- public List<JobEntity> getJobForExcute() throws Exception{
+ public List<JobEntity> getJobForExcute() {
return rrm.getJobForExcute();
}
@Override
- public List<JobEntity> getJobTask(int rows) throws Exception{
+ public List<JobEntity> getJobTask(int rows) {
String current_time = DateUtil.getDate();
@@ -48,38 +48,17 @@ public class MysqlServiceImpl implements MysqlService {
@Override
public int updateProcesses(JobEntity job){
- try {
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sDate = formatter.format(currentTime);
- java.sql.Timestamp nowTime = java.sql.Timestamp.valueOf(sDate);
job.setOpTime(sDate);
return rrm.updateProcesses(job);
- }
- catch(Exception e){
- logger.error(e.toString());
- return 0;
- }
}
- @Override
- public int updateStatue(JobEntity job) {
- try {
- Date currentTime = new Date();
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String sDate = formatter.format(currentTime);
- java.sql.Timestamp nowTime = java.sql.Timestamp.valueOf(sDate);
- job.setOpTime(sDate);
- return rrm.updateStatue(job);
- }
- catch(Exception e){
- logger.error(e.toString());
- return 0;
- }
- }
+
@Override
- public Map<String,Long> getJobCount() throws Exception {
+ public Map<String,Long> getJobCount() {
String current_time = DateUtil.getDate();
String today_time = DateUtil.getCurrentDate();
HashMap map = new HashMap();
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java
index 1136087..638c8da 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java
@@ -28,12 +28,12 @@ public class ZkServiceImpl implements ZkService {
@Override
public boolean isMaster() {
+
try {
if (zp.getOpen() == 0) {
boolean isZkCuratorStarted = curatorConnect.isStarted();
String nodePath = "/masterip";
- System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
- ExistsBuilder eb = curatorConnect.checkExists();
+ logger.debug("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
Stat statExist = curatorConnect.checkExists().forPath(nodePath);
if (statExist == null) {
byte[] data = GlobelConfig.zkuuid.getBytes(); // 节点数据
@@ -41,15 +41,17 @@ public class ZkServiceImpl implements ZkService {
String result = curatorConnect.create().creatingParentsIfNeeded() // 创建父节点,也就是会递归创建
.withMode(CreateMode.EPHEMERAL) // 节点类型
.forPath(nodePath, data);
+ logger.debug(result);
+
return true;
} else {
- System.out.println(nodePath + " 节点存在");
+ logger.debug(nodePath + " 节点存在");
Stat stat = new Stat();
byte[] nodeData = curatorConnect.getData().storingStatIn(stat).forPath(nodePath);
- String masterid = new String(nodeData).trim();
- System.out.println("uuid="+ GlobelConfig.zkuuid+" 节点 " + nodePath + " 的数据为:" + new String(nodeData));
+ String masterid = new String(nodeData, "UTF-8").trim();
+ logger.debug("uuid=" + GlobelConfig.zkuuid + " 节点 " + nodePath + " 的数据为:" + masterid);
if (masterid.equals(GlobelConfig.zkuuid)) {
@@ -63,10 +65,13 @@ public class ZkServiceImpl implements ZkService {
} else {
return true;
}
+
} catch (Exception e) {
logger.error(e.toString());
- return true;
+ return false;
}
}
}
+
+
diff --git a/src/main/java/com/mesa/reportservice/util/DateUtil.java b/src/main/java/com/mesa/reportservice/util/DateUtil.java
index 05ee225..6307470 100644
--- a/src/main/java/com/mesa/reportservice/util/DateUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/DateUtil.java
@@ -452,38 +452,8 @@ public final class DateUtil {
}
- /**
- * <p>将日期,转换成10进制日期</p>
- * <p>
- * pattern需跟date格式相同,才可进行转换.转换后的长整型.
- * </p>
- * <pre>
- * DateUtil.convertStringToTimestamp(TimeConstants.YYYY_MM_DD, "2010-10-11") = 1286726400
- * </pre>
- *
- * @param date 时间值
- * @param pattern 描述日期与格式
- * @return 返回10进制时间值字符串
- * @throws NullPointerException
- * @see #convertTimestampToString(long, String)
- */
- public static long convertStringToTimestamp(String date,
- String pattern) {
- if (StringUtil.isEmpty(date) || StringUtil.isEmpty(pattern)) {
- throw new NullPointerException("参数为NULL");
- }
- long time = 0l;
- simpleDateFormat = new SimpleDateFormat(pattern);
- try {
- time = simpleDateFormat.parse(date).getTime() / 1000;
- } catch (ParseException e) {
- e.printStackTrace();
- }
- return time;
-
- }
/**
@@ -538,37 +508,7 @@ public final class DateUtil {
}
- /**
- * <p>通过字符串日期类型,转换为Date类型日期.</p>
- * <p>
- * pattern格式,需跟提供的日期字符串格式相统一,否则解析异常返回NULL值。
- * </P>
- * <pre>
- * DateUtil.convertStringToDate(TimeConstants.YYYY_MM_DD, "2010-09-11") = date类型日期
- * DateUtil.convertStringToDate(TimeConstants.YYYY_MM_DD_HH_MM, "2010-09-11") = NULL.
- * </pre>
- *
- * @param date 字符串类型日期.
- * @param pattern 描述日期与时间格式.
- * @return 返回解析后的{@link java.util.Date }类型日期.
- * @throws NullPointerException
- */
- public static Date convertStringToDate(String date,
- String pattern) {
- if (StringUtil.isEmpty(date) || StringUtil.isEmpty(pattern)) {
- throw new NullPointerException();
- }
- simpleDateFormat = new SimpleDateFormat(pattern);
- Date parseDate = null;
- try {
- parseDate = simpleDateFormat.parse(date);
- } catch (ParseException e) {
- e.printStackTrace();
- }
-
- return parseDate;
- }
}
diff --git a/src/main/java/com/mesa/reportservice/util/StringUtil.java b/src/main/java/com/mesa/reportservice/util/StringUtil.java
index 8f8a1f2..63bc219 100644
--- a/src/main/java/com/mesa/reportservice/util/StringUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/StringUtil.java
@@ -319,43 +319,8 @@ public final class StringUtil extends StringUtils {
return isEmpty(object) ? defaultValue : object;
}
- /**
- * <p>对字符串进行MD5加密.</p>
- * <p>
- * 一般作为密码的处理方式,首先通过MD5进行加密,然后将字符串进行Base64编码获得所需字符.
- * </p>
- * <pre>
- * String str = "ceshi";
- * StringUtil.md5(str) = "zBfDDNERxyFfyPUfh5Dg4Q=="
- * </pre>
- *
- * @param msg 要加密的字符串
- * @return 返回加密后的24位字符, 如果解析出现异常将返回<code>null</code>.
- */
-
-
- public static String getBase64(String msg) {
- try {
- BASE64Encoder encoder = new BASE64Encoder();
- return encoder.encode(msg.getBytes());
- } catch (Exception e) {
- e.printStackTrace();
- return EMPTY;
- }
- }
- /**
- * <p>截取处理字符串,当字符串超过指定的截取长度时,用“......”补充</p>
- * <pre>
- * String str = "中华人民共和国";
- * StringUtil.getMoreString(str, 6) = "中华人民共和......"
- * </pre>
- *
- * @param text 字符串数据
- * @param length 截取的长度值
- * @return 返回处理后字符
- */
public static String getMoreString(String text, int length) {
StringBuilder textBuilder = new StringBuilder();