diff options
37 files changed, 527 insertions, 196 deletions
diff --git a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml index 5c910d8..91d5309 100644 --- a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml +++ b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml @@ -62,7 +62,9 @@ spring: username: root #mariadb的密码 password: galaxy2019 - +#初始化数据库表和数据 + initialization-mode: always + schema: classpath:sql/monitorInit.sql #以下配置不需要更改通常 name: druidDataSource type: com.alibaba.druid.pool.DruidDataSource diff --git a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml index 97f8407..cdeb3b0 100644 --- a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml +++ b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml @@ -62,7 +62,9 @@ spring: username: root #mariadb的密码 password: galaxy2019 - +#初始化数据库表和数据 + initialization-mode: always + schema: classpath:sql/monitorInit.sql #以下配置不需要更改通常 name: druidDataSource type: com.alibaba.druid.pool.DruidDataSource diff --git a/docs/release/release-2.0.0.md b/docs/release/release-2.0.0.md new file mode 100644 index 0000000..c820873 --- /dev/null +++ b/docs/release/release-2.0.0.md @@ -0,0 +1,3 @@ +Release 2.0.0 (TSG-24.02) +* 修改Dockerfile,更新默认路径为/opt/saved-query-scheduler(GAL-502) +* 版本号按年命名修改为“主版本号.子版本号.修正版本号”
\ No newline at end of file diff --git a/docs/release/release-3.0.0.md b/docs/release/release-3.0.0.md new file mode 100644 index 0000000..21f6c41 --- /dev/null +++ b/docs/release/release-3.0.0.md @@ -0,0 +1,5 @@ +Release 3.0.0 (TSG-24.03) +* 报告服务核心功能单元测试编写(GAL-466) +* 统一变量及函数命名(GAL-467) +* 报告服务函数、常量的抽取及封装(GAL-468) +* 集成sonarQube服务,Job执行异常信息封装(GAL-480)
\ No newline at end of file diff --git a/docs/release/release-3.0.1.md b/docs/release/release-3.0.1.md new file mode 100644 index 0000000..fef03c9 --- /dev/null +++ b/docs/release/release-3.0.1.md @@ -0,0 +1,6 @@ +Release 3.0.1 (TSG-24.03) +* 修复Saved Query Scheduler代码检查异常(GAL-513) +* Saved Query Scheduler job状态信息获取接口开发(GAL-518) +* 优化saved-query-scheduler job信息检测API(GAL-522) +* 优化异常信息描述 +* 修复done_progress字段在job执行期间不更新问题
\ No newline at end of file diff --git a/docs/release/release-3.0.2.md b/docs/release/release-3.0.2.md new file mode 100644 index 0000000..48e1510 --- /dev/null +++ b/docs/release/release-3.0.2.md @@ -0,0 +1,4 @@ +Release 3.0.2 (TSG-24.07) +* 优化异常信息描述 +* 修复done_progress字段在job执行期间不更新问题 +* Saved Query Scheduler健康检查接口增加慢查询指标(GAL-608)
\ No newline at end of file @@ -45,6 +45,11 @@ <dependencies> <dependency> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <version>0.8.11</version> + </dependency> + <dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot-starter</artifactId> <version>2.1.1</version> @@ -291,7 +296,13 @@ </execution> </executions> </plugin> - + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> <plugin> <groupId>com.mesalab.xjar-maven-plugin</groupId> <artifactId>mesalab-xjar-maven-plugin</artifactId> @@ -358,7 +369,26 @@ </configuration> </plugin> - + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + <version>0.8.11</version> + <configuration > + <destFile>${project.build.directory}/jacoco.exec</destFile> + <dataFile>${project.build.directory}/jacoco.exec</dataFile> + <includes> + <include>**/service/impl/**</include> + </includes> + </configuration> + <executions> + <execution> + <goals> + <goal>prepare-agent</goal> + <goal>report</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> diff --git a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java deleted file mode 100644 index cfa761e..0000000 --- a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.mesa.reportservice.bean; - -import lombok.Data; - -import java.util.Map; - -/** - * - * @author lijinyang - * @date 2024/1/24 - */ -@Data -public class MonitorEntity { - - private Long queueJobNum; - - private Long executingJobNum; - - private Long todaySuccessJobNum; - - private Long todayErrorJobNum; - - private Map<String,JobEntity> jobList ; - - private String status; - - public MonitorEntity() { - } -} diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java index 8d5f8c7..c99b8a5 100644 --- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java +++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java @@ -1,11 +1,16 @@ package com.mesa.reportservice.controller; +import com.mesa.reportservice.service.JobService; import com.mesa.reportservice.service.MonitorService; +import com.mesa.reportservice.util.R; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.Map; /** * @@ -14,13 +19,20 @@ import org.springframework.web.bind.annotation.RestController; */ @RestController @Component +@RequestMapping("/v1/monitor") public class MonitorController { @Autowired - private MonitorService monitorService; + private JobService jobService; - @GetMapping(value = "/monitor") - public String getJobCount() { - return monitorService.getJobCount(); + @GetMapping + public R status() { + return jobService.getJobStatus(); + } + + @GetMapping(value = "/job/{jobId}") + public R jobDetail(@PathVariable String jobId) { + Map<String, Object> getJobDetail = jobService.getJobDetail(jobId); + return R.ok(getJobDetail); } } diff --git a/src/main/java/com/mesa/reportservice/exception/SQSCode.java b/src/main/java/com/mesa/reportservice/exception/SQSCode.java index d20f28b..1961f7a 100644 --- a/src/main/java/com/mesa/reportservice/exception/SQSCode.java +++ b/src/main/java/com/mesa/reportservice/exception/SQSCode.java @@ -28,9 +28,10 @@ public enum SQSCode { KILL_QUERY_ERROR(500008,"Kill Query Error"), TEMPLATE_UNKNOWN_ERROR(500009,"Unknown Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"), UPDATE_STATUS_ERROR(500010,"failed to update database status"), - TASK_CANNOT_EXECUTED(500010,"task cannot be executed"), + TASK_CANNOT_EXECUTED(500011,"task cannot be executed"), UNKNOWN_ERROR(500999,"Unknown Error"), + JOB_IS_NOT_EXIST(510000, "job is not exist"), //成功 SUCCESS(200, "success"), //失败 diff --git a/src/main/java/com/mesa/reportservice/exception/SQSException.java b/src/main/java/com/mesa/reportservice/exception/SQSException.java index d7db7f2..9fcdb3e 100644 --- a/src/main/java/com/mesa/reportservice/exception/SQSException.java +++ b/src/main/java/com/mesa/reportservice/exception/SQSException.java @@ -1,11 +1,3 @@ -/** - - * - - * - * - */ - package com.mesa.reportservice.exception; diff --git a/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java b/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java new file mode 100644 index 0000000..8658bcf --- /dev/null +++ b/src/main/java/com/mesa/reportservice/exception/SQSExceptionHandler.java @@ -0,0 +1,38 @@ +package com.mesa.reportservice.exception; + +import cn.hutool.log.Log; +import com.mesa.reportservice.util.R; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +/** + * 异常处理器 + * + * @author 86158 + + */ +@RestControllerAdvice +public class SQSExceptionHandler { + private Log logger = Log.get(); + + /** + * 处理自定义异常 + */ + @ExceptionHandler(SQSException.class) + @ResponseStatus(value= HttpStatus.BAD_REQUEST) + public R handleSQSException(SQSException e){ + R r = new R(); + r.put("code", e.getCode()); + r.put("msg", e.getMsg()); + return r; + } + + @ExceptionHandler(Exception.class) + @ResponseStatus(value=HttpStatus.INTERNAL_SERVER_ERROR) + public R handleException(Exception e){ + logger.error(e); + return R.error().put("msg", e.getMessage()); + } +} diff --git a/src/main/java/com/mesa/reportservice/mapper/JobMapper.java b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java index 4546b12..684f1da 100644 --- a/src/main/java/com/mesa/reportservice/mapper/JobMapper.java +++ b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.mesa.reportservice.bean.JobEntity; import org.apache.ibatis.annotations.Mapper; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -24,6 +25,9 @@ public interface JobMapper extends BaseMapper<JobEntity> { int updateStatue(JobEntity job); - Map<String,Long> getJobCount(Map<String, Object> map); + Map<String,Object> getJobCount(long lastUpdateTime); + Map<String,Object> getJobDetail(String jobId); + + List<String> getSlowQuerySql(long lastUpdateTime); }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/JobService.java b/src/main/java/com/mesa/reportservice/service/JobService.java index 706c75a..2ee50f5 100644 --- a/src/main/java/com/mesa/reportservice/service/JobService.java +++ b/src/main/java/com/mesa/reportservice/service/JobService.java @@ -3,6 +3,7 @@ package com.mesa.reportservice.service; import com.baomidou.mybatisplus.extension.service.IService; import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.util.R; import java.util.List; import java.util.Map; @@ -21,5 +22,7 @@ public interface JobService extends IService<JobEntity> { int updateProcesses(JobEntity job); - Map<String,Long> getJobCount(); + R getJobStatus(); + + Map<String,Object> getJobDetail(String jobId); } diff --git a/src/main/java/com/mesa/reportservice/service/MonitorService.java b/src/main/java/com/mesa/reportservice/service/MonitorService.java index 5bccaea..19eeaae 100644 --- a/src/main/java/com/mesa/reportservice/service/MonitorService.java +++ b/src/main/java/com/mesa/reportservice/service/MonitorService.java @@ -1,5 +1,6 @@ package com.mesa.reportservice.service; + /** * * @author lijinyang @@ -10,6 +11,4 @@ public interface MonitorService { void addSuccess(); void addFail(); - - String getJobCount(); }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/ZookeeperService.java b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java index 69f77d2..4b86792 100644 --- a/src/main/java/com/mesa/reportservice/service/ZookeeperService.java +++ b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java @@ -4,6 +4,7 @@ package com.mesa.reportservice.service; import org.springframework.stereotype.Service; import java.net.InetAddress; +import java.util.Map; /** * @@ -14,4 +15,6 @@ import java.net.InetAddress; public interface ZookeeperService { boolean isMaster(); + + Map<String,Object> status() throws Exception; }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java index 4e514d1..fb5d452 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java @@ -1,5 +1,6 @@ package com.mesa.reportservice.service.impl; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; @@ -50,6 +51,7 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService { jobEntity.setEndTime(System.currentTimeMillis()/1000); jobEntity.setDoneProgress(1.00f); jobEntity.setIsFailed(1); + String resultMessage = jobEntity.getResultMessage(); try { if (jobEntity.getIsValid() == 0) { jobEntity.setResultMessage(Constant.CANCEL); @@ -61,21 +63,21 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService { jobEntity.setIsFailed(0); logger.info(MessageFormat.format(Constant.LOG_SUCCESS_SAVE_TO_HBASE,jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql())); } else { - jobEntity.setResultMessage(Constant.WRITE_DATA_ERROR); + jobEntity.setResultMessage(StrUtil.concat(false,Constant.WRITE_DATA_ERROR,StrUtil.COLON,resultMessage)); monitorService.addFail(); msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_HBASE_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); logger.error(msg); } } else if (jobEntity.getExecuteStatus() >= Constant.JOB_BAD_REQUEST_THRESHOLD && jobEntity.getExecuteStatus() < Constant.JOB_INTERNAL_SERVER_ERROR_THRESHOLD) { - jobEntity.setResultMessage(Constant.PARAM_SYNTAX_ERROR); + jobEntity.setResultMessage(StrUtil.concat(false,Constant.PARAM_SYNTAX_ERROR,StrUtil.COLON,resultMessage)); msg = MessageFormat.format(SQSCode.PARAM_SYNTAX_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); logger.error(msg); } else if (jobEntity.getExecuteStatus() == Constant.JOB_SQL_EXECUTION_ERROR_CODE) { - jobEntity.setResultMessage(Constant.SQL_EXECUTION_ERROR); + jobEntity.setResultMessage(StrUtil.concat(false,Constant.SQL_EXECUTION_ERROR,StrUtil.COLON,resultMessage)); msg = MessageFormat.format(SQSCode.TEMPLATE_SQL_EXECUTION_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); logger.error(msg); } else { - jobEntity.setResultMessage(Constant.UNKNOWN_ERROR); + jobEntity.setResultMessage(StrUtil.concat(false,Constant.UNKNOWN_ERROR,StrUtil.COLON,resultMessage)); msg = MessageFormat.format(SQSCode.UNKNOWN_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); logger.error(msg); } @@ -88,9 +90,9 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService { } while (number != 1 && z >= 0); } catch (Exception e) { - jobEntity.setResultMessage(Constant.DATABASE_ERROR); + jobEntity.setResultMessage(StrUtil.concat(false,Constant.DATABASE_ERROR,StrUtil.COLON,e.getMessage())); jobService.updateProcesses(jobEntity); - msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.toString()); + msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.getMessage()); logger.error(msg); } finally { saveToMonitor(jobEntity); @@ -122,12 +124,11 @@ public class ExecuteProcessServiceImpl implements ExecuteProcessService { Map map = JSON.parseObject(JSON.toJSONString(listData.get(0))); long readRows = Long.parseLong(map.get("rows_read").toString()); float elapsed = Float.parseFloat(map.get("elapsed").toString()); - double percent = Double.parseDouble(map.get("percent").toString()); - int process = (int) (percent * 100); + float percent = Float.parseFloat(map.get("percent").toString()); jobEntity.setElapsed((int) elapsed); jobEntity.setRowsRead(readRows); - if(jobEntity.getDoneProgress()<process){ - jobEntity.setDoneProgress(process); + if(jobEntity.getDoneProgress()<percent){ + jobEntity.setDoneProgress(percent); } if (jobEntity.getRowsRead() != 0 || jobEntity.getElapsed() != 0) { jobService.updateProcesses(jobEntity); diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java index 594d42b..e6ada8c 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java @@ -48,6 +48,8 @@ public class ExecuteServiceImpl implements ExecuteService { if (httpResult != null) { Map mapresult = JSON.parseObject(httpResult.getBody()); int queryStatus = Integer.parseInt(mapresult.get("status").toString()); + job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString())); + job.setResultMessage(mapresult.get("message").toString()); logger.info("httpCode=" + httpResult.getCode() +" status="+queryStatus); if (httpResult.getCode() == HttpStatus.OK.value() && queryStatus == HttpStatus.OK.value()) { k = 0; @@ -59,11 +61,9 @@ public class ExecuteServiceImpl implements ExecuteService { job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString())); job.setResult(httpResult.getBody()); job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString())); - job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString())); logger.info("success resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql()); } else { k = 0; - job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString())); logger.error("SQL Execution Error "); } } else { @@ -71,34 +71,34 @@ public class ExecuteServiceImpl implements ExecuteService { } } catch (SocketTimeoutException e) { k--; - job.setResultMessage(e.toString()); + job.setResultMessage(e.getMessage()); if (k == 0) { job.setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT); - job.setResultMessage(Constant.QUERY_TIME_OUT); + job.setResultMessage(Constant.SOCKET_HANG_UP); logger.info("timeout resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql()); } else { - logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); + logger.info("Socket warn " + e.getMessage() + "retry time " + (3 - k)); } } catch (ConnectTimeoutException e) { job.setExecuteStatus(Constant.JOB_ERROR); - job.setResultMessage(e.toString()); - logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString()); + job.setResultMessage(e.getMessage()); + logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.getMessage()); k = 0; } catch (OutOfMemoryError e) { job.setExecuteStatus(Constant.JOB_ERROR); - job.setResultMessage(Constant.RESULT_TOO_LARGE); - logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.toString()); + job.setResultMessage(e.getMessage()); + logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.getMessage()); k = 0; } catch (Exception e) { job.setExecuteStatus(Constant.JOB_ERROR); - job.setResultMessage(e.toString()); - logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString()); + job.setResultMessage(e.getMessage()); + logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.getMessage()); k = 0; } try { queryGatewayService.deleteJob(job.getQueryId()); } catch (Exception e) { - logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.toString()); + logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.getMessage()); throw new SQSException(SQSCode.KILL_QUERY_ERROR); } } diff --git a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java index 100b93f..affa306 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java @@ -1,16 +1,28 @@ package com.mesa.reportservice.service.impl; +import cn.hutool.Hutool; +import cn.hutool.core.util.ObjectUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; import com.mesa.reportservice.mapper.JobMapper; import com.mesa.reportservice.service.JobService; +import com.mesa.reportservice.service.QueryGatewayService; +import com.mesa.reportservice.service.ZookeeperService; +import com.mesa.reportservice.util.R; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDate; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * @@ -20,6 +32,12 @@ import java.util.Map; @Service public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements JobService { + @Autowired + private SchedulerProperties schedulerProperties; + + @Autowired + private ZookeeperService zookeeperService; + @Override public List<JobEntity> getRunningJob() { return this.baseMapper.getRunningJob(); @@ -40,12 +58,45 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements } @Override - public Map<String,Long> getJobCount(){ - LocalDate currentDate = LocalDate.now(); - long lastUpdateTime = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); - Map map = new HashMap<String,Object>(); - map.put("lastUpdateTime", lastUpdateTime); - return this.baseMapper.getJobCount(map); + public R getJobStatus() { + Map<String,Object> numMap = getJobCount(); + Map<String, Object> status = null; + try { + status = zookeeperService.status(); + } catch (Exception e) { + e.printStackTrace(); + } + R r = new R(); + r.put("data",numMap); + r.putAll(status); + return r; + } + + private Map<String,Object> getJobCount(){ + long lastUpdateTime = System.currentTimeMillis()/1000 - convertDaysToSeconds(1); + Map<String, Object> jobCount = this.baseMapper.getJobCount(lastUpdateTime); + LambdaQueryWrapper<JobEntity> failed = new LambdaQueryWrapper<>(); + failed.select(JobEntity::getJobId).eq(JobEntity::getIsFailed,1).ge(JobEntity::getLastUpdateTime,lastUpdateTime); + List<String> failedJobIdList = this.baseMapper.selectObjs(failed).stream().map(o -> (String) o).collect(Collectors.toList()); + List<String> slowJobIdList = this.baseMapper.getSlowQuerySql(lastUpdateTime); + Map<String, Object> jobs = new HashMap<>(); + jobs.put("running",schedulerProperties.getMapResult()); + jobs.put("failed",failedJobIdList); + jobs.put("slow_query",slowJobIdList); + jobCount.put("jobs",jobs); + return jobCount; } + @Override + public Map<String,Object> getJobDetail(String jobId){ + Map<String,Object> jobState = this.baseMapper.getJobDetail(jobId); + if (ObjectUtil.isEmpty(jobState)) { + throw new SQSException(SQSCode.JOB_IS_NOT_EXIST); + } + return jobState; + } + + private long convertDaysToSeconds(Integer days) { + return 3600L*24*days; + } }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java index 0e57d84..3120fca 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java @@ -1,11 +1,10 @@ package com.mesa.reportservice.service.impl; -import com.alibaba.fastjson.JSONArray; -import com.mesa.reportservice.bean.MonitorEntity; import com.mesa.reportservice.configuration.SchedulerProperties; import com.mesa.reportservice.service.JobService; import com.mesa.reportservice.service.MonitorService; import com.mesa.reportservice.service.ZookeeperService; +import com.mesa.reportservice.util.R; import io.micrometer.core.instrument.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -22,15 +21,6 @@ import java.util.Map; public class MonitorServiceImpl implements MonitorService { @Autowired - private JobService mysqlService; - - @Autowired - private ZookeeperService zookeeperService; - - @Autowired - private SchedulerProperties schedulerProperties; - - @Autowired private MeterRegistry meterRegistry; private Counter counterReportSuccess; @@ -52,24 +42,4 @@ public class MonitorServiceImpl implements MonitorService { public void addFail() { counterReportFail.increment(); } - - @Override - public String getJobCount() { - String json=""; - MonitorEntity monitorEntity = new MonitorEntity(); - Map<String,Long> numMap = mysqlService.getJobCount(); - monitorEntity.setQueueJobNum(numMap.get("queueNum")); - monitorEntity.setExecutingJobNum(numMap.get("executingNum")); - monitorEntity.setTodaySuccessJobNum(numMap.get("todaySuccessNum")); - monitorEntity.setTodayErrorJobNum(numMap.get("todayErrorNum")); - monitorEntity.setJobList(schedulerProperties.getMapResult()); - if(zookeeperService.isMaster()){ - monitorEntity.setStatus("active"); - }else{ - monitorEntity.setStatus("standby"); - } - Object obj = JSONArray.toJSON(monitorEntity); - json = obj.toString(); - return json; - } } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java index c43981b..3051690 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java @@ -1,5 +1,6 @@ package com.mesa.reportservice.service.impl; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.mesa.reportservice.bean.JobEntity; @@ -71,8 +72,8 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { } } } catch (Exception e) { - logger.error(e.toString()); - throw new SQSException(e.toString()); + logger.error(e.getMessage()); + throw new SQSException(e.getMessage()); } } @@ -83,18 +84,11 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { String sql = jobEntity.getQuerySql().trim(); String queryId = queryGatewayService.getQueryId(sql); jobEntity.setQueryId(queryId); - if (jobEntity.getIsValid() == 0) { + if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId()) && jobEntity.getIsValid() == 0) { executeProcessService.killQuery(jobEntity); schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0); - } else if (!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) { - executeProcessService.reSet(jobEntity); } - if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) { - if (jobEntity.getIsValid() == 0) { - executeProcessService.killQuery(jobEntity); - schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0); - } - } else { + if(!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) { executeProcessService.reSet(jobEntity); } } @@ -104,10 +98,10 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { //遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态 for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) { logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); - long currentTime = System.currentTimeMillis(); + long currentTime = convertToSeconds(System.currentTimeMillis()); long executeTime = currentTime - entry.getValue().getStartTime(); - logger.info("execute time=" + executeTime + "ttlTime=" + httpClientPool.getSocketTimeout()); - if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > httpClientPool.getSocketTimeout()) { + logger.info("execute time=" + executeTime + "ttlTime=" + convertToSeconds(httpClientPool.getSocketTimeout())); + if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > convertToSeconds(httpClientPool.getSocketTimeout())) { entry.getValue().setState(JobStates.DONE.getValue()); entry.getValue().setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT); executeProcessService.killQuery(entry.getValue()); @@ -128,7 +122,7 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { List<JobEntity> jobs = jobService.getJobTask(rows); for (JobEntity job : jobs) { logger.info(Constant.LOG_START_EXECUTING_TASKS); - long startTime = System.currentTimeMillis()/1000; + long startTime = convertToSeconds(System.currentTimeMillis()); job.setStartTime(startTime); String sql = job.getQuerySql().trim(); job.setQuerySql(sql); @@ -145,10 +139,10 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { if (queryId.equals("") ) { job.setExecuteStatus(0); job.setState(JobStates.DONE.getValue()); - job.setEndTime(System.currentTimeMillis()/1000); + job.setEndTime(convertToSeconds(System.currentTimeMillis())); job.setDoneProgress(1.00f); job.setIsFailed(1); - job.setResultMessage(Constant.UNKNOWN_ERROR); + job.setResultMessage(StrUtil.concat(false,Constant.UNKNOWN_ERROR,": queryId is null")); } if ((JobStates.RUNNING.getValue()).equals(job.getState())) { if (jobService.updateProcesses(job) != 0) { @@ -161,15 +155,12 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { }); } else { logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg()); - throw new SQSException(SQSCode.UPDATE_STATUS_ERROR); } } else { if (jobService.updateProcesses(job) != 0) { logger.error(SQSCode.TASK_CANNOT_EXECUTED.getMsg()); - throw new SQSException(SQSCode.TASK_CANNOT_EXECUTED); } else { logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg()); - throw new SQSException(SQSCode.UPDATE_STATUS_ERROR); } } } @@ -177,4 +168,9 @@ public class ScheduledResultServiceImpl implements ScheduledResultService { logger.info(Constant.LOG_NO_PENDING_TASKS); } } + + private long convertToSeconds(long millis) { + return millis/1000; + } + } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java index dd2c267..123235d 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java @@ -6,12 +6,17 @@ import com.mesa.reportservice.configuration.SchedulerProperties; import com.mesa.reportservice.configuration.ZookeeperProperties; import com.mesa.reportservice.service.ZookeeperService; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @@ -68,6 +73,26 @@ public class ZookeeperServiceImpl implements ZookeeperService { return false; } } + + @Override + public Map<String,Object> status() throws Exception { + Map<String, Object> status = new HashMap<>(); + status.put("status","UP"); + if (zookeeperProperties.getOpen() == 0) { + List<String> children = curatorConnect.getChildren().forPath("/"); + boolean isDown = true; + for (String child : children) { + Stat stat = curatorConnect.checkExists().forPath("/" + child); + if (stat != null) { + isDown = false; + } + } + if(isDown){ + status.put("status","DOWN"); + } + } + return status; + } } diff --git a/src/main/java/com/mesa/reportservice/util/Constant.java b/src/main/java/com/mesa/reportservice/util/Constant.java index 91b4fd6..16662ae 100644 --- a/src/main/java/com/mesa/reportservice/util/Constant.java +++ b/src/main/java/com/mesa/reportservice/util/Constant.java @@ -19,7 +19,7 @@ public class Constant { public static final String UNKNOWN_ERROR = "Unknown Error"; public static final String DATABASE_ERROR = "Database Error"; public static final String RE_EXECUTION = "Re Execution"; - public static final String QUERY_TIME_OUT = "SQL Execution Error execute query time out"; + public static final String SOCKET_HANG_UP = "SQL Execution Error socket hang up"; public static final String RESULT_TOO_LARGE = "Result Too Large"; /** @@ -44,7 +44,7 @@ public class Constant { * URL */ public static final String URL_GET_QUERY_ID = "{0}/v1/sql/query/query_id?query="; - public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?originalSQL="; + public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?option=long_term&originalSQL="; public static final String URL_QUERY_FOR_PROCESS = "{0}/v1/sql/query/{1}/progress"; public static final String URL_QUERY_FOR_CANCEL = "{0}/v1/sql/query/{1}"; } diff --git a/src/main/java/com/mesa/reportservice/util/R.java b/src/main/java/com/mesa/reportservice/util/R.java new file mode 100644 index 0000000..0bafdb2 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/util/R.java @@ -0,0 +1,93 @@ +package com.mesa.reportservice.util; + +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; + +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + + +/** + * 返回数据 + * + * 错误码、错误内容统一在枚举类SQSCode中定义, 错误码格式见SQSCode注释,错误码内容必须用英文,作为国际化的code + * 自定义的错误类型必须加注释 + */ +public class R extends HashMap<String, Object> { + private static final long serialVersionUID = 1L; + + public R() { + put("code", SQSCode.SUCCESS.getCode()); + put("msg", SQSCode.SUCCESS.getMsg()); + put("time", new Date()); + } + + public static R error() { + return error(SQSCode.ERROR.getCode(), SQSCode.ERROR.getMsg()); + } + + public static R error(SQSCode SQSCode) { + R r = new R(); + r.put("code", SQSCode.getCode()); + r.put("msg", SQSCode.getMsg()); + r.put("time", new Date()); + return r; + } + + public static R error(Integer code, String msg) { + R r = new R(); + r.put("code", code); + r.put("msg", msg); + r.put("time", new Date()); + return r; + } + + public static R ok(String msg) { + R r = new R(); + r.put("msg", msg); + r.put("time", new Date()); + return r; + } + + public static R ok() { + return new R(); + } + + public static R ok(Object data) { + R r = new R(); + r.put("data", data); + r.put("time", new Date()); + return r; + } + + @Override + public R put(String key, Object value) { + super.put(key, value); + return this; + } + + @SuppressWarnings("unchecked") + public R putData(String key,Object value) { + Object data = super.getOrDefault("data", new LinkedHashMap<String, Object>()); + if( !(data instanceof Map)) { + throw new SQSException("data put error"); + } + ((Map<String, Object>)data).put(key, value); + super.put("data", data); + return this; + } + + @SuppressWarnings("unchecked") + public R putDataAll(Map<String,Object> dataAll) { + Object data = super.getOrDefault("data", new LinkedHashMap<String, Object>()); + if( !(data instanceof Map)) { + throw new SQSException("data putAll error"); + } + ((Map<String, Object>)data).putAll(dataAll);; + super.put("data", data); + return this; + } +} + diff --git a/src/main/resources/mappers/JobMapper.xml b/src/main/resources/mappers/JobMapper.xml index 9017d60..f36b683 100644 --- a/src/main/resources/mappers/JobMapper.xml +++ b/src/main/resources/mappers/JobMapper.xml @@ -38,6 +38,12 @@ where state = 'PENDING' and is_valid = 1 order by generated_time limit ${rows} </select> + <select id="getSlowQuerySql" resultType="java.lang.String" parameterType="long"> + select job_id + from saved_query_job + where state = 'DONE' and is_failed = 0 and elapsed > 300000 and last_update_time >= #{lastUpdateTime} + </select> + <update id="updateProcesses" parameterType="com.mesa.reportservice.bean.JobEntity" > update saved_query_job <set > @@ -81,10 +87,6 @@ where job_id = #{jobId,jdbcType=VARCHAR} </update> - - - - <update id="updateStatue" parameterType="com.mesa.reportservice.bean.JobEntity" > update saved_query_job <set > @@ -107,11 +109,18 @@ where job_id = #{jobId,jdbcType=VARCHAR} </update> + <select id="getJobCount" resultType="map" parameterType="long"> + SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as pending_count, + (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as running_count, + (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as done_count, + (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as failed_count, + (SELECT COUNT(1) FROM saved_query_job where is_valid = 0 and last_update_time > #{lastUpdateTime} ) as canceled_count, + (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and elapsed > 300000 and last_update_time >= #{lastUpdateTime}) as slow_query_count + </select> - <select id="getJobCount" resultType="map" parameterType="hashmap"> - SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum, - (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as executingNum, - (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum, - (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum + <select id="getJobDetail" resultType="map" parameterType="String"> + SELECT + <include refid="Base_Column_List" /> + FROM saved_query_job WHERE job_id = #{jobId} </select> </mapper>
\ No newline at end of file diff --git a/src/main/resources/sql/monitorInit.sql b/src/main/resources/sql/monitorInit.sql new file mode 100644 index 0000000..978e1fd --- /dev/null +++ b/src/main/resources/sql/monitorInit.sql @@ -0,0 +1 @@ +REPLACE INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('test127897d819062beafc8db85b9ece', 'SELECT 1 FROM session_record LIMIT 1', 'DONE', 1.00, 0, 'OK', 1, 1, 9, 9, 1, 1, 1710836940, 1710836955, 1710836956, 1706607272);
\ No newline at end of file diff --git a/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java b/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java new file mode 100644 index 0000000..2ed989f --- /dev/null +++ b/src/test/java/com/mesa/reportservice/bean/JobEntityTest.java @@ -0,0 +1,23 @@ +package com.mesa.reportservice.bean; + +import com.mesa.ReportServiceApplicationTests; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + +import static org.junit.Assert.*; + +/** + * @Auther: lijinyang + * @Date: 2024/03/12/17:24 + */ +@EnableAutoConfiguration +public class JobEntityTest extends ReportServiceApplicationTests { + + @Test + public void testClone() { + JobEntity job = this.jsonToInParameter("parameters/executeTest.json", "executeJob", JobEntity.class); + Object clone = job.clone(); + Assert.assertTrue(clone instanceof JobEntity); + } +}
\ No newline at end of file diff --git a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java b/src/test/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImplTest.java index 9e5b56e..68f9f53 100644 --- a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImplTest.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.bean.JobEntity; @@ -18,69 +18,71 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; * @Date: 2024/01/08/16:33 */ @EnableAutoConfiguration -public class ExecuteProcessTest extends ReportServiceApplicationTests { +public class ExecuteProcessServiceImplTest extends ReportServiceApplicationTests { @Autowired private ExecuteProcessService executeProcessService; @Test - public void testReSet() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class); - executeProcessService.reSet(job); - Assert.assertEquals(JobStates.PENDING.getValue(),job.getState()); - } + public void updateResultMessage() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class); + executeProcessService.updateResultMessage(job); + Assert.assertEquals(Constant.OK,job.getResultMessage()); - @Test - public void testUpdateResultMessageCancel() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class); + job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class); executeProcessService.updateResultMessage(job); Assert.assertEquals(Constant.CANCEL,job.getResultMessage()); - } - @Test - public void testUpdateResultMessageOk() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class); - executeProcessService.updateResultMessage(job); - Assert.assertEquals(Constant.OK,job.getResultMessage()); - } + job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class); + try { + executeProcessService.updateResultMessage(job); + } catch (SQSException e) { + Assert.assertEquals(Constant.DATABASE_ERROR,job.getResultMessage()); + } - @Test - public void testUpdateResultMessageDataError() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class); + job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class); try { executeProcessService.updateResultMessage(job); } catch (SQSException e) { - Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + Assert.assertEquals(Constant.PARAM_SYNTAX_ERROR,job.getResultMessage()); } - } - @Test - public void testUpdateResultMessageParamError() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class); + job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class); try { executeProcessService.updateResultMessage(job); } catch (SQSException e) { - Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + Assert.assertEquals(Constant.JOB_ERROR,job.getResultMessage()); } - } - @Test - public void testUpdateResultMessageSQLError() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class); + job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class); try { executeProcessService.updateResultMessage(job); } catch (SQSException e) { - Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + Assert.assertEquals(Constant.UNKNOWN_ERROR,job.getResultMessage()); } } @Test - public void testUpdateResultMessageUnknownError() { - JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class); - try { - executeProcessService.updateResultMessage(job); + public void reSet() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class); + executeProcessService.reSet(job); + Assert.assertEquals(JobStates.PENDING.getValue(),job.getState()); + } + + @Test + public void killQuery() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class); + executeProcessService.killQuery(job); + Assert.assertTrue(job.getIsValid()==0); + } + + @Test + public void updateProcessMessage() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "updateProcessJob", JobEntity.class); + try{ + executeProcessService.updateProcessMessage(job); } catch (SQSException e) { - Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + Assert.assertEquals(SQSCode.RESPONSE_DATA_ISNULL.getMsg(),e.getMsg()); } } } diff --git a/src/test/java/com/mesa/reportservice/ExecuteTest.java b/src/test/java/com/mesa/reportservice/service/impl/ExecuteServiceImplTest.java index e30293a..3bd7483 100644 --- a/src/test/java/com/mesa/reportservice/ExecuteTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/ExecuteServiceImplTest.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.bean.JobEntity; @@ -15,7 +15,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; * @Date: 2024/01/10/15:57 */ @EnableAutoConfiguration -public class ExecuteTest extends ReportServiceApplicationTests { +public class ExecuteServiceImplTest extends ReportServiceApplicationTests { @Autowired private ExecuteService executeService; diff --git a/src/test/java/com/mesa/reportservice/HBaseTest.java b/src/test/java/com/mesa/reportservice/service/impl/HBaseServiceImplTest.java index 2392c10..84d8372 100644 --- a/src/test/java/com/mesa/reportservice/HBaseTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/HBaseServiceImplTest.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.bean.JobEntity; @@ -14,7 +14,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; * @Date: 2024/01/10/14:48 */ @EnableAutoConfiguration -public class HBaseTest extends ReportServiceApplicationTests { +public class HBaseServiceImplTest extends ReportServiceApplicationTests { @Autowired private HBaseService hBaseService; diff --git a/src/test/java/com/mesa/reportservice/JobTest.java b/src/test/java/com/mesa/reportservice/service/impl/JobServiceImplTest.java index e69aaf6..9bbfeb9 100644 --- a/src/test/java/com/mesa/reportservice/JobTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/JobServiceImplTest.java @@ -1,9 +1,13 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; +import cn.hutool.core.util.ObjectUtil; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; import com.mesa.reportservice.service.JobService; +import com.mesa.reportservice.util.R; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -17,7 +21,7 @@ import java.util.Map; * @Date: 2024/01/08/17:22 */ @EnableAutoConfiguration -public class JobTest extends ReportServiceApplicationTests { +public class JobServiceImplTest extends ReportServiceApplicationTests { private static final int rows = 10; @@ -41,22 +45,36 @@ public class JobTest extends ReportServiceApplicationTests { } @Test - public void testGetJobCount() { - Map<String, Long> jobCount = jobService.getJobCount(); - Assert.assertNotNull(jobCount); + public void testGetJobStatus() { + R jobStatus = jobService.getJobStatus(); + Assert.assertNotNull(jobStatus); } @Test public void testUpdateProcessesSuccess() { - JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesSuccessJob", JobEntity.class); + JobEntity job = this.jsonToInParameter("parameters/jobTest.json", "updateProcessesSuccessJob", JobEntity.class); int i = jobService.updateProcesses(job); Assert.assertEquals(updateProcessesSuccess,i); } @Test public void testUpdateProcessesError() { - JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesErrorJob", JobEntity.class); + JobEntity job = this.jsonToInParameter("parameters/jobTest.json", "updateProcessesErrorJob", JobEntity.class); int i = jobService.updateProcesses(job); Assert.assertNotEquals(updateProcessesSuccess,i); } + + @Test + public void testGetJobState() { + String successId = this.jsonToInParameter("parameters/jobTest.json", "getJobDetailSuccessId", String.class); + Map<String, Object> success = jobService.getJobDetail(successId); + Assert.assertTrue(ObjectUtil.isNotEmpty(success)); + + String errorId = this.jsonToInParameter("parameters/jobTest.json", "getJobDetailErrorId", String.class); + try { + jobService.getJobDetail(errorId); + } catch (SQSException e) { + Assert.assertTrue(e.getCode()==SQSCode.JOB_IS_NOT_EXIST.getCode()); + } + } } diff --git a/src/test/java/com/mesa/reportservice/MonitorTest.java b/src/test/java/com/mesa/reportservice/service/impl/MonitorServiceImplTest.java index f8cf701..5b01fba 100644 --- a/src/test/java/com/mesa/reportservice/MonitorTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/MonitorServiceImplTest.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; import com.mesa.ReportServiceApplicationTests; import org.junit.Test; @@ -16,14 +16,14 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. */ @EnableAutoConfiguration @AutoConfigureMockMvc -public class MonitorTest extends ReportServiceApplicationTests { +public class MonitorServiceImplTest extends ReportServiceApplicationTests { @Autowired private MockMvc mockMvc; @Test public void testGetReportStatus() throws Exception{ - mockMvc.perform(get("/monitor")) + mockMvc.perform(get("/v1/monitor")) .andExpect(status().isOk()); } } diff --git a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java b/src/test/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImplTest.java index 193b925..aad4e29 100644 --- a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImplTest.java @@ -1,7 +1,10 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.exception.SQSException; import com.mesa.reportservice.service.QueryGatewayService; import org.junit.Assert; import org.junit.Test; @@ -13,15 +16,19 @@ import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.mesa.reportservice.exception.SQSCode.ERROR; + /** * @Auther: lijinyang * @Date: 2024/01/04/17:05 */ @EnableAutoConfiguration -public class QueryGatewayTest extends ReportServiceApplicationTests { +public class QueryGatewayServiceImplTest extends ReportServiceApplicationTests { private static final String QUERY_ID_PATTERN = "^[a-f0-9]{32}:[a-f0-9]{32}$"; + private static final Log logger = LogFactory.get(); + @Autowired private QueryGatewayService queryGatewayService; @@ -93,4 +100,36 @@ public class QueryGatewayTest extends ReportServiceApplicationTests { httpResult = queryGatewayService.deleteJob(queryId); Assert.assertTrue(httpResult.getCode() == HttpStatus.NOT_FOUND.value()); } + + @Test + public void highLatencyQuerySuccess() throws IOException { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "highLatencyQuerySuccess", String.class); + HttpResult httpResult = queryGatewayService.queryJob(sql); + Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value()); + } + + @Test + public void highLatencyQueryCancel() throws IOException { + try { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "highLatencyQueryCancel", String.class); + String queryId = queryGatewayService.getQueryId(sql); + Thread thread = new Thread(() -> { + try { + HttpResult result = queryGatewayService.queryJob(sql); + } catch (IOException e) { + e.printStackTrace(); + } + }); + thread.start(); + thread.sleep(5000); + HttpResult process = queryGatewayService.queryJobProcess(queryId); + logger.info(process.getBody()); + Assert.assertTrue(process.getBody().contains("elapsed")); + HttpResult httpResult = queryGatewayService.deleteJob(queryId); + Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value()); + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/src/test/java/com/mesa/reportservice/zookeeperTest.java b/src/test/java/com/mesa/reportservice/service/impl/ZookeeperServiceImplTest.java index 5370afb..32914ad 100644 --- a/src/test/java/com/mesa/reportservice/zookeeperTest.java +++ b/src/test/java/com/mesa/reportservice/service/impl/ZookeeperServiceImplTest.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice; +package com.mesa.reportservice.service.impl; import com.mesa.ReportServiceApplicationTests; import com.mesa.reportservice.service.ZookeeperService; @@ -12,7 +12,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; * @Date: 2024/01/10/14:20 */ @EnableAutoConfiguration -public class zookeeperTest extends ReportServiceApplicationTests { +public class ZookeeperServiceImplTest extends ReportServiceApplicationTests { @Autowired private ZookeeperService zookeeperService; diff --git a/src/test/resources/parameters/executeProcessTest.json b/src/test/resources/parameters/executeProcessTest.json index 8a3d4a3..ed8b2d3 100644 --- a/src/test/resources/parameters/executeProcessTest.json +++ b/src/test/resources/parameters/executeProcessTest.json @@ -166,5 +166,29 @@ "result": "", "memoryUsage": "", "queryDurationMs": "" + }, + "updateProcessJob": { + "jobId": "014f9d6e1f17227ce8d837003bc3bedb", + "querySql": "SELECT AVG(sent_bytes) AS \"Bytes Sent\", AVG(received_bytes) AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(start_timestamp_ms, 'PT5M', 'zero')) AS \"Start Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-18T13:18:01Z') AND recv_time < UNIX_TIMESTAMP('2023-12-19T13:18:01Z') AND security_event.vsys_id IN (1) GROUP BY \"Start Time\" ORDER BY \"Start Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1702991925, + "endTime": 1703399430, + "lastUpdateTime": 1702991940, + "generatedTime": 1702991921, + "queryId": "", + "executeStatus": 200, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" } } diff --git a/src/test/resources/parameters/mysqlTest.json b/src/test/resources/parameters/jobTest.json index 7ed1831..04f32b9 100644 --- a/src/test/resources/parameters/mysqlTest.json +++ b/src/test/resources/parameters/jobTest.json @@ -46,5 +46,7 @@ "result": "", "memoryUsage": "", "queryDurationMs": "" - } + }, + "getJobDetailSuccessId": "00838b9d94552afecc6fa29cde033714", + "getJobDetailErrorId": "00838b9d94552afecc6fa29cde033715" }
\ No newline at end of file diff --git a/src/test/resources/parameters/queryGatewayTest.json b/src/test/resources/parameters/queryGatewayTest.json index fc36ed0..167efc4 100644 --- a/src/test/resources/parameters/queryGatewayTest.json +++ b/src/test/resources/parameters/queryGatewayTest.json @@ -2,5 +2,7 @@ "querySqlSuccess": "select 1 from session_record limit 1", "querySqlError": "select 1 from session_record_cn limit 1", "queryForProcessIdError": "", - "queryForCancelIdError": "1:1" + "queryForCancelIdError": "1:1", + "highLatencyQuerySuccess": "SELECT sleepEachRow(3) FROM tables_cluster LIMIT 150", + "highLatencyQueryCancel": "SELECT sleepEachRow(3) FROM tables_cluster LIMIT 150" } |
