summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王玮 <[email protected]>2024-07-15 01:50:22 +0000
committer王玮 <[email protected]>2024-07-15 01:50:22 +0000
commit11c95e4722b61de0e460c8f011a1b712b1a5e288 (patch)
tree6abb3b58f766f7b1271fe023a7e238edef1815dd
parent495fcdaac7df37e97ada37286a222ca90fd4f32f (diff)
parentef9790659e2f483b550bacf0ef0b1836a19a6d00 (diff)
Merge branch 'br-3.0.2' into 'develop'3.0.2br-3.0.2
Br 3.0.2 See merge request galaxy/platform/galaxy-report-service!3
-rw-r--r--config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml4
-rw-r--r--config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml4
-rw-r--r--docs/release/release-2.0.0.md3
-rw-r--r--docs/release/release-3.0.0.md5
-rw-r--r--docs/release/release-3.0.1.md6
-rw-r--r--docs/release/release-3.0.2.md4
-rw-r--r--pom.xml34
-rw-r--r--src/main/java/com/mesa/reportservice/bean/MonitorEntity.java29
-rw-r--r--src/main/java/com/mesa/reportservice/controller/MonitorController.java20
-rw-r--r--src/main/java/com/mesa/reportservice/exception/SQSCode.java3
-rw-r--r--src/main/java/com/mesa/reportservice/exception/SQSException.java8
-rw-r--r--src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java38
-rw-r--r--src/main/java/com/mesa/reportservice/mapper/JobMapper.java6
-rw-r--r--src/main/java/com/mesa/reportservice/service/JobService.java5
-rw-r--r--src/main/java/com/mesa/reportservice/service/MonitorService.java3
-rw-r--r--src/main/java/com/mesa/reportservice/service/ZookeeperService.java3
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java21
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java24
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java63
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java32
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java36
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java25
-rw-r--r--src/main/java/com/mesa/reportservice/util/Constant.java4
-rw-r--r--src/main/java/com/mesa/reportservice/util/R.java93
-rw-r--r--src/main/resources/mappers/JobMapper.xml27
-rw-r--r--src/main/resources/sql/monitorInit.sql1
-rw-r--r--src/test/java/com/mesa/reportservice/bean/JobEntityTest.java23
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/ExecuteProcessTest.java)74
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/ExecuteServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/ExecuteTest.java)4
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/HBaseServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/HBaseTest.java)4
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/JobServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/JobTest.java)32
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/MonitorServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/MonitorTest.java)6
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/QueryGatewayTest.java)43
-rw-r--r--src/test/java/com/mesa/reportservice/service/impl/ZookeeperServiceImplTest.java (renamed from src/test/java/com/mesa/reportservice/zookeeperTest.java)4
-rw-r--r--src/test/resources/parameters/executeProcessTest.json24
-rw-r--r--src/test/resources/parameters/jobTest.json (renamed from src/test/resources/parameters/mysqlTest.json)4
-rw-r--r--src/test/resources/parameters/queryGatewayTest.json4
37 files changed, 527 insertions, 196 deletions
diff --git a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
index 5c910d8..91d5309 100644
--- a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
+++ b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml
@@ -62,7 +62,9 @@ spring:
username: root
#mariadb的密码
password: galaxy2019
-
+#初始化数据库表和数据
+ initialization-mode: always
+ schema: classpath:sql/monitorInit.sql
#以下配置不需要更改通常
name: druidDataSource
type: com.alibaba.druid.pool.DruidDataSource
diff --git a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml
index 97f8407..cdeb3b0 100644
--- a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml
+++ b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml
@@ -62,7 +62,9 @@ spring:
username: root
#mariadb的密码
password: galaxy2019
-
+#初始化数据库表和数据
+ initialization-mode: always
+ schema: classpath:sql/monitorInit.sql
#以下配置不需要更改通常
name: druidDataSource
type: com.alibaba.druid.pool.DruidDataSource
diff --git a/docs/release/release-2.0.0.md b/docs/release/release-2.0.0.md
new file mode 100644
index 0000000..c820873
--- /dev/null
+++ b/docs/release/release-2.0.0.md
@@ -0,0 +1,3 @@
+Release 2.0.0 (TSG-24.02)
+* 修改Dockerfile,更新默认路径为/opt/saved-query-scheduler(GAL-502)
+* 版本号按年命名修改为“主版本号.子版本号.修正版本号” \ No newline at end of file
diff --git a/docs/release/release-3.0.0.md b/docs/release/release-3.0.0.md
new file mode 100644
index 0000000..21f6c41
--- /dev/null
+++ b/docs/release/release-3.0.0.md
@@ -0,0 +1,5 @@
+Release 3.0.0 (TSG-24.03)
+* 报告服务核心功能单元测试编写(GAL-466)
+* 统一变量及函数命名(GAL-467)
+* 报告服务函数、常量的抽取及封装(GAL-468)
+* 集成sonarQube服务,Job执行异常信息封装(GAL-480) \ No newline at end of file
diff --git a/docs/release/release-3.0.1.md b/docs/release/release-3.0.1.md
new file mode 100644
index 0000000..fef03c9
--- /dev/null
+++ b/docs/release/release-3.0.1.md
@@ -0,0 +1,6 @@
+Release 3.0.1 (TSG-24.03)
+* 修复Saved Query Scheduler代码检查异常(GAL-513)
+* Saved Query Scheduler job状态信息获取接口开发(GAL-518)
+* 优化saved-query-scheduler job信息检测API(GAL-522)
+* 优化异常信息描述
+* 修复done_progress字段在job执行期间不更新问题 \ No newline at end of file
diff --git a/docs/release/release-3.0.2.md b/docs/release/release-3.0.2.md
new file mode 100644
index 0000000..48e1510
--- /dev/null
+++ b/docs/release/release-3.0.2.md
@@ -0,0 +1,4 @@
+Release 3.0.2 (TSG-24.07)
+* 优化异常信息描述
+* 修复done_progress字段在job执行期间不更新问题
+* Saved Query Scheduler健康检查接口增加慢查询指标(GAL-608) \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3ee88c0..236b8a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,11 @@
<dependencies>
<dependency>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.8.11</version>
+ </dependency>
+ <dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
<version>2.1.1</version>
@@ -291,7 +296,13 @@
</execution>
</executions>
</plugin>
-
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
<plugin>
<groupId>com.mesalab.xjar-maven-plugin</groupId>
<artifactId>mesalab-xjar-maven-plugin</artifactId>
@@ -358,7 +369,26 @@
</configuration>
</plugin>
-
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.8.11</version>
+ <configuration >
+ <destFile>${project.build.directory}/jacoco.exec</destFile>
+ <dataFile>${project.build.directory}/jacoco.exec</dataFile>
+ <includes>
+ <include>**/service/impl/**</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>prepare-agent</goal>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java
deleted file mode 100644
index cfa761e..0000000
--- a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.mesa.reportservice.bean;
-
-import lombok.Data;
-
-import java.util.Map;
-
-/**
- *
- * @author lijinyang
- * @date 2024/1/24
- */
-@Data
-public class MonitorEntity {
-
- private Long queueJobNum;
-
- private Long executingJobNum;
-
- private Long todaySuccessJobNum;
-
- private Long todayErrorJobNum;
-
- private Map<String,JobEntity> jobList ;
-
- private String status;
-
- public MonitorEntity() {
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
index 8d5f8c7..c99b8a5 100644
--- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java
+++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java
@@ -1,11 +1,16 @@
package com.mesa.reportservice.controller;
+import com.mesa.reportservice.service.JobService;
import com.mesa.reportservice.service.MonitorService;
+import com.mesa.reportservice.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import java.util.Map;
/**
*
@@ -14,13 +19,20 @@ import org.springframework.web.bind.annotation.RestController;
*/
@RestController
@Component
+@RequestMapping("/v1/monitor")
public class MonitorController {
@Autowired
- private MonitorService monitorService;
+ private JobService jobService;
- @GetMapping(value = "/monitor")
- public String getJobCount() {
- return monitorService.getJobCount();
+ @GetMapping
+ public R status() {
+ return jobService.getJobStatus();
+ }
+
+ @GetMapping(value = "/job/{jobId}")
+ public R jobDetail(@PathVariable String jobId) {
+ Map<String, Object> getJobDetail = jobService.getJobDetail(jobId);
+ return R.ok(getJobDetail);
}
}
diff --git a/src/main/java/com/mesa/reportservice/exception/SQSCode.java b/src/main/java/com/mesa/reportservice/exception/SQSCode.java
index d20f28b..1961f7a 100644
--- a/src/main/java/com/mesa/reportservice/exception/SQSCode.java
+++ b/src/main/java/com/mesa/reportservice/exception/SQSCode.java
@@ -28,9 +28,10 @@ public enum SQSCode {
KILL_QUERY_ERROR(500008,"Kill Query Error"),
TEMPLATE_UNKNOWN_ERROR(500009,"Unknown Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"),
UPDATE_STATUS_ERROR(500010,"failed to update database status"),
- TASK_CANNOT_EXECUTED(500010,"task cannot be executed"),
+ TASK_CANNOT_EXECUTED(500011,"task cannot be executed"),
UNKNOWN_ERROR(500999,"Unknown Error"),
+ JOB_IS_NOT_EXIST(510000, "job is not exist"),
//成功
SUCCESS(200, "success"),
//失败
diff --git a/src/main/java/com/mesa/reportservice/exception/SQSException.java b/src/main/java/com/mesa/reportservice/exception/SQSException.java
index d7db7f2..9fcdb3e 100644
--- a/src/main/java/com/mesa/reportservice/exception/SQSException.java
+++ b/src/main/java/com/mesa/reportservice/exception/SQSException.java
@@ -1,11 +1,3 @@
-/**
-
- *
-
- *
- *
- */
-
package com.mesa.reportservice.exception;
diff --git a/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java b/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java
new file mode 100644
index 0000000..8658bcf
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java
@@ -0,0 +1,38 @@
+package com.mesa.reportservice.exception;
+
+import cn.hutool.log.Log;
+import com.mesa.reportservice.util.R;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+/**
+ * 异常处理器
+ *
+ * @author 86158
+
+ */
+@RestControllerAdvice
+public class SQSExceptionHandler {
+ private Log logger = Log.get();
+
+ /**
+ * 处理自定义异常
+ */
+ @ExceptionHandler(SQSException.class)
+ @ResponseStatus(value= HttpStatus.BAD_REQUEST)
+ public R handleSQSException(SQSException e){
+ R r = new R();
+ r.put("code", e.getCode());
+ r.put("msg", e.getMsg());
+ return r;
+ }
+
+ @ExceptionHandler(Exception.class)
+ @ResponseStatus(value=HttpStatus.INTERNAL_SERVER_ERROR)
+ public R handleException(Exception e){
+ logger.error(e);
+ return R.error().put("msg", e.getMessage());
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/mapper/JobMapper.java b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java
index 4546b12..684f1da 100644
--- a/src/main/java/com/mesa/reportservice/mapper/JobMapper.java
+++ b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java
@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mesa.reportservice.bean.JobEntity;
import org.apache.ibatis.annotations.Mapper;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -24,6 +25,9 @@ public interface JobMapper extends BaseMapper<JobEntity> {
int updateStatue(JobEntity job);
- Map<String,Long> getJobCount(Map<String, Object> map);
+ Map<String,Object> getJobCount(long lastUpdateTime);
+ Map<String,Object> getJobDetail(String jobId);
+
+ List<String> getSlowQuerySql(long lastUpdateTime);
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/JobService.java b/src/main/java/com/mesa/reportservice/service/JobService.java
index 706c75a..2ee50f5 100644
--- a/src/main/java/com/mesa/reportservice/service/JobService.java
+++ b/src/main/java/com/mesa/reportservice/service/JobService.java
@@ -3,6 +3,7 @@ package com.mesa.reportservice.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.util.R;
import java.util.List;
import java.util.Map;
@@ -21,5 +22,7 @@ public interface JobService extends IService<JobEntity> {
int updateProcesses(JobEntity job);
- Map<String,Long> getJobCount();
+ R getJobStatus();
+
+ Map<String,Object> getJobDetail(String jobId);
}
diff --git a/src/main/java/com/mesa/reportservice/service/MonitorService.java b/src/main/java/com/mesa/reportservice/service/MonitorService.java
index 5bccaea..19eeaae 100644
--- a/src/main/java/com/mesa/reportservice/service/MonitorService.java
+++ b/src/main/java/com/mesa/reportservice/service/MonitorService.java
@@ -1,5 +1,6 @@
package com.mesa.reportservice.service;
+
/**
*
* @author lijinyang
@@ -10,6 +11,4 @@ public interface MonitorService {
void addSuccess();
void addFail();
-
- String getJobCount();
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/ZookeeperService.java b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java
index 69f77d2..4b86792 100644
--- a/src/main/java/com/mesa/reportservice/service/ZookeeperService.java
+++ b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java
@@ -4,6 +4,7 @@ package com.mesa.reportservice.service;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
+import java.util.Map;
/**
*
@@ -14,4 +15,6 @@ import java.net.InetAddress;
public interface ZookeeperService {
boolean isMaster();
+
+ Map<String,Object> status() throws Exception;
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java
index 4e514d1..fb5d452 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java
@@ -1,5 +1,6 @@
package com.mesa.reportservice.service.impl;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
@@ -50,6 +51,7 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService {
jobEntity.setEndTime(System.currentTimeMillis()/1000);
jobEntity.setDoneProgress(1.00f);
jobEntity.setIsFailed(1);
+ String resultMessage = jobEntity.getResultMessage();
try {
if (jobEntity.getIsValid() == 0) {
jobEntity.setResultMessage(Constant.CANCEL);
@@ -61,21 +63,21 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService {
jobEntity.setIsFailed(0);
logger.info(MessageFormat.format(Constant.LOG_SUCCESS_SAVE_TO_HBASE,jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()));
} else {
- jobEntity.setResultMessage(Constant.WRITE_DATA_ERROR);
+ jobEntity.setResultMessage(StrUtil.concat(false,Constant.WRITE_DATA_ERROR,StrUtil.COLON,resultMessage));
monitorService.addFail();
msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_HBASE_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
logger.error(msg);
}
} else if (jobEntity.getExecuteStatus() >= Constant.JOB_BAD_REQUEST_THRESHOLD && jobEntity.getExecuteStatus() < Constant.JOB_INTERNAL_SERVER_ERROR_THRESHOLD) {
- jobEntity.setResultMessage(Constant.PARAM_SYNTAX_ERROR);
+ jobEntity.setResultMessage(StrUtil.concat(false,Constant.PARAM_SYNTAX_ERROR,StrUtil.COLON,resultMessage));
msg = MessageFormat.format(SQSCode.PARAM_SYNTAX_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
logger.error(msg);
} else if (jobEntity.getExecuteStatus() == Constant.JOB_SQL_EXECUTION_ERROR_CODE) {
- jobEntity.setResultMessage(Constant.SQL_EXECUTION_ERROR);
+ jobEntity.setResultMessage(StrUtil.concat(false,Constant.SQL_EXECUTION_ERROR,StrUtil.COLON,resultMessage));
msg = MessageFormat.format(SQSCode.TEMPLATE_SQL_EXECUTION_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
logger.error(msg);
} else {
- jobEntity.setResultMessage(Constant.UNKNOWN_ERROR);
+ jobEntity.setResultMessage(StrUtil.concat(false,Constant.UNKNOWN_ERROR,StrUtil.COLON,resultMessage));
msg = MessageFormat.format(SQSCode.UNKNOWN_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql());
logger.error(msg);
}
@@ -88,9 +90,9 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService {
}
while (number != 1 && z >= 0);
} catch (Exception e) {
- jobEntity.setResultMessage(Constant.DATABASE_ERROR);
+ jobEntity.setResultMessage(StrUtil.concat(false,Constant.DATABASE_ERROR,StrUtil.COLON,e.getMessage()));
jobService.updateProcesses(jobEntity);
- msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.toString());
+ msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.getMessage());
logger.error(msg);
} finally {
saveToMonitor(jobEntity);
@@ -122,12 +124,11 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService {
Map map = JSON.parseObject(JSON.toJSONString(listData.get(0)));
long readRows = Long.parseLong(map.get("rows_read").toString());
float elapsed = Float.parseFloat(map.get("elapsed").toString());
- double percent = Double.parseDouble(map.get("percent").toString());
- int process = (int) (percent * 100);
+ float percent = Float.parseFloat(map.get("percent").toString());
jobEntity.setElapsed((int) elapsed);
jobEntity.setRowsRead(readRows);
- if(jobEntity.getDoneProgress()<process){
- jobEntity.setDoneProgress(process);
+ if(jobEntity.getDoneProgress()<percent){
+ jobEntity.setDoneProgress(percent);
}
if (jobEntity.getRowsRead() != 0 || jobEntity.getElapsed() != 0) {
jobService.updateProcesses(jobEntity);
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java
index 594d42b..e6ada8c 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java
@@ -48,6 +48,8 @@ public class ExecuteServiceImpl implements ExecuteService {
if (httpResult != null) {
Map mapresult = JSON.parseObject(httpResult.getBody());
int queryStatus = Integer.parseInt(mapresult.get("status").toString());
+ job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString()));
+ job.setResultMessage(mapresult.get("message").toString());
logger.info("httpCode=" + httpResult.getCode() +" status="+queryStatus);
if (httpResult.getCode() == HttpStatus.OK.value() && queryStatus == HttpStatus.OK.value()) {
k = 0;
@@ -59,11 +61,9 @@ public class ExecuteServiceImpl implements ExecuteService {
job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString()));
job.setResult(httpResult.getBody());
job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString()));
- job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString()));
logger.info("success resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql());
} else {
k = 0;
- job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString()));
logger.error("SQL Execution Error ");
}
} else {
@@ -71,34 +71,34 @@ public class ExecuteServiceImpl implements ExecuteService {
}
} catch (SocketTimeoutException e) {
k--;
- job.setResultMessage(e.toString());
+ job.setResultMessage(e.getMessage());
if (k == 0) {
job.setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT);
- job.setResultMessage(Constant.QUERY_TIME_OUT);
+ job.setResultMessage(Constant.SOCKET_HANG_UP);
logger.info("timeout resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql());
} else {
- logger.info("Socket warn " + e.toString() + "retry time " + (3 - k));
+ logger.info("Socket warn " + e.getMessage() + "retry time " + (3 - k));
}
} catch (ConnectTimeoutException e) {
job.setExecuteStatus(Constant.JOB_ERROR);
- job.setResultMessage(e.toString());
- logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString());
+ job.setResultMessage(e.getMessage());
+ logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.getMessage());
k = 0;
} catch (OutOfMemoryError e) {
job.setExecuteStatus(Constant.JOB_ERROR);
- job.setResultMessage(Constant.RESULT_TOO_LARGE);
- logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.toString());
+ job.setResultMessage(e.getMessage());
+ logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.getMessage());
k = 0;
} catch (Exception e) {
job.setExecuteStatus(Constant.JOB_ERROR);
- job.setResultMessage(e.toString());
- logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString());
+ job.setResultMessage(e.getMessage());
+ logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.getMessage());
k = 0;
}
try {
queryGatewayService.deleteJob(job.getQueryId());
} catch (Exception e) {
- logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.toString());
+ logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.getMessage());
throw new SQSException(SQSCode.KILL_QUERY_ERROR);
}
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java
index 100b93f..affa306 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java
@@ -1,16 +1,28 @@
package com.mesa.reportservice.service.impl;
+import cn.hutool.Hutool;
+import cn.hutool.core.util.ObjectUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.SchedulerProperties;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
import com.mesa.reportservice.mapper.JobMapper;
import com.mesa.reportservice.service.JobService;
+import com.mesa.reportservice.service.QueryGatewayService;
+import com.mesa.reportservice.service.ZookeeperService;
+import com.mesa.reportservice.util.R;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
*
@@ -20,6 +32,12 @@ import java.util.Map;
@Service
public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements JobService {
+ @Autowired
+ private SchedulerProperties schedulerProperties;
+
+ @Autowired
+ private ZookeeperService zookeeperService;
+
@Override
public List<JobEntity> getRunningJob() {
return this.baseMapper.getRunningJob();
@@ -40,12 +58,45 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements
}
@Override
- public Map<String,Long> getJobCount(){
- LocalDate currentDate = LocalDate.now();
- long lastUpdateTime = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
- Map map = new HashMap<String,Object>();
- map.put("lastUpdateTime", lastUpdateTime);
- return this.baseMapper.getJobCount(map);
+ public R getJobStatus() {
+ Map<String,Object> numMap = getJobCount();
+ Map<String, Object> status = null;
+ try {
+ status = zookeeperService.status();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ R r = new R();
+ r.put("data",numMap);
+ r.putAll(status);
+ return r;
+ }
+
+ private Map<String,Object> getJobCount(){
+ long lastUpdateTime = System.currentTimeMillis()/1000 - convertDaysToSeconds(1);
+ Map<String, Object> jobCount = this.baseMapper.getJobCount(lastUpdateTime);
+ LambdaQueryWrapper<JobEntity> failed = new LambdaQueryWrapper<>();
+ failed.select(JobEntity::getJobId).eq(JobEntity::getIsFailed,1).ge(JobEntity::getLastUpdateTime,lastUpdateTime);
+ List<String> failedJobIdList = this.baseMapper.selectObjs(failed).stream().map(o -> (String) o).collect(Collectors.toList());
+ List<String> slowJobIdList = this.baseMapper.getSlowQuerySql(lastUpdateTime);
+ Map<String, Object> jobs = new HashMap<>();
+ jobs.put("running",schedulerProperties.getMapResult());
+ jobs.put("failed",failedJobIdList);
+ jobs.put("slow_query",slowJobIdList);
+ jobCount.put("jobs",jobs);
+ return jobCount;
}
+ @Override
+ public Map<String,Object> getJobDetail(String jobId){
+ Map<String,Object> jobState = this.baseMapper.getJobDetail(jobId);
+ if (ObjectUtil.isEmpty(jobState)) {
+ throw new SQSException(SQSCode.JOB_IS_NOT_EXIST);
+ }
+ return jobState;
+ }
+
+ private long convertDaysToSeconds(Integer days) {
+ return 3600L*24*days;
+ }
} \ No newline at end of file
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
index 0e57d84..3120fca 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java
@@ -1,11 +1,10 @@
package com.mesa.reportservice.service.impl;
-import com.alibaba.fastjson.JSONArray;
-import com.mesa.reportservice.bean.MonitorEntity;
import com.mesa.reportservice.configuration.SchedulerProperties;
import com.mesa.reportservice.service.JobService;
import com.mesa.reportservice.service.MonitorService;
import com.mesa.reportservice.service.ZookeeperService;
+import com.mesa.reportservice.util.R;
import io.micrometer.core.instrument.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -22,15 +21,6 @@ import java.util.Map;
public class MonitorServiceImpl implements MonitorService {
@Autowired
- private JobService mysqlService;
-
- @Autowired
- private ZookeeperService zookeeperService;
-
- @Autowired
- private SchedulerProperties schedulerProperties;
-
- @Autowired
private MeterRegistry meterRegistry;
private Counter counterReportSuccess;
@@ -52,24 +42,4 @@ public class MonitorServiceImpl implements MonitorService {
public void addFail() {
counterReportFail.increment();
}
-
- @Override
- public String getJobCount() {
- String json="";
- MonitorEntity monitorEntity = new MonitorEntity();
- Map<String,Long> numMap = mysqlService.getJobCount();
- monitorEntity.setQueueJobNum(numMap.get("queueNum"));
- monitorEntity.setExecutingJobNum(numMap.get("executingNum"));
- monitorEntity.setTodaySuccessJobNum(numMap.get("todaySuccessNum"));
- monitorEntity.setTodayErrorJobNum(numMap.get("todayErrorNum"));
- monitorEntity.setJobList(schedulerProperties.getMapResult());
- if(zookeeperService.isMaster()){
- monitorEntity.setStatus("active");
- }else{
- monitorEntity.setStatus("standby");
- }
- Object obj = JSONArray.toJSON(monitorEntity);
- json = obj.toString();
- return json;
- }
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java
index c43981b..3051690 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java
@@ -1,5 +1,6 @@
package com.mesa.reportservice.service.impl;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.mesa.reportservice.bean.JobEntity;
@@ -71,8 +72,8 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
}
}
} catch (Exception e) {
- logger.error(e.toString());
- throw new SQSException(e.toString());
+ logger.error(e.getMessage());
+ throw new SQSException(e.getMessage());
}
}
@@ -83,18 +84,11 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
String sql = jobEntity.getQuerySql().trim();
String queryId = queryGatewayService.getQueryId(sql);
jobEntity.setQueryId(queryId);
- if (jobEntity.getIsValid() == 0) {
+ if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId()) && jobEntity.getIsValid() == 0) {
executeProcessService.killQuery(jobEntity);
schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0);
- } else if (!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) {
- executeProcessService.reSet(jobEntity);
}
- if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) {
- if (jobEntity.getIsValid() == 0) {
- executeProcessService.killQuery(jobEntity);
- schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0);
- }
- } else {
+ if(!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) {
executeProcessService.reSet(jobEntity);
}
}
@@ -104,10 +98,10 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
//遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态
for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) {
logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
- long currentTime = System.currentTimeMillis();
+ long currentTime = convertToSeconds(System.currentTimeMillis());
long executeTime = currentTime - entry.getValue().getStartTime();
- logger.info("execute time=" + executeTime + "ttlTime=" + httpClientPool.getSocketTimeout());
- if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > httpClientPool.getSocketTimeout()) {
+ logger.info("execute time=" + executeTime + "ttlTime=" + convertToSeconds(httpClientPool.getSocketTimeout()));
+ if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > convertToSeconds(httpClientPool.getSocketTimeout())) {
entry.getValue().setState(JobStates.DONE.getValue());
entry.getValue().setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT);
executeProcessService.killQuery(entry.getValue());
@@ -128,7 +122,7 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
List<JobEntity> jobs = jobService.getJobTask(rows);
for (JobEntity job : jobs) {
logger.info(Constant.LOG_START_EXECUTING_TASKS);
- long startTime = System.currentTimeMillis()/1000;
+ long startTime = convertToSeconds(System.currentTimeMillis());
job.setStartTime(startTime);
String sql = job.getQuerySql().trim();
job.setQuerySql(sql);
@@ -145,10 +139,10 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
if (queryId.equals("") ) {
job.setExecuteStatus(0);
job.setState(JobStates.DONE.getValue());
- job.setEndTime(System.currentTimeMillis()/1000);
+ job.setEndTime(convertToSeconds(System.currentTimeMillis()));
job.setDoneProgress(1.00f);
job.setIsFailed(1);
- job.setResultMessage(Constant.UNKNOWN_ERROR);
+ job.setResultMessage(StrUtil.concat(false,Constant.UNKNOWN_ERROR,": queryId is null"));
}
if ((JobStates.RUNNING.getValue()).equals(job.getState())) {
if (jobService.updateProcesses(job) != 0) {
@@ -161,15 +155,12 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
});
} else {
logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg());
- throw new SQSException(SQSCode.UPDATE_STATUS_ERROR);
}
} else {
if (jobService.updateProcesses(job) != 0) {
logger.error(SQSCode.TASK_CANNOT_EXECUTED.getMsg());
- throw new SQSException(SQSCode.TASK_CANNOT_EXECUTED);
} else {
logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg());
- throw new SQSException(SQSCode.UPDATE_STATUS_ERROR);
}
}
}
@@ -177,4 +168,9 @@ public class ScheduledResultServiceImpl implements ScheduledResultService {
logger.info(Constant.LOG_NO_PENDING_TASKS);
}
}
+
+ private long convertToSeconds(long millis) {
+ return millis/1000;
+ }
+
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java
index dd2c267..123235d 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java
@@ -6,12 +6,17 @@ import com.mesa.reportservice.configuration.SchedulerProperties;
import com.mesa.reportservice.configuration.ZookeeperProperties;
import com.mesa.reportservice.service.ZookeeperService;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
*
@@ -68,6 +73,26 @@ public class ZookeeperServiceImpl implements ZookeeperService {
return false;
}
}
+
+ @Override
+ public Map<String,Object> status() throws Exception {
+ Map<String, Object> status = new HashMap<>();
+ status.put("status","UP");
+ if (zookeeperProperties.getOpen() == 0) {
+ List<String> children = curatorConnect.getChildren().forPath("/");
+ boolean isDown = true;
+ for (String child : children) {
+ Stat stat = curatorConnect.checkExists().forPath("/" + child);
+ if (stat != null) {
+ isDown = false;
+ }
+ }
+ if(isDown){
+ status.put("status","DOWN");
+ }
+ }
+ return status;
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/util/Constant.java b/src/main/java/com/mesa/reportservice/util/Constant.java
index 91b4fd6..16662ae 100644
--- a/src/main/java/com/mesa/reportservice/util/Constant.java
+++ b/src/main/java/com/mesa/reportservice/util/Constant.java
@@ -19,7 +19,7 @@ public class Constant {
public static final String UNKNOWN_ERROR = "Unknown Error";
public static final String DATABASE_ERROR = "Database Error";
public static final String RE_EXECUTION = "Re Execution";
- public static final String QUERY_TIME_OUT = "SQL Execution Error execute query time out";
+ public static final String SOCKET_HANG_UP = "SQL Execution Error socket hang up";
public static final String RESULT_TOO_LARGE = "Result Too Large";
/**
@@ -44,7 +44,7 @@ public class Constant {
* URL
*/
public static final String URL_GET_QUERY_ID = "{0}/v1/sql/query/query_id?query=";
- public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?originalSQL=";
+ public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?option=long_term&originalSQL=";
public static final String URL_QUERY_FOR_PROCESS = "{0}/v1/sql/query/{1}/progress";
public static final String URL_QUERY_FOR_CANCEL = "{0}/v1/sql/query/{1}";
}
diff --git a/src/main/java/com/mesa/reportservice/util/R.java b/src/main/java/com/mesa/reportservice/util/R.java
new file mode 100644
index 0000000..0bafdb2
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/R.java
@@ -0,0 +1,93 @@
+package com.mesa.reportservice.util;
+
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * 返回数据
+ *
+ * 错误码、错误内容统一在枚举类SQSCode中定义, 错误码格式见SQSCode注释,错误码内容必须用英文,作为国际化的code
+ * 自定义的错误类型必须加注释
+ */
+public class R extends HashMap<String, Object> {
+ private static final long serialVersionUID = 1L;
+
+ public R() {
+ put("code", SQSCode.SUCCESS.getCode());
+ put("msg", SQSCode.SUCCESS.getMsg());
+ put("time", new Date());
+ }
+
+ public static R error() {
+ return error(SQSCode.ERROR.getCode(), SQSCode.ERROR.getMsg());
+ }
+
+ public static R error(SQSCode SQSCode) {
+ R r = new R();
+ r.put("code", SQSCode.getCode());
+ r.put("msg", SQSCode.getMsg());
+ r.put("time", new Date());
+ return r;
+ }
+
+ public static R error(Integer code, String msg) {
+ R r = new R();
+ r.put("code", code);
+ r.put("msg", msg);
+ r.put("time", new Date());
+ return r;
+ }
+
+ public static R ok(String msg) {
+ R r = new R();
+ r.put("msg", msg);
+ r.put("time", new Date());
+ return r;
+ }
+
+ public static R ok() {
+ return new R();
+ }
+
+ public static R ok(Object data) {
+ R r = new R();
+ r.put("data", data);
+ r.put("time", new Date());
+ return r;
+ }
+
+ @Override
+ public R put(String key, Object value) {
+ super.put(key, value);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public R putData(String key,Object value) {
+ Object data = super.getOrDefault("data", new LinkedHashMap<String, Object>());
+ if( !(data instanceof Map)) {
+ throw new SQSException("data put error");
+ }
+ ((Map<String, Object>)data).put(key, value);
+ super.put("data", data);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public R putDataAll(Map<String,Object> dataAll) {
+ Object data = super.getOrDefault("data", new LinkedHashMap<String, Object>());
+ if( !(data instanceof Map)) {
+ throw new SQSException("data putAll error");
+ }
+ ((Map<String, Object>)data).putAll(dataAll);;
+ super.put("data", data);
+ return this;
+ }
+}
+
diff --git a/src/main/resources/mappers/JobMapper.xml b/src/main/resources/mappers/JobMapper.xml
index 9017d60..f36b683 100644
--- a/src/main/resources/mappers/JobMapper.xml
+++ b/src/main/resources/mappers/JobMapper.xml
@@ -38,6 +38,12 @@
where state = 'PENDING' and is_valid = 1 order by generated_time limit ${rows}
</select>
+ <select id="getSlowQuerySql" resultType="java.lang.String" parameterType="long">
+ select job_id
+ from saved_query_job
+ where state = 'DONE' and is_failed = 0 and elapsed > 300000 and last_update_time >= #{lastUpdateTime}
+ </select>
+
<update id="updateProcesses" parameterType="com.mesa.reportservice.bean.JobEntity" >
update saved_query_job
<set >
@@ -81,10 +87,6 @@
where job_id = #{jobId,jdbcType=VARCHAR}
</update>
-
-
-
-
<update id="updateStatue" parameterType="com.mesa.reportservice.bean.JobEntity" >
update saved_query_job
<set >
@@ -107,11 +109,18 @@
where job_id = #{jobId,jdbcType=VARCHAR}
</update>
+ <select id="getJobCount" resultType="map" parameterType="long">
+ SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as pending_count,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as running_count,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as done_count,
+ (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as failed_count,
+ (SELECT COUNT(1) FROM saved_query_job where is_valid = 0 and last_update_time > #{lastUpdateTime} ) as canceled_count,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and elapsed > 300000 and last_update_time >= #{lastUpdateTime}) as slow_query_count
+ </select>
- <select id="getJobCount" resultType="map" parameterType="hashmap">
- SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum,
- (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as executingNum,
- (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum,
- (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum
+ <select id="getJobDetail" resultType="map" parameterType="String">
+ SELECT
+ <include refid="Base_Column_List" />
+ FROM saved_query_job WHERE job_id = #{jobId}
</select>
</mapper> \ No newline at end of file
diff --git a/src/main/resources/sql/monitorInit.sql b/src/main/resources/sql/monitorInit.sql
new file mode 100644
index 0000000..978e1fd
--- /dev/null
+++ b/src/main/resources/sql/monitorInit.sql
@@ -0,0 +1 @@
+REPLACE INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('test127897d819062beafc8db85b9ece', 'SELECT 1 FROM session_record LIMIT 1', 'DONE', 1.00, 0, 'OK', 1, 1, 9, 9, 1, 1, 1710836940, 1710836955, 1710836956, 1706607272); \ No newline at end of file
diff --git a/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java b/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java
new file mode 100644
index 0000000..2ed989f
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java
@@ -0,0 +1,23 @@
+package com.mesa.reportservice.bean;
+
+import com.mesa.ReportServiceApplicationTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+
+import static org.junit.Assert.*;
+
+/**
+ * @Auther: lijinyang
+ * @Date: 2024/03/12/17:24
+ */
+@EnableAutoConfiguration
+public class JobEntityTest extends ReportServiceApplicationTests {
+
+ @Test
+ public void testClone() {
+ JobEntity job = this.jsonToInParameter("parameters/executeTest.json", "executeJob", JobEntity.class);
+ Object clone = job.clone();
+ Assert.assertTrue(clone instanceof JobEntity);
+ }
+} \ No newline at end of file
diff --git a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java b/src/test/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImplTest.java
index 9e5b56e..68f9f53 100644
--- a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImplTest.java
@@ -1,4 +1,4 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.bean.JobEntity;
@@ -18,69 +18,71 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
* @Date: 2024/01/08/16:33
*/
@EnableAutoConfiguration
-public class ExecuteProcessTest extends ReportServiceApplicationTests {
+public class ExecuteProcessServiceImplTest extends ReportServiceApplicationTests {
@Autowired
private ExecuteProcessService executeProcessService;
@Test
- public void testReSet() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class);
- executeProcessService.reSet(job);
- Assert.assertEquals(JobStates.PENDING.getValue(),job.getState());
- }
+ public void updateResultMessage() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class);
+ executeProcessService.updateResultMessage(job);
+ Assert.assertEquals(Constant.OK,job.getResultMessage());
- @Test
- public void testUpdateResultMessageCancel() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class);
+ job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class);
executeProcessService.updateResultMessage(job);
Assert.assertEquals(Constant.CANCEL,job.getResultMessage());
- }
- @Test
- public void testUpdateResultMessageOk() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class);
- executeProcessService.updateResultMessage(job);
- Assert.assertEquals(Constant.OK,job.getResultMessage());
- }
+ job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class);
+ try {
+ executeProcessService.updateResultMessage(job);
+ } catch (SQSException e) {
+ Assert.assertEquals(Constant.DATABASE_ERROR,job.getResultMessage());
+ }
- @Test
- public void testUpdateResultMessageDataError() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class);
+ job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class);
try {
executeProcessService.updateResultMessage(job);
} catch (SQSException e) {
- Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ Assert.assertEquals(Constant.PARAM_SYNTAX_ERROR,job.getResultMessage());
}
- }
- @Test
- public void testUpdateResultMessageParamError() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class);
+ job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class);
try {
executeProcessService.updateResultMessage(job);
} catch (SQSException e) {
- Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ Assert.assertEquals(Constant.JOB_ERROR,job.getResultMessage());
}
- }
- @Test
- public void testUpdateResultMessageSQLError() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class);
+ job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class);
try {
executeProcessService.updateResultMessage(job);
} catch (SQSException e) {
- Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ Assert.assertEquals(Constant.UNKNOWN_ERROR,job.getResultMessage());
}
}
@Test
- public void testUpdateResultMessageUnknownError() {
- JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class);
- try {
- executeProcessService.updateResultMessage(job);
+ public void reSet() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class);
+ executeProcessService.reSet(job);
+ Assert.assertEquals(JobStates.PENDING.getValue(),job.getState());
+ }
+
+ @Test
+ public void killQuery() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class);
+ executeProcessService.killQuery(job);
+ Assert.assertTrue(job.getIsValid()==0);
+ }
+
+ @Test
+ public void updateProcessMessage() {
+ JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "updateProcessJob", JobEntity.class);
+ try{
+ executeProcessService.updateProcessMessage(job);
} catch (SQSException e) {
- Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode());
+ Assert.assertEquals(SQSCode.RESPONSE_DATA_ISNULL.getMsg(),e.getMsg());
}
}
}
diff --git a/src/test/java/com/mesa/reportservice/ExecuteTest.java b/src/test/java/com/mesa/reportservice/service/impl/ExecuteServiceImplTest.java
index e30293a..3bd7483 100644
--- a/src/test/java/com/mesa/reportservice/ExecuteTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/ExecuteServiceImplTest.java
@@ -1,4 +1,4 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.bean.JobEntity;
@@ -15,7 +15,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
* @Date: 2024/01/10/15:57
*/
@EnableAutoConfiguration
-public class ExecuteTest extends ReportServiceApplicationTests {
+public class ExecuteServiceImplTest extends ReportServiceApplicationTests {
@Autowired
private ExecuteService executeService;
diff --git a/src/test/java/com/mesa/reportservice/HBaseTest.java b/src/test/java/com/mesa/reportservice/service/impl/HBaseServiceImplTest.java
index 2392c10..84d8372 100644
--- a/src/test/java/com/mesa/reportservice/HBaseTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/HBaseServiceImplTest.java
@@ -1,4 +1,4 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.bean.JobEntity;
@@ -14,7 +14,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
* @Date: 2024/01/10/14:48
*/
@EnableAutoConfiguration
-public class HBaseTest extends ReportServiceApplicationTests {
+public class HBaseServiceImplTest extends ReportServiceApplicationTests {
@Autowired
private HBaseService hBaseService;
diff --git a/src/test/java/com/mesa/reportservice/JobTest.java b/src/test/java/com/mesa/reportservice/service/impl/JobServiceImplTest.java
index e69aaf6..9bbfeb9 100644
--- a/src/test/java/com/mesa/reportservice/JobTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/JobServiceImplTest.java
@@ -1,9 +1,13 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
+import cn.hutool.core.util.ObjectUtil;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.enums.JobStates;
+import com.mesa.reportservice.exception.SQSCode;
+import com.mesa.reportservice.exception.SQSException;
import com.mesa.reportservice.service.JobService;
+import com.mesa.reportservice.util.R;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,7 +21,7 @@ import java.util.Map;
* @Date: 2024/01/08/17:22
*/
@EnableAutoConfiguration
-public class JobTest extends ReportServiceApplicationTests {
+public class JobServiceImplTest extends ReportServiceApplicationTests {
private static final int rows = 10;
@@ -41,22 +45,36 @@ public class JobTest extends ReportServiceApplicationTests {
}
@Test
- public void testGetJobCount() {
- Map<String, Long> jobCount = jobService.getJobCount();
- Assert.assertNotNull(jobCount);
+ public void testGetJobStatus() {
+ R jobStatus = jobService.getJobStatus();
+ Assert.assertNotNull(jobStatus);
}
@Test
public void testUpdateProcessesSuccess() {
- JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesSuccessJob", JobEntity.class);
+ JobEntity job = this.jsonToInParameter("parameters/jobTest.json", "updateProcessesSuccessJob", JobEntity.class);
int i = jobService.updateProcesses(job);
Assert.assertEquals(updateProcessesSuccess,i);
}
@Test
public void testUpdateProcessesError() {
- JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesErrorJob", JobEntity.class);
+ JobEntity job = this.jsonToInParameter("parameters/jobTest.json", "updateProcessesErrorJob", JobEntity.class);
int i = jobService.updateProcesses(job);
Assert.assertNotEquals(updateProcessesSuccess,i);
}
+
+ @Test
+ public void testGetJobState() {
+ String successId = this.jsonToInParameter("parameters/jobTest.json", "getJobDetailSuccessId", String.class);
+ Map<String, Object> success = jobService.getJobDetail(successId);
+ Assert.assertTrue(ObjectUtil.isNotEmpty(success));
+
+ String errorId = this.jsonToInParameter("parameters/jobTest.json", "getJobDetailErrorId", String.class);
+ try {
+ jobService.getJobDetail(errorId);
+ } catch (SQSException e) {
+ Assert.assertTrue(e.getCode()==SQSCode.JOB_IS_NOT_EXIST.getCode());
+ }
+ }
}
diff --git a/src/test/java/com/mesa/reportservice/MonitorTest.java b/src/test/java/com/mesa/reportservice/service/impl/MonitorServiceImplTest.java
index f8cf701..5b01fba 100644
--- a/src/test/java/com/mesa/reportservice/MonitorTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/MonitorServiceImplTest.java
@@ -1,4 +1,4 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
import com.mesa.ReportServiceApplicationTests;
import org.junit.Test;
@@ -16,14 +16,14 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
*/
@EnableAutoConfiguration
@AutoConfigureMockMvc
-public class MonitorTest extends ReportServiceApplicationTests {
+public class MonitorServiceImplTest extends ReportServiceApplicationTests {
@Autowired
private MockMvc mockMvc;
@Test
public void testGetReportStatus() throws Exception{
- mockMvc.perform(get("/monitor"))
+ mockMvc.perform(get("/v1/monitor"))
.andExpect(status().isOk());
}
}
diff --git a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java b/src/test/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImplTest.java
index 193b925..aad4e29 100644
--- a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImplTest.java
@@ -1,7 +1,10 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.exception.SQSException;
import com.mesa.reportservice.service.QueryGatewayService;
import org.junit.Assert;
import org.junit.Test;
@@ -13,15 +16,19 @@ import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static com.mesa.reportservice.exception.SQSCode.ERROR;
+
/**
* @Auther: lijinyang
* @Date: 2024/01/04/17:05
*/
@EnableAutoConfiguration
-public class QueryGatewayTest extends ReportServiceApplicationTests {
+public class QueryGatewayServiceImplTest extends ReportServiceApplicationTests {
private static final String QUERY_ID_PATTERN = "^[a-f0-9]{32}:[a-f0-9]{32}$";
+ private static final Log logger = LogFactory.get();
+
@Autowired
private QueryGatewayService queryGatewayService;
@@ -93,4 +100,36 @@ public class QueryGatewayTest extends ReportServiceApplicationTests {
httpResult = queryGatewayService.deleteJob(queryId);
Assert.assertTrue(httpResult.getCode() == HttpStatus.NOT_FOUND.value());
}
+
+ @Test
+ public void highLatencyQuerySuccess() throws IOException {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "highLatencyQuerySuccess", String.class);
+ HttpResult httpResult = queryGatewayService.queryJob(sql);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value());
+ }
+
+ @Test
+ public void highLatencyQueryCancel() throws IOException {
+ try {
+ String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "highLatencyQueryCancel", String.class);
+ String queryId = queryGatewayService.getQueryId(sql);
+ Thread thread = new Thread(() -> {
+ try {
+ HttpResult result = queryGatewayService.queryJob(sql);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.sleep(5000);
+ HttpResult process = queryGatewayService.queryJobProcess(queryId);
+ logger.info(process.getBody());
+ Assert.assertTrue(process.getBody().contains("elapsed"));
+ HttpResult httpResult = queryGatewayService.deleteJob(queryId);
+ Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value());
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/src/test/java/com/mesa/reportservice/zookeeperTest.java b/src/test/java/com/mesa/reportservice/service/impl/ZookeeperServiceImplTest.java
index 5370afb..32914ad 100644
--- a/src/test/java/com/mesa/reportservice/zookeeperTest.java
+++ b/src/test/java/com/mesa/reportservice/service/impl/ZookeeperServiceImplTest.java
@@ -1,4 +1,4 @@
-package com.mesa.reportservice;
+package com.mesa.reportservice.service.impl;
import com.mesa.ReportServiceApplicationTests;
import com.mesa.reportservice.service.ZookeeperService;
@@ -12,7 +12,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
* @Date: 2024/01/10/14:20
*/
@EnableAutoConfiguration
-public class zookeeperTest extends ReportServiceApplicationTests {
+public class ZookeeperServiceImplTest extends ReportServiceApplicationTests {
@Autowired
private ZookeeperService zookeeperService;
diff --git a/src/test/resources/parameters/executeProcessTest.json b/src/test/resources/parameters/executeProcessTest.json
index 8a3d4a3..ed8b2d3 100644
--- a/src/test/resources/parameters/executeProcessTest.json
+++ b/src/test/resources/parameters/executeProcessTest.json
@@ -166,5 +166,29 @@
"result": "",
"memoryUsage": "",
"queryDurationMs": ""
+ },
+ "updateProcessJob": {
+ "jobId": "014f9d6e1f17227ce8d837003bc3bedb",
+ "querySql": "SELECT AVG(sent_bytes) AS \"Bytes Sent\", AVG(received_bytes) AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(start_timestamp_ms, 'PT5M', 'zero')) AS \"Start Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-18T13:18:01Z') AND recv_time < UNIX_TIMESTAMP('2023-12-19T13:18:01Z') AND security_event.vsys_id IN (1) GROUP BY \"Start Time\" ORDER BY \"Start Time\" ASC",
+ "state": "RUNNING",
+ "resultName": "",
+ "doneProgress": 0,
+ "isFailed": 0,
+ "resultMessage": "",
+ "elapsed": 1,
+ "rowsRead": 100,
+ "bytesRead": 100,
+ "resultRows": 1,
+ "resultBytes": 100,
+ "isValid": 1,
+ "startTime": 1702991925,
+ "endTime": 1703399430,
+ "lastUpdateTime": 1702991940,
+ "generatedTime": 1702991921,
+ "queryId": "",
+ "executeStatus": 200,
+ "result": "",
+ "memoryUsage": "",
+ "queryDurationMs": ""
}
}
diff --git a/src/test/resources/parameters/mysqlTest.json b/src/test/resources/parameters/jobTest.json
index 7ed1831..04f32b9 100644
--- a/src/test/resources/parameters/mysqlTest.json
+++ b/src/test/resources/parameters/jobTest.json
@@ -46,5 +46,7 @@
"result": "",
"memoryUsage": "",
"queryDurationMs": ""
- }
+ },
+ "getJobDetailSuccessId": "00838b9d94552afecc6fa29cde033714",
+ "getJobDetailErrorId": "00838b9d94552afecc6fa29cde033715"
} \ No newline at end of file
diff --git a/src/test/resources/parameters/queryGatewayTest.json b/src/test/resources/parameters/queryGatewayTest.json
index fc36ed0..167efc4 100644
--- a/src/test/resources/parameters/queryGatewayTest.json
+++ b/src/test/resources/parameters/queryGatewayTest.json
@@ -2,5 +2,7 @@
"querySqlSuccess": "select 1 from session_record limit 1",
"querySqlError": "select 1 from session_record_cn limit 1",
"queryForProcessIdError": "",
- "queryForCancelIdError": "1:1"
+ "queryForCancelIdError": "1:1",
+ "highLatencyQuerySuccess": "SELECT sleepEachRow(3) FROM tables_cluster LIMIT 150",
+ "highLatencyQueryCancel": "SELECT sleepEachRow(3) FROM tables_cluster LIMIT 150"
}