summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitlab-ci.yml9
-rw-r--r--config/application-test.yml19
-rw-r--r--config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml14
-rw-r--r--config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml133
-rw-r--r--docs/release/release-374.md4
-rw-r--r--pom.xml26
-rw-r--r--src/main/java/com/mesa/reportservice/ReportServiceApplication.java (renamed from src/main/java/com/mesa/reportservice/ReportserviceApplication.java)8
-rw-r--r--src/main/java/com/mesa/reportservice/bean/HttpResult.java30
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java11
-rw-r--r--src/main/java/com/mesa/reportservice/bean/MonitorEntity.java65
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java29
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java38
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java36
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HBaseFactory.java (renamed from src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java)31
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java36
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java72
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java111
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java47
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java25
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java40
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java35
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZkConfig.java48
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZkProperties.java86
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java46
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java32
-rw-r--r--src/main/java/com/mesa/reportservice/controller/MonitorController.java55
-rw-r--r--src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java155
-rw-r--r--src/main/java/com/mesa/reportservice/enums/JobStates.java31
-rw-r--r--src/main/java/com/mesa/reportservice/exception/SQSCode.java47
-rw-r--r--src/main/java/com/mesa/reportservice/exception/SQSException.java69
-rw-r--r--src/main/java/com/mesa/reportservice/mapper/JobMapper.java (renamed from src/main/java/com/mesa/reportservice/mapper/ReportResultMapper.java)10
-rw-r--r--src/main/java/com/mesa/reportservice/service/ClickhouseService.java25
-rw-r--r--src/main/java/com/mesa/reportservice/service/ExcuteService.java12
-rw-r--r--src/main/java/com/mesa/reportservice/service/ExecuteProcessService.java (renamed from src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java)13
-rw-r--r--src/main/java/com/mesa/reportservice/service/ExecuteService.java13
-rw-r--r--src/main/java/com/mesa/reportservice/service/HBaseService.java (renamed from src/main/java/com/mesa/reportservice/service/HbaseService.java)10
-rw-r--r--src/main/java/com/mesa/reportservice/service/JobService.java25
-rw-r--r--src/main/java/com/mesa/reportservice/service/MonitorService.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/MysqlService.java24
-rw-r--r--src/main/java/com/mesa/reportservice/service/QueryGatewayService.java21
-rw-r--r--src/main/java/com/mesa/reportservice/service/ScheduledResultService.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/ZookeeperService.java (renamed from src/main/java/com/mesa/reportservice/service/ZkService.java)9
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java182
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java114
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java176
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java109
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/HBaseServiceImpl.java (renamed from src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java)35
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java51
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java74
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java69
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImpl.java (renamed from src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java)146
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java180
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java (renamed from src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java)50
-rw-r--r--src/main/java/com/mesa/reportservice/util/Constant.java50
-rw-r--r--src/main/resources/mappers/JobMapper.xml (renamed from src/main/resources/mappers/ReportResultMapper.xml)6
-rw-r--r--src/test/java/com/mesa/ReportServiceApplicationTests.java81
-rw-r--r--src/test/java/com/mesa/reportservice/ExecuteProcessTest.java86
-rw-r--r--src/test/java/com/mesa/reportservice/ExecuteTest.java29
-rw-r--r--src/test/java/com/mesa/reportservice/HBaseTest.java28
-rw-r--r--src/test/java/com/mesa/reportservice/JobTest.java62
-rw-r--r--src/test/java/com/mesa/reportservice/MonitorTest.java29
-rw-r--r--src/test/java/com/mesa/reportservice/QueryGatewayTest.java96
-rw-r--r--src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java37
-rw-r--r--src/test/java/com/mesa/reportservice/zookeeperTest.java25
-rw-r--r--src/test/resources/db/SavedQueryJob.sql7
-rw-r--r--src/test/resources/parameters/executeProcessTest.json170
-rw-r--r--src/test/resources/parameters/executeTest.json26
-rw-r--r--src/test/resources/parameters/hBaseTest.json26
-rw-r--r--src/test/resources/parameters/mysqlTest.json50
-rw-r--r--src/test/resources/parameters/queryGatewayTest.json6
70 files changed, 2147 insertions, 1425 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 79c3c9b..96f075d 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,3 +1,8 @@
+variables:
+ MAVEN_CLI_OPTS: --batch-mode --errors --show-version
+ SONAR_HOST_URL: http://192.168.40.153:9900
+ SONAR_PROJECT_KEY: saved-query-scheduler
+ SONAR_LOGIN_TOKEN: 921363353ba26a612fc4ec77679647780f9a46e7
stages:
- build
@@ -31,6 +36,10 @@ deploy:
script:
- echo 'exec mvn package & docker build'
- echo 'tag:' $CI_COMMIT_TAG
+ - |-
+ mvn $MAVEN_CLI_OPTS clean verify sonar:sonar -Dsonar.projectKey=$SONAR_PROJECT_KEY \
+ -Dsonar.host.url=$SONAR_HOST_URL -Dsonar.login=$SONAR_LOGIN_TOKEN \
+ -Dsonar.qualitygate.wait=true -Dsonar.qualitygate.timeout=300
- mvn clean package -Dmaven.test.skip=true docker:build -DdockerImageTags=$CI_COMMIT_TAG
when: on_success
only:
diff --git a/config/application-test.yml b/config/application-test.yml
new file mode 100644
index 0000000..4631924
--- /dev/null
+++ b/config/application-test.yml
@@ -0,0 +1,19 @@
+nacos:
+ config:
+ type: yaml #配置集的配置格式
+ server-addr: 127.0.0.1:8848 #配置中心地址
+ namespace: test #命名空间
+ data-id: saved-query-scheduler.yml # 数据集ID
+ auto-refresh: true #开启自动刷新
+ group: Galaxy #配置对应的分组
+ username: nacos #Nacos认证用户
+ password: nacos #Nacos认证密码
+ bootstrap:
+ enable: true #开启配置预加载功能
+ log:
+ enable: true #开启Nacos支持日志级别的加载时机
+spring:
+ profiles:
+ active: test
+logging:
+ config: ./config/log4j2-dev.xml \ No newline at end of file
diff --git a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
index 7a7c9fb..5c910d8 100644
--- a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
+++ b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
@@ -6,13 +6,13 @@ scan:
result:
scheduled:
plan: 0/15 * * * * ?
-#同时间执行是线程数
-globle:
+#同时间执行线程数
+scheduler:
job_thread: 2
#Hbasehttp的端口
#Hbase的表名等配置通畅不需要更改
hbase:
- table: tsg:report_result
+ table: tsg_galaxy:saved_query_result
zookeeper_quorum: 192.168.44.12
zookeeper_property_clientPort: 2181
zookeeper_znode_parent: /hbase
@@ -22,12 +22,12 @@ hbase:
#存入Hbase的cell级别生存时间 根据部署环境填写:1.TSG 不设置(永久有效); 2.CN 7(默认7天) 单位:Day
cell_ttl_d:
#查询网关ip
-ck:
- gateway_ip: localhost:9999
+query-gateway:
+ gateway_ip: 192.168.44.12:9999
#zk集群的ip
zookeeper:
- connectString: 192.168.44.55
+ connectString: 192.168.44.12
#是否启用zookeeper 0启用(集群) 1禁用(单机)
open: 1
retryCount: 5
@@ -109,7 +109,7 @@ spring:
login-password: admin
#Spring监控,对内部各接口调用的监控,需要导入aop相关包
aop-patterns: com.mesa.reportservice.controller.*,com.mesa.reportservice.service.*,com.mesa.reportservice.mapper.*
-mybatis:
+mybatis-plus:
typeAliasesPackage: com.mesa.reportservice.bean
mapperLocations: classpath*:/mappers/*.xml
management:
diff --git a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml
new file mode 100644
index 0000000..97f8407
--- /dev/null
+++ b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml
@@ -0,0 +1,133 @@
+#http的端口
+server:
+ port: 9093
+#更新进度条的时间10s
+scan:
+ result:
+ scheduled:
+ plan: 0/15 * * * * ?
+#同时间执行线程数
+scheduler:
+ job_thread: 2
+#Hbasehttp的端口
+#Hbase的表名等配置通畅不需要更改
+hbase:
+ table: tsg_galaxy:saved_query_result
+ zookeeper_quorum: 192.168.44.12
+ zookeeper_property_clientPort: 2181
+ zookeeper_znode_parent: /hbase
+ client_retries_number: 3
+ rpc_timeout: 100000
+ connect_pool: 10
+#存入Hbase的cell级别生存时间 根据部署环境填写:1.TSG 不设置(永久有效); 2.CN 7(默认7天) 单位:Day
+ cell_ttl_d:
+#查询网关ip
+query-gateway:
+ gateway_ip: 192.168.44.12:9999
+
+#zk集群的ip
+zookeeper:
+ connectString: 192.168.44.12
+#是否启用zookeeper 0启用(集群) 1禁用(单机)
+ open: 1
+ retryCount: 5
+ elapsedTimeMs: 10000
+ sessionTimeoutMs: 50000
+ connectionTimeoutMs: 50000
+ nameSpace: reportservice
+
+#最大连接数
+http:
+ maxTotal: 300
+#并发数
+ defaultMaxPerRoute: 100
+#创建连接的最长时间
+ connectTimeout: 10000
+#从连接池中获取到连接的最长时间
+ connectionRequestTimeout: 10000
+#数据传输的最长时间
+ socketTimeout: 21605000
+#提交请求前测试连接是否可用
+ staleConnectionCheckEnabled: true
+ socketTimeoutShort: 30000
+
+#mariadb的url
+spring:
+ application:
+ name: saved-query-scheduler
+
+ datasource:
+ url: jdbc:mariadb://192.168.44.12:3306/olap_test?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
+#mariadb的用户名
+ username: root
+#mariadb的密码
+ password: galaxy2019
+
+#以下配置不需要更改通常
+ name: druidDataSource
+ type: com.alibaba.druid.pool.DruidDataSource
+ driver-class-name: org.mariadb.jdbc.Driver
+
+#配置监控统计拦截的filters,去掉后监控界面SQL无法进行统计,’wall’用于防火墙
+ druid:
+ filters:
+ stat:
+ enabled: true
+ slf4j:
+ enabled: true
+ wall:
+ enabled: true
+ config:
+ comment-allow: true
+ #最大连接数
+ max-active: 30
+ #最小连接数
+ min-idle: 1
+ #初始化连接数
+ initial-size: 2
+ #获取连接最大超时时间
+ max-wait: 600000
+ #间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ time-between-eviction-runs-millis: 60000
+ # 一个连接在池中最小生存的时间,单位是毫秒
+ min-evictable-idle-time-millis: 300000
+ #验证连接是否可用,在数据库中执行一条sql
+ validation-query: select 1
+ #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
+ # 执行validationQuery检测连接是否有效
+ test-while-idle: true
+ #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
+ test-on-borrow: true
+ test-on-return: false
+ connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
+ #是否开启WebStatFilter
+ web-stat-filter:
+ enabled: true
+ #设置不统计哪些URL(用于排除一些不必要的url)
+ exclusions: "*.js,*.gif,*.jpg,*..ng,*.css,*.ico,/druid/*"
+ #是否开启Druid监控信息显示页面
+ stat-view-servlet:
+ enabled: true
+ #甚至浏览器访问路径
+ url-pattern: /druid/*
+ #禁止手动重置监控数据
+ reset-enable: false
+ #durid-ui页面账户密码
+ login-username: admin
+ login-password: admin
+ #Spring监控,对内部各接口调用的监控,需要导入aop相关包
+ aop-patterns: com.mesa.reportservice.controller.*,com.mesa.reportservice.service.*,com.mesa.reportservice.mapper.*
+mybatis-plus:
+ typeAliasesPackage: com.mesa.reportservice.bean
+ mapperLocations: classpath*:/mappers/*.xml
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+ metrics:
+ tags:
+ application: saved-query-scheduler
+
+logging:
+ config: ./config/log4j2-dev.xml
diff --git a/docs/release/release-374.md b/docs/release/release-374.md
index 00e3421..32e7a3d 100644
--- a/docs/release/release-374.md
+++ b/docs/release/release-374.md
@@ -1,4 +1,6 @@
Release 374 (TSG-23.12)
* 报告服务适配新版表结构(GAL-447)
* Hbase存储数据表更换为tsg_galaxy:saved_query_result
-* 优化报告任务运行时信息 \ No newline at end of file
+* 优化报告任务运行时信息
+* 修复monitor接口执行失败时status异常问题
+* 修复Report报告执行失败(TSG-18703) \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 234db70..3ee88c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,6 +23,7 @@
<!--docker-registry的端口-->
<docker.registry.port>9080</docker.registry.port>
<docker.image.prefix>tsg/galaxy</docker.image.prefix>
+ <project.password>Geedge2020!</project.password>
</properties>
<!-- 设置 jitpack.io 仓库 -->
<repositories>
@@ -43,7 +44,6 @@
<dependencies>
-
<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
@@ -66,9 +66,15 @@
<version>1.4</version>
</dependency>-->
<dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>1.3.2</version>
+ <groupId>com.baomidou</groupId>
+ <artifactId>mybatis-plus-boot-starter</artifactId>
+ <version>3.0.7.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.baomidou</groupId>
+ <artifactId>mybatis-plus-generator</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
@@ -258,6 +264,16 @@
<build>
<plugins>
<plugin>
+ <groupId>org.sonarsource.scanner.maven</groupId>
+ <artifactId>sonar-maven-plugin</artifactId>
+ <version>3.9.0.2155</version>
+ </plugin>
+ <plugin>
+ <groupId>io.github.r0bb3n</groupId>
+ <artifactId>sonar-quality-gate-maven-plugin</artifactId>
+ <version>1.3.0</version>
+ </plugin>
+ <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.12.RELEASE</version>
@@ -288,7 +304,7 @@
<phase>package</phase>
<configuration>
<!-- 配置加密密码 ,推荐使用命令指定 -->
- <password>Geedge2020!</password>
+ <password>${project.password}</password>
<!-- 配置不加密项 -->
<excludes>
<exclude>
diff --git a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java b/src/main/java/com/mesa/reportservice/ReportServiceApplication.java
index 46595d8..d6e0e28 100644
--- a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java
+++ b/src/main/java/com/mesa/reportservice/ReportServiceApplication.java
@@ -15,11 +15,13 @@ import java.lang.management.RuntimeMXBean;
import java.util.List;
import java.util.TimeZone;
+/**
+ * @author wk
+ */
@ServletComponentScan
-
@SpringBootApplication
@EnableScheduling
-public class ReportserviceApplication {
+public class ReportServiceApplication {
public static final String TIMEZONE = "UTC";
public static void main(String[] args) {
@@ -34,7 +36,7 @@ public class ReportserviceApplication {
// joda.time.DateTime
System.setProperty("user.timezone", TIMEZONE);
}
- SpringApplication.run(ReportserviceApplication.class, args);
+ SpringApplication.run(ReportServiceApplication.class, args);
}
@Bean
diff --git a/src/main/java/com/mesa/reportservice/bean/HttpResult.java b/src/main/java/com/mesa/reportservice/bean/HttpResult.java
index 632edaa..4aa3df7 100644
--- a/src/main/java/com/mesa/reportservice/bean/HttpResult.java
+++ b/src/main/java/com/mesa/reportservice/bean/HttpResult.java
@@ -1,35 +1,19 @@
package com.mesa.reportservice.bean;
+import lombok.Data;
+
/**
- * Created by wk1 on 2019/5/15.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
+@Data
public class HttpResult {
private int code;
- private String body;
- // private byte[] bodybyte;
-
+ private String body;
public HttpResult() {
-
- }
-
-
-
- public int getCode() {
- return code;
- }
-
- public void setCode(int code) {
- this.code = code;
- }
-
- public String getBody() {
- return body;
- }
-
- public void setBody(String body) {
- this.body = body;
}
}
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index b3baa8b..2a0394c 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -1,11 +1,17 @@
package com.mesa.reportservice.bean;
+import cn.hutool.log.LogFactory;
+import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
+import org.apache.log4j.spi.LoggerFactory;
/**
- * Created by wk1 on 2019/5/20.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Data
+@TableName("saved_query_job")
public class JobEntity implements Cloneable {
private String jobId;
@@ -44,7 +50,7 @@ public class JobEntity implements Cloneable {
private String queryId;
- private Integer excuteStatus;
+ private Integer executeStatus;
private String result;
@@ -58,6 +64,7 @@ public class JobEntity implements Cloneable {
try {
o = (JobEntity) super.clone();
} catch (CloneNotSupportedException e) {
+ LogFactory.get().error(e);
}
return o;
}
diff --git a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java
index 2824ecf..cfa761e 100644
--- a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java
@@ -1,72 +1,29 @@
package com.mesa.reportservice.bean;
+import lombok.Data;
+
import java.util.Map;
/**
- * Created by wk1 on 2019/5/15.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
+@Data
public class MonitorEntity {
private Long queueJobNum;
- private Long excuteingJobNum;
+
+ private Long executingJobNum;
+
private Long todaySuccessJobNum;
+
private Long todayErrorJobNum;
- private Map<String,JobEntity> joblist ;
+ private Map<String,JobEntity> jobList ;
private String status;
-
public MonitorEntity() {
-
- }
-
- public Long getQueueJobNum() {
- return queueJobNum;
- }
-
- public void setQueueJobNum(Long queueJobNum) {
- this.queueJobNum = queueJobNum;
- }
-
- public Long getExcuteingJobNum() {
- return excuteingJobNum;
- }
-
- public void setExcuteingJobNum(Long excuteingJobNum) {
- this.excuteingJobNum = excuteingJobNum;
- }
-
- public Long getTodaySuccessJobNum() {
- return todaySuccessJobNum;
- }
-
- public void setTodaySuccessJobNum(Long todaySuccessJobNum) {
- this.todaySuccessJobNum = todaySuccessJobNum;
- }
-
- public Long getTodayErrorJobNum() {
- return todayErrorJobNum;
- }
-
- public void setTodayErrorJobNum(Long todayErrorJobNum) {
- this.todayErrorJobNum = todayErrorJobNum;
- }
-
- public Map<String, JobEntity> getJoblist() {
- return joblist;
- }
-
- public void setJoblist(Map<String, JobEntity> joblist) {
- this.joblist = joblist;
- }
-
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
}
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
deleted file mode 100644
index 3526c4e..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-
-/**
- * Created by wk1 on 2019/5/17.
- */
-@Component
-//@ConfigurationProperties(prefix = "ck")
-@NacosConfigurationProperties(prefix = "ck", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-
-public class ClickhouseConfig {
-
- private static String gateway_ip;
-
- public void setGateway_ip(String gateway_ip) {
- ClickhouseConfig.gateway_ip = gateway_ip;
- }
-
- public String getGateway_ip() {
- return gateway_ip;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java b/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java
deleted file mode 100644
index d17e1a0..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.EnableAsync;
-import org.springframework.scheduling.annotation.SchedulingConfigurer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.scheduling.config.ScheduledTaskRegistrar;
-import org.springframework.stereotype.Component;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * Created by wk on 2020/8/28.
- */
-@Configuration
-@EnableAsync
-public class ExecutorConfig {
-
- @Bean
- public Executor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(2);
- executor.setMaxPoolSize(20);
- executor.initialize();
- return executor;
- }
- @Bean
- public Executor resultExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(2);
- executor.setMaxPoolSize(20);
- executor.initialize();
- return executor;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java
deleted file mode 100644
index 7334b45..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import com.mesa.reportservice.bean.JobEntity;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Created by wk1 on 2019/5/28.
- */
-
-@Component
-@NacosConfigurationProperties(prefix = "globle", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-
-public class GlobelConfig {
-
- private static int job_thread;
- private static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>();
- public void setJob_thread(int job_thread) {
- this.job_thread = job_thread;
- }
- public final static String zkuuid = UUID.randomUUID().toString().replaceAll("-","");
-
- public int getJob_thread() {
- return job_thread;
- }
-
- public Map<String, JobEntity> getMapresult() {
- return mapresult;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HBaseFactory.java
index 96f6457..6134101 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HBaseFactory.java
@@ -5,44 +5,38 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
- * Created by Administrator on 2020/3/10.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Configuration
-public class HbaseFactory {
+public class HBaseFactory {
@Autowired
- private HbaseProperties hbaseProperties;
+ private HBaseProperties hBaseProperties;
Log logger = Log.get();
-
@Bean(name = "hbaseConfiguration")
public org.apache.hadoop.conf.Configuration getConfiguration() {
-
org.apache.hadoop.conf.Configuration conf = null;
if (conf == null) {
conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", hbaseProperties.getZookeeper_quorum());
- conf.set("hbase.zookeeper.property.clientPort", hbaseProperties.getZookeeper_property_clientPort());
- conf.set("zookeeper.znode.parent", hbaseProperties.getZookeeper_znode_parent());
- conf.set("hbase.client.retries.number", hbaseProperties.getClient_retries_number());
- conf.set("hbase.rpc.timeout", hbaseProperties.getRpc_timeout());
+ conf.set("hbase.zookeeper.quorum", hBaseProperties.getZookeeperQuorum());
+ conf.set("hbase.zookeeper.property.clientPort", hBaseProperties.getZookeeperPropertyClientPort());
+ conf.set("zookeeper.znode.parent", hBaseProperties.getZookeeperZnodeParent());
+ conf.set("hbase.client.retries.number", hBaseProperties.getClientRetriesNumber());
+ conf.set("hbase.rpc.timeout", hBaseProperties.getRpcTimeout());
conf.set("hbase.client.keyvalue.maxsize", "1024000000");
conf.set("zookeeper.recovery.retry", "3");
- conf.set("hbase.client.ipc.pool.size", hbaseProperties.getConnect_pool().toString());
-
-
+ conf.set("hbase.client.ipc.pool.size", hBaseProperties.getConnectPool().toString());
}
-
return conf;
}
@@ -51,12 +45,9 @@ public class HbaseFactory {
Connection con = null;
try {
con = ConnectionFactory.createConnection(Conf);
-
} catch (IOException e) {
logger.error(e.toString());
}
return con;
-
-
}
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java
new file mode 100644
index 0000000..47211f2
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java
@@ -0,0 +1,36 @@
+package com.mesa.reportservice.configuration;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Data
+@Component
+@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class HBaseProperties {
+
+ private String zookeeperQuorum;
+
+ private String zookeeperPropertyClientPort;
+
+ private String zookeeperZnodeParent;
+
+ private String clientPause;
+
+ private String clientRetriesNumber;
+
+ private String rpcTimeout;
+
+ private Integer connectPool;
+
+ private String table;
+
+ private Integer cellTtlD;
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java b/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java
deleted file mode 100644
index 2cdb8e4..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.util.StringUtil;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-import java.net.URL;
-
-/**
- * Created by wk1 on 2019/5/27.
- */
-@Component
-@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-
-public class HbaseConfig {
-
- private static String columefamily;
- private static String url;
- private static String table;
- private static String colume_job_id;
- private static String colume_query_id;
- private static String colume_query_duration_ms;
- private static String colume_read_rows;
- private static String colume_memory_usage;
- private static String colume_exception;
- private static String colume_sql;
-
-
- public void setColumefamily(String columefamily) {
- HbaseConfig.columefamily = columefamily;
- }
-
- public void setUrl(String url) {
- HbaseConfig.url = url;
- }
-
- public void setTable(String table) {
- HbaseConfig.table = table;
- }
-
- public void setColume_query_id(String colume_query_id) {
- HbaseConfig.colume_query_id = colume_query_id;
- }
-
- public void setColume_query_duration_ms(String colume_query_duration_ms) {
- HbaseConfig.colume_query_duration_ms = colume_query_duration_ms;
- }
-
- public void setColume_read_rows(String colume_read_rows) {
- HbaseConfig.colume_read_rows = colume_read_rows;
- }
-
- public void setColume_memory_usage(String colume_memory_usage) {
- HbaseConfig.colume_memory_usage = colume_memory_usage;
- }
-
- public void setColume_job_id(String colume_job_id) {
- HbaseConfig.colume_job_id = colume_job_id;
- }
-
-
- public void setColume_exception(String colume_exception) {
- HbaseConfig.colume_exception = colume_exception;
- }
-
- public void setColume_sql(String colume_sql) {
- HbaseConfig.colume_sql = colume_sql;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
deleted file mode 100644
index 46fefc8..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-/**
- * Created by Administrator on 2020/3/10.
- */
-
-//@ConfigurationProperties(prefix = "hbase")
-@Component
-@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-
-public class HbaseProperties {
-
- private String zookeeper_quorum;
-
- private String zookeeper_property_clientPort;
-
- private String zookeeper_znode_parent;
-
- private String client_pause;
-
- private String client_retries_number;
-
- private String rpc_timeout;
-
- private Integer connect_pool;
-
- private String table;
-
- private Integer cell_ttl_d;
-
-
-
- public String getZookeeper_quorum() {
- return zookeeper_quorum;
- }
-
- public void setZookeeper_quorum(String zookeeper_quorum) {
- this.zookeeper_quorum = zookeeper_quorum;
- }
-
- public String getZookeeper_property_clientPort() {
- return zookeeper_property_clientPort;
- }
-
- public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) {
- this.zookeeper_property_clientPort = zookeeper_property_clientPort;
- }
-
- public String getZookeeper_znode_parent() {
- return zookeeper_znode_parent;
- }
-
- public void setZookeeper_znode_parent(String zookeeper_znode_parent) {
- this.zookeeper_znode_parent = zookeeper_znode_parent;
- }
-
- public String getClient_pause() {
- return client_pause;
- }
-
- public void setClient_pause(String client_pause) {
- this.client_pause = client_pause;
- }
-
- public String getClient_retries_number() {
- return client_retries_number;
- }
-
- public void setClient_retries_number(String client_retries_number) {
- this.client_retries_number = client_retries_number;
- }
-
- public String getRpc_timeout() {
- return rpc_timeout;
- }
-
- public void setRpc_timeout(String rpc_timeout) {
- this.rpc_timeout = rpc_timeout;
- }
-
-
- public Integer getConnect_pool() {
- return connect_pool;
- }
-
- public void setConnect_pool(Integer connect_pool) {
- this.connect_pool = connect_pool;
- }
-
-
- public String getTable() {
- return table;
- }
-
- public void setTable(String table) {
- this.table = table;
- }
-
- public void setCell_ttl_d(Integer cell_ttl_d) {
- this.cell_ttl_d = cell_ttl_d;
- }
-
- public Integer getCell_ttl_d() {
- return cell_ttl_d;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
index e6ac85a..47e6e62 100644
--- a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
+++ b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java
@@ -2,6 +2,7 @@ package com.mesa.reportservice.configuration;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import lombok.Data;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
@@ -12,12 +13,13 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
- * Created by wk1 on 2019/5/15.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
+@Data
@Component
-//@ConfigurationProperties(prefix = "http")
@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-
public class HttpClientPool {
private Integer maxTotal;
@@ -34,39 +36,6 @@ public class HttpClientPool {
private Integer socketTimeoutShort;
-
- public void setMaxTotal(Integer maxTotal) {
- this.maxTotal = maxTotal;
- }
-
- public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) {
- this.defaultMaxPerRoute = defaultMaxPerRoute;
- }
-
- public void setConnectTimeout(Integer connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-
- public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
- this.connectionRequestTimeout = connectionRequestTimeout;
- }
-
- public void setSocketTimeout(Integer socketTimeout) {
- this.socketTimeout = socketTimeout;
- }
-
- public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) {
- this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
- }
-
- public void setSocketTimeoutShort(Integer socketTimeoutShort) {
- this.socketTimeoutShort = socketTimeoutShort;
- }
-
- public Integer getSocketTimeout() {
- return socketTimeout;
- }
-
/**
* 首先实例化一个连接池管理器,设置最大连接数、并发连接数
*
@@ -94,9 +63,7 @@ public class HttpClientPool {
//HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
-
httpClientBuilder.setConnectionManager(httpClientConnectionManager);
-
return httpClientBuilder;
}
@@ -128,8 +95,6 @@ public class HttpClientPool {
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
}
-
-
@Bean(name = "builderShort")
public RequestConfig.Builder getBuilderShort() {
RequestConfig.Builder builder = RequestConfig.custom();
@@ -138,6 +103,7 @@ public class HttpClientPool {
.setSocketTimeout(socketTimeoutShort)
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
}
+
/**
* 使用builder构建一个RequestConfig对象
*
@@ -160,6 +126,5 @@ public class HttpClientPool {
public RequestConfig getRequestShortConfig(@Qualifier("builderShort") RequestConfig.Builder builder) {
return builder.build();
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java b/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java
new file mode 100644
index 0000000..83c3ae5
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java
@@ -0,0 +1,25 @@
+package com.mesa.reportservice.configuration;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Component
+@NacosConfigurationProperties(prefix = "query-gateway", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class QueryGatewayProperties {
+
+ private static String gatewayIp;
+
+ public void setGatewayIp(String gatewayIp) {
+ QueryGatewayProperties.gatewayIp = gatewayIp;
+ }
+
+ public String getGatewayIp() {
+ return gatewayIp;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java b/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java
new file mode 100644
index 0000000..cb154d0
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java
@@ -0,0 +1,40 @@
+package com.mesa.reportservice.configuration;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import com.mesa.reportservice.bean.JobEntity;
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Data
+@Component
+@NacosConfigurationProperties(prefix = "scheduler", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class SchedulerProperties {
+
+ private static int jobThread;
+
+ private static Map<String, JobEntity> mapResult = new ConcurrentHashMap<>();
+
+ public void setJobThread(int jobThread) {
+ SchedulerProperties.jobThread = jobThread;
+ }
+
+ public final static String zkUUID = UUID.randomUUID().toString().replaceAll("-","");
+
+ public int getJobThread() {
+ return jobThread;
+ }
+
+ public Map<String, JobEntity> getMapResult() {
+ return mapResult;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java b/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java
new file mode 100644
index 0000000..91009b0
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java
@@ -0,0 +1,35 @@
+package com.mesa.reportservice.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.*;
+
+/**
+ * @author lijinyang
+ * @Date 2024/02/28/11:06
+ */
+@Configuration
+@EnableAsync
+public class ThreadPoolConfig {
+
+ @Bean(name = "threadPoolTaskExecutor")
+ public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
+ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+ // 核心线程数
+ taskExecutor.setCorePoolSize(2);
+ // 最大线程数
+ taskExecutor.setMaxPoolSize(30);
+ // 阻塞队列长度
+ taskExecutor.setQueueCapacity(30);
+ // 空闲线程最大存活时间
+ taskExecutor.setKeepAliveSeconds(5);
+ // 拒绝策略
+ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ taskExecutor.initialize();
+ return taskExecutor;
+ }
+
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java b/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java
deleted file mode 100644
index c4cebf6..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import cn.hutool.log.Log;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Created by wk1 on 2020/1/6.
- */
-@Configuration
-public class ZkConfig {
-
- @Autowired
- private ZkProperties zkproperties;
- Log logger = Log.get();
-
-
-
- @Bean(name = "curatorConnect")
- public CuratorFramework CuratorConnect() {
-
- CuratorFramework client = null;
-
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkproperties.getElapsedTimeMs(), zkproperties.getRetryCount());
-
- // 实例化Curator客户端
- client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
- .connectString(zkproperties.getConnectString()) // 放入zookeeper服务器ip
- .sessionTimeoutMs(zkproperties.getSessionTimeoutMs()).connectionTimeoutMs(zkproperties.getConnectionTimeoutMs()).retryPolicy(retryPolicy) // 设定会话时间以及重连策略
- .namespace(zkproperties.getNameSpace()).build(); // 设置命名空间以及开始建立连接
- if (zkproperties.getOpen() == 0) {
- client.start();
- } else {
- logger.info("repoertservice start local");
- }
-
- return client;
- }
-
-
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java b/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java
deleted file mode 100644
index f8ed4e0..0000000
--- a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.mesa.reportservice.configuration;
-
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-/**
- * Created by wk1 on 2020/1/6.
- */
-//@ConfigurationProperties(prefix = "zookeeper")
-@Component
-@NacosConfigurationProperties(prefix = "zookeeper", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
-public class ZkProperties {
-
- private int open;
-
- private int retryCount;
-
- private int elapsedTimeMs;
-
- private String connectString;
-
- private int sessionTimeoutMs;
-
- private int connectionTimeoutMs;
-
- private String nameSpace;
-
-
- public int getOpen() {
- return open;
- }
-
- public void setOpen(int open) {
- this.open = open;
- }
-
- public int getRetryCount() {
- return retryCount;
- }
-
- public void setRetryCount(int retryCount) {
- this.retryCount = retryCount;
- }
-
- public int getElapsedTimeMs() {
- return elapsedTimeMs;
- }
-
- public void setElapsedTimeMs(int elapsedTimeMs) {
- this.elapsedTimeMs = elapsedTimeMs;
- }
-
- public String getConnectString() {
- return connectString;
- }
-
- public void setConnectString(String connectString) {
- this.connectString = connectString;
- }
-
- public int getSessionTimeoutMs() {
- return sessionTimeoutMs;
- }
-
- public void setSessionTimeoutMs(int sessionTimeoutMs) {
- this.sessionTimeoutMs = sessionTimeoutMs;
- }
-
- public int getConnectionTimeoutMs() {
- return connectionTimeoutMs;
- }
-
- public void setConnectionTimeoutMs(int connectionTimeoutMs) {
- this.connectionTimeoutMs = connectionTimeoutMs;
- }
-
- public String getNameSpace() {
- return nameSpace;
- }
-
- public void setNameSpace(String nameSpace) {
- this.nameSpace = nameSpace;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java b/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java
new file mode 100644
index 0000000..10b1ac3
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java
@@ -0,0 +1,46 @@
+package com.mesa.reportservice.configuration;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Configuration
+public class ZookeeperConfig {
+
+ @Autowired
+ private ZookeeperProperties zookeeperProperties;
+
+ private static final Log logger = LogFactory.get();
+
+ @Bean(name = "curatorConnect")
+ public CuratorFramework curatorConnect() {
+ CuratorFramework client = null;
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getElapsedTimeMs(), zookeeperProperties.getRetryCount());
+ // 实例化Curator客户端
+ // 使用工厂类来建造客户端的实例对象
+ client = CuratorFrameworkFactory.builder()
+ // 放入zookeeper服务器ip
+ .connectString(zookeeperProperties.getConnectString())
+ // 设定会话时间以及重连策略
+ .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()).connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()).retryPolicy(retryPolicy)
+ // 设置命名空间以及开始建立连接
+ .namespace(zookeeperProperties.getNameSpace()).build();
+ if (zookeeperProperties.getOpen() == 0) {
+ client.start();
+ } else {
+ logger.info("reportService start local");
+ }
+ return client;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java b/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java
new file mode 100644
index 0000000..d0a9c80
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java
@@ -0,0 +1,32 @@
+package com.mesa.reportservice.configuration;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Data
+@Component
+@NacosConfigurationProperties(prefix = "zookeeper", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class ZookeeperProperties {
+
+ private int open;
+
+ private int retryCount;
+
+ private int elapsedTimeMs;
+
+ private String connectString;
+
+ private int sessionTimeoutMs;
+
+ private int connectionTimeoutMs;
+
+ private String nameSpace;
+}
diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
index 0f8e61d..8d5f8c7 100644
--- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java
+++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
@@ -1,67 +1,26 @@
package com.mesa.reportservice.controller;
-import cn.hutool.log.Log;
-import com.alibaba.fastjson.JSONArray;
-import com.mesa.reportservice.bean.MonitorEntity;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.service.MysqlService;
-import com.mesa.reportservice.service.ZkService;
+import com.mesa.reportservice.service.MonitorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
-import com.mesa.reportservice.configuration.ZkProperties;
-
-import java.util.Map;
/**
- * Created by wk on 2020/4/16.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@RestController
@Component
public class MonitorController {
@Autowired
- private MysqlService mysqlService;
- @Autowired
- private ZkService zs;
- @Autowired
- private GlobelConfig gc;
- @Autowired
- private ZkProperties zkProperties;
-
- Log logger = Log.get();
-
+ private MonitorService monitorService;
@GetMapping(value = "/monitor")
- public String getReportStatus() {
-
-
- String json="";
- try {
- MonitorEntity me = new MonitorEntity();
- Map<String,Long> numMap = mysqlService.getJobCount();
- me.setQueueJobNum(numMap.get("queueNum"));
- me.setExcuteingJobNum(numMap.get("excuteingNum"));
- me.setTodaySuccessJobNum(numMap.get("todaySuccessNum"));
- me.setTodayErrorJobNum(numMap.get("todayErrorNum"));
- me.setJoblist(gc.getMapresult());
- if(zs.isMaster()){
- me.setStatus("active");
- }
- else{
- me.setStatus("standby");
- }
- gc.getMapresult().size();
- Object obj = JSONArray.toJSON(me);
- json = obj.toString();
- } catch (Exception e) {
- logger.error(e.toString());
- json=e.toString();
- }
-
- return json;
+ public String getJobCount() {
+ return monitorService.getJobCount();
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
index ea8345f..f927f4d 100644
--- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
+++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
@@ -1,166 +1,25 @@
package com.mesa.reportservice.controller;
-import cn.hutool.log.Log;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.configuration.HttpClientPool;
-import com.mesa.reportservice.service.*;
+import com.mesa.reportservice.service.ScheduledResultService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
- * @author wk1
- * @date 2020/1/8
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Component
@EnableScheduling
public class ScheduledResultController {
- Log logger = Log.get();
-
- protected static ExecutorService pool = Executors.newFixedThreadPool(30);
- @Autowired
- private MysqlService ms;
- @Autowired
- private ExcuteService es;
- @Autowired
- private ExcuteProcessService eps;
- @Autowired
- private ZkService zs;
- @Autowired
- private ClickhouseService cs;
- @Autowired
- private HttpClientPool hc;
@Autowired
- private GlobelConfig gc;
-
+ private ScheduledResultService scheduledResultService;
@Scheduled(cron = "${scan.result.scheduled.plan}")
- public void getExcuteResult() {
- try {
- if (zs.isMaster()) {
- logger.info("start viewing results");
- //先查询数据库是否有异常状态任务,killquery
- List<JobEntity> joblist = ms.getJobForExcute();
- for (JobEntity jobEntity : joblist) {
- String sql = jobEntity.getQuerySql().trim();
- /* sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')");
- sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')");
- sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");*/
-
- String queryid = cs.getQueryId(sql);
- jobEntity.setQueryId(queryid);
-
- if (jobEntity.getIsValid() == 0) {
- eps.killQuery(jobEntity);
- gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0);
- } else if (!gc.getMapresult().containsKey(jobEntity.getQueryId())) {
- eps.reSet(jobEntity);
- }
- if (gc.getMapresult().containsKey(jobEntity.getQueryId())) {
- if (jobEntity.getIsValid() == 0) {
- eps.killQuery(jobEntity);
- gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0);
- }
- } else {
- eps.reSet(jobEntity);
- }
-
- }
- //遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态
- for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
- logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
- long currentTime = System.currentTimeMillis();
- long excutetime = currentTime - entry.getValue().getStartTime();
- logger.info("excute time=" + excutetime + "ttl_time=" + hc.getSocketTimeout());
- if (("RUNNING").equals(entry.getValue().getState()) && excutetime > hc.getSocketTimeout()) {
- entry.getValue().setState("DONE");
- entry.getValue().setExcuteStatus(500001);
- eps.killQuery(entry.getValue());
- eps.updateResultMessage(entry.getValue());
- } else {
- if (("RUNNING").equals(entry.getValue().getState())) {
- eps.updateProcessMessage(entry.getValue());
- } else {
- eps.updateResultMessage(entry.getValue());
- }
- }
- }
- int rows = gc.getJob_thread() - gc.getMapresult().size();
- if (rows > 0) {
- List<JobEntity> jobs = ms.getJobTask(rows);
- for (JobEntity job : jobs) {
- logger.info("start executing task");
- long begintime = System.currentTimeMillis()/1000;
- job.setStartTime(begintime);
- String sql = job.getQuerySql().trim();
- /* sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')");
- sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')");
- sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");*/
- job.setQuerySql(sql);
- job.setState("RUNNING");
- job.setExcuteStatus(1);
- job.setResultMessage("EXECUTING");
- job.setRowsRead(0L);
- job.setElapsed(0);
- job.setDoneProgress(0);
- job.setResultRows(0L);
- String queryid = "";
- queryid = cs.getQueryId(sql);
- job.setQueryId(queryid);
- if (queryid.equals("") ) {
- job.setExcuteStatus(0);
- job.setState("DONE");
- job.setEndTime(System.currentTimeMillis()/1000);
- //status = 7
- job.setDoneProgress(1.00f);
- job.setIsFailed(1);
- job.setResultMessage("Unknow Error");
- }
- if (("RUNNING").equals(job.getState())) {
- if (ms.updateProcesses(job) != 0) {
- gc.getMapresult().put(queryid, job);
- pool.execute(new Runnable() {
- @Override
- public void run() {
- es.excuteCkTask(job);
- }
- });
- } else {
- logger.error("failed to update database status");
-
- }
- } else {
- if (ms.updateProcesses(job) != 0) {
- logger.error("task cannot be executed");
- } else {
- logger.error("failed to update database status");
-
- }
- }
- }
- } else {
- logger.info("no pending tasks");
- }
- } else {
- if (gc.getMapresult().size() > 0) {
- for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
- logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
- eps.killQuery(entry.getValue());
- }
- gc.getMapresult().clear();
- }
- }
- } catch (Exception e) {
- logger.error(e.toString());
- }
+ public void execute() {
+ scheduledResultService.execute();
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/enums/JobStates.java b/src/main/java/com/mesa/reportservice/enums/JobStates.java
new file mode 100644
index 0000000..e723bbc
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/enums/JobStates.java
@@ -0,0 +1,31 @@
+package com.mesa.reportservice.enums;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+public enum JobStates {
+ /**
+ * PENDING
+ */
+ PENDING("PENDING"),
+ /**
+ * RUNNING
+ */
+ RUNNING("RUNNING"),
+ /**
+ * DONE
+ */
+ DONE("DONE");
+
+ private final String value;
+
+ public String getValue() {
+ return value;
+ }
+
+ JobStates(String value) {
+ this.value = value;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/exception/SQSCode.java b/src/main/java/com/mesa/reportservice/exception/SQSCode.java
new file mode 100644
index 0000000..d20f28b
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/exception/SQSCode.java
@@ -0,0 +1,47 @@
+package com.mesa.reportservice.exception;
+
+
+import lombok.Getter;
+
+/**
+ * @author 86158
+ */
+@Getter
+public enum SQSCode {
+ /**
+ * 1-3位:异常类型(HTTP协议状态码)
+ * 4-6位:自然排序
+ */
+
+ /**
+ * Message
+ */
+ PARAM_SYNTAX_ERROR(400001,"Param Syntax Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"),
+
+ TEMPLATE_SQL_EXECUTION_ERROR(500001,"SQL Execution Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"),
+ TEMPLATE_SAVE_HBASE_ERROR(500002,"save HBase error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"),
+ TEMPLATE_SAVE_DATABASE_ERROR(500003,"save database error queryId = {0} jobId = {1} {2}"),
+ RESPONSE_DATA_ISNULL(500004,"response data is null"),
+ RESPONSE_BODY_ISNULL(500005,"response body is null"),
+ WRITE_HBASE_ERROR(500006,"write HBase error retry count"),
+ OUT_OF_MEMORY_ERROR(500007,"OutOfMemory Error"),
+ KILL_QUERY_ERROR(500008,"Kill Query Error"),
+ TEMPLATE_UNKNOWN_ERROR(500009,"Unknown Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"),
+ UPDATE_STATUS_ERROR(500010,"failed to update database status"),
+ TASK_CANNOT_EXECUTED(500010,"task cannot be executed"),
+ UNKNOWN_ERROR(500999,"Unknown Error"),
+
+ //成功
+ SUCCESS(200, "success"),
+ //失败
+ ERROR(999, "error");
+
+ SQSCode(Integer code, String msg) {
+ this.code = code;
+ this.msg = msg;
+ }
+
+ private Integer code;
+
+ private String msg;
+}
diff --git a/src/main/java/com/mesa/reportservice/exception/SQSException.java b/src/main/java/com/mesa/reportservice/exception/SQSException.java
new file mode 100644
index 0000000..d7db7f2
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/exception/SQSException.java
@@ -0,0 +1,69 @@
+/**
+
+ *
+
+ *
+ *
+ */
+
+package com.mesa.reportservice.exception;
+
+
+import lombok.Data;
+
+/**
+ * 自定义异常
+ *
+ * @author lijinyang
+ */
+@Data
+public class SQSException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private String msg;
+ private int code = SQSCode.ERROR.getCode();
+
+ public SQSException(SQSCode sqsCode) {
+ this.code = sqsCode.getCode();
+ this.msg = sqsCode.getMsg();
+ }
+
+ public SQSException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ public SQSException(String msg, Throwable e) {
+ super(msg, e);
+ this.msg = msg;
+ }
+
+ public SQSException(String msg, int code) {
+ super(msg);
+ this.msg = msg;
+ this.code = code;
+ }
+
+ public SQSException(String msg, int code, Throwable e) {
+ super(msg, e);
+ this.msg = msg;
+ this.code = code;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+}
diff --git a/src/main/java/com/mesa/reportservice/mapper/ReportResultMapper.java b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java
index d0b845c..4546b12 100644
--- a/src/main/java/com/mesa/reportservice/mapper/ReportResultMapper.java
+++ b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java
@@ -1,16 +1,22 @@
package com.mesa.reportservice.mapper;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mesa.reportservice.bean.JobEntity;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
@Mapper
-public interface ReportResultMapper {
+public interface JobMapper extends BaseMapper<JobEntity> {
- List<JobEntity> getJobForExcute();
+ List<JobEntity> getRunningJob();
List<JobEntity> getJobTask(Map<String, Object> map);
diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
deleted file mode 100644
index 9c9a978..0000000
--- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.mesa.reportservice.service;
-
-import com.mesa.reportservice.bean.HttpResult;
-
-import java.io.IOException;
-
-/**
- * Created by wk1 on 2020/1/2.
- */
-public interface ClickhouseService {
-
- String getQueryId(String query) ;
-
-
- HttpResult queryForExcute(String query) throws IOException;
-
-
- HttpResult queryForProcess(String queryId) throws IOException;
-
-
- HttpResult queryForCancel(String queryId) throws IOException;
-
-
-
-}
diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteService.java b/src/main/java/com/mesa/reportservice/service/ExcuteService.java
deleted file mode 100644
index 7b6ccae..0000000
--- a/src/main/java/com/mesa/reportservice/service/ExcuteService.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.mesa.reportservice.service;
-
-import com.mesa.reportservice.bean.JobEntity;
-
-/**
- * Created by wk1 on 2020/1/8.
- */
-public interface ExcuteService {
-
- void excuteCkTask(JobEntity jl);
-
-}
diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExecuteProcessService.java
index 4660e9a..6355d35 100644
--- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
+++ b/src/main/java/com/mesa/reportservice/service/ExecuteProcessService.java
@@ -5,13 +5,18 @@ import com.mesa.reportservice.bean.JobEntity;
import java.io.IOException;
/**
- * Created by wk1 on 2020/1/8.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
-public interface ExcuteProcessService {
+public interface ExecuteProcessService {
void updateResultMessage(JobEntity job);
+
void reSet(JobEntity jobEntity);
- void updateProcessMessage(JobEntity job) throws IOException;
- void killQuery(JobEntity jobEntity) throws IOException;
+
+ void updateProcessMessage(JobEntity job);
+
+ void killQuery(JobEntity jobEntity);
}
diff --git a/src/main/java/com/mesa/reportservice/service/ExecuteService.java b/src/main/java/com/mesa/reportservice/service/ExecuteService.java
new file mode 100644
index 0000000..59a518b
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/ExecuteService.java
@@ -0,0 +1,13 @@
+package com.mesa.reportservice.service;
+
+import com.mesa.reportservice.bean.JobEntity;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+public interface ExecuteService {
+
+ void queryQgw(JobEntity jobEntity);
+}
diff --git a/src/main/java/com/mesa/reportservice/service/HbaseService.java b/src/main/java/com/mesa/reportservice/service/HBaseService.java
index 46db5ef..c9e5728 100644
--- a/src/main/java/com/mesa/reportservice/service/HbaseService.java
+++ b/src/main/java/com/mesa/reportservice/service/HBaseService.java
@@ -7,11 +7,11 @@ import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
import java.util.Map;
/**
- * Created by wk1 on 2020/1/2.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
-public interface HbaseService {
-
-
- Boolean put(JobEntity jobEntity) throws Exception;
+public interface HBaseService {
+ Boolean put(JobEntity jobEntity);
}
diff --git a/src/main/java/com/mesa/reportservice/service/JobService.java b/src/main/java/com/mesa/reportservice/service/JobService.java
new file mode 100644
index 0000000..706c75a
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/JobService.java
@@ -0,0 +1,25 @@
+package com.mesa.reportservice.service;
+
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.mesa.reportservice.bean.JobEntity;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+public interface JobService extends IService<JobEntity> {
+
+ List<JobEntity> getRunningJob() ;
+
+ List<JobEntity> getJobTask(int Rows) ;
+
+ int updateProcesses(JobEntity job);
+
+ Map<String,Long> getJobCount();
+}
diff --git a/src/main/java/com/mesa/reportservice/service/MonitorService.java b/src/main/java/com/mesa/reportservice/service/MonitorService.java
index 88dc242..5bccaea 100644
--- a/src/main/java/com/mesa/reportservice/service/MonitorService.java
+++ b/src/main/java/com/mesa/reportservice/service/MonitorService.java
@@ -1,14 +1,15 @@
package com.mesa.reportservice.service;
-
-import org.springframework.stereotype.Service;
-
/**
- * Created by wk1 on 2020/1/6.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
public interface MonitorService {
-
void addSuccess();
+
void addFail();
+
+ String getJobCount();
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/MysqlService.java b/src/main/java/com/mesa/reportservice/service/MysqlService.java
deleted file mode 100644
index 036bad9..0000000
--- a/src/main/java/com/mesa/reportservice/service/MysqlService.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.mesa.reportservice.service;
-
-
-import com.mesa.reportservice.bean.JobEntity;
-
-import java.util.List;
-import java.util.Map;
-
-
-public interface MysqlService {
-
-
- List<JobEntity> getJobForExcute() ;
-
-
- List<JobEntity> getJobTask(int Rows) ;
-
-
- int updateProcesses(JobEntity job);
-
-
- Map<String,Long> getJobCount() throws Exception;
-
- }
diff --git a/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java b/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java
new file mode 100644
index 0000000..7f7be78
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java
@@ -0,0 +1,21 @@
+package com.mesa.reportservice.service;
+
+import com.mesa.reportservice.bean.HttpResult;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+public interface QueryGatewayService {
+
+ String getQueryId(String query) ;
+
+ HttpResult queryJob(String query) throws IOException;
+
+ HttpResult queryJobProcess(String queryId);
+
+ HttpResult deleteJob(String queryId);
+}
diff --git a/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java b/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java
new file mode 100644
index 0000000..476579a
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java
@@ -0,0 +1,11 @@
+package com.mesa.reportservice.service;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+public interface ScheduledResultService {
+
+ void execute();
+}
diff --git a/src/main/java/com/mesa/reportservice/service/ZkService.java b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java
index d42f636..69f77d2 100644
--- a/src/main/java/com/mesa/reportservice/service/ZkService.java
+++ b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java
@@ -6,11 +6,12 @@ import org.springframework.stereotype.Service;
import java.net.InetAddress;
/**
- * Created by wk1 on 2020/1/6.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Service
-public interface ZkService {
+public interface ZookeeperService {
-
- boolean isMaster() throws Exception;
+ boolean isMaster();
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
deleted file mode 100644
index 2df61f9..0000000
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package com.mesa.reportservice.service.impl;
-
-import cn.hutool.log.Log;
-import com.alibaba.fastjson.JSON;
-import com.mesa.reportservice.bean.HttpResult;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.service.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by wk1 on 2020/1/8.
- */
-@Service
-
-public class ExcuteProcessServiceImpl implements ExcuteProcessService {
- Log logger = Log.get();
-
-
- @Autowired
- private ClickhouseService cs;
- @Autowired
- private MysqlService ms;
- @Autowired
- private HbaseService hs;
- @Autowired
- private MonitorService mons;
- @Autowired
- private GlobelConfig gc;
- @Override
- public void updateResultMessage(JobEntity je) {
- je.setState("DONE");
- je.setEndTime(System.currentTimeMillis()/1000);
- je.setDoneProgress(1.00f);
- je.setIsFailed(1);
- try {
- if (je.getIsValid() == 0) {
- //status = 9
- je.setResultMessage("CANCEL");
- } else {
- if (je.getExcuteStatus() >= 20000000 && je.getExcuteStatus() < 30000000) {
-
- Boolean isok = saveToHbase(je);
- if (isok) {
- //status = 2
- je.setResultMessage("OK");
- je.setIsFailed(0);
- logger.info("success save to hbase query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
- } else {
- //status = 5
- je.setResultMessage("Write Data Error");
- mons.addFail();
- logger.error("save hbase error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
- }
- } else if (je.getExcuteStatus() >= 40000000 && je.getExcuteStatus() < 50000000) {
- //status = 3
- je.setResultMessage("Param Syntax Error");
- logger.error("Param Syntax Error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
-
- } else if (je.getExcuteStatus() == 50001300) {
- //status = 4
- je.setResultMessage("SQL Execution Error");
- logger.error("SQL Execution Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
-
- } else {
- //status = 7
- je.setResultMessage("Unknow Error");
- logger.error("Unknow Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
- }
-
- }
-
- int number = 0;
- int z = 3;
- do {
- number = ms.updateProcesses(je);
- z--;
- }
- while (number != 1 && z >= 0);
- } catch (Exception e) {
- //status = 10
- je.setResultMessage("Database Error");
- ms.updateProcesses(je);
- logger.error("save database error job_id =" + je.getJobId()+" query_id=" + je.getQueryId() + e.toString());
-
- } finally {
- saveToMonitor(je);
- gc.getMapresult().remove(je.getQueryId());
- }
-
-
- }
-
- @Override
- public void reSet(JobEntity jobEntity) {
- //status = 0
- jobEntity.setState("PENDING");
- jobEntity.setResultMessage("Re Execution");
- ms.updateProcesses(jobEntity);
-
- }
-
- @Override
- public void killQuery(JobEntity jobEntity) throws IOException {
-
- cs.queryForCancel(jobEntity.getQueryId());
-
- }
-
- @Override
- public void updateProcessMessage(JobEntity job) throws IOException {
-
- HttpResult hr = cs.queryForProcess(job.getQueryId());
- if (hr!=null) {
- String rs = hr.getBody().trim();
- Map data = JSON.parseObject(rs);
-
- if (!rs.isEmpty() && !rs.equals("")) {
- List listdata = (List) data.get("data");
- if (null != listdata && listdata.size() > 0) {
- Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
- long read_rows = Long.parseLong(map.get("rows_read").toString());
- float elapsed = Float.parseFloat(map.get("elapsed").toString());
- double persent = Double.parseDouble(map.get("percent").toString());
- int process = (int) (persent * 100);
- job.setElapsed((int) elapsed);
- job.setRowsRead(read_rows);
- if(job.getDoneProgress()<process){
- job.setDoneProgress(process);
- }
- if (job.getRowsRead() != 0 || job.getElapsed() != 0) {
- ms.updateProcesses(job);
- }
- } else {
- logger.info("responsedata is null");
- }
- } else {
- logger.error("responsebody is null");
- }
- }
- else{
- logger.error("responsebody is null");
- }
-
- }
- /**
- * 结果存入hbase
- */
- public Boolean saveToHbase(JobEntity entity) {
- int k = 3;
- Boolean isok = false;
- do {
- try {
- k--;
- isok = hs.put(entity);
- break;
- } catch (Exception e) {
- logger.error("write HBase error retry count" + (3 - k) + e.toString());
- k--;
- }
- }
- while (k >= 0);
- return isok;
- }
-
- /**
- * promethus记录结果
- */
- public void saveToMonitor(JobEntity entity) {
- if (("DONE").equals(entity.getState()) && entity.getIsFailed() == 0 && entity.getIsValid() == 1) {
-
- mons.addSuccess();
- } else {
- mons.addFail();
- }
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
deleted file mode 100644
index 8619b87..0000000
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.mesa.reportservice.service.impl;
-
-import cn.hutool.log.Log;
-import com.alibaba.fastjson.JSON;
-import com.mesa.reportservice.bean.HttpResult;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.service.ClickhouseService;
-import com.mesa.reportservice.service.ExcuteService;
-import com.mesa.reportservice.service.MysqlService;
-import io.netty.channel.ConnectTimeoutException;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.net.SocketTimeoutException;
-import java.util.Map;
-
-
-/**
- * Created by wk1 on 2020/1/8.
- */
-@Service
-public class ExcuteserviceImpl implements ExcuteService {
- Log logger = Log.get();
-
-
- @Autowired
- private ClickhouseService cs;
- @Autowired
- private MysqlService ms;
- @Autowired
- private GlobelConfig gc;
-
- @Override
- public void excuteCkTask(JobEntity job) {
-
-
- logger.info("execute queryid=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size());
- HttpResult hr = new HttpResult();
- int k = 3;
- do {
- try {
- hr = cs.queryForExcute(job.getQuerySql());
- if (hr != null) {
- Map mapresult = JSON.parseObject(hr.getBody());
- int query_status = Integer.parseInt(mapresult.get("status").toString());
- logger.info("httpcode=" + hr.getCode() +" status="+query_status);
- if (hr.getCode() == 200 && query_status == 200) {
-
- k = 0;
- mapresult = JSON.parseObject(hr.getBody());
-
- Map rows = (Map) mapresult.get("statistics");
- job.setResultRows(Long.parseLong(rows.get("result_rows").toString()));
- job.setRowsRead(Long.parseLong(rows.get("rows_read").toString()));
- job.setBytesRead(Long.parseLong(rows.get("bytes_read").toString()));
- job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString()));
- job.setResult(hr.getBody());
- job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString()));
- job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString()));
- logger.info("success resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql());
-
- } else {
- k = 0;
- job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString()));
- logger.error("excute sql Error ");
- }
- } else {
- throw new SocketTimeoutException();
- }
- } catch (SocketTimeoutException e) {
- k--;
- job.setResultMessage(e.toString());
- if (k == 0) {
- job.setExcuteStatus(500001);
- job.setResultMessage("SQL Execution Error excute query time out");
- logger.info("timeout resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql());
- } else {
- logger.info("Socket warn " + e.toString() + "retry time " + (3 - k));
- }
-
-
- } catch (ConnectTimeoutException e) {
-
- job.setExcuteStatus(555999);
- job.setResultMessage(e.toString());
- logger.error("Unknow Error" + e.toString());
- k = 0;
-
- } catch (OutOfMemoryError e) {
-
- job.setExcuteStatus(555999);
- job.setResultMessage("result too large");
- logger.error("outofmemery Error" + e.toString());
- k = 0;
- } catch (Exception e) {
- job.setExcuteStatus(555999);
- job.setResultMessage(e.toString());
- logger.error("Unknow Error" + e.toString());
- k = 0;
- }
- try {
- cs.queryForCancel(job.getQueryId());
- } catch (Exception e) {
- logger.error("Kill Query Error" + e.toString());
- }
-
- }
- while (k > 0);
- job.setState("DONE");
- job.setEndTime(System.currentTimeMillis()/1000);
- }
-
-}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java
new file mode 100644
index 0000000..4e514d1
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java
@@ -0,0 +1,176 @@
+package com.mesa.reportservice.service.impl;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
+import com.mesa.reportservice.service.*;
+import com.mesa.reportservice.util.Constant;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Service
+public class ExecuteProcessServiceImpl implements ExecuteProcessService {
+
+ private static final Log logger = LogFactory.get();
+
+ @Autowired
+ private QueryGatewayService queryGatewayService;
+
+ @Autowired
+ private JobService jobService;
+
+ @Autowired
+ private HBaseService hBaseService;
+
+ @Autowired
+ private MonitorService monitorService;
+
+ @Autowired
+ private SchedulerProperties schedulerProperties;
+
+ @Override
+ public void updateResultMessage(JobEntity jobEntity) {
+ String msg = "";
+ jobEntity.setState(JobStates.DONE.getValue());
+ jobEntity.setEndTime(System.currentTimeMillis()/1000);
+ jobEntity.setDoneProgress(1.00f);
+ jobEntity.setIsFailed(1);
+ try {
+ if (jobEntity.getIsValid() == 0) {
+ jobEntity.setResultMessage(Constant.CANCEL);
+ } else {
+ if (jobEntity.getExecuteStatus() == Constant.JOB_SUCCESS) {
+ Boolean isOk = saveToHBase(jobEntity);
+ if (isOk) {
+ jobEntity.setResultMessage(Constant.OK);
+ jobEntity.setIsFailed(0);
+ logger.info(MessageFormat.format(Constant.LOG_SUCCESS_SAVE_TO_HBASE,jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()));
+ } else {
+ jobEntity.setResultMessage(Constant.WRITE_DATA_ERROR);
+ monitorService.addFail();
+ msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_HBASE_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
+ logger.error(msg);
+ }
+ } else if (jobEntity.getExecuteStatus() >= Constant.JOB_BAD_REQUEST_THRESHOLD && jobEntity.getExecuteStatus() < Constant.JOB_INTERNAL_SERVER_ERROR_THRESHOLD) {
+ jobEntity.setResultMessage(Constant.PARAM_SYNTAX_ERROR);
+ msg = MessageFormat.format(SQSCode.PARAM_SYNTAX_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
+ logger.error(msg);
+ } else if (jobEntity.getExecuteStatus() == Constant.JOB_SQL_EXECUTION_ERROR_CODE) {
+ jobEntity.setResultMessage(Constant.SQL_EXECUTION_ERROR);
+ msg = MessageFormat.format(SQSCode.TEMPLATE_SQL_EXECUTION_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
+ logger.error(msg);
+ } else {
+ jobEntity.setResultMessage(Constant.UNKNOWN_ERROR);
+ msg = MessageFormat.format(SQSCode.UNKNOWN_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
+ logger.error(msg);
+ }
+ }
+ int number = 0;
+ int z = 3;
+ do {
+ number = jobService.updateProcesses(jobEntity);
+ z--;
+ }
+ while (number != 1 && z >= 0);
+ } catch (Exception e) {
+ jobEntity.setResultMessage(Constant.DATABASE_ERROR);
+ jobService.updateProcesses(jobEntity);
+ msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.toString());
+ logger.error(msg);
+ } finally {
+ saveToMonitor(jobEntity);
+ schedulerProperties.getMapResult().remove(jobEntity.getQueryId());
+ }
+ }
+
+ @Override
+ public void reSet(JobEntity jobEntity){
+ jobEntity.setState(JobStates.PENDING.getValue());
+ jobEntity.setResultMessage(Constant.RE_EXECUTION);
+ jobService.updateProcesses(jobEntity);
+ }
+
+ @Override
+ public void killQuery(JobEntity jobEntity){
+ queryGatewayService.deleteJob(jobEntity.getQueryId());
+ }
+
+ @Override
+ public void updateProcessMessage(JobEntity jobEntity){
+ HttpResult httpResult = queryGatewayService.queryJobProcess(jobEntity.getQueryId());
+ if (httpResult!=null) {
+ String result = httpResult.getBody().trim();
+ Map data = JSON.parseObject(result);
+ if (!result.isEmpty() && !result.equals("")) {
+ List listData = (List) data.get("data");
+ if (null != listData && listData.size() > 0) {
+ Map map = JSON.parseObject(JSON.toJSONString(listData.get(0)));
+ long readRows = Long.parseLong(map.get("rows_read").toString());
+ float elapsed = Float.parseFloat(map.get("elapsed").toString());
+ double percent = Double.parseDouble(map.get("percent").toString());
+ int process = (int) (percent * 100);
+ jobEntity.setElapsed((int) elapsed);
+ jobEntity.setRowsRead(readRows);
+ if(jobEntity.getDoneProgress()<process){
+ jobEntity.setDoneProgress(process);
+ }
+ if (jobEntity.getRowsRead() != 0 || jobEntity.getElapsed() != 0) {
+ jobService.updateProcesses(jobEntity);
+ }
+ } else {
+ logger.error(SQSCode.RESPONSE_DATA_ISNULL.getMsg());
+ throw new SQSException(SQSCode.RESPONSE_DATA_ISNULL);
+ }
+ } else {
+ logger.error(SQSCode.RESPONSE_BODY_ISNULL.getMsg());
+ throw new SQSException(SQSCode.RESPONSE_BODY_ISNULL);
+ }
+ } else {
+ logger.error(SQSCode.RESPONSE_BODY_ISNULL.getMsg());
+ throw new SQSException(SQSCode.RESPONSE_BODY_ISNULL);
+ }
+ }
+ /**
+ * 结果存入hbase
+ */
+ public Boolean saveToHBase(JobEntity jobEntity) {
+ int k = 3;
+ Boolean isok = false;
+ do {
+ try {
+ k--;
+ isok = hBaseService.put(jobEntity);
+ break;
+ } catch (Exception e) {
+ logger.error(MessageFormat.format(SQSCode.WRITE_HBASE_ERROR.getMsg(),(3 - k),e.toString()));
+ k--;
+ }
+ }
+ while (k >= 0);
+ return isok;
+ }
+
+
+ public void saveToMonitor(JobEntity jobEntity) {
+ if ((JobStates.DONE.getValue()).equals(jobEntity.getState()) && jobEntity.getIsFailed() == 0 && jobEntity.getIsValid() == 1) {
+ monitorService.addSuccess();
+ } else {
+ monitorService.addFail();
+ }
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java
new file mode 100644
index 0000000..594d42b
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java
@@ -0,0 +1,109 @@
+package com.mesa.reportservice.service.impl;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
+import com.mesa.reportservice.service.QueryGatewayService;
+import com.mesa.reportservice.service.ExecuteService;
+import com.mesa.reportservice.util.Constant;
+import io.netty.channel.ConnectTimeoutException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Service;
+
+import java.net.SocketTimeoutException;
+import java.util.Map;
+
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Service
+public class ExecuteServiceImpl implements ExecuteService {
+
+ private static final Log logger = LogFactory.get();
+
+ @Autowired
+ private QueryGatewayService queryGatewayService;
+
+ @Autowired
+ private SchedulerProperties schedulerProperties;
+
+ @Override
+ public void queryQgw(JobEntity job) {
+ logger.info("execute queryId=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapResult size=" + schedulerProperties.getMapResult().size());
+ HttpResult httpResult = null;
+ int k = 3;
+ do {
+ try {
+ httpResult = queryGatewayService.queryJob(job.getQuerySql());
+ if (httpResult != null) {
+ Map mapresult = JSON.parseObject(httpResult.getBody());
+ int queryStatus = Integer.parseInt(mapresult.get("status").toString());
+ logger.info("httpCode=" + httpResult.getCode() +" status="+queryStatus);
+ if (httpResult.getCode() == HttpStatus.OK.value() && queryStatus == HttpStatus.OK.value()) {
+ k = 0;
+ mapresult = JSON.parseObject(httpResult.getBody());
+ Map rows = (Map) mapresult.get("statistics");
+ job.setResultRows(Long.parseLong(rows.get("result_rows").toString()));
+ job.setRowsRead(Long.parseLong(rows.get("rows_read").toString()));
+ job.setBytesRead(Long.parseLong(rows.get("bytes_read").toString()));
+ job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString()));
+ job.setResult(httpResult.getBody());
+ job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString()));
+ job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString()));
+ logger.info("success resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql());
+ } else {
+ k = 0;
+ job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString()));
+ logger.error("SQL Execution Error ");
+ }
+ } else {
+ throw new SocketTimeoutException();
+ }
+ } catch (SocketTimeoutException e) {
+ k--;
+ job.setResultMessage(e.toString());
+ if (k == 0) {
+ job.setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT);
+ job.setResultMessage(Constant.QUERY_TIME_OUT);
+ logger.info("timeout resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql());
+ } else {
+ logger.info("Socket warn " + e.toString() + "retry time " + (3 - k));
+ }
+ } catch (ConnectTimeoutException e) {
+ job.setExecuteStatus(Constant.JOB_ERROR);
+ job.setResultMessage(e.toString());
+ logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString());
+ k = 0;
+ } catch (OutOfMemoryError e) {
+ job.setExecuteStatus(Constant.JOB_ERROR);
+ job.setResultMessage(Constant.RESULT_TOO_LARGE);
+ logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.toString());
+ k = 0;
+ } catch (Exception e) {
+ job.setExecuteStatus(Constant.JOB_ERROR);
+ job.setResultMessage(e.toString());
+ logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString());
+ k = 0;
+ }
+ try {
+ queryGatewayService.deleteJob(job.getQueryId());
+ } catch (Exception e) {
+ logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.toString());
+ throw new SQSException(SQSCode.KILL_QUERY_ERROR);
+ }
+ }
+ while (k > 0);
+ job.setState(JobStates.DONE.getValue());
+ job.setEndTime(System.currentTimeMillis()/1000);
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HBaseServiceImpl.java
index 90cafb3..2ec411d 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/HBaseServiceImpl.java
@@ -2,9 +2,11 @@ package com.mesa.reportservice.service.impl;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.configuration.HbaseProperties;
-import com.mesa.reportservice.service.HbaseService;
+import com.mesa.reportservice.configuration.HBaseProperties;
+import com.mesa.reportservice.exception.SQSException;
+import com.mesa.reportservice.service.HBaseService;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
@@ -17,33 +19,30 @@ import java.io.IOException;
/**
- * Created by wk1 on 2019/5/15.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
-
@Service
-public class HbaseServiceImpl implements HbaseService {
+public class HBaseServiceImpl implements HBaseService {
@Autowired
- private Connection hbaseConnection;
+ private Connection hBaseConnection;
@Autowired
- private HbaseProperties hbproperties;
- Log logger = Log.get();
+ private HBaseProperties hBaseProperties;
+
+ private static final Log logger = LogFactory.get();
@Override
public Boolean put(JobEntity jobEntity) {
-
-
Boolean status = false;
Table table = null;
try {
- table = hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()));
+ table = hBaseConnection.getTable(TableName.valueOf(hBaseProperties.getTable()));
Put put = new Put(Bytes.toBytes(jobEntity.getJobId()));
put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), Bytes.toBytes(jobEntity.getResult()));
-
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql()));
- //put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql()));
-
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("execute_sql"), Bytes.toBytes(jobEntity.getQuerySql()));
if (jobEntity.getRowsRead() != 0) {
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getRowsRead()));
}
@@ -53,8 +52,8 @@ public class HbaseServiceImpl implements HbaseService {
if (jobEntity.getMemoryUsage() != null) {
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemoryUsage()));
}
- if (hbproperties.getCell_ttl_d() != null) {
- put.setTTL(hbproperties.getCell_ttl_d() * 86400 * 1000);
+ if (hBaseProperties.getCellTtlD() != null) {
+ put.setTTL(convertDaysToMilliseconds(hBaseProperties.getCellTtlD()));
}
table.put(put);
status = true;
@@ -66,5 +65,5 @@ public class HbaseServiceImpl implements HbaseService {
return status;
}
-
+ private long convertDaysToMilliseconds(Integer days) { return 1000L*3600*24*days; }
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java
new file mode 100644
index 0000000..100b93f
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java
@@ -0,0 +1,51 @@
+package com.mesa.reportservice.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.mapper.JobMapper;
+import com.mesa.reportservice.service.JobService;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Service
+public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements JobService {
+
+ @Override
+ public List<JobEntity> getRunningJob() {
+ return this.baseMapper.getRunningJob();
+ }
+
+ @Override
+ public List<JobEntity> getJobTask(int rows) {
+ Map map = new HashMap<String,Object>();
+ map.put("rows", rows);
+ return this.baseMapper.getJobTask(map);
+ }
+
+ @Override
+ public int updateProcesses(JobEntity job){
+ long currentTime = System.currentTimeMillis()/1000;
+ job.setLastUpdateTime(currentTime);
+ return this.baseMapper.updateProcesses(job);
+ }
+
+ @Override
+ public Map<String,Long> getJobCount(){
+ LocalDate currentDate = LocalDate.now();
+ long lastUpdateTime = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
+ Map map = new HashMap<String,Object>();
+ map.put("lastUpdateTime", lastUpdateTime);
+ return this.baseMapper.getJobCount(map);
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
index 008d207..0e57d84 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
@@ -1,61 +1,75 @@
package com.mesa.reportservice.service.impl;
-import cn.hutool.log.Log;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.configuration.ZkProperties;
+import com.alibaba.fastjson.JSONArray;
+import com.mesa.reportservice.bean.MonitorEntity;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.service.JobService;
import com.mesa.reportservice.service.MonitorService;
-import com.mesa.reportservice.service.ZkService;
+import com.mesa.reportservice.service.ZookeeperService;
import io.micrometer.core.instrument.*;
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
-import java.net.InetAddress;
-import java.util.List;
import java.util.Map;
/**
- * Created by wk1 on 2020/1/6.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Service
public class MonitorServiceImpl implements MonitorService {
- Log logger = Log.get();
+ @Autowired
+ private JobService mysqlService;
@Autowired
- MeterRegistry registry;
+ private ZookeeperService zookeeperService;
- private Counter counter_report_success;
- private Counter counter_report_fail;
+ @Autowired
+ private SchedulerProperties schedulerProperties;
- @PostConstruct
- public void init(){
+ @Autowired
+ private MeterRegistry meterRegistry;
+
+ private Counter counterReportSuccess;
- counter_report_success = registry.counter("report_success_count", "reportJob", "report_success");
+ private Counter counterReportFail;
- counter_report_fail = registry.counter("report_fail_count", "reportJob", "report_error");
+ @PostConstruct
+ public void init(){
+ counterReportSuccess = meterRegistry.counter("report_success_count", "reportJob", "report_success");
+ counterReportFail = meterRegistry.counter("report_fail_count", "reportJob", "report_error");
}
+
@Override
public void addSuccess() {
- counter_report_success.increment();
+ counterReportSuccess.increment();
}
+
@Override
public void addFail() {
- counter_report_fail.increment();
+ counterReportFail.increment();
}
-
- public MeterRegistry getRegistry() {
- return registry;
- }
-
- public void setRegistry(MeterRegistry registry) {
- this.registry = registry;
+ @Override
+ public String getJobCount() {
+ String json="";
+ MonitorEntity monitorEntity = new MonitorEntity();
+ Map<String,Long> numMap = mysqlService.getJobCount();
+ monitorEntity.setQueueJobNum(numMap.get("queueNum"));
+ monitorEntity.setExecutingJobNum(numMap.get("executingNum"));
+ monitorEntity.setTodaySuccessJobNum(numMap.get("todaySuccessNum"));
+ monitorEntity.setTodayErrorJobNum(numMap.get("todayErrorNum"));
+ monitorEntity.setJobList(schedulerProperties.getMapResult());
+ if(zookeeperService.isMaster()){
+ monitorEntity.setStatus("active");
+ }else{
+ monitorEntity.setStatus("standby");
+ }
+ Object obj = JSONArray.toJSON(monitorEntity);
+ json = obj.toString();
+ return json;
}
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
deleted file mode 100644
index bfc95ac..0000000
--- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.mesa.reportservice.service.impl;
-
-import cn.hutool.log.Log;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.mapper.ReportResultMapper;
-import com.mesa.reportservice.service.MysqlService;
-import com.mesa.reportservice.util.DateUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.time.LocalDate;
-import java.time.ZoneOffset;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by wk1 on 2020/1/2.
- */
-@Service
-public class MysqlServiceImpl implements MysqlService {
- Log logger = Log.get();
-
- @Autowired
- public ReportResultMapper rrm;
-
- @Override
- public List<JobEntity> getJobForExcute() {
-
- return rrm.getJobForExcute();
-
- }
-
- @Override
- public List<JobEntity> getJobTask(int rows) {
-
-
- String current_time = DateUtil.getDate();
- HashMap map = new HashMap();
- map.put("issuedtime", current_time);
- map.put("rows", rows);
-
- return rrm.getJobTask(map);
-
- }
-
- @Override
- public int updateProcesses(JobEntity job){
-
- long currentTime = System.currentTimeMillis()/1000;
- job.setLastUpdateTime(currentTime);
- return rrm.updateProcesses(job);
- }
-
-
-
- @Override
- public Map<String,Long> getJobCount() {
- LocalDate currentDate = LocalDate.now();
- long current_date = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
- HashMap map = new HashMap();
- map.put("lastUpdateTime", current_date);
- return rrm.getJobCount(map);
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImpl.java
index be22dcb..2831c2e 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImpl.java
@@ -3,10 +3,12 @@ package com.mesa.reportservice.service.impl;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.mesa.reportservice.bean.HttpResult;
-import com.mesa.reportservice.configuration.ClickhouseConfig;
-import com.mesa.reportservice.service.ClickhouseService;
+import com.mesa.reportservice.configuration.QueryGatewayProperties;
+import com.mesa.reportservice.service.QueryGatewayService;
+import com.mesa.reportservice.util.Constant;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
@@ -16,187 +18,147 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
/**
- * Created by wk1 on 2019/5/15.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
-
@Service
-public class ClickhouseServiceImpl implements ClickhouseService {
+public class QueryGatewayServiceImpl implements QueryGatewayService {
@Autowired
private CloseableHttpClient httpClient;
+
@Autowired
- private ClickhouseConfig clickhouseConfig;
+ private QueryGatewayProperties queryGatewayProperties;
@Autowired
@Qualifier("RequestShortConfig")
- private RequestConfig RequestshortConfig;
+ private RequestConfig RequestShortConfig;
@Autowired
@Qualifier("RequestLongConfig")
private RequestConfig RequestLongConfig;
- Log logger = Log.get();
+
+ private static final Log logger = LogFactory.get();
@Override
public String getQueryId(String query) {
-
CloseableHttpResponse response = null;
- String query_id = "";
+ String queryId = "";
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?query=");
+ String url = URLUtil.normalize(MessageFormat.format(Constant.URL_GET_QUERY_ID,queryGatewayProperties.getGatewayIp().trim()));
String sql = null;
sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
url = url + sql;
HttpPost httpPost = new HttpPost(url);
// 加入配置信息
- httpPost.setConfig(RequestshortConfig);
+ httpPost.setConfig(RequestShortConfig);
response = this.httpClient.execute(httpPost);
- if (response.getStatusLine().getStatusCode() != 200) {
+ if (response.getStatusLine().getStatusCode() != HttpStatus.OK.value()) {
throw new IOException();
} else {
- Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8"));
- List listdata = (List) data.get("data");
- Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
- query_id = map.get("queryId").toString();
-
+ Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
+ List listData = (List) data.get("data");
+ Map map = JSON.parseObject(JSON.toJSONString(listData.get(0)));
+ queryId = map.get("queryId").toString();
}
} catch (IOException e) {
logger.error(e.toString());
-
} finally {
IoUtil.close(response);
}
- return query_id;
+ return queryId;
}
/**
- * 带参数的post请求
+ * 带参数的get请求
*
* @return
* @throws Exception
*/
@Override
- public HttpResult queryForExcute(String query) throws UnsupportedEncodingException {
- // 声明httpPost请求
+ public HttpResult queryJob(String query) throws UnsupportedEncodingException {
+ // 声明httpGet请求
CloseableHttpResponse response = null;
// 发起请求
- HttpResult rs = null;
+ HttpResult httpResult = null;
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?rawSQL=");
+ String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_EXECUTE,queryGatewayProperties.getGatewayIp().trim()));
query = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
- String jobsql = url + query;
- HttpGet httpGet = new HttpGet(jobsql);
+ String jobSql = url + query;
+ HttpGet httpGet = new HttpGet(jobSql);
// 加入配置信息
httpGet.setConfig(RequestLongConfig);
-
-
response = this.httpClient.execute(httpGet);
- rs = new HttpResult();
- rs.setCode(response.getStatusLine().getStatusCode());
- rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
-
+ httpResult = new HttpResult();
+ httpResult.setCode(response.getStatusLine().getStatusCode());
+ httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
} catch (IOException e) {
logger.error(e.toString());
-
} finally {
IoUtil.close(response);
}
- return rs;
+ return httpResult;
}
@Override
- public HttpResult queryForProcess(String queryId) {
- // 声明httpPost请求
+ public HttpResult queryJobProcess(String queryId) {
+ // 声明httpGet请求
CloseableHttpResponse response = null;
// 发起请求
- HttpResult rs = null;
+ HttpResult httpResult = null;
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/" + queryId + "/progress");
-
+ String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_PROCESS,queryGatewayProperties.getGatewayIp().trim(),queryId));
HttpGet httpGet = new HttpGet(url);
// 加入配置信息
- httpGet.setConfig(RequestshortConfig);
-
-
+ httpGet.setConfig(RequestShortConfig);
response = this.httpClient.execute(httpGet);
- rs = new HttpResult();
- rs.setCode(response.getStatusLine().getStatusCode());
- rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ httpResult = new HttpResult();
+ httpResult.setCode(response.getStatusLine().getStatusCode());
+ httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
} catch (IOException e) {
logger.error(e.toString());
} finally {
IoUtil.close(response);
}
- return rs;
+ return httpResult;
}
@Override
- public HttpResult queryForCancel(String queryId) {
-
+ public HttpResult deleteJob(String queryId) {
CloseableHttpResponse response = null;
- HttpResult rs = null;
+ HttpResult httpResult = null;
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/" + queryId);
- // 声明httpPost请求
+ String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_CANCEL,queryGatewayProperties.getGatewayIp().trim(),queryId));
+ // 声明httpDelete请求
HttpDelete HttpDelete = new HttpDelete(url);
// 加入配置信息
- HttpDelete.setConfig(RequestshortConfig);
-
+ HttpDelete.setConfig(RequestShortConfig);
// 发起请求
-
response = this.httpClient.execute(HttpDelete);
- rs = new HttpResult();
- rs.setCode(response.getStatusLine().getStatusCode());
- rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ httpResult = new HttpResult();
+ httpResult.setCode(response.getStatusLine().getStatusCode());
+ httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
} catch (IOException e) {
logger.error(e.toString());
} finally {
IoUtil.close(response);
}
- return rs;
- }
-
- public CloseableHttpClient getHttpClient() {
- return httpClient;
+ return httpResult;
}
-
- public void setHttpClient(CloseableHttpClient httpClient) {
- this.httpClient = httpClient;
- }
-
- public RequestConfig getRequestshortConfig() {
- return RequestshortConfig;
- }
-
- public void setRequestshortConfig(RequestConfig requestshortConfig) {
- RequestshortConfig = requestshortConfig;
- }
-
- public RequestConfig getRequestLongConfig() {
- return RequestLongConfig;
- }
-
- public void setRequestLongConfig(RequestConfig requestLongConfig) {
- RequestLongConfig = requestLongConfig;
- }
-
- /**
- * 不带参数post请求
- *
- * @param url
- * @return
- * @throws Exception
- */
-
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java
new file mode 100644
index 0000000..c43981b
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java
@@ -0,0 +1,180 @@
+package com.mesa.reportservice.service.impl;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.HttpClientPool;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.configuration.ThreadPoolConfig;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
+import com.mesa.reportservice.service.*;
+import com.mesa.reportservice.util.Constant;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * @author lijinyang
+ * @date 2024/1/24
+ */
+@Service
+public class ScheduledResultServiceImpl implements ScheduledResultService {
+
+ private static final Log logger = LogFactory.get();
+
+ @Autowired
+ private ThreadPoolConfig threadPoolConfig;
+
+ @Autowired
+ private JobService jobService;
+
+ @Autowired
+ private ExecuteService executeService;
+
+ @Autowired
+ private ExecuteProcessService executeProcessService;
+
+ @Autowired
+ private ZookeeperService zookeeperService;
+
+ @Autowired
+ private QueryGatewayService queryGatewayService;
+
+ @Autowired
+ private HttpClientPool httpClientPool;
+
+ @Autowired
+ private SchedulerProperties schedulerProperties;
+
+ @Override
+ public void execute() {
+ try {
+ if (zookeeperService.isMaster()) {
+ logger.info(Constant.LOG_START_VIEWING_RESULTS);
+ updateAbnormalJob();
+
+ updateRunningJob();
+
+ updatePendingJob();
+ } else {
+ if (schedulerProperties.getMapResult().size() > 0) {
+ for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) {
+ logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
+ executeProcessService.killQuery(entry.getValue());
+ }
+ schedulerProperties.getMapResult().clear();
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.toString());
+ throw new SQSException(e.toString());
+ }
+ }
+
+ private void updateAbnormalJob(){
+ //先查询数据库是否有异常状态任务,killquery
+ List<JobEntity> jobList = jobService.getRunningJob();
+ for (JobEntity jobEntity : jobList) {
+ String sql = jobEntity.getQuerySql().trim();
+ String queryId = queryGatewayService.getQueryId(sql);
+ jobEntity.setQueryId(queryId);
+ if (jobEntity.getIsValid() == 0) {
+ executeProcessService.killQuery(jobEntity);
+ schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0);
+ } else if (!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) {
+ executeProcessService.reSet(jobEntity);
+ }
+ if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) {
+ if (jobEntity.getIsValid() == 0) {
+ executeProcessService.killQuery(jobEntity);
+ schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0);
+ }
+ } else {
+ executeProcessService.reSet(jobEntity);
+ }
+ }
+ }
+
+ private void updateRunningJob(){
+ //遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态
+ for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) {
+ logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
+ long currentTime = System.currentTimeMillis();
+ long executeTime = currentTime - entry.getValue().getStartTime();
+ logger.info("execute time=" + executeTime + "ttlTime=" + httpClientPool.getSocketTimeout());
+ if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > httpClientPool.getSocketTimeout()) {
+ entry.getValue().setState(JobStates.DONE.getValue());
+ entry.getValue().setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT);
+ executeProcessService.killQuery(entry.getValue());
+ executeProcessService.updateResultMessage(entry.getValue());
+ } else {
+ if (JobStates.RUNNING.getValue().equals(entry.getValue().getState())) {
+ executeProcessService.updateProcessMessage(entry.getValue());
+ } else {
+ executeProcessService.updateResultMessage(entry.getValue());
+ }
+ }
+ }
+ }
+
+ private void updatePendingJob() {
+ int rows = schedulerProperties.getJobThread() - schedulerProperties.getMapResult().size();
+ if (rows > 0) {
+ List<JobEntity> jobs = jobService.getJobTask(rows);
+ for (JobEntity job : jobs) {
+ logger.info(Constant.LOG_START_EXECUTING_TASKS);
+ long startTime = System.currentTimeMillis()/1000;
+ job.setStartTime(startTime);
+ String sql = job.getQuerySql().trim();
+ job.setQuerySql(sql);
+ job.setState(JobStates.RUNNING.getValue());
+ job.setExecuteStatus(1);
+ job.setResultMessage(Constant.EXECUTING);
+ job.setRowsRead(0L);
+ job.setElapsed(0);
+ job.setDoneProgress(0);
+ job.setResultRows(0L);
+ String queryId = "";
+ queryId = queryGatewayService.getQueryId(sql);
+ job.setQueryId(queryId);
+ if (queryId.equals("") ) {
+ job.setExecuteStatus(0);
+ job.setState(JobStates.DONE.getValue());
+ job.setEndTime(System.currentTimeMillis()/1000);
+ job.setDoneProgress(1.00f);
+ job.setIsFailed(1);
+ job.setResultMessage(Constant.UNKNOWN_ERROR);
+ }
+ if ((JobStates.RUNNING.getValue()).equals(job.getState())) {
+ if (jobService.updateProcesses(job) != 0) {
+ schedulerProperties.getMapResult().put(queryId, job);
+ threadPoolConfig.getThreadPoolTaskExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ executeService.queryQgw(job);
+ }
+ });
+ } else {
+ logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg());
+ throw new SQSException(SQSCode.UPDATE_STATUS_ERROR);
+ }
+ } else {
+ if (jobService.updateProcesses(job) != 0) {
+ logger.error(SQSCode.TASK_CANNOT_EXECUTED.getMsg());
+ throw new SQSException(SQSCode.TASK_CANNOT_EXECUTED);
+ } else {
+ logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg());
+ throw new SQSException(SQSCode.UPDATE_STATUS_ERROR);
+ }
+ }
+ }
+ } else {
+ logger.info(Constant.LOG_NO_PENDING_TASKS);
+ }
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java
index fe30208..dd2c267 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java
@@ -1,62 +1,60 @@
package com.mesa.reportservice.service.impl;
import cn.hutool.log.Log;
-import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.configuration.ZkProperties;
-import com.mesa.reportservice.service.ZkService;
+import cn.hutool.log.LogFactory;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.configuration.ZookeeperProperties;
+import com.mesa.reportservice.service.ZookeeperService;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
/**
- * Created by wk1 on 2020/1/6.
+ *
+ * @author lijinyang
+ * @date 2024/1/24
*/
@Service
-public class ZkServiceImpl implements ZkService {
+public class ZookeeperServiceImpl implements ZookeeperService {
+
@Autowired
private CuratorFramework curatorConnect;
+
@Autowired
- private ZkProperties zp;
- Log logger = Log.get();
+ private ZookeeperProperties zookeeperProperties;
+ private static final Log logger = LogFactory.get();
@Override
public boolean isMaster() {
-
-
try {
- if (zp.getOpen() == 0) {
+ if (zookeeperProperties.getOpen() == 0) {
boolean isZkCuratorStarted = curatorConnect.isStarted();
String nodePath = "/masterip";
logger.debug("the current state of the client: " + (isZkCuratorStarted ? "connecting..." : "closed..."));
Stat statExist = curatorConnect.checkExists().forPath(nodePath);
if (statExist == null) {
- byte[] data = GlobelConfig.zkuuid.getBytes(); // 节点数据
-
- String result = curatorConnect.create().creatingParentsIfNeeded() // 创建父节点,也就是会递归创建
- .withMode(CreateMode.EPHEMERAL) // 节点类型
+ // 节点数据
+ byte[] data = SchedulerProperties.zkUUID.getBytes(StandardCharsets.UTF_8);
+ // 创建父节点,也就是会递归创建
+ String result = curatorConnect.create().creatingParentsIfNeeded()
+ // 节点类型
+ .withMode(CreateMode.EPHEMERAL)
.forPath(nodePath, data);
logger.debug(result);
-
return true;
-
} else {
logger.debug(nodePath + " node exists");
Stat stat = new Stat();
-
byte[] nodeData = curatorConnect.getData().storingStatIn(stat).forPath(nodePath);
- String masterid = new String(nodeData, "UTF-8").trim();
- logger.debug("uuid=" + GlobelConfig.zkuuid + " node " + nodePath + " data is:" + masterid);
-
- if (masterid.equals(GlobelConfig.zkuuid)) {
-
+ String masterId = new String(nodeData, StandardCharsets.UTF_8).trim();
+ logger.debug("uuid=" + SchedulerProperties.zkUUID + " node " + nodePath + " data is:" + masterId);
+ if (masterId.equals(SchedulerProperties.zkUUID)) {
return true;
-
} else {
logger.debug("not the main node");
return false;
@@ -65,13 +63,11 @@ public class ZkServiceImpl implements ZkService {
} else {
return true;
}
-
} catch (Exception e) {
logger.error(e.toString());
return false;
}
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/util/Constant.java b/src/main/java/com/mesa/reportservice/util/Constant.java
new file mode 100644
index 0000000..91b4fd6
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/Constant.java
@@ -0,0 +1,50 @@
+package com.mesa.reportservice.util;
+
+/**
+ *
+ * @author lijinyang
+ * @date: 2024/01/12/16:33
+ */
+public class Constant {
+
+ /**
+ * jobMessage
+ */
+ public static final String OK = "OK";
+ public static final String CANCEL = "CANCEL";
+ public static final String EXECUTING = "EXECUTING";
+ public static final String WRITE_DATA_ERROR = "Write Data Error";
+ public static final String PARAM_SYNTAX_ERROR = "Param Syntax Error";
+ public static final String SQL_EXECUTION_ERROR = "SQL Execution Error";
+ public static final String UNKNOWN_ERROR = "Unknown Error";
+ public static final String DATABASE_ERROR = "Database Error";
+ public static final String RE_EXECUTION = "Re Execution";
+ public static final String QUERY_TIME_OUT = "SQL Execution Error execute query time out";
+ public static final String RESULT_TOO_LARGE = "Result Too Large";
+
+ /**
+ * jobStatus
+ */
+ public static final int JOB_SUCCESS = 20000666;
+ public static final int JOB_ERROR = 555999;
+ public static final int JOB_EXECUTION_TIMEOUT = 500001;
+ public static final int JOB_SQL_EXECUTION_ERROR_CODE = 50001300;
+ public static final int JOB_BAD_REQUEST_THRESHOLD = 40000000;
+ public static final int JOB_INTERNAL_SERVER_ERROR_THRESHOLD = 50000000;
+
+ /**
+ * loggerMessage
+ */
+ public static final String LOG_SUCCESS_SAVE_TO_HBASE = "success save to HBase ! queryId = {0} jobId = {1} executeSql = {2}";
+ public static final String LOG_NO_PENDING_TASKS = "no pending tasks";
+ public static final String LOG_START_EXECUTING_TASKS = "start executing task";
+ public static final String LOG_START_VIEWING_RESULTS = "start viewing results";
+
+ /**
+ * URL
+ */
+ public static final String URL_GET_QUERY_ID = "{0}/v1/sql/query/query_id?query=";
+ public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?originalSQL=";
+ public static final String URL_QUERY_FOR_PROCESS = "{0}/v1/sql/query/{1}/progress";
+ public static final String URL_QUERY_FOR_CANCEL = "{0}/v1/sql/query/{1}";
+}
diff --git a/src/main/resources/mappers/ReportResultMapper.xml b/src/main/resources/mappers/JobMapper.xml
index b4d8f57..9017d60 100644
--- a/src/main/resources/mappers/ReportResultMapper.xml
+++ b/src/main/resources/mappers/JobMapper.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
-<mapper namespace="com.mesa.reportservice.mapper.ReportResultMapper" >
+<mapper namespace="com.mesa.reportservice.mapper.JobMapper" >
<resultMap type="com.mesa.reportservice.bean.JobEntity" id="BaseResultMap">
<id property="jobId" column="job_id"/>
<result property="querySql" column="query_sql"/>
@@ -24,7 +24,7 @@
job_id,query_sql,state,done_progress,is_failed,result_message,elapsed,rows_read,
bytes_read,result_rows,result_bytes,is_valid,start_time,end_time,last_update_time,generated_time
</sql>
- <select id="getJobForExcute" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
+ <select id="getRunningJob" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
select
<include refid="Base_Column_List" />
from saved_query_job
@@ -110,7 +110,7 @@
<select id="getJobCount" resultType="map" parameterType="hashmap">
SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum,
- (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as excuteingNum,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as executingNum,
(SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum,
(SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum
</select>
diff --git a/src/test/java/com/mesa/ReportServiceApplicationTests.java b/src/test/java/com/mesa/ReportServiceApplicationTests.java
new file mode 100644
index 0000000..1421554
--- /dev/null
+++ b/src/test/java/com/mesa/ReportServiceApplicationTests.java
@@ -0,0 +1,81 @@
+package com.mesa;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.mesa.reportservice.ReportServiceApplication;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.jdbc.datasource.init.ScriptUtils;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {ReportServiceApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@EnableAutoConfiguration
+@ActiveProfiles("test")
+public class ReportServiceApplicationTests {
+
+ private static final Log logger = LogFactory.get();
+
+ @Autowired
+ private DataSource dataSource;
+
+ @LocalServerPort
+ protected int testPort;
+
+ static {
+ System.setProperty("jasypt.encryptor.password", "galaxy");
+ System.setProperty("JM.SNAPSHOT.PATH", "config");
+ }
+
+ public ReportServiceApplicationTests() {
+ }
+
+ @Test
+ public void contextLoads() {
+ logger.info("SAVED QUERY SCHEDULER CONTEXT LOAD");
+ }
+
+ @Test
+ public void runSqlScript() throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ ScriptUtils.executeSqlScript(connection, new ClassPathResource("db/SavedQueryJob.sql"));
+ }
+ }
+
+ // 解析resource: parameters/otherPath/指定JSON文件, 返回指定类型对象
+ public <T> T jsonToInParameter(String path, String parameterName,Class<T> type) {
+ InputStream resourceAsStream = ReportServiceApplicationTests.class.getClassLoader().getResourceAsStream(path);
+ if (Objects.isNull(resourceAsStream)) {
+ logger.info("Unable to obtain the specified file under the specified path. Please check if the file exists or if the file name capitalization is consistent", path);
+ throw new IllegalArgumentException();
+ }
+ Map<String, Object> json = null;
+ try {
+ json = JSON.parseObject(resourceAsStream, Map.class);
+ } catch (IOException e) {
+ logger.error("Failed to read files from the test resource directory, file: {}", path);
+ throw new RuntimeException(e);
+ }
+ Object data = json.get(parameterName);
+ String objectJsonStr = JSON.toJSONString(data, true);
+ logger.info("parameter file: {}, parameter name: {}", path, parameterName);
+ return JSON.parseObject(objectJsonStr, type);
+ }
+
+}
diff --git a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java b/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java
new file mode 100644
index 0000000..9e5b56e
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java
@@ -0,0 +1,86 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
+import com.mesa.reportservice.service.ExecuteProcessService;
+import com.mesa.reportservice.util.Constant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/08/16:33
+ */
+@EnableAutoConfiguration
+public class ExecuteProcessTest extends ReportServiceApplicationTests {
+
+ @Autowired
+ private ExecuteProcessService executeProcessService;
+
+ @Test
+ public void testReSet() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class);
+ executeProcessService.reSet(job);
+ Assert.assertEquals(JobStates.PENDING.getValue(),job.getState());
+ }
+
+ @Test
+ public void testUpdateResultMessageCancel() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class);
+ executeProcessService.updateResultMessage(job);
+ Assert.assertEquals(Constant.CANCEL,job.getResultMessage());
+ }
+
+ @Test
+ public void testUpdateResultMessageOk() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class);
+ executeProcessService.updateResultMessage(job);
+ Assert.assertEquals(Constant.OK,job.getResultMessage());
+ }
+
+ @Test
+ public void testUpdateResultMessageDataError() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class);
+ try {
+ executeProcessService.updateResultMessage(job);
+ } catch (SQSException e) {
+ Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ }
+ }
+
+ @Test
+ public void testUpdateResultMessageParamError() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class);
+ try {
+ executeProcessService.updateResultMessage(job);
+ } catch (SQSException e) {
+ Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ }
+ }
+
+ @Test
+ public void testUpdateResultMessageSQLError() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class);
+ try {
+ executeProcessService.updateResultMessage(job);
+ } catch (SQSException e) {
+ Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ }
+ }
+
+ @Test
+ public void testUpdateResultMessageUnknownError() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class);
+ try {
+ executeProcessService.updateResultMessage(job);
+ } catch (SQSException e) {
+ Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ }
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/ExecuteTest.java b/src/test/java/com/mesa/reportservice/ExecuteTest.java
new file mode 100644
index 0000000..e30293a
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/ExecuteTest.java
@@ -0,0 +1,29 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.service.ExecuteService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/10/15:57
+ */
+@EnableAutoConfiguration
+public class ExecuteTest extends ReportServiceApplicationTests {
+
+ @Autowired
+ private ExecuteService executeService;
+
+ @Test
+ public void testExecuteTask() {
+ JobEntity job = this.jsonToInParameter("parameters/executeTest.json", "executeJob", JobEntity.class);
+ executeService.queryQgw(job);
+ Assert.assertEquals(JobStates.DONE.getValue(),job.getState());
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/HBaseTest.java b/src/test/java/com/mesa/reportservice/HBaseTest.java
new file mode 100644
index 0000000..2392c10
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/HBaseTest.java
@@ -0,0 +1,28 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.service.HBaseService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/10/14:48
+ */
+@EnableAutoConfiguration
+public class HBaseTest extends ReportServiceApplicationTests {
+
+ @Autowired
+ private HBaseService hBaseService;
+
+ @Test
+ public void testPut() throws Exception {
+ JobEntity job = this.jsonToInParameter("parameters/hBaseTest.json", "putJob", JobEntity.class);
+ Boolean put = hBaseService.put(job);
+ Assert.assertTrue(put);
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/JobTest.java b/src/test/java/com/mesa/reportservice/JobTest.java
new file mode 100644
index 0000000..e69aaf6
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/JobTest.java
@@ -0,0 +1,62 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.service.JobService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/08/17:22
+ */
+@EnableAutoConfiguration
+public class JobTest extends ReportServiceApplicationTests {
+
+ private static final int rows = 10;
+
+ private static final int updateProcessesSuccess = 1;
+
+ @Autowired
+ private JobService jobService;
+
+ @Test
+ public void testGetRunningJob() {
+ List<JobEntity> jobForExecute = jobService.getRunningJob();
+ Assert.assertNotNull(jobForExecute);
+ Assert.assertEquals(JobStates.RUNNING.getValue(),jobForExecute.get(0).getState());
+ }
+
+ @Test
+ public void testGetJobTask() {
+ List<JobEntity> jobForExecute = jobService.getJobTask(rows);
+ Assert.assertNotNull(jobForExecute);
+ Assert.assertEquals(JobStates.PENDING.getValue(),jobForExecute.get(0).getState());
+ }
+
+ @Test
+ public void testGetJobCount() {
+ Map<String, Long> jobCount = jobService.getJobCount();
+ Assert.assertNotNull(jobCount);
+ }
+
+ @Test
+ public void testUpdateProcessesSuccess() {
+ JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesSuccessJob", JobEntity.class);
+ int i = jobService.updateProcesses(job);
+ Assert.assertEquals(updateProcessesSuccess,i);
+ }
+
+ @Test
+ public void testUpdateProcessesError() {
+ JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesErrorJob", JobEntity.class);
+ int i = jobService.updateProcesses(job);
+ Assert.assertNotEquals(updateProcessesSuccess,i);
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/MonitorTest.java b/src/test/java/com/mesa/reportservice/MonitorTest.java
new file mode 100644
index 0000000..f8cf701
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/MonitorTest.java
@@ -0,0 +1,29 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.test.web.servlet.MockMvc;
+
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/04/16:08
+ */
+@EnableAutoConfiguration
+@AutoConfigureMockMvc
+public class MonitorTest extends ReportServiceApplicationTests {
+
+ @Autowired
+ private MockMvc mockMvc;
+
+ @Test
+ public void testGetReportStatus() throws Exception{
+ mockMvc.perform(get("/monitor"))
+ .andExpect(status().isOk());
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java b/src/test/java/com/mesa/reportservice/QueryGatewayTest.java
new file mode 100644
index 0000000..193b925
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/QueryGatewayTest.java
@@ -0,0 +1,96 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.service.QueryGatewayService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.http.HttpStatus;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/04/17:05
+ */
+@EnableAutoConfiguration
+public class QueryGatewayTest extends ReportServiceApplicationTests {
+
+ private static final String QUERY_ID_PATTERN = "^[a-f0-9]{32}:[a-f0-9]{32}$";
+
+ @Autowired
+ private QueryGatewayService queryGatewayService;
+
+ @Test
+ public void testGetQueryIdSuccess() {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class);
+ String queryId = queryGatewayService.getQueryId(sql);
+ Pattern compile = Pattern.compile(QUERY_ID_PATTERN);
+ Matcher matcher = compile.matcher(queryId);
+ Assert.assertTrue(matcher.find());
+ }
+
+ @Test
+ public void testGetQueryIdError() {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlError", String.class);
+ String queryId = queryGatewayService.getQueryId(sql);
+ Pattern compile = Pattern.compile(QUERY_ID_PATTERN);
+ Matcher matcher = compile.matcher(queryId);
+ Assert.assertTrue(!matcher.find());
+ }
+
+ @Test
+ public void testQueryJobSuccess() throws IOException {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.queryJob(sql);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value());
+ }
+
+ @Test
+ public void testQueryJobError() throws IOException {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlError", String.class);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.queryJob(sql);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.BAD_REQUEST.value());
+ }
+
+ @Test
+ public void testQueryJobProcessSuccess() throws IOException {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class);
+ String queryId = queryGatewayService.getQueryId(sql);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.queryJobProcess(queryId);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value());
+ Assert.assertNotNull(httpResult.getBody());
+ }
+
+ @Test
+ public void testQueryJobProcessError() throws IOException {
+ String queryId = this.jsonToInParameter("parameters/queryGatewayTest.json", "queryForProcessIdError", String.class);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.queryJobProcess(queryId);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.BAD_REQUEST.value());
+ }
+
+ @Test
+ public void testDeleteJobSuccess() throws IOException {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class);
+ String queryId = queryGatewayService.getQueryId(sql);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.deleteJob(queryId);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value());
+ }
+
+ @Test
+ public void testDeleteJobError() throws IOException {
+ String queryId = this.jsonToInParameter("parameters/queryGatewayTest.json", "queryForCancelIdError", String.class);
+ HttpResult httpResult = null;
+ httpResult = queryGatewayService.deleteJob(queryId);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.NOT_FOUND.value());
+ }
+}
diff --git a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java
deleted file mode 100644
index 4ae38f5..0000000
--- a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.mesa.reportservice;
-
-
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-import java.io.IOException;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ReportserviceApplicationTests {
-
-
-
-
- @Autowired
- private CloseableHttpClient httpClient;
-
- @Autowired
- private RequestConfig config;
-
-
-
-
- @Test
- public void contextLoads() throws IOException {
-
-
-
- }
-
-
-}
diff --git a/src/test/java/com/mesa/reportservice/zookeeperTest.java b/src/test/java/com/mesa/reportservice/zookeeperTest.java
new file mode 100644
index 0000000..5370afb
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/zookeeperTest.java
@@ -0,0 +1,25 @@
+package com.mesa.reportservice;
+
+import com.mesa.ReportServiceApplicationTests;
+import com.mesa.reportservice.service.ZookeeperService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/01/10/14:20
+ */
+@EnableAutoConfiguration
+public class zookeeperTest extends ReportServiceApplicationTests {
+
+ @Autowired
+ private ZookeeperService zookeeperService;
+
+ @Test
+ public void testIsMaster() throws Exception {
+ boolean isMaster = zookeeperService.isMaster();
+ Assert.assertTrue(isMaster);
+ }
+}
diff --git a/src/test/resources/db/SavedQueryJob.sql b/src/test/resources/db/SavedQueryJob.sql
new file mode 100644
index 0000000..1940c31
--- /dev/null
+++ b/src/test/resources/db/SavedQueryJob.sql
@@ -0,0 +1,7 @@
+truncate saved_query_job;
+
+INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('00838b9d94552afecc6fa29cde033714', 'SELECT COUNT_DISTINCT(server_ip) AS \"UniqServerIP\", client_ip AS \"Client IP\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-23T14:30:00+08:00\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-24T14:30:00+08:00\') AND security_event.vsys_id IN (1) GROUP BY \"Client IP\" ORDER BY \"UniqServerIP\" DESC LIMIT 20', 'RUNNING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1703399415, null, 1703399430, 1703399409);
+INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('014f9d6e1f17227ce8d837003bc3bedb', 'SELECT AVG(sent_bytes) AS \"Bytes Sent\", AVG(received_bytes) AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(start_timestamp_ms, \'PT5M\', \'zero\')) AS \"Start Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-18T13:18:01Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-19T13:18:01Z\') AND security_event.vsys_id IN (1) GROUP BY \"Start Time\" ORDER BY \"Start Time\" ASC', 'RUNNING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1702991925, null, 1702991940, 1702991921);
+INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('0450dd9248d172aed6aac55e62f7d128', 'SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-27T09:05:44Z\') AND recv_time < UNIX_TIMESTAMP(\'2024-01-03T09:05:44Z\') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC', 'PENDING', 0, 0, '', 0, 0, 0, 0, 0, 1, null, null, 1704272820, 1704272794);
+INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('04758d6dd22b008c4c66714b7caafd63', 'SELECT COUNT(sled_ip) AS \"Sled IP\", COUNT(client_asn) AS \"Client ASN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-20T09:31:16Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-20T10:31:16Z\') AND security_event.vsys_id IN (6) ORDER BY \"Sled IP\" DESC', 'DONE', 1.00, 0, 'OK', 100, 100, 100, 100, 1, 1, 1703068470, 1703068485, 1703068485, 1703068455);
+INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('155c91cef347d3afa015076df3e9395b', 'SELECT COUNT(sent_bytes) AS \"Bytes Sent\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-10T16:00:00Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-17T16:00:00Z\') AND security_event.vsys_id IN (1) ORDER BY \"Bytes Sent\" DESC', 'PENDING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1703376015, 1703376030, 1703376030, 1703376008); \ No newline at end of file
diff --git a/src/test/resources/parameters/executeProcessTest.json b/src/test/resources/parameters/executeProcessTest.json
new file mode 100644
index 0000000..8a3d4a3
--- /dev/null
+++ b/src/test/resources/parameters/executeProcessTest.json
@@ -0,0 +1,170 @@
+{
+ "reSetJob": {
+ "jobId": "04758d6dd22b008c4c66714b7caafd63",
+ "querySql": "",
+ "state": "DONE",
+ "resultName": "",
+ "doneProgress": 1.00,
+ "isFailed": 0,
+ "resultMessage": "OK",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": "",
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "cancelJob": {
+ "jobId": "04758d6dd22b008c4c66714b7caafd63",
+ "querySql": "",
+ "state": "DONE",
+ "resultName": "",
+ "doneProgress": 1.00,
+ "isFailed": 0,
+ "resultMessage": "OK",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 0,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "okJob": {
+ "jobId": "0550bf2eb3ed6ff0351554c085367286",
+ "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": 20000666,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "dataErrorJob": {
+ "jobId": "",
+ "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": 20000666,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "paramErrorJob": {
+ "jobId": "0550bf2eb3ed6ff0351554c085367286",
+ "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": 40000100,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "SQLErrorJob": {
+ "jobId": "0550bf2eb3ed6ff0351554c085367286",
+ "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": 50001300,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "unknownErrorJob": {
+ "jobId": "0550bf2eb3ed6ff0351554c085367286",
+ "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": 50001500,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ }
+}
diff --git a/src/test/resources/parameters/executeTest.json b/src/test/resources/parameters/executeTest.json
new file mode 100644
index 0000000..8f59844
--- /dev/null
+++ b/src/test/resources/parameters/executeTest.json
@@ -0,0 +1,26 @@
+{
+ "executeJob": {
+ "jobId": "0450dd9248d172aed6aac55e62f7d128",
+ "querySql": "SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-27T09:05:44Z') AND recv_time < UNIX_TIMESTAMP('2024-01-03T09:05:44Z') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": null,
+ "rowsRead": null,
+ "bytesRead": null,
+ "resultRows": null,
+ "resultBytes": null,
+ "isValid": "",
+ "startTime": null,
+ "endTime": null,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/parameters/hBaseTest.json b/src/test/resources/parameters/hBaseTest.json
new file mode 100644
index 0000000..2162c5a
--- /dev/null
+++ b/src/test/resources/parameters/hBaseTest.json
@@ -0,0 +1,26 @@
+{
+ "putJob": {
+ "jobId": "0450dd9248d172aed6aac55e62f7d128",
+ "querySql": "SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-27T09:05:44Z') AND recv_time < UNIX_TIMESTAMP('2024-01-03T09:05:44Z') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC",
+ "state": "DONE",
+ "resultName": "",
+ "doneProgress": 1.00,
+ "isFailed": 0,
+ "resultMessage": "OK",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": "",
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/parameters/mysqlTest.json b/src/test/resources/parameters/mysqlTest.json
new file mode 100644
index 0000000..7ed1831
--- /dev/null
+++ b/src/test/resources/parameters/mysqlTest.json
@@ -0,0 +1,50 @@
+{
+ "updateProcessesSuccessJob": {
+ "jobId": "00838b9d94552afecc6fa29cde033714",
+ "querySql": "",
+ "state": "DONE",
+ "resultName": "",
+ "doneProgress": 1.00,
+ "isFailed": 0,
+ "resultMessage": "OK",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": "",
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ },
+ "updateProcessesErrorJob": {
+ "jobId": "e70afb932207d556b20da4da5ec7ff38",
+ "querySql": "",
+ "state": "DONE",
+ "resultName": "",
+ "doneProgress": 1.00,
+ "isFailed": 0,
+ "resultMessage": "OK",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": "",
+ "startTime": 1703399415,
+ "endTime": 1703399430,
+ "lastUpdateTime": "",
+ "generatedTime": "",
+ "queryId": "",
+ "executeStatus": "",
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/parameters/queryGatewayTest.json b/src/test/resources/parameters/queryGatewayTest.json
new file mode 100644
index 0000000..fc36ed0
--- /dev/null
+++ b/src/test/resources/parameters/queryGatewayTest.json
@@ -0,0 +1,6 @@
+{
+ "querySqlSuccess": "select 1 from session_record limit 1",
+ "querySqlError": "select 1 from session_record_cn limit 1",
+ "queryForProcessIdError": "",
+ "queryForCancelIdError": "1:1"
+}