diff options
| author | 李金洋 <[email protected]> | 2024-02-28 09:35:31 +0000 |
|---|---|---|
| committer | 王玮 <[email protected]> | 2024-02-28 09:35:31 +0000 |
| commit | baea863319b4c4f2027d4309db67d14a44c0a390 (patch) | |
| tree | 1f7e634843bd3ea2ac5960bf45c27bd6f834680f | |
| parent | ae114156919c66e6763ea72e1ca65e66dbfedf6b (diff) | |
Reconsitution
70 files changed, 2147 insertions, 1425 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 79c3c9b..96f075d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,3 +1,8 @@ +variables: + MAVEN_CLI_OPTS: --batch-mode --errors --show-version + SONAR_HOST_URL: http://192.168.40.153:9900 + SONAR_PROJECT_KEY: saved-query-scheduler + SONAR_LOGIN_TOKEN: 921363353ba26a612fc4ec77679647780f9a46e7 stages: - build @@ -31,6 +36,10 @@ deploy: script: - echo 'exec mvn package & docker build' - echo 'tag:' $CI_COMMIT_TAG + - |- + mvn $MAVEN_CLI_OPTS clean verify sonar:sonar -Dsonar.projectKey=$SONAR_PROJECT_KEY \ + -Dsonar.host.url=$SONAR_HOST_URL -Dsonar.login=$SONAR_LOGIN_TOKEN \ + -Dsonar.qualitygate.wait=true -Dsonar.qualitygate.timeout=300 - mvn clean package -Dmaven.test.skip=true docker:build -DdockerImageTags=$CI_COMMIT_TAG when: on_success only: diff --git a/config/application-test.yml b/config/application-test.yml new file mode 100644 index 0000000..4631924 --- /dev/null +++ b/config/application-test.yml @@ -0,0 +1,19 @@ +nacos: + config: + type: yaml #配置集的配置格式 + server-addr: 127.0.0.1:8848 #配置中心地址 + namespace: test #命名空间 + data-id: saved-query-scheduler.yml # 数据集ID + auto-refresh: true #开启自动刷新 + group: Galaxy #配置对应的分组 + username: nacos #Nacos认证用户 + password: nacos #Nacos认证密码 + bootstrap: + enable: true #开启配置预加载功能 + log: + enable: true #开启Nacos支持日志级别的加载时机 +spring: + profiles: + active: test +logging: + config: ./config/log4j2-dev.xml
\ No newline at end of file diff --git a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml index 7a7c9fb..5c910d8 100644 --- a/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml +++ b/config/nacos/config/fixed-127.0.0.1_8848-dev_nacos/data/config-data-tenant/dev/Galaxy/saved-query-scheduler.yml @@ -6,13 +6,13 @@ scan: result: scheduled: plan: 0/15 * * * * ? -#同时间执行是线程数 -globle: +#同时间执行线程数 +scheduler: job_thread: 2 #Hbasehttp的端口 #Hbase的表名等配置通畅不需要更改 hbase: - table: tsg:report_result + table: tsg_galaxy:saved_query_result zookeeper_quorum: 192.168.44.12 zookeeper_property_clientPort: 2181 zookeeper_znode_parent: /hbase @@ -22,12 +22,12 @@ hbase: #存入Hbase的cell级别生存时间 根据部署环境填写:1.TSG 不设置(永久有效); 2.CN 7(默认7天) 单位:Day cell_ttl_d: #查询网关ip -ck: - gateway_ip: localhost:9999 +query-gateway: + gateway_ip: 192.168.44.12:9999 #zk集群的ip zookeeper: - connectString: 192.168.44.55 + connectString: 192.168.44.12 #是否启用zookeeper 0启用(集群) 1禁用(单机) open: 1 retryCount: 5 @@ -109,7 +109,7 @@ spring: login-password: admin #Spring监控,对内部各接口调用的监控,需要导入aop相关包 aop-patterns: com.mesa.reportservice.controller.*,com.mesa.reportservice.service.*,com.mesa.reportservice.mapper.* -mybatis: +mybatis-plus: typeAliasesPackage: com.mesa.reportservice.bean mapperLocations: classpath*:/mappers/*.xml management: diff --git a/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml new file mode 100644 index 0000000..97f8407 --- /dev/null +++ b/config/nacos/config/fixed-127.0.0.1_8848-test_nacos/data/config-data-tenant/test/Galaxy/saved-query-scheduler.yml @@ -0,0 +1,133 @@ +#http的端口 +server: + port: 9093 +#更新进度条的时间10s +scan: + result: + scheduled: + plan: 0/15 * * * * ? +#同时间执行线程数 +scheduler: + job_thread: 2 +#Hbasehttp的端口 +#Hbase的表名等配置通畅不需要更改 +hbase: + table: tsg_galaxy:saved_query_result + zookeeper_quorum: 192.168.44.12 + zookeeper_property_clientPort: 2181 + zookeeper_znode_parent: /hbase + client_retries_number: 3 + rpc_timeout: 100000 + connect_pool: 10 +#存入Hbase的cell级别生存时间 根据部署环境填写:1.TSG 不设置(永久有效); 2.CN 7(默认7天) 单位:Day + cell_ttl_d: +#查询网关ip +query-gateway: + gateway_ip: 192.168.44.12:9999 + +#zk集群的ip +zookeeper: + connectString: 192.168.44.12 +#是否启用zookeeper 0启用(集群) 1禁用(单机) + open: 1 + retryCount: 5 + elapsedTimeMs: 10000 + sessionTimeoutMs: 50000 + connectionTimeoutMs: 50000 + nameSpace: reportservice + +#最大连接数 +http: + maxTotal: 300 +#并发数 + defaultMaxPerRoute: 100 +#创建连接的最长时间 + connectTimeout: 10000 +#从连接池中获取到连接的最长时间 + connectionRequestTimeout: 10000 +#数据传输的最长时间 + socketTimeout: 21605000 +#提交请求前测试连接是否可用 + staleConnectionCheckEnabled: true + socketTimeoutShort: 30000 + +#mariadb的url +spring: + application: + name: saved-query-scheduler + + datasource: + url: jdbc:mariadb://192.168.44.12:3306/olap_test?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false +#mariadb的用户名 + username: root +#mariadb的密码 + password: galaxy2019 + +#以下配置不需要更改通常 + name: druidDataSource + type: com.alibaba.druid.pool.DruidDataSource + driver-class-name: org.mariadb.jdbc.Driver + +#配置监控统计拦截的filters,去掉后监控界面SQL无法进行统计,’wall’用于防火墙 + druid: + filters: + stat: + enabled: true + slf4j: + enabled: true + wall: + enabled: true + config: + comment-allow: true + #最大连接数 + max-active: 30 + #最小连接数 + min-idle: 1 + #初始化连接数 + initial-size: 2 + #获取连接最大超时时间 + max-wait: 600000 + #间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + time-between-eviction-runs-millis: 60000 + # 一个连接在池中最小生存的时间,单位是毫秒 + min-evictable-idle-time-millis: 300000 + #验证连接是否可用,在数据库中执行一条sql + validation-query: select 1 + #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis, + # 执行validationQuery检测连接是否有效 + test-while-idle: true + #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 + test-on-borrow: true + test-on-return: false + connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 + #是否开启WebStatFilter + web-stat-filter: + enabled: true + #设置不统计哪些URL(用于排除一些不必要的url) + exclusions: "*.js,*.gif,*.jpg,*..ng,*.css,*.ico,/druid/*" + #是否开启Druid监控信息显示页面 + stat-view-servlet: + enabled: true + #甚至浏览器访问路径 + url-pattern: /druid/* + #禁止手动重置监控数据 + reset-enable: false + #durid-ui页面账户密码 + login-username: admin + login-password: admin + #Spring监控,对内部各接口调用的监控,需要导入aop相关包 + aop-patterns: com.mesa.reportservice.controller.*,com.mesa.reportservice.service.*,com.mesa.reportservice.mapper.* +mybatis-plus: + typeAliasesPackage: com.mesa.reportservice.bean + mapperLocations: classpath*:/mappers/*.xml +management: + endpoints: + web: + exposure: + include: "*" + metrics: + tags: + application: saved-query-scheduler + +logging: + config: ./config/log4j2-dev.xml diff --git a/docs/release/release-374.md b/docs/release/release-374.md index 00e3421..32e7a3d 100644 --- a/docs/release/release-374.md +++ b/docs/release/release-374.md @@ -1,4 +1,6 @@ Release 374 (TSG-23.12) * 报告服务适配新版表结构(GAL-447) * Hbase存储数据表更换为tsg_galaxy:saved_query_result -* 优化报告任务运行时信息
\ No newline at end of file +* 优化报告任务运行时信息 +* 修复monitor接口执行失败时status异常问题 +* 修复Report报告执行失败(TSG-18703)
\ No newline at end of file @@ -23,6 +23,7 @@ <!--docker-registry的端口--> <docker.registry.port>9080</docker.registry.port> <docker.image.prefix>tsg/galaxy</docker.image.prefix> + <project.password>Geedge2020!</project.password> </properties> <!-- 设置 jitpack.io 仓库 --> <repositories> @@ -43,7 +44,6 @@ <dependencies> - <dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot-starter</artifactId> @@ -66,9 +66,15 @@ <version>1.4</version> </dependency>--> <dependency> - <groupId>org.mybatis.spring.boot</groupId> - <artifactId>mybatis-spring-boot-starter</artifactId> - <version>1.3.2</version> + <groupId>com.baomidou</groupId> + <artifactId>mybatis-plus-boot-starter</artifactId> + <version>3.0.7.1</version> + <exclusions> + <exclusion> + <groupId>com.baomidou</groupId> + <artifactId>mybatis-plus-generator</artifactId> + </exclusion> + </exclusions> </dependency> <!-- <dependency> <groupId>org.apache.hbase</groupId> @@ -258,6 +264,16 @@ <build> <plugins> <plugin> + <groupId>org.sonarsource.scanner.maven</groupId> + <artifactId>sonar-maven-plugin</artifactId> + <version>3.9.0.2155</version> + </plugin> + <plugin> + <groupId>io.github.r0bb3n</groupId> + <artifactId>sonar-quality-gate-maven-plugin</artifactId> + <version>1.3.0</version> + </plugin> + <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.3.12.RELEASE</version> @@ -288,7 +304,7 @@ <phase>package</phase> <configuration> <!-- 配置加密密码 ,推荐使用命令指定 --> - <password>Geedge2020!</password> + <password>${project.password}</password> <!-- 配置不加密项 --> <excludes> <exclude> diff --git a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java b/src/main/java/com/mesa/reportservice/ReportServiceApplication.java index 46595d8..d6e0e28 100644 --- a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java +++ b/src/main/java/com/mesa/reportservice/ReportServiceApplication.java @@ -15,11 +15,13 @@ import java.lang.management.RuntimeMXBean; import java.util.List; import java.util.TimeZone; +/** + * @author wk + */ @ServletComponentScan - @SpringBootApplication @EnableScheduling -public class ReportserviceApplication { +public class ReportServiceApplication { public static final String TIMEZONE = "UTC"; public static void main(String[] args) { @@ -34,7 +36,7 @@ public class ReportserviceApplication { // joda.time.DateTime System.setProperty("user.timezone", TIMEZONE); } - SpringApplication.run(ReportserviceApplication.class, args); + SpringApplication.run(ReportServiceApplication.class, args); } @Bean diff --git a/src/main/java/com/mesa/reportservice/bean/HttpResult.java b/src/main/java/com/mesa/reportservice/bean/HttpResult.java index 632edaa..4aa3df7 100644 --- a/src/main/java/com/mesa/reportservice/bean/HttpResult.java +++ b/src/main/java/com/mesa/reportservice/bean/HttpResult.java @@ -1,35 +1,19 @@ package com.mesa.reportservice.bean; +import lombok.Data; + /** - * Created by wk1 on 2019/5/15. + * + * @author lijinyang + * @date 2024/1/24 */ +@Data public class HttpResult { private int code; - private String body; - // private byte[] bodybyte; - + private String body; public HttpResult() { - - } - - - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } - - public String getBody() { - return body; - } - - public void setBody(String body) { - this.body = body; } } diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index b3baa8b..2a0394c 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -1,11 +1,17 @@ package com.mesa.reportservice.bean; +import cn.hutool.log.LogFactory; +import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; +import org.apache.log4j.spi.LoggerFactory; /** - * Created by wk1 on 2019/5/20. + * + * @author lijinyang + * @date 2024/1/24 */ @Data +@TableName("saved_query_job") public class JobEntity implements Cloneable { private String jobId; @@ -44,7 +50,7 @@ public class JobEntity implements Cloneable { private String queryId; - private Integer excuteStatus; + private Integer executeStatus; private String result; @@ -58,6 +64,7 @@ public class JobEntity implements Cloneable { try { o = (JobEntity) super.clone(); } catch (CloneNotSupportedException e) { + LogFactory.get().error(e); } return o; } diff --git a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java index 2824ecf..cfa761e 100644 --- a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java @@ -1,72 +1,29 @@ package com.mesa.reportservice.bean; +import lombok.Data; + import java.util.Map; /** - * Created by wk1 on 2019/5/15. + * + * @author lijinyang + * @date 2024/1/24 */ +@Data public class MonitorEntity { private Long queueJobNum; - private Long excuteingJobNum; + + private Long executingJobNum; + private Long todaySuccessJobNum; + private Long todayErrorJobNum; - private Map<String,JobEntity> joblist ; + private Map<String,JobEntity> jobList ; private String status; - public MonitorEntity() { - - } - - public Long getQueueJobNum() { - return queueJobNum; - } - - public void setQueueJobNum(Long queueJobNum) { - this.queueJobNum = queueJobNum; - } - - public Long getExcuteingJobNum() { - return excuteingJobNum; - } - - public void setExcuteingJobNum(Long excuteingJobNum) { - this.excuteingJobNum = excuteingJobNum; - } - - public Long getTodaySuccessJobNum() { - return todaySuccessJobNum; - } - - public void setTodaySuccessJobNum(Long todaySuccessJobNum) { - this.todaySuccessJobNum = todaySuccessJobNum; - } - - public Long getTodayErrorJobNum() { - return todayErrorJobNum; - } - - public void setTodayErrorJobNum(Long todayErrorJobNum) { - this.todayErrorJobNum = todayErrorJobNum; - } - - public Map<String, JobEntity> getJoblist() { - return joblist; - } - - public void setJoblist(Map<String, JobEntity> joblist) { - this.joblist = joblist; - } - - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; } } diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java deleted file mode 100644 index 3526c4e..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.mesa.reportservice.configuration; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; - -/** - * Created by wk1 on 2019/5/17. - */ -@Component -//@ConfigurationProperties(prefix = "ck") -@NacosConfigurationProperties(prefix = "ck", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) - -public class ClickhouseConfig { - - private static String gateway_ip; - - public void setGateway_ip(String gateway_ip) { - ClickhouseConfig.gateway_ip = gateway_ip; - } - - public String getGateway_ip() { - return gateway_ip; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java b/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java deleted file mode 100644 index d17e1a0..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/ExecutorConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.mesa.reportservice.configuration; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.annotation.SchedulingConfigurer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.scheduling.config.ScheduledTaskRegistrar; -import org.springframework.stereotype.Component; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * Created by wk on 2020/8/28. - */ -@Configuration -@EnableAsync -public class ExecutorConfig { - - @Bean - public Executor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(2); - executor.setMaxPoolSize(20); - executor.initialize(); - return executor; - } - @Bean - public Executor resultExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(2); - executor.setMaxPoolSize(20); - executor.initialize(); - return executor; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java deleted file mode 100644 index 7334b45..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.mesa.reportservice.configuration; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import com.mesa.reportservice.bean.JobEntity; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by wk1 on 2019/5/28. - */ - -@Component -@NacosConfigurationProperties(prefix = "globle", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) - -public class GlobelConfig { - - private static int job_thread; - private static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>(); - public void setJob_thread(int job_thread) { - this.job_thread = job_thread; - } - public final static String zkuuid = UUID.randomUUID().toString().replaceAll("-",""); - - public int getJob_thread() { - return job_thread; - } - - public Map<String, JobEntity> getMapresult() { - return mapresult; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HBaseFactory.java index 96f6457..6134101 100644 --- a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java +++ b/src/main/java/com/mesa/reportservice/configuration/HBaseFactory.java @@ -5,44 +5,38 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** - * Created by Administrator on 2020/3/10. + * + * @author lijinyang + * @date 2024/1/24 */ @Configuration -public class HbaseFactory { +public class HBaseFactory { @Autowired - private HbaseProperties hbaseProperties; + private HBaseProperties hBaseProperties; Log logger = Log.get(); - @Bean(name = "hbaseConfiguration") public org.apache.hadoop.conf.Configuration getConfiguration() { - org.apache.hadoop.conf.Configuration conf = null; if (conf == null) { conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", hbaseProperties.getZookeeper_quorum()); - conf.set("hbase.zookeeper.property.clientPort", hbaseProperties.getZookeeper_property_clientPort()); - conf.set("zookeeper.znode.parent", hbaseProperties.getZookeeper_znode_parent()); - conf.set("hbase.client.retries.number", hbaseProperties.getClient_retries_number()); - conf.set("hbase.rpc.timeout", hbaseProperties.getRpc_timeout()); + conf.set("hbase.zookeeper.quorum", hBaseProperties.getZookeeperQuorum()); + conf.set("hbase.zookeeper.property.clientPort", hBaseProperties.getZookeeperPropertyClientPort()); + conf.set("zookeeper.znode.parent", hBaseProperties.getZookeeperZnodeParent()); + conf.set("hbase.client.retries.number", hBaseProperties.getClientRetriesNumber()); + conf.set("hbase.rpc.timeout", hBaseProperties.getRpcTimeout()); conf.set("hbase.client.keyvalue.maxsize", "1024000000"); conf.set("zookeeper.recovery.retry", "3"); - conf.set("hbase.client.ipc.pool.size", hbaseProperties.getConnect_pool().toString()); - - + conf.set("hbase.client.ipc.pool.size", hBaseProperties.getConnectPool().toString()); } - return conf; } @@ -51,12 +45,9 @@ public class HbaseFactory { Connection con = null; try { con = ConnectionFactory.createConnection(Conf); - } catch (IOException e) { logger.error(e.toString()); } return con; - - } } diff --git a/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java new file mode 100644 index 0000000..47211f2 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/HBaseProperties.java @@ -0,0 +1,36 @@ +package com.mesa.reportservice.configuration; + +import com.alibaba.nacos.api.config.ConfigType; +import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Data +@Component +@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) +public class HBaseProperties { + + private String zookeeperQuorum; + + private String zookeeperPropertyClientPort; + + private String zookeeperZnodeParent; + + private String clientPause; + + private String clientRetriesNumber; + + private String rpcTimeout; + + private Integer connectPool; + + private String table; + + private Integer cellTtlD; +} diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java b/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java deleted file mode 100644 index 2cdb8e4..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/HbaseConfig.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.mesa.reportservice.configuration; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.util.StringUtil; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -import java.net.URL; - -/** - * Created by wk1 on 2019/5/27. - */ -@Component -@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) - -public class HbaseConfig { - - private static String columefamily; - private static String url; - private static String table; - private static String colume_job_id; - private static String colume_query_id; - private static String colume_query_duration_ms; - private static String colume_read_rows; - private static String colume_memory_usage; - private static String colume_exception; - private static String colume_sql; - - - public void setColumefamily(String columefamily) { - HbaseConfig.columefamily = columefamily; - } - - public void setUrl(String url) { - HbaseConfig.url = url; - } - - public void setTable(String table) { - HbaseConfig.table = table; - } - - public void setColume_query_id(String colume_query_id) { - HbaseConfig.colume_query_id = colume_query_id; - } - - public void setColume_query_duration_ms(String colume_query_duration_ms) { - HbaseConfig.colume_query_duration_ms = colume_query_duration_ms; - } - - public void setColume_read_rows(String colume_read_rows) { - HbaseConfig.colume_read_rows = colume_read_rows; - } - - public void setColume_memory_usage(String colume_memory_usage) { - HbaseConfig.colume_memory_usage = colume_memory_usage; - } - - public void setColume_job_id(String colume_job_id) { - HbaseConfig.colume_job_id = colume_job_id; - } - - - public void setColume_exception(String colume_exception) { - HbaseConfig.colume_exception = colume_exception; - } - - public void setColume_sql(String colume_sql) { - HbaseConfig.colume_sql = colume_sql; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java deleted file mode 100644 index 46fefc8..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.mesa.reportservice.configuration; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -/** - * Created by Administrator on 2020/3/10. - */ - -//@ConfigurationProperties(prefix = "hbase") -@Component -@NacosConfigurationProperties(prefix = "hbase", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) - -public class HbaseProperties { - - private String zookeeper_quorum; - - private String zookeeper_property_clientPort; - - private String zookeeper_znode_parent; - - private String client_pause; - - private String client_retries_number; - - private String rpc_timeout; - - private Integer connect_pool; - - private String table; - - private Integer cell_ttl_d; - - - - public String getZookeeper_quorum() { - return zookeeper_quorum; - } - - public void setZookeeper_quorum(String zookeeper_quorum) { - this.zookeeper_quorum = zookeeper_quorum; - } - - public String getZookeeper_property_clientPort() { - return zookeeper_property_clientPort; - } - - public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) { - this.zookeeper_property_clientPort = zookeeper_property_clientPort; - } - - public String getZookeeper_znode_parent() { - return zookeeper_znode_parent; - } - - public void setZookeeper_znode_parent(String zookeeper_znode_parent) { - this.zookeeper_znode_parent = zookeeper_znode_parent; - } - - public String getClient_pause() { - return client_pause; - } - - public void setClient_pause(String client_pause) { - this.client_pause = client_pause; - } - - public String getClient_retries_number() { - return client_retries_number; - } - - public void setClient_retries_number(String client_retries_number) { - this.client_retries_number = client_retries_number; - } - - public String getRpc_timeout() { - return rpc_timeout; - } - - public void setRpc_timeout(String rpc_timeout) { - this.rpc_timeout = rpc_timeout; - } - - - public Integer getConnect_pool() { - return connect_pool; - } - - public void setConnect_pool(Integer connect_pool) { - this.connect_pool = connect_pool; - } - - - public String getTable() { - return table; - } - - public void setTable(String table) { - this.table = table; - } - - public void setCell_ttl_d(Integer cell_ttl_d) { - this.cell_ttl_d = cell_ttl_d; - } - - public Integer getCell_ttl_d() { - return cell_ttl_d; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java index e6ac85a..47e6e62 100644 --- a/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java +++ b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java @@ -2,6 +2,7 @@ package com.mesa.reportservice.configuration; import com.alibaba.nacos.api.config.ConfigType; import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; +import lombok.Data; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; @@ -12,12 +13,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** - * Created by wk1 on 2019/5/15. + * + * @author lijinyang + * @date 2024/1/24 */ +@Data @Component -//@ConfigurationProperties(prefix = "http") @NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) - public class HttpClientPool { private Integer maxTotal; @@ -34,39 +36,6 @@ public class HttpClientPool { private Integer socketTimeoutShort; - - public void setMaxTotal(Integer maxTotal) { - this.maxTotal = maxTotal; - } - - public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) { - this.defaultMaxPerRoute = defaultMaxPerRoute; - } - - public void setConnectTimeout(Integer connectTimeout) { - this.connectTimeout = connectTimeout; - } - - public void setConnectionRequestTimeout(Integer connectionRequestTimeout) { - this.connectionRequestTimeout = connectionRequestTimeout; - } - - public void setSocketTimeout(Integer socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) { - this.staleConnectionCheckEnabled = staleConnectionCheckEnabled; - } - - public void setSocketTimeoutShort(Integer socketTimeoutShort) { - this.socketTimeoutShort = socketTimeoutShort; - } - - public Integer getSocketTimeout() { - return socketTimeout; - } - /** * 首先实例化一个连接池管理器,设置最大连接数、并发连接数 * @@ -94,9 +63,7 @@ public class HttpClientPool { //HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象 HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); - httpClientBuilder.setConnectionManager(httpClientConnectionManager); - return httpClientBuilder; } @@ -128,8 +95,6 @@ public class HttpClientPool { .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled); } - - @Bean(name = "builderShort") public RequestConfig.Builder getBuilderShort() { RequestConfig.Builder builder = RequestConfig.custom(); @@ -138,6 +103,7 @@ public class HttpClientPool { .setSocketTimeout(socketTimeoutShort) .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled); } + /** * 使用builder构建一个RequestConfig对象 * @@ -160,6 +126,5 @@ public class HttpClientPool { public RequestConfig getRequestShortConfig(@Qualifier("builderShort") RequestConfig.Builder builder) { return builder.build(); } - } diff --git a/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java b/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java new file mode 100644 index 0000000..83c3ae5 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/QueryGatewayProperties.java @@ -0,0 +1,25 @@ +package com.mesa.reportservice.configuration; + +import com.alibaba.nacos.api.config.ConfigType; +import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Component +@NacosConfigurationProperties(prefix = "query-gateway", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) +public class QueryGatewayProperties { + + private static String gatewayIp; + + public void setGatewayIp(String gatewayIp) { + QueryGatewayProperties.gatewayIp = gatewayIp; + } + + public String getGatewayIp() { + return gatewayIp; + } +} diff --git a/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java b/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java new file mode 100644 index 0000000..cb154d0 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/SchedulerProperties.java @@ -0,0 +1,40 @@ +package com.mesa.reportservice.configuration; + +import com.alibaba.nacos.api.config.ConfigType; +import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; +import com.mesa.reportservice.bean.JobEntity; +import lombok.Data; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Data +@Component +@NacosConfigurationProperties(prefix = "scheduler", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) +public class SchedulerProperties { + + private static int jobThread; + + private static Map<String, JobEntity> mapResult = new ConcurrentHashMap<>(); + + public void setJobThread(int jobThread) { + SchedulerProperties.jobThread = jobThread; + } + + public final static String zkUUID = UUID.randomUUID().toString().replaceAll("-",""); + + public int getJobThread() { + return jobThread; + } + + public Map<String, JobEntity> getMapResult() { + return mapResult; + } +} diff --git a/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java b/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java new file mode 100644 index 0000000..91009b0 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/ThreadPoolConfig.java @@ -0,0 +1,35 @@ +package com.mesa.reportservice.configuration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.*; + +/** + * @author lijinyang + * @Date 2024/02/28/11:06 + */ +@Configuration +@EnableAsync +public class ThreadPoolConfig { + + @Bean(name = "threadPoolTaskExecutor") + public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心线程数 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(30); + // 阻塞队列长度 + taskExecutor.setQueueCapacity(30); + // 空闲线程最大存活时间 + taskExecutor.setKeepAliveSeconds(5); + // 拒绝策略 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + taskExecutor.initialize(); + return taskExecutor; + } + +} diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java b/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java deleted file mode 100644 index c4cebf6..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/ZkConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.mesa.reportservice.configuration; - -import cn.hutool.log.Log; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Created by wk1 on 2020/1/6. - */ -@Configuration -public class ZkConfig { - - @Autowired - private ZkProperties zkproperties; - Log logger = Log.get(); - - - - @Bean(name = "curatorConnect") - public CuratorFramework CuratorConnect() { - - CuratorFramework client = null; - - - RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkproperties.getElapsedTimeMs(), zkproperties.getRetryCount()); - - // 实例化Curator客户端 - client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象 - .connectString(zkproperties.getConnectString()) // 放入zookeeper服务器ip - .sessionTimeoutMs(zkproperties.getSessionTimeoutMs()).connectionTimeoutMs(zkproperties.getConnectionTimeoutMs()).retryPolicy(retryPolicy) // 设定会话时间以及重连策略 - .namespace(zkproperties.getNameSpace()).build(); // 设置命名空间以及开始建立连接 - if (zkproperties.getOpen() == 0) { - client.start(); - } else { - logger.info("repoertservice start local"); - } - - return client; - } - - -} diff --git a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java b/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java deleted file mode 100644 index f8ed4e0..0000000 --- a/src/main/java/com/mesa/reportservice/configuration/ZkProperties.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.mesa.reportservice.configuration; - -import com.alibaba.nacos.api.config.ConfigType; -import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -/** - * Created by wk1 on 2020/1/6. - */ -//@ConfigurationProperties(prefix = "zookeeper") -@Component -@NacosConfigurationProperties(prefix = "zookeeper", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) -public class ZkProperties { - - private int open; - - private int retryCount; - - private int elapsedTimeMs; - - private String connectString; - - private int sessionTimeoutMs; - - private int connectionTimeoutMs; - - private String nameSpace; - - - public int getOpen() { - return open; - } - - public void setOpen(int open) { - this.open = open; - } - - public int getRetryCount() { - return retryCount; - } - - public void setRetryCount(int retryCount) { - this.retryCount = retryCount; - } - - public int getElapsedTimeMs() { - return elapsedTimeMs; - } - - public void setElapsedTimeMs(int elapsedTimeMs) { - this.elapsedTimeMs = elapsedTimeMs; - } - - public String getConnectString() { - return connectString; - } - - public void setConnectString(String connectString) { - this.connectString = connectString; - } - - public int getSessionTimeoutMs() { - return sessionTimeoutMs; - } - - public void setSessionTimeoutMs(int sessionTimeoutMs) { - this.sessionTimeoutMs = sessionTimeoutMs; - } - - public int getConnectionTimeoutMs() { - return connectionTimeoutMs; - } - - public void setConnectionTimeoutMs(int connectionTimeoutMs) { - this.connectionTimeoutMs = connectionTimeoutMs; - } - - public String getNameSpace() { - return nameSpace; - } - - public void setNameSpace(String nameSpace) { - this.nameSpace = nameSpace; - } -} diff --git a/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java b/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java new file mode 100644 index 0000000..10b1ac3 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/ZookeeperConfig.java @@ -0,0 +1,46 @@ +package com.mesa.reportservice.configuration; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Configuration +public class ZookeeperConfig { + + @Autowired + private ZookeeperProperties zookeeperProperties; + + private static final Log logger = LogFactory.get(); + + @Bean(name = "curatorConnect") + public CuratorFramework curatorConnect() { + CuratorFramework client = null; + RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getElapsedTimeMs(), zookeeperProperties.getRetryCount()); + // 实例化Curator客户端 + // 使用工厂类来建造客户端的实例对象 + client = CuratorFrameworkFactory.builder() + // 放入zookeeper服务器ip + .connectString(zookeeperProperties.getConnectString()) + // 设定会话时间以及重连策略 + .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()).connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()).retryPolicy(retryPolicy) + // 设置命名空间以及开始建立连接 + .namespace(zookeeperProperties.getNameSpace()).build(); + if (zookeeperProperties.getOpen() == 0) { + client.start(); + } else { + logger.info("reportService start local"); + } + return client; + } +} diff --git a/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java b/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java new file mode 100644 index 0000000..d0a9c80 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/ZookeeperProperties.java @@ -0,0 +1,32 @@ +package com.mesa.reportservice.configuration; + +import com.alibaba.nacos.api.config.ConfigType; +import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Data +@Component +@NacosConfigurationProperties(prefix = "zookeeper", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) +public class ZookeeperProperties { + + private int open; + + private int retryCount; + + private int elapsedTimeMs; + + private String connectString; + + private int sessionTimeoutMs; + + private int connectionTimeoutMs; + + private String nameSpace; +} diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java index 0f8e61d..8d5f8c7 100644 --- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java +++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java @@ -1,67 +1,26 @@ package com.mesa.reportservice.controller; -import cn.hutool.log.Log; -import com.alibaba.fastjson.JSONArray; -import com.mesa.reportservice.bean.MonitorEntity; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.service.ZkService; +import com.mesa.reportservice.service.MonitorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import com.mesa.reportservice.configuration.ZkProperties; - -import java.util.Map; /** - * Created by wk on 2020/4/16. + * + * @author lijinyang + * @date 2024/1/24 */ @RestController @Component public class MonitorController { @Autowired - private MysqlService mysqlService; - @Autowired - private ZkService zs; - @Autowired - private GlobelConfig gc; - @Autowired - private ZkProperties zkProperties; - - Log logger = Log.get(); - + private MonitorService monitorService; @GetMapping(value = "/monitor") - public String getReportStatus() { - - - String json=""; - try { - MonitorEntity me = new MonitorEntity(); - Map<String,Long> numMap = mysqlService.getJobCount(); - me.setQueueJobNum(numMap.get("queueNum")); - me.setExcuteingJobNum(numMap.get("excuteingNum")); - me.setTodaySuccessJobNum(numMap.get("todaySuccessNum")); - me.setTodayErrorJobNum(numMap.get("todayErrorNum")); - me.setJoblist(gc.getMapresult()); - if(zs.isMaster()){ - me.setStatus("active"); - } - else{ - me.setStatus("standby"); - } - gc.getMapresult().size(); - Object obj = JSONArray.toJSON(me); - json = obj.toString(); - } catch (Exception e) { - logger.error(e.toString()); - json=e.toString(); - } - - return json; + public String getJobCount() { + return monitorService.getJobCount(); } - } diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index ea8345f..f927f4d 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -1,166 +1,25 @@ package com.mesa.reportservice.controller; -import cn.hutool.log.Log; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.configuration.HttpClientPool; -import com.mesa.reportservice.service.*; +import com.mesa.reportservice.service.ScheduledResultService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** - * @author wk1 - * @date 2020/1/8 + * + * @author lijinyang + * @date 2024/1/24 */ @Component @EnableScheduling public class ScheduledResultController { - Log logger = Log.get(); - - protected static ExecutorService pool = Executors.newFixedThreadPool(30); - @Autowired - private MysqlService ms; - @Autowired - private ExcuteService es; - @Autowired - private ExcuteProcessService eps; - @Autowired - private ZkService zs; - @Autowired - private ClickhouseService cs; - @Autowired - private HttpClientPool hc; @Autowired - private GlobelConfig gc; - + private ScheduledResultService scheduledResultService; @Scheduled(cron = "${scan.result.scheduled.plan}") - public void getExcuteResult() { - try { - if (zs.isMaster()) { - logger.info("start viewing results"); - //先查询数据库是否有异常状态任务,killquery - List<JobEntity> joblist = ms.getJobForExcute(); - for (JobEntity jobEntity : joblist) { - String sql = jobEntity.getQuerySql().trim(); - /* sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')"); - sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); - sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");*/ - - String queryid = cs.getQueryId(sql); - jobEntity.setQueryId(queryid); - - if (jobEntity.getIsValid() == 0) { - eps.killQuery(jobEntity); - gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0); - } else if (!gc.getMapresult().containsKey(jobEntity.getQueryId())) { - eps.reSet(jobEntity); - } - if (gc.getMapresult().containsKey(jobEntity.getQueryId())) { - if (jobEntity.getIsValid() == 0) { - eps.killQuery(jobEntity); - gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0); - } - } else { - eps.reSet(jobEntity); - } - - } - //遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态 - for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) { - logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); - long currentTime = System.currentTimeMillis(); - long excutetime = currentTime - entry.getValue().getStartTime(); - logger.info("excute time=" + excutetime + "ttl_time=" + hc.getSocketTimeout()); - if (("RUNNING").equals(entry.getValue().getState()) && excutetime > hc.getSocketTimeout()) { - entry.getValue().setState("DONE"); - entry.getValue().setExcuteStatus(500001); - eps.killQuery(entry.getValue()); - eps.updateResultMessage(entry.getValue()); - } else { - if (("RUNNING").equals(entry.getValue().getState())) { - eps.updateProcessMessage(entry.getValue()); - } else { - eps.updateResultMessage(entry.getValue()); - } - } - } - int rows = gc.getJob_thread() - gc.getMapresult().size(); - if (rows > 0) { - List<JobEntity> jobs = ms.getJobTask(rows); - for (JobEntity job : jobs) { - logger.info("start executing task"); - long begintime = System.currentTimeMillis()/1000; - job.setStartTime(begintime); - String sql = job.getQuerySql().trim(); - /* sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')"); - sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); - sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");*/ - job.setQuerySql(sql); - job.setState("RUNNING"); - job.setExcuteStatus(1); - job.setResultMessage("EXECUTING"); - job.setRowsRead(0L); - job.setElapsed(0); - job.setDoneProgress(0); - job.setResultRows(0L); - String queryid = ""; - queryid = cs.getQueryId(sql); - job.setQueryId(queryid); - if (queryid.equals("") ) { - job.setExcuteStatus(0); - job.setState("DONE"); - job.setEndTime(System.currentTimeMillis()/1000); - //status = 7 - job.setDoneProgress(1.00f); - job.setIsFailed(1); - job.setResultMessage("Unknow Error"); - } - if (("RUNNING").equals(job.getState())) { - if (ms.updateProcesses(job) != 0) { - gc.getMapresult().put(queryid, job); - pool.execute(new Runnable() { - @Override - public void run() { - es.excuteCkTask(job); - } - }); - } else { - logger.error("failed to update database status"); - - } - } else { - if (ms.updateProcesses(job) != 0) { - logger.error("task cannot be executed"); - } else { - logger.error("failed to update database status"); - - } - } - } - } else { - logger.info("no pending tasks"); - } - } else { - if (gc.getMapresult().size() > 0) { - for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) { - logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); - eps.killQuery(entry.getValue()); - } - gc.getMapresult().clear(); - } - } - } catch (Exception e) { - logger.error(e.toString()); - } + public void execute() { + scheduledResultService.execute(); } - } diff --git a/src/main/java/com/mesa/reportservice/enums/JobStates.java b/src/main/java/com/mesa/reportservice/enums/JobStates.java new file mode 100644 index 0000000..e723bbc --- /dev/null +++ b/src/main/java/com/mesa/reportservice/enums/JobStates.java @@ -0,0 +1,31 @@ +package com.mesa.reportservice.enums; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +public enum JobStates { + /** + * PENDING + */ + PENDING("PENDING"), + /** + * RUNNING + */ + RUNNING("RUNNING"), + /** + * DONE + */ + DONE("DONE"); + + private final String value; + + public String getValue() { + return value; + } + + JobStates(String value) { + this.value = value; + } +} diff --git a/src/main/java/com/mesa/reportservice/exception/SQSCode.java b/src/main/java/com/mesa/reportservice/exception/SQSCode.java new file mode 100644 index 0000000..d20f28b --- /dev/null +++ b/src/main/java/com/mesa/reportservice/exception/SQSCode.java @@ -0,0 +1,47 @@ +package com.mesa.reportservice.exception; + + +import lombok.Getter; + +/** + * @author 86158 + */ +@Getter +public enum SQSCode { + /** + * 1-3位:异常类型(HTTP协议状态码) + * 4-6位:自然排序 + */ + + /** + * Message + */ + PARAM_SYNTAX_ERROR(400001,"Param Syntax Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"), + + TEMPLATE_SQL_EXECUTION_ERROR(500001,"SQL Execution Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"), + TEMPLATE_SAVE_HBASE_ERROR(500002,"save HBase error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"), + TEMPLATE_SAVE_DATABASE_ERROR(500003,"save database error queryId = {0} jobId = {1} {2}"), + RESPONSE_DATA_ISNULL(500004,"response data is null"), + RESPONSE_BODY_ISNULL(500005,"response body is null"), + WRITE_HBASE_ERROR(500006,"write HBase error retry count"), + OUT_OF_MEMORY_ERROR(500007,"OutOfMemory Error"), + KILL_QUERY_ERROR(500008,"Kill Query Error"), + TEMPLATE_UNKNOWN_ERROR(500009,"Unknown Error ! ErrorCode = {0} queryId = {1} jobId = {2} executeSql = {3}"), + UPDATE_STATUS_ERROR(500010,"failed to update database status"), + TASK_CANNOT_EXECUTED(500010,"task cannot be executed"), + UNKNOWN_ERROR(500999,"Unknown Error"), + + //成功 + SUCCESS(200, "success"), + //失败 + ERROR(999, "error"); + + SQSCode(Integer code, String msg) { + this.code = code; + this.msg = msg; + } + + private Integer code; + + private String msg; +} diff --git a/src/main/java/com/mesa/reportservice/exception/SQSException.java b/src/main/java/com/mesa/reportservice/exception/SQSException.java new file mode 100644 index 0000000..d7db7f2 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/exception/SQSException.java @@ -0,0 +1,69 @@ +/** + + * + + * + * + */ + +package com.mesa.reportservice.exception; + + +import lombok.Data; + +/** + * 自定义异常 + * + * @author lijinyang + */ +@Data +public class SQSException extends RuntimeException { + private static final long serialVersionUID = 1L; + + private String msg; + private int code = SQSCode.ERROR.getCode(); + + public SQSException(SQSCode sqsCode) { + this.code = sqsCode.getCode(); + this.msg = sqsCode.getMsg(); + } + + public SQSException(String msg) { + super(msg); + this.msg = msg; + } + + public SQSException(String msg, Throwable e) { + super(msg, e); + this.msg = msg; + } + + public SQSException(String msg, int code) { + super(msg); + this.msg = msg; + this.code = code; + } + + public SQSException(String msg, int code, Throwable e) { + super(msg, e); + this.msg = msg; + this.code = code; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + +} diff --git a/src/main/java/com/mesa/reportservice/mapper/ReportResultMapper.java b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java index d0b845c..4546b12 100644 --- a/src/main/java/com/mesa/reportservice/mapper/ReportResultMapper.java +++ b/src/main/java/com/mesa/reportservice/mapper/JobMapper.java @@ -1,16 +1,22 @@ package com.mesa.reportservice.mapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.mesa.reportservice.bean.JobEntity; import org.apache.ibatis.annotations.Mapper; import java.util.List; import java.util.Map; +/** + * + * @author lijinyang + * @date 2024/1/24 + */ @Mapper -public interface ReportResultMapper { +public interface JobMapper extends BaseMapper<JobEntity> { - List<JobEntity> getJobForExcute(); + List<JobEntity> getRunningJob(); List<JobEntity> getJobTask(Map<String, Object> map); diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java deleted file mode 100644 index 9c9a978..0000000 --- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.mesa.reportservice.service; - -import com.mesa.reportservice.bean.HttpResult; - -import java.io.IOException; - -/** - * Created by wk1 on 2020/1/2. - */ -public interface ClickhouseService { - - String getQueryId(String query) ; - - - HttpResult queryForExcute(String query) throws IOException; - - - HttpResult queryForProcess(String queryId) throws IOException; - - - HttpResult queryForCancel(String queryId) throws IOException; - - - -} diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteService.java b/src/main/java/com/mesa/reportservice/service/ExcuteService.java deleted file mode 100644 index 7b6ccae..0000000 --- a/src/main/java/com/mesa/reportservice/service/ExcuteService.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.mesa.reportservice.service; - -import com.mesa.reportservice.bean.JobEntity; - -/** - * Created by wk1 on 2020/1/8. - */ -public interface ExcuteService { - - void excuteCkTask(JobEntity jl); - -} diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExecuteProcessService.java index 4660e9a..6355d35 100644 --- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java +++ b/src/main/java/com/mesa/reportservice/service/ExecuteProcessService.java @@ -5,13 +5,18 @@ import com.mesa.reportservice.bean.JobEntity; import java.io.IOException; /** - * Created by wk1 on 2020/1/8. + * + * @author lijinyang + * @date 2024/1/24 */ -public interface ExcuteProcessService { +public interface ExecuteProcessService { void updateResultMessage(JobEntity job); + void reSet(JobEntity jobEntity); - void updateProcessMessage(JobEntity job) throws IOException; - void killQuery(JobEntity jobEntity) throws IOException; + + void updateProcessMessage(JobEntity job); + + void killQuery(JobEntity jobEntity); } diff --git a/src/main/java/com/mesa/reportservice/service/ExecuteService.java b/src/main/java/com/mesa/reportservice/service/ExecuteService.java new file mode 100644 index 0000000..59a518b --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/ExecuteService.java @@ -0,0 +1,13 @@ +package com.mesa.reportservice.service; + +import com.mesa.reportservice.bean.JobEntity; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +public interface ExecuteService { + + void queryQgw(JobEntity jobEntity); +} diff --git a/src/main/java/com/mesa/reportservice/service/HbaseService.java b/src/main/java/com/mesa/reportservice/service/HBaseService.java index 46db5ef..c9e5728 100644 --- a/src/main/java/com/mesa/reportservice/service/HbaseService.java +++ b/src/main/java/com/mesa/reportservice/service/HBaseService.java @@ -7,11 +7,11 @@ import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; import java.util.Map; /** - * Created by wk1 on 2020/1/2. + * + * @author lijinyang + * @date 2024/1/24 */ -public interface HbaseService { - - - Boolean put(JobEntity jobEntity) throws Exception; +public interface HBaseService { + Boolean put(JobEntity jobEntity); } diff --git a/src/main/java/com/mesa/reportservice/service/JobService.java b/src/main/java/com/mesa/reportservice/service/JobService.java new file mode 100644 index 0000000..706c75a --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/JobService.java @@ -0,0 +1,25 @@ +package com.mesa.reportservice.service; + + +import com.baomidou.mybatisplus.extension.service.IService; +import com.mesa.reportservice.bean.JobEntity; + +import java.util.List; +import java.util.Map; + + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +public interface JobService extends IService<JobEntity> { + + List<JobEntity> getRunningJob() ; + + List<JobEntity> getJobTask(int Rows) ; + + int updateProcesses(JobEntity job); + + Map<String,Long> getJobCount(); +} diff --git a/src/main/java/com/mesa/reportservice/service/MonitorService.java b/src/main/java/com/mesa/reportservice/service/MonitorService.java index 88dc242..5bccaea 100644 --- a/src/main/java/com/mesa/reportservice/service/MonitorService.java +++ b/src/main/java/com/mesa/reportservice/service/MonitorService.java @@ -1,14 +1,15 @@ package com.mesa.reportservice.service; - -import org.springframework.stereotype.Service; - /** - * Created by wk1 on 2020/1/6. + * + * @author lijinyang + * @date 2024/1/24 */ public interface MonitorService { - void addSuccess(); + void addFail(); + + String getJobCount(); }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/MysqlService.java b/src/main/java/com/mesa/reportservice/service/MysqlService.java deleted file mode 100644 index 036bad9..0000000 --- a/src/main/java/com/mesa/reportservice/service/MysqlService.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.mesa.reportservice.service; - - -import com.mesa.reportservice.bean.JobEntity; - -import java.util.List; -import java.util.Map; - - -public interface MysqlService { - - - List<JobEntity> getJobForExcute() ; - - - List<JobEntity> getJobTask(int Rows) ; - - - int updateProcesses(JobEntity job); - - - Map<String,Long> getJobCount() throws Exception; - - } diff --git a/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java b/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java new file mode 100644 index 0000000..7f7be78 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/QueryGatewayService.java @@ -0,0 +1,21 @@ +package com.mesa.reportservice.service; + +import com.mesa.reportservice.bean.HttpResult; + +import java.io.IOException; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +public interface QueryGatewayService { + + String getQueryId(String query) ; + + HttpResult queryJob(String query) throws IOException; + + HttpResult queryJobProcess(String queryId); + + HttpResult deleteJob(String queryId); +} diff --git a/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java b/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java new file mode 100644 index 0000000..476579a --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/ScheduledResultService.java @@ -0,0 +1,11 @@ +package com.mesa.reportservice.service; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +public interface ScheduledResultService { + + void execute(); +} diff --git a/src/main/java/com/mesa/reportservice/service/ZkService.java b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java index d42f636..69f77d2 100644 --- a/src/main/java/com/mesa/reportservice/service/ZkService.java +++ b/src/main/java/com/mesa/reportservice/service/ZookeeperService.java @@ -6,11 +6,12 @@ import org.springframework.stereotype.Service; import java.net.InetAddress; /** - * Created by wk1 on 2020/1/6. + * + * @author lijinyang + * @date 2024/1/24 */ @Service -public interface ZkService { +public interface ZookeeperService { - - boolean isMaster() throws Exception; + boolean isMaster(); }
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java deleted file mode 100644 index 2df61f9..0000000 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ /dev/null @@ -1,182 +0,0 @@ -package com.mesa.reportservice.service.impl; - -import cn.hutool.log.Log; -import com.alibaba.fastjson.JSON; -import com.mesa.reportservice.bean.HttpResult; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.service.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Created by wk1 on 2020/1/8. - */ -@Service - -public class ExcuteProcessServiceImpl implements ExcuteProcessService { - Log logger = Log.get(); - - - @Autowired - private ClickhouseService cs; - @Autowired - private MysqlService ms; - @Autowired - private HbaseService hs; - @Autowired - private MonitorService mons; - @Autowired - private GlobelConfig gc; - @Override - public void updateResultMessage(JobEntity je) { - je.setState("DONE"); - je.setEndTime(System.currentTimeMillis()/1000); - je.setDoneProgress(1.00f); - je.setIsFailed(1); - try { - if (je.getIsValid() == 0) { - //status = 9 - je.setResultMessage("CANCEL"); - } else { - if (je.getExcuteStatus() >= 20000000 && je.getExcuteStatus() < 30000000) { - - Boolean isok = saveToHbase(je); - if (isok) { - //status = 2 - je.setResultMessage("OK"); - je.setIsFailed(0); - logger.info("success save to hbase query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - } else { - //status = 5 - je.setResultMessage("Write Data Error"); - mons.addFail(); - logger.error("save hbase error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - } - } else if (je.getExcuteStatus() >= 40000000 && je.getExcuteStatus() < 50000000) { - //status = 3 - je.setResultMessage("Param Syntax Error"); - logger.error("Param Syntax Error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - - } else if (je.getExcuteStatus() == 50001300) { - //status = 4 - je.setResultMessage("SQL Execution Error"); - logger.error("SQL Execution Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - - } else { - //status = 7 - je.setResultMessage("Unknow Error"); - logger.error("Unknow Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - } - - } - - int number = 0; - int z = 3; - do { - number = ms.updateProcesses(je); - z--; - } - while (number != 1 && z >= 0); - } catch (Exception e) { - //status = 10 - je.setResultMessage("Database Error"); - ms.updateProcesses(je); - logger.error("save database error job_id =" + je.getJobId()+" query_id=" + je.getQueryId() + e.toString()); - - } finally { - saveToMonitor(je); - gc.getMapresult().remove(je.getQueryId()); - } - - - } - - @Override - public void reSet(JobEntity jobEntity) { - //status = 0 - jobEntity.setState("PENDING"); - jobEntity.setResultMessage("Re Execution"); - ms.updateProcesses(jobEntity); - - } - - @Override - public void killQuery(JobEntity jobEntity) throws IOException { - - cs.queryForCancel(jobEntity.getQueryId()); - - } - - @Override - public void updateProcessMessage(JobEntity job) throws IOException { - - HttpResult hr = cs.queryForProcess(job.getQueryId()); - if (hr!=null) { - String rs = hr.getBody().trim(); - Map data = JSON.parseObject(rs); - - if (!rs.isEmpty() && !rs.equals("")) { - List listdata = (List) data.get("data"); - if (null != listdata && listdata.size() > 0) { - Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0))); - long read_rows = Long.parseLong(map.get("rows_read").toString()); - float elapsed = Float.parseFloat(map.get("elapsed").toString()); - double persent = Double.parseDouble(map.get("percent").toString()); - int process = (int) (persent * 100); - job.setElapsed((int) elapsed); - job.setRowsRead(read_rows); - if(job.getDoneProgress()<process){ - job.setDoneProgress(process); - } - if (job.getRowsRead() != 0 || job.getElapsed() != 0) { - ms.updateProcesses(job); - } - } else { - logger.info("responsedata is null"); - } - } else { - logger.error("responsebody is null"); - } - } - else{ - logger.error("responsebody is null"); - } - - } - /** - * 结果存入hbase - */ - public Boolean saveToHbase(JobEntity entity) { - int k = 3; - Boolean isok = false; - do { - try { - k--; - isok = hs.put(entity); - break; - } catch (Exception e) { - logger.error("write HBase error retry count" + (3 - k) + e.toString()); - k--; - } - } - while (k >= 0); - return isok; - } - - /** - * promethus记录结果 - */ - public void saveToMonitor(JobEntity entity) { - if (("DONE").equals(entity.getState()) && entity.getIsFailed() == 0 && entity.getIsValid() == 1) { - - mons.addSuccess(); - } else { - mons.addFail(); - } - } -} diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java deleted file mode 100644 index 8619b87..0000000 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.mesa.reportservice.service.impl; - -import cn.hutool.log.Log; -import com.alibaba.fastjson.JSON; -import com.mesa.reportservice.bean.HttpResult; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.service.ClickhouseService; -import com.mesa.reportservice.service.ExcuteService; -import com.mesa.reportservice.service.MysqlService; -import io.netty.channel.ConnectTimeoutException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.net.SocketTimeoutException; -import java.util.Map; - - -/** - * Created by wk1 on 2020/1/8. - */ -@Service -public class ExcuteserviceImpl implements ExcuteService { - Log logger = Log.get(); - - - @Autowired - private ClickhouseService cs; - @Autowired - private MysqlService ms; - @Autowired - private GlobelConfig gc; - - @Override - public void excuteCkTask(JobEntity job) { - - - logger.info("execute queryid=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size()); - HttpResult hr = new HttpResult(); - int k = 3; - do { - try { - hr = cs.queryForExcute(job.getQuerySql()); - if (hr != null) { - Map mapresult = JSON.parseObject(hr.getBody()); - int query_status = Integer.parseInt(mapresult.get("status").toString()); - logger.info("httpcode=" + hr.getCode() +" status="+query_status); - if (hr.getCode() == 200 && query_status == 200) { - - k = 0; - mapresult = JSON.parseObject(hr.getBody()); - - Map rows = (Map) mapresult.get("statistics"); - job.setResultRows(Long.parseLong(rows.get("result_rows").toString())); - job.setRowsRead(Long.parseLong(rows.get("rows_read").toString())); - job.setBytesRead(Long.parseLong(rows.get("bytes_read").toString())); - job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString())); - job.setResult(hr.getBody()); - job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString())); - job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString())); - logger.info("success resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql()); - - } else { - k = 0; - job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString())); - logger.error("excute sql Error "); - } - } else { - throw new SocketTimeoutException(); - } - } catch (SocketTimeoutException e) { - k--; - job.setResultMessage(e.toString()); - if (k == 0) { - job.setExcuteStatus(500001); - job.setResultMessage("SQL Execution Error excute query time out"); - logger.info("timeout resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql()); - } else { - logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); - } - - - } catch (ConnectTimeoutException e) { - - job.setExcuteStatus(555999); - job.setResultMessage(e.toString()); - logger.error("Unknow Error" + e.toString()); - k = 0; - - } catch (OutOfMemoryError e) { - - job.setExcuteStatus(555999); - job.setResultMessage("result too large"); - logger.error("outofmemery Error" + e.toString()); - k = 0; - } catch (Exception e) { - job.setExcuteStatus(555999); - job.setResultMessage(e.toString()); - logger.error("Unknow Error" + e.toString()); - k = 0; - } - try { - cs.queryForCancel(job.getQueryId()); - } catch (Exception e) { - logger.error("Kill Query Error" + e.toString()); - } - - } - while (k > 0); - job.setState("DONE"); - job.setEndTime(System.currentTimeMillis()/1000); - } - -} diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java new file mode 100644 index 0000000..4e514d1 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteProcessServiceImpl.java @@ -0,0 +1,176 @@ +package com.mesa.reportservice.service.impl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; +import com.mesa.reportservice.service.*; +import com.mesa.reportservice.util.Constant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.text.MessageFormat; +import java.util.List; +import java.util.Map; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Service +public class ExecuteProcessServiceImpl implements ExecuteProcessService { + + private static final Log logger = LogFactory.get(); + + @Autowired + private QueryGatewayService queryGatewayService; + + @Autowired + private JobService jobService; + + @Autowired + private HBaseService hBaseService; + + @Autowired + private MonitorService monitorService; + + @Autowired + private SchedulerProperties schedulerProperties; + + @Override + public void updateResultMessage(JobEntity jobEntity) { + String msg = ""; + jobEntity.setState(JobStates.DONE.getValue()); + jobEntity.setEndTime(System.currentTimeMillis()/1000); + jobEntity.setDoneProgress(1.00f); + jobEntity.setIsFailed(1); + try { + if (jobEntity.getIsValid() == 0) { + jobEntity.setResultMessage(Constant.CANCEL); + } else { + if (jobEntity.getExecuteStatus() == Constant.JOB_SUCCESS) { + Boolean isOk = saveToHBase(jobEntity); + if (isOk) { + jobEntity.setResultMessage(Constant.OK); + jobEntity.setIsFailed(0); + logger.info(MessageFormat.format(Constant.LOG_SUCCESS_SAVE_TO_HBASE,jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql())); + } else { + jobEntity.setResultMessage(Constant.WRITE_DATA_ERROR); + monitorService.addFail(); + msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_HBASE_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); + logger.error(msg); + } + } else if (jobEntity.getExecuteStatus() >= Constant.JOB_BAD_REQUEST_THRESHOLD && jobEntity.getExecuteStatus() < Constant.JOB_INTERNAL_SERVER_ERROR_THRESHOLD) { + jobEntity.setResultMessage(Constant.PARAM_SYNTAX_ERROR); + msg = MessageFormat.format(SQSCode.PARAM_SYNTAX_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); + logger.error(msg); + } else if (jobEntity.getExecuteStatus() == Constant.JOB_SQL_EXECUTION_ERROR_CODE) { + jobEntity.setResultMessage(Constant.SQL_EXECUTION_ERROR); + msg = MessageFormat.format(SQSCode.TEMPLATE_SQL_EXECUTION_ERROR.getMsg(),jobEntity.getExecuteStatus(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); + logger.error(msg); + } else { + jobEntity.setResultMessage(Constant.UNKNOWN_ERROR); + msg = MessageFormat.format(SQSCode.UNKNOWN_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),jobEntity.getQuerySql()); + logger.error(msg); + } + } + int number = 0; + int z = 3; + do { + number = jobService.updateProcesses(jobEntity); + z--; + } + while (number != 1 && z >= 0); + } catch (Exception e) { + jobEntity.setResultMessage(Constant.DATABASE_ERROR); + jobService.updateProcesses(jobEntity); + msg = MessageFormat.format(SQSCode.TEMPLATE_SAVE_DATABASE_ERROR.getMsg(),jobEntity.getQueryId(),jobEntity.getJobId(),e.toString()); + logger.error(msg); + } finally { + saveToMonitor(jobEntity); + schedulerProperties.getMapResult().remove(jobEntity.getQueryId()); + } + } + + @Override + public void reSet(JobEntity jobEntity){ + jobEntity.setState(JobStates.PENDING.getValue()); + jobEntity.setResultMessage(Constant.RE_EXECUTION); + jobService.updateProcesses(jobEntity); + } + + @Override + public void killQuery(JobEntity jobEntity){ + queryGatewayService.deleteJob(jobEntity.getQueryId()); + } + + @Override + public void updateProcessMessage(JobEntity jobEntity){ + HttpResult httpResult = queryGatewayService.queryJobProcess(jobEntity.getQueryId()); + if (httpResult!=null) { + String result = httpResult.getBody().trim(); + Map data = JSON.parseObject(result); + if (!result.isEmpty() && !result.equals("")) { + List listData = (List) data.get("data"); + if (null != listData && listData.size() > 0) { + Map map = JSON.parseObject(JSON.toJSONString(listData.get(0))); + long readRows = Long.parseLong(map.get("rows_read").toString()); + float elapsed = Float.parseFloat(map.get("elapsed").toString()); + double percent = Double.parseDouble(map.get("percent").toString()); + int process = (int) (percent * 100); + jobEntity.setElapsed((int) elapsed); + jobEntity.setRowsRead(readRows); + if(jobEntity.getDoneProgress()<process){ + jobEntity.setDoneProgress(process); + } + if (jobEntity.getRowsRead() != 0 || jobEntity.getElapsed() != 0) { + jobService.updateProcesses(jobEntity); + } + } else { + logger.error(SQSCode.RESPONSE_DATA_ISNULL.getMsg()); + throw new SQSException(SQSCode.RESPONSE_DATA_ISNULL); + } + } else { + logger.error(SQSCode.RESPONSE_BODY_ISNULL.getMsg()); + throw new SQSException(SQSCode.RESPONSE_BODY_ISNULL); + } + } else { + logger.error(SQSCode.RESPONSE_BODY_ISNULL.getMsg()); + throw new SQSException(SQSCode.RESPONSE_BODY_ISNULL); + } + } + /** + * 结果存入hbase + */ + public Boolean saveToHBase(JobEntity jobEntity) { + int k = 3; + Boolean isok = false; + do { + try { + k--; + isok = hBaseService.put(jobEntity); + break; + } catch (Exception e) { + logger.error(MessageFormat.format(SQSCode.WRITE_HBASE_ERROR.getMsg(),(3 - k),e.toString())); + k--; + } + } + while (k >= 0); + return isok; + } + + + public void saveToMonitor(JobEntity jobEntity) { + if ((JobStates.DONE.getValue()).equals(jobEntity.getState()) && jobEntity.getIsFailed() == 0 && jobEntity.getIsValid() == 1) { + monitorService.addSuccess(); + } else { + monitorService.addFail(); + } + } +} diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java new file mode 100644 index 0000000..594d42b --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/impl/ExecuteServiceImpl.java @@ -0,0 +1,109 @@ +package com.mesa.reportservice.service.impl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; +import com.mesa.reportservice.service.QueryGatewayService; +import com.mesa.reportservice.service.ExecuteService; +import com.mesa.reportservice.util.Constant; +import io.netty.channel.ConnectTimeoutException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; + +import java.net.SocketTimeoutException; +import java.util.Map; + + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Service +public class ExecuteServiceImpl implements ExecuteService { + + private static final Log logger = LogFactory.get(); + + @Autowired + private QueryGatewayService queryGatewayService; + + @Autowired + private SchedulerProperties schedulerProperties; + + @Override + public void queryQgw(JobEntity job) { + logger.info("execute queryId=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapResult size=" + schedulerProperties.getMapResult().size()); + HttpResult httpResult = null; + int k = 3; + do { + try { + httpResult = queryGatewayService.queryJob(job.getQuerySql()); + if (httpResult != null) { + Map mapresult = JSON.parseObject(httpResult.getBody()); + int queryStatus = Integer.parseInt(mapresult.get("status").toString()); + logger.info("httpCode=" + httpResult.getCode() +" status="+queryStatus); + if (httpResult.getCode() == HttpStatus.OK.value() && queryStatus == HttpStatus.OK.value()) { + k = 0; + mapresult = JSON.parseObject(httpResult.getBody()); + Map rows = (Map) mapresult.get("statistics"); + job.setResultRows(Long.parseLong(rows.get("result_rows").toString())); + job.setRowsRead(Long.parseLong(rows.get("rows_read").toString())); + job.setBytesRead(Long.parseLong(rows.get("bytes_read").toString())); + job.setResultBytes(Long.parseLong(rows.get("result_bytes").toString())); + job.setResult(httpResult.getBody()); + job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString())); + job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString())); + logger.info("success resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql()); + } else { + k = 0; + job.setExecuteStatus(Integer.parseInt(mapresult.get("code").toString())); + logger.error("SQL Execution Error "); + } + } else { + throw new SocketTimeoutException(); + } + } catch (SocketTimeoutException e) { + k--; + job.setResultMessage(e.toString()); + if (k == 0) { + job.setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT); + job.setResultMessage(Constant.QUERY_TIME_OUT); + logger.info("timeout resultId = " + job.getJobId() + " queryId=" + job.getQueryId() + " sql=" + job.getQuerySql()); + } else { + logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); + } + } catch (ConnectTimeoutException e) { + job.setExecuteStatus(Constant.JOB_ERROR); + job.setResultMessage(e.toString()); + logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString()); + k = 0; + } catch (OutOfMemoryError e) { + job.setExecuteStatus(Constant.JOB_ERROR); + job.setResultMessage(Constant.RESULT_TOO_LARGE); + logger.error(SQSCode.OUT_OF_MEMORY_ERROR.getMsg() + e.toString()); + k = 0; + } catch (Exception e) { + job.setExecuteStatus(Constant.JOB_ERROR); + job.setResultMessage(e.toString()); + logger.error(SQSCode.UNKNOWN_ERROR.getMsg() + e.toString()); + k = 0; + } + try { + queryGatewayService.deleteJob(job.getQueryId()); + } catch (Exception e) { + logger.error(SQSCode.KILL_QUERY_ERROR.getMsg() + e.toString()); + throw new SQSException(SQSCode.KILL_QUERY_ERROR); + } + } + while (k > 0); + job.setState(JobStates.DONE.getValue()); + job.setEndTime(System.currentTimeMillis()/1000); + } +} diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HBaseServiceImpl.java index 90cafb3..2ec411d 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/HBaseServiceImpl.java @@ -2,9 +2,11 @@ package com.mesa.reportservice.service.impl; import cn.hutool.core.io.IoUtil; import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.configuration.HbaseProperties; -import com.mesa.reportservice.service.HbaseService; +import com.mesa.reportservice.configuration.HBaseProperties; +import com.mesa.reportservice.exception.SQSException; +import com.mesa.reportservice.service.HBaseService; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; @@ -17,33 +19,30 @@ import java.io.IOException; /** - * Created by wk1 on 2019/5/15. + * + * @author lijinyang + * @date 2024/1/24 */ - @Service -public class HbaseServiceImpl implements HbaseService { +public class HBaseServiceImpl implements HBaseService { @Autowired - private Connection hbaseConnection; + private Connection hBaseConnection; @Autowired - private HbaseProperties hbproperties; - Log logger = Log.get(); + private HBaseProperties hBaseProperties; + + private static final Log logger = LogFactory.get(); @Override public Boolean put(JobEntity jobEntity) { - - Boolean status = false; Table table = null; try { - table = hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable())); + table = hBaseConnection.getTable(TableName.valueOf(hBaseProperties.getTable())); Put put = new Put(Bytes.toBytes(jobEntity.getJobId())); put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), Bytes.toBytes(jobEntity.getResult())); - - put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql())); - //put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql())); - + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("execute_sql"), Bytes.toBytes(jobEntity.getQuerySql())); if (jobEntity.getRowsRead() != 0) { put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getRowsRead())); } @@ -53,8 +52,8 @@ public class HbaseServiceImpl implements HbaseService { if (jobEntity.getMemoryUsage() != null) { put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemoryUsage())); } - if (hbproperties.getCell_ttl_d() != null) { - put.setTTL(hbproperties.getCell_ttl_d() * 86400 * 1000); + if (hBaseProperties.getCellTtlD() != null) { + put.setTTL(convertDaysToMilliseconds(hBaseProperties.getCellTtlD())); } table.put(put); status = true; @@ -66,5 +65,5 @@ public class HbaseServiceImpl implements HbaseService { return status; } - + private long convertDaysToMilliseconds(Integer days) { return 1000L*3600*24*days; } } diff --git a/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java new file mode 100644 index 0000000..100b93f --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/impl/JobServiceImpl.java @@ -0,0 +1,51 @@ +package com.mesa.reportservice.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.mapper.JobMapper; +import com.mesa.reportservice.service.JobService; +import org.springframework.stereotype.Service; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Service +public class JobServiceImpl extends ServiceImpl<JobMapper, JobEntity> implements JobService { + + @Override + public List<JobEntity> getRunningJob() { + return this.baseMapper.getRunningJob(); + } + + @Override + public List<JobEntity> getJobTask(int rows) { + Map map = new HashMap<String,Object>(); + map.put("rows", rows); + return this.baseMapper.getJobTask(map); + } + + @Override + public int updateProcesses(JobEntity job){ + long currentTime = System.currentTimeMillis()/1000; + job.setLastUpdateTime(currentTime); + return this.baseMapper.updateProcesses(job); + } + + @Override + public Map<String,Long> getJobCount(){ + LocalDate currentDate = LocalDate.now(); + long lastUpdateTime = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); + Map map = new HashMap<String,Object>(); + map.put("lastUpdateTime", lastUpdateTime); + return this.baseMapper.getJobCount(map); + } + +}
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java index 008d207..0e57d84 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/MonitorServiceImpl.java @@ -1,61 +1,75 @@ package com.mesa.reportservice.service.impl; -import cn.hutool.log.Log; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.configuration.ZkProperties; +import com.alibaba.fastjson.JSONArray; +import com.mesa.reportservice.bean.MonitorEntity; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.service.JobService; import com.mesa.reportservice.service.MonitorService; -import com.mesa.reportservice.service.ZkService; +import com.mesa.reportservice.service.ZookeeperService; import io.micrometer.core.instrument.*; -import org.apache.commons.collections.map.HashedMap; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.ExistsBuilder; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; -import java.net.InetAddress; -import java.util.List; import java.util.Map; /** - * Created by wk1 on 2020/1/6. + * + * @author lijinyang + * @date 2024/1/24 */ @Service public class MonitorServiceImpl implements MonitorService { - Log logger = Log.get(); + @Autowired + private JobService mysqlService; @Autowired - MeterRegistry registry; + private ZookeeperService zookeeperService; - private Counter counter_report_success; - private Counter counter_report_fail; + @Autowired + private SchedulerProperties schedulerProperties; - @PostConstruct - public void init(){ + @Autowired + private MeterRegistry meterRegistry; + + private Counter counterReportSuccess; - counter_report_success = registry.counter("report_success_count", "reportJob", "report_success"); + private Counter counterReportFail; - counter_report_fail = registry.counter("report_fail_count", "reportJob", "report_error"); + @PostConstruct + public void init(){ + counterReportSuccess = meterRegistry.counter("report_success_count", "reportJob", "report_success"); + counterReportFail = meterRegistry.counter("report_fail_count", "reportJob", "report_error"); } + @Override public void addSuccess() { - counter_report_success.increment(); + counterReportSuccess.increment(); } + @Override public void addFail() { - counter_report_fail.increment(); + counterReportFail.increment(); } - - public MeterRegistry getRegistry() { - return registry; - } - - public void setRegistry(MeterRegistry registry) { - this.registry = registry; + @Override + public String getJobCount() { + String json=""; + MonitorEntity monitorEntity = new MonitorEntity(); + Map<String,Long> numMap = mysqlService.getJobCount(); + monitorEntity.setQueueJobNum(numMap.get("queueNum")); + monitorEntity.setExecutingJobNum(numMap.get("executingNum")); + monitorEntity.setTodaySuccessJobNum(numMap.get("todaySuccessNum")); + monitorEntity.setTodayErrorJobNum(numMap.get("todayErrorNum")); + monitorEntity.setJobList(schedulerProperties.getMapResult()); + if(zookeeperService.isMaster()){ + monitorEntity.setStatus("active"); + }else{ + monitorEntity.setStatus("standby"); + } + Object obj = JSONArray.toJSON(monitorEntity); + json = obj.toString(); + return json; } } diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java deleted file mode 100644 index bfc95ac..0000000 --- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.mesa.reportservice.service.impl; - -import cn.hutool.log.Log; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.mapper.ReportResultMapper; -import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.util.DateUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.time.LocalDate; -import java.time.ZoneOffset; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Created by wk1 on 2020/1/2. - */ -@Service -public class MysqlServiceImpl implements MysqlService { - Log logger = Log.get(); - - @Autowired - public ReportResultMapper rrm; - - @Override - public List<JobEntity> getJobForExcute() { - - return rrm.getJobForExcute(); - - } - - @Override - public List<JobEntity> getJobTask(int rows) { - - - String current_time = DateUtil.getDate(); - HashMap map = new HashMap(); - map.put("issuedtime", current_time); - map.put("rows", rows); - - return rrm.getJobTask(map); - - } - - @Override - public int updateProcesses(JobEntity job){ - - long currentTime = System.currentTimeMillis()/1000; - job.setLastUpdateTime(currentTime); - return rrm.updateProcesses(job); - } - - - - @Override - public Map<String,Long> getJobCount() { - LocalDate currentDate = LocalDate.now(); - long current_date = currentDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); - HashMap map = new HashMap(); - map.put("lastUpdateTime", current_date); - return rrm.getJobCount(map); - } - -}
\ No newline at end of file diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImpl.java index be22dcb..2831c2e 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/QueryGatewayServiceImpl.java @@ -3,10 +3,12 @@ package com.mesa.reportservice.service.impl; import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.URLUtil; import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; import com.mesa.reportservice.bean.HttpResult; -import com.mesa.reportservice.configuration.ClickhouseConfig; -import com.mesa.reportservice.service.ClickhouseService; +import com.mesa.reportservice.configuration.QueryGatewayProperties; +import com.mesa.reportservice.service.QueryGatewayService; +import com.mesa.reportservice.util.Constant; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; @@ -16,187 +18,147 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; import java.util.List; import java.util.Map; /** - * Created by wk1 on 2019/5/15. + * + * @author lijinyang + * @date 2024/1/24 */ - @Service -public class ClickhouseServiceImpl implements ClickhouseService { +public class QueryGatewayServiceImpl implements QueryGatewayService { @Autowired private CloseableHttpClient httpClient; + @Autowired - private ClickhouseConfig clickhouseConfig; + private QueryGatewayProperties queryGatewayProperties; @Autowired @Qualifier("RequestShortConfig") - private RequestConfig RequestshortConfig; + private RequestConfig RequestShortConfig; @Autowired @Qualifier("RequestLongConfig") private RequestConfig RequestLongConfig; - Log logger = Log.get(); + + private static final Log logger = LogFactory.get(); @Override public String getQueryId(String query) { - CloseableHttpResponse response = null; - String query_id = ""; + String queryId = ""; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?query="); + String url = URLUtil.normalize(MessageFormat.format(Constant.URL_GET_QUERY_ID,queryGatewayProperties.getGatewayIp().trim())); String sql = null; sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20"); url = url + sql; HttpPost httpPost = new HttpPost(url); // 加入配置信息 - httpPost.setConfig(RequestshortConfig); + httpPost.setConfig(RequestShortConfig); response = this.httpClient.execute(httpPost); - if (response.getStatusLine().getStatusCode() != 200) { + if (response.getStatusLine().getStatusCode() != HttpStatus.OK.value()) { throw new IOException(); } else { - Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8")); - List listdata = (List) data.get("data"); - Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0))); - query_id = map.get("queryId").toString(); - + Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + List listData = (List) data.get("data"); + Map map = JSON.parseObject(JSON.toJSONString(listData.get(0))); + queryId = map.get("queryId").toString(); } } catch (IOException e) { logger.error(e.toString()); - } finally { IoUtil.close(response); } - return query_id; + return queryId; } /** - * 带参数的post请求 + * 带参数的get请求 * * @return * @throws Exception */ @Override - public HttpResult queryForExcute(String query) throws UnsupportedEncodingException { - // 声明httpPost请求 + public HttpResult queryJob(String query) throws UnsupportedEncodingException { + // 声明httpGet请求 CloseableHttpResponse response = null; // 发起请求 - HttpResult rs = null; + HttpResult httpResult = null; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?rawSQL="); + String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_EXECUTE,queryGatewayProperties.getGatewayIp().trim())); query = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20"); - String jobsql = url + query; - HttpGet httpGet = new HttpGet(jobsql); + String jobSql = url + query; + HttpGet httpGet = new HttpGet(jobSql); // 加入配置信息 httpGet.setConfig(RequestLongConfig); - - response = this.httpClient.execute(httpGet); - rs = new HttpResult(); - rs.setCode(response.getStatusLine().getStatusCode()); - rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8")); - + httpResult = new HttpResult(); + httpResult.setCode(response.getStatusLine().getStatusCode()); + httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); } catch (IOException e) { logger.error(e.toString()); - } finally { IoUtil.close(response); } - return rs; + return httpResult; } @Override - public HttpResult queryForProcess(String queryId) { - // 声明httpPost请求 + public HttpResult queryJobProcess(String queryId) { + // 声明httpGet请求 CloseableHttpResponse response = null; // 发起请求 - HttpResult rs = null; + HttpResult httpResult = null; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/" + queryId + "/progress"); - + String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_PROCESS,queryGatewayProperties.getGatewayIp().trim(),queryId)); HttpGet httpGet = new HttpGet(url); // 加入配置信息 - httpGet.setConfig(RequestshortConfig); - - + httpGet.setConfig(RequestShortConfig); response = this.httpClient.execute(httpGet); - rs = new HttpResult(); - rs.setCode(response.getStatusLine().getStatusCode()); - rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8")); + httpResult = new HttpResult(); + httpResult.setCode(response.getStatusLine().getStatusCode()); + httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); } catch (IOException e) { logger.error(e.toString()); } finally { IoUtil.close(response); } - return rs; + return httpResult; } @Override - public HttpResult queryForCancel(String queryId) { - + public HttpResult deleteJob(String queryId) { CloseableHttpResponse response = null; - HttpResult rs = null; + HttpResult httpResult = null; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/" + queryId); - // 声明httpPost请求 + String url = URLUtil.normalize(MessageFormat.format(Constant.URL_QUERY_FOR_CANCEL,queryGatewayProperties.getGatewayIp().trim(),queryId)); + // 声明httpDelete请求 HttpDelete HttpDelete = new HttpDelete(url); // 加入配置信息 - HttpDelete.setConfig(RequestshortConfig); - + HttpDelete.setConfig(RequestShortConfig); // 发起请求 - response = this.httpClient.execute(HttpDelete); - rs = new HttpResult(); - rs.setCode(response.getStatusLine().getStatusCode()); - rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8")); + httpResult = new HttpResult(); + httpResult.setCode(response.getStatusLine().getStatusCode()); + httpResult.setBody(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); } catch (IOException e) { logger.error(e.toString()); } finally { IoUtil.close(response); } - return rs; - } - - public CloseableHttpClient getHttpClient() { - return httpClient; + return httpResult; } - - public void setHttpClient(CloseableHttpClient httpClient) { - this.httpClient = httpClient; - } - - public RequestConfig getRequestshortConfig() { - return RequestshortConfig; - } - - public void setRequestshortConfig(RequestConfig requestshortConfig) { - RequestshortConfig = requestshortConfig; - } - - public RequestConfig getRequestLongConfig() { - return RequestLongConfig; - } - - public void setRequestLongConfig(RequestConfig requestLongConfig) { - RequestLongConfig = requestLongConfig; - } - - /** - * 不带参数post请求 - * - * @param url - * @return - * @throws Exception - */ - } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java new file mode 100644 index 0000000..c43981b --- /dev/null +++ b/src/main/java/com/mesa/reportservice/service/impl/ScheduledResultServiceImpl.java @@ -0,0 +1,180 @@ +package com.mesa.reportservice.service.impl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.HttpClientPool; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.configuration.ThreadPoolConfig; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; +import com.mesa.reportservice.service.*; +import com.mesa.reportservice.util.Constant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * + * @author lijinyang + * @date 2024/1/24 + */ +@Service +public class ScheduledResultServiceImpl implements ScheduledResultService { + + private static final Log logger = LogFactory.get(); + + @Autowired + private ThreadPoolConfig threadPoolConfig; + + @Autowired + private JobService jobService; + + @Autowired + private ExecuteService executeService; + + @Autowired + private ExecuteProcessService executeProcessService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private QueryGatewayService queryGatewayService; + + @Autowired + private HttpClientPool httpClientPool; + + @Autowired + private SchedulerProperties schedulerProperties; + + @Override + public void execute() { + try { + if (zookeeperService.isMaster()) { + logger.info(Constant.LOG_START_VIEWING_RESULTS); + updateAbnormalJob(); + + updateRunningJob(); + + updatePendingJob(); + } else { + if (schedulerProperties.getMapResult().size() > 0) { + for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) { + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); + executeProcessService.killQuery(entry.getValue()); + } + schedulerProperties.getMapResult().clear(); + } + } + } catch (Exception e) { + logger.error(e.toString()); + throw new SQSException(e.toString()); + } + } + + private void updateAbnormalJob(){ + //先查询数据库是否有异常状态任务,killquery + List<JobEntity> jobList = jobService.getRunningJob(); + for (JobEntity jobEntity : jobList) { + String sql = jobEntity.getQuerySql().trim(); + String queryId = queryGatewayService.getQueryId(sql); + jobEntity.setQueryId(queryId); + if (jobEntity.getIsValid() == 0) { + executeProcessService.killQuery(jobEntity); + schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0); + } else if (!schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) { + executeProcessService.reSet(jobEntity); + } + if (schedulerProperties.getMapResult().containsKey(jobEntity.getQueryId())) { + if (jobEntity.getIsValid() == 0) { + executeProcessService.killQuery(jobEntity); + schedulerProperties.getMapResult().get(jobEntity.getQueryId()).setIsValid(0); + } + } else { + executeProcessService.reSet(jobEntity); + } + } + } + + private void updateRunningJob(){ + //遍历内存中的任务对状态RUNNING的更新进度,其他更新数据库的状态 + for (Map.Entry<String, JobEntity> entry : schedulerProperties.getMapResult().entrySet()) { + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); + long currentTime = System.currentTimeMillis(); + long executeTime = currentTime - entry.getValue().getStartTime(); + logger.info("execute time=" + executeTime + "ttlTime=" + httpClientPool.getSocketTimeout()); + if (JobStates.RUNNING.getValue().equals(entry.getValue().getState()) && executeTime > httpClientPool.getSocketTimeout()) { + entry.getValue().setState(JobStates.DONE.getValue()); + entry.getValue().setExecuteStatus(Constant.JOB_EXECUTION_TIMEOUT); + executeProcessService.killQuery(entry.getValue()); + executeProcessService.updateResultMessage(entry.getValue()); + } else { + if (JobStates.RUNNING.getValue().equals(entry.getValue().getState())) { + executeProcessService.updateProcessMessage(entry.getValue()); + } else { + executeProcessService.updateResultMessage(entry.getValue()); + } + } + } + } + + private void updatePendingJob() { + int rows = schedulerProperties.getJobThread() - schedulerProperties.getMapResult().size(); + if (rows > 0) { + List<JobEntity> jobs = jobService.getJobTask(rows); + for (JobEntity job : jobs) { + logger.info(Constant.LOG_START_EXECUTING_TASKS); + long startTime = System.currentTimeMillis()/1000; + job.setStartTime(startTime); + String sql = job.getQuerySql().trim(); + job.setQuerySql(sql); + job.setState(JobStates.RUNNING.getValue()); + job.setExecuteStatus(1); + job.setResultMessage(Constant.EXECUTING); + job.setRowsRead(0L); + job.setElapsed(0); + job.setDoneProgress(0); + job.setResultRows(0L); + String queryId = ""; + queryId = queryGatewayService.getQueryId(sql); + job.setQueryId(queryId); + if (queryId.equals("") ) { + job.setExecuteStatus(0); + job.setState(JobStates.DONE.getValue()); + job.setEndTime(System.currentTimeMillis()/1000); + job.setDoneProgress(1.00f); + job.setIsFailed(1); + job.setResultMessage(Constant.UNKNOWN_ERROR); + } + if ((JobStates.RUNNING.getValue()).equals(job.getState())) { + if (jobService.updateProcesses(job) != 0) { + schedulerProperties.getMapResult().put(queryId, job); + threadPoolConfig.getThreadPoolTaskExecutor().execute(new Runnable() { + @Override + public void run() { + executeService.queryQgw(job); + } + }); + } else { + logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg()); + throw new SQSException(SQSCode.UPDATE_STATUS_ERROR); + } + } else { + if (jobService.updateProcesses(job) != 0) { + logger.error(SQSCode.TASK_CANNOT_EXECUTED.getMsg()); + throw new SQSException(SQSCode.TASK_CANNOT_EXECUTED); + } else { + logger.error(SQSCode.UPDATE_STATUS_ERROR.getMsg()); + throw new SQSException(SQSCode.UPDATE_STATUS_ERROR); + } + } + } + } else { + logger.info(Constant.LOG_NO_PENDING_TASKS); + } + } +} diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java index fe30208..dd2c267 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ZookeeperServiceImpl.java @@ -1,62 +1,60 @@ package com.mesa.reportservice.service.impl; import cn.hutool.log.Log; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.configuration.ZkProperties; -import com.mesa.reportservice.service.ZkService; +import cn.hutool.log.LogFactory; +import com.mesa.reportservice.configuration.SchedulerProperties; +import com.mesa.reportservice.configuration.ZookeeperProperties; +import com.mesa.reportservice.service.ZookeeperService; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.ExistsBuilder; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.net.InetAddress; +import java.nio.charset.StandardCharsets; /** - * Created by wk1 on 2020/1/6. + * + * @author lijinyang + * @date 2024/1/24 */ @Service -public class ZkServiceImpl implements ZkService { +public class ZookeeperServiceImpl implements ZookeeperService { + @Autowired private CuratorFramework curatorConnect; + @Autowired - private ZkProperties zp; - Log logger = Log.get(); + private ZookeeperProperties zookeeperProperties; + private static final Log logger = LogFactory.get(); @Override public boolean isMaster() { - - try { - if (zp.getOpen() == 0) { + if (zookeeperProperties.getOpen() == 0) { boolean isZkCuratorStarted = curatorConnect.isStarted(); String nodePath = "/masterip"; logger.debug("the current state of the client: " + (isZkCuratorStarted ? "connecting..." : "closed...")); Stat statExist = curatorConnect.checkExists().forPath(nodePath); if (statExist == null) { - byte[] data = GlobelConfig.zkuuid.getBytes(); // 节点数据 - - String result = curatorConnect.create().creatingParentsIfNeeded() // 创建父节点,也就是会递归创建 - .withMode(CreateMode.EPHEMERAL) // 节点类型 + // 节点数据 + byte[] data = SchedulerProperties.zkUUID.getBytes(StandardCharsets.UTF_8); + // 创建父节点,也就是会递归创建 + String result = curatorConnect.create().creatingParentsIfNeeded() + // 节点类型 + .withMode(CreateMode.EPHEMERAL) .forPath(nodePath, data); logger.debug(result); - return true; - } else { logger.debug(nodePath + " node exists"); Stat stat = new Stat(); - byte[] nodeData = curatorConnect.getData().storingStatIn(stat).forPath(nodePath); - String masterid = new String(nodeData, "UTF-8").trim(); - logger.debug("uuid=" + GlobelConfig.zkuuid + " node " + nodePath + " data is:" + masterid); - - if (masterid.equals(GlobelConfig.zkuuid)) { - + String masterId = new String(nodeData, StandardCharsets.UTF_8).trim(); + logger.debug("uuid=" + SchedulerProperties.zkUUID + " node " + nodePath + " data is:" + masterId); + if (masterId.equals(SchedulerProperties.zkUUID)) { return true; - } else { logger.debug("not the main node"); return false; @@ -65,13 +63,11 @@ public class ZkServiceImpl implements ZkService { } else { return true; } - } catch (Exception e) { logger.error(e.toString()); return false; } } - } diff --git a/src/main/java/com/mesa/reportservice/util/Constant.java b/src/main/java/com/mesa/reportservice/util/Constant.java new file mode 100644 index 0000000..91b4fd6 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/util/Constant.java @@ -0,0 +1,50 @@ +package com.mesa.reportservice.util; + +/** + * + * @author lijinyang + * @date: 2024/01/12/16:33 + */ +public class Constant { + + /** + * jobMessage + */ + public static final String OK = "OK"; + public static final String CANCEL = "CANCEL"; + public static final String EXECUTING = "EXECUTING"; + public static final String WRITE_DATA_ERROR = "Write Data Error"; + public static final String PARAM_SYNTAX_ERROR = "Param Syntax Error"; + public static final String SQL_EXECUTION_ERROR = "SQL Execution Error"; + public static final String UNKNOWN_ERROR = "Unknown Error"; + public static final String DATABASE_ERROR = "Database Error"; + public static final String RE_EXECUTION = "Re Execution"; + public static final String QUERY_TIME_OUT = "SQL Execution Error execute query time out"; + public static final String RESULT_TOO_LARGE = "Result Too Large"; + + /** + * jobStatus + */ + public static final int JOB_SUCCESS = 20000666; + public static final int JOB_ERROR = 555999; + public static final int JOB_EXECUTION_TIMEOUT = 500001; + public static final int JOB_SQL_EXECUTION_ERROR_CODE = 50001300; + public static final int JOB_BAD_REQUEST_THRESHOLD = 40000000; + public static final int JOB_INTERNAL_SERVER_ERROR_THRESHOLD = 50000000; + + /** + * loggerMessage + */ + public static final String LOG_SUCCESS_SAVE_TO_HBASE = "success save to HBase ! queryId = {0} jobId = {1} executeSql = {2}"; + public static final String LOG_NO_PENDING_TASKS = "no pending tasks"; + public static final String LOG_START_EXECUTING_TASKS = "start executing task"; + public static final String LOG_START_VIEWING_RESULTS = "start viewing results"; + + /** + * URL + */ + public static final String URL_GET_QUERY_ID = "{0}/v1/sql/query/query_id?query="; + public static final String URL_QUERY_FOR_EXECUTE = "{0}/sql?originalSQL="; + public static final String URL_QUERY_FOR_PROCESS = "{0}/v1/sql/query/{1}/progress"; + public static final String URL_QUERY_FOR_CANCEL = "{0}/v1/sql/query/{1}"; +} diff --git a/src/main/resources/mappers/ReportResultMapper.xml b/src/main/resources/mappers/JobMapper.xml index b4d8f57..9017d60 100644 --- a/src/main/resources/mappers/ReportResultMapper.xml +++ b/src/main/resources/mappers/JobMapper.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > -<mapper namespace="com.mesa.reportservice.mapper.ReportResultMapper" > +<mapper namespace="com.mesa.reportservice.mapper.JobMapper" > <resultMap type="com.mesa.reportservice.bean.JobEntity" id="BaseResultMap"> <id property="jobId" column="job_id"/> <result property="querySql" column="query_sql"/> @@ -24,7 +24,7 @@ job_id,query_sql,state,done_progress,is_failed,result_message,elapsed,rows_read, bytes_read,result_rows,result_bytes,is_valid,start_time,end_time,last_update_time,generated_time </sql> - <select id="getJobForExcute" resultMap="BaseResultMap" parameterType="java.lang.Integer" > + <select id="getRunningJob" resultMap="BaseResultMap" parameterType="java.lang.Integer" > select <include refid="Base_Column_List" /> from saved_query_job @@ -110,7 +110,7 @@ <select id="getJobCount" resultType="map" parameterType="hashmap"> SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum, - (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as excuteingNum, + (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as executingNum, (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum, (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum </select> diff --git a/src/test/java/com/mesa/ReportServiceApplicationTests.java b/src/test/java/com/mesa/ReportServiceApplicationTests.java new file mode 100644 index 0000000..1421554 --- /dev/null +++ b/src/test/java/com/mesa/ReportServiceApplicationTests.java @@ -0,0 +1,81 @@ +package com.mesa; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.mesa.reportservice.ReportServiceApplication; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.core.io.ClassPathResource; +import org.springframework.jdbc.datasource.init.ScriptUtils; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.sql.DataSource; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {ReportServiceApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@EnableAutoConfiguration +@ActiveProfiles("test") +public class ReportServiceApplicationTests { + + private static final Log logger = LogFactory.get(); + + @Autowired + private DataSource dataSource; + + @LocalServerPort + protected int testPort; + + static { + System.setProperty("jasypt.encryptor.password", "galaxy"); + System.setProperty("JM.SNAPSHOT.PATH", "config"); + } + + public ReportServiceApplicationTests() { + } + + @Test + public void contextLoads() { + logger.info("SAVED QUERY SCHEDULER CONTEXT LOAD"); + } + + @Test + public void runSqlScript() throws SQLException { + try (Connection connection = dataSource.getConnection()) { + ScriptUtils.executeSqlScript(connection, new ClassPathResource("db/SavedQueryJob.sql")); + } + } + + // 解析resource: parameters/otherPath/指定JSON文件, 返回指定类型对象 + public <T> T jsonToInParameter(String path, String parameterName,Class<T> type) { + InputStream resourceAsStream = ReportServiceApplicationTests.class.getClassLoader().getResourceAsStream(path); + if (Objects.isNull(resourceAsStream)) { + logger.info("Unable to obtain the specified file under the specified path. Please check if the file exists or if the file name capitalization is consistent", path); + throw new IllegalArgumentException(); + } + Map<String, Object> json = null; + try { + json = JSON.parseObject(resourceAsStream, Map.class); + } catch (IOException e) { + logger.error("Failed to read files from the test resource directory, file: {}", path); + throw new RuntimeException(e); + } + Object data = json.get(parameterName); + String objectJsonStr = JSON.toJSONString(data, true); + logger.info("parameter file: {}, parameter name: {}", path, parameterName); + return JSON.parseObject(objectJsonStr, type); + } + +} diff --git a/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java b/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java new file mode 100644 index 0000000..9e5b56e --- /dev/null +++ b/src/test/java/com/mesa/reportservice/ExecuteProcessTest.java @@ -0,0 +1,86 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.exception.SQSCode; +import com.mesa.reportservice.exception.SQSException; +import com.mesa.reportservice.service.ExecuteProcessService; +import com.mesa.reportservice.util.Constant; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + + +/** + * @Auther: lijinyang + * @Date: 2024/01/08/16:33 + */ +@EnableAutoConfiguration +public class ExecuteProcessTest extends ReportServiceApplicationTests { + + @Autowired + private ExecuteProcessService executeProcessService; + + @Test + public void testReSet() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "reSetJob", JobEntity.class); + executeProcessService.reSet(job); + Assert.assertEquals(JobStates.PENDING.getValue(),job.getState()); + } + + @Test + public void testUpdateResultMessageCancel() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "cancelJob", JobEntity.class); + executeProcessService.updateResultMessage(job); + Assert.assertEquals(Constant.CANCEL,job.getResultMessage()); + } + + @Test + public void testUpdateResultMessageOk() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "okJob", JobEntity.class); + executeProcessService.updateResultMessage(job); + Assert.assertEquals(Constant.OK,job.getResultMessage()); + } + + @Test + public void testUpdateResultMessageDataError() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "dataErrorJob", JobEntity.class); + try { + executeProcessService.updateResultMessage(job); + } catch (SQSException e) { + Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + } + } + + @Test + public void testUpdateResultMessageParamError() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "paramErrorJob", JobEntity.class); + try { + executeProcessService.updateResultMessage(job); + } catch (SQSException e) { + Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + } + } + + @Test + public void testUpdateResultMessageSQLError() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "SQLErrorJob", JobEntity.class); + try { + executeProcessService.updateResultMessage(job); + } catch (SQSException e) { + Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + } + } + + @Test + public void testUpdateResultMessageUnknownError() { + JobEntity job = this.jsonToInParameter("parameters/executeProcessTest.json", "unknownErrorJob", JobEntity.class); + try { + executeProcessService.updateResultMessage(job); + } catch (SQSException e) { + Assert.assertNotEquals((Object) SQSCode.SUCCESS.getCode(),e.getCode()); + } + } +} diff --git a/src/test/java/com/mesa/reportservice/ExecuteTest.java b/src/test/java/com/mesa/reportservice/ExecuteTest.java new file mode 100644 index 0000000..e30293a --- /dev/null +++ b/src/test/java/com/mesa/reportservice/ExecuteTest.java @@ -0,0 +1,29 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.service.ExecuteService; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + + +/** + * @Auther: lijinyang + * @Date: 2024/01/10/15:57 + */ +@EnableAutoConfiguration +public class ExecuteTest extends ReportServiceApplicationTests { + + @Autowired + private ExecuteService executeService; + + @Test + public void testExecuteTask() { + JobEntity job = this.jsonToInParameter("parameters/executeTest.json", "executeJob", JobEntity.class); + executeService.queryQgw(job); + Assert.assertEquals(JobStates.DONE.getValue(),job.getState()); + } +} diff --git a/src/test/java/com/mesa/reportservice/HBaseTest.java b/src/test/java/com/mesa/reportservice/HBaseTest.java new file mode 100644 index 0000000..2392c10 --- /dev/null +++ b/src/test/java/com/mesa/reportservice/HBaseTest.java @@ -0,0 +1,28 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.service.HBaseService; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + + +/** + * @Auther: lijinyang + * @Date: 2024/01/10/14:48 + */ +@EnableAutoConfiguration +public class HBaseTest extends ReportServiceApplicationTests { + + @Autowired + private HBaseService hBaseService; + + @Test + public void testPut() throws Exception { + JobEntity job = this.jsonToInParameter("parameters/hBaseTest.json", "putJob", JobEntity.class); + Boolean put = hBaseService.put(job); + Assert.assertTrue(put); + } +} diff --git a/src/test/java/com/mesa/reportservice/JobTest.java b/src/test/java/com/mesa/reportservice/JobTest.java new file mode 100644 index 0000000..e69aaf6 --- /dev/null +++ b/src/test/java/com/mesa/reportservice/JobTest.java @@ -0,0 +1,62 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.enums.JobStates; +import com.mesa.reportservice.service.JobService; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + +import java.util.List; +import java.util.Map; + +/** + * @Auther: lijinyang + * @Date: 2024/01/08/17:22 + */ +@EnableAutoConfiguration +public class JobTest extends ReportServiceApplicationTests { + + private static final int rows = 10; + + private static final int updateProcessesSuccess = 1; + + @Autowired + private JobService jobService; + + @Test + public void testGetRunningJob() { + List<JobEntity> jobForExecute = jobService.getRunningJob(); + Assert.assertNotNull(jobForExecute); + Assert.assertEquals(JobStates.RUNNING.getValue(),jobForExecute.get(0).getState()); + } + + @Test + public void testGetJobTask() { + List<JobEntity> jobForExecute = jobService.getJobTask(rows); + Assert.assertNotNull(jobForExecute); + Assert.assertEquals(JobStates.PENDING.getValue(),jobForExecute.get(0).getState()); + } + + @Test + public void testGetJobCount() { + Map<String, Long> jobCount = jobService.getJobCount(); + Assert.assertNotNull(jobCount); + } + + @Test + public void testUpdateProcessesSuccess() { + JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesSuccessJob", JobEntity.class); + int i = jobService.updateProcesses(job); + Assert.assertEquals(updateProcessesSuccess,i); + } + + @Test + public void testUpdateProcessesError() { + JobEntity job = this.jsonToInParameter("parameters/mysqlTest.json", "updateProcessesErrorJob", JobEntity.class); + int i = jobService.updateProcesses(job); + Assert.assertNotEquals(updateProcessesSuccess,i); + } +} diff --git a/src/test/java/com/mesa/reportservice/MonitorTest.java b/src/test/java/com/mesa/reportservice/MonitorTest.java new file mode 100644 index 0000000..f8cf701 --- /dev/null +++ b/src/test/java/com/mesa/reportservice/MonitorTest.java @@ -0,0 +1,29 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.test.web.servlet.MockMvc; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +/** + * @Auther: lijinyang + * @Date: 2024/01/04/16:08 + */ +@EnableAutoConfiguration +@AutoConfigureMockMvc +public class MonitorTest extends ReportServiceApplicationTests { + + @Autowired + private MockMvc mockMvc; + + @Test + public void testGetReportStatus() throws Exception{ + mockMvc.perform(get("/monitor")) + .andExpect(status().isOk()); + } +} diff --git a/src/test/java/com/mesa/reportservice/QueryGatewayTest.java b/src/test/java/com/mesa/reportservice/QueryGatewayTest.java new file mode 100644 index 0000000..193b925 --- /dev/null +++ b/src/test/java/com/mesa/reportservice/QueryGatewayTest.java @@ -0,0 +1,96 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.service.QueryGatewayService; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.http.HttpStatus; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @Auther: lijinyang + * @Date: 2024/01/04/17:05 + */ +@EnableAutoConfiguration +public class QueryGatewayTest extends ReportServiceApplicationTests { + + private static final String QUERY_ID_PATTERN = "^[a-f0-9]{32}:[a-f0-9]{32}$"; + + @Autowired + private QueryGatewayService queryGatewayService; + + @Test + public void testGetQueryIdSuccess() { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class); + String queryId = queryGatewayService.getQueryId(sql); + Pattern compile = Pattern.compile(QUERY_ID_PATTERN); + Matcher matcher = compile.matcher(queryId); + Assert.assertTrue(matcher.find()); + } + + @Test + public void testGetQueryIdError() { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlError", String.class); + String queryId = queryGatewayService.getQueryId(sql); + Pattern compile = Pattern.compile(QUERY_ID_PATTERN); + Matcher matcher = compile.matcher(queryId); + Assert.assertTrue(!matcher.find()); + } + + @Test + public void testQueryJobSuccess() throws IOException { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class); + HttpResult httpResult = null; + httpResult = queryGatewayService.queryJob(sql); + Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value()); + } + + @Test + public void testQueryJobError() throws IOException { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlError", String.class); + HttpResult httpResult = null; + httpResult = queryGatewayService.queryJob(sql); + Assert.assertTrue(httpResult.getCode() == HttpStatus.BAD_REQUEST.value()); + } + + @Test + public void testQueryJobProcessSuccess() throws IOException { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class); + String queryId = queryGatewayService.getQueryId(sql); + HttpResult httpResult = null; + httpResult = queryGatewayService.queryJobProcess(queryId); + Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value()); + Assert.assertNotNull(httpResult.getBody()); + } + + @Test + public void testQueryJobProcessError() throws IOException { + String queryId = this.jsonToInParameter("parameters/queryGatewayTest.json", "queryForProcessIdError", String.class); + HttpResult httpResult = null; + httpResult = queryGatewayService.queryJobProcess(queryId); + Assert.assertTrue(httpResult.getCode() == HttpStatus.BAD_REQUEST.value()); + } + + @Test + public void testDeleteJobSuccess() throws IOException { + String sql = this.jsonToInParameter("parameters/queryGatewayTest.json", "querySqlSuccess", String.class); + String queryId = queryGatewayService.getQueryId(sql); + HttpResult httpResult = null; + httpResult = queryGatewayService.deleteJob(queryId); + Assert.assertTrue(httpResult.getCode() == HttpStatus.OK.value()); + } + + @Test + public void testDeleteJobError() throws IOException { + String queryId = this.jsonToInParameter("parameters/queryGatewayTest.json", "queryForCancelIdError", String.class); + HttpResult httpResult = null; + httpResult = queryGatewayService.deleteJob(queryId); + Assert.assertTrue(httpResult.getCode() == HttpStatus.NOT_FOUND.value()); + } +} diff --git a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java deleted file mode 100644 index 4ae38f5..0000000 --- a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.mesa.reportservice; - - -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; -import java.io.IOException; - -@RunWith(SpringRunner.class) -@SpringBootTest -public class ReportserviceApplicationTests { - - - - - @Autowired - private CloseableHttpClient httpClient; - - @Autowired - private RequestConfig config; - - - - - @Test - public void contextLoads() throws IOException { - - - - } - - -} diff --git a/src/test/java/com/mesa/reportservice/zookeeperTest.java b/src/test/java/com/mesa/reportservice/zookeeperTest.java new file mode 100644 index 0000000..5370afb --- /dev/null +++ b/src/test/java/com/mesa/reportservice/zookeeperTest.java @@ -0,0 +1,25 @@ +package com.mesa.reportservice; + +import com.mesa.ReportServiceApplicationTests; +import com.mesa.reportservice.service.ZookeeperService; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; + +/** + * @Auther: lijinyang + * @Date: 2024/01/10/14:20 + */ +@EnableAutoConfiguration +public class zookeeperTest extends ReportServiceApplicationTests { + + @Autowired + private ZookeeperService zookeeperService; + + @Test + public void testIsMaster() throws Exception { + boolean isMaster = zookeeperService.isMaster(); + Assert.assertTrue(isMaster); + } +} diff --git a/src/test/resources/db/SavedQueryJob.sql b/src/test/resources/db/SavedQueryJob.sql new file mode 100644 index 0000000..1940c31 --- /dev/null +++ b/src/test/resources/db/SavedQueryJob.sql @@ -0,0 +1,7 @@ +truncate saved_query_job; + +INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('00838b9d94552afecc6fa29cde033714', 'SELECT COUNT_DISTINCT(server_ip) AS \"UniqServerIP\", client_ip AS \"Client IP\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-23T14:30:00+08:00\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-24T14:30:00+08:00\') AND security_event.vsys_id IN (1) GROUP BY \"Client IP\" ORDER BY \"UniqServerIP\" DESC LIMIT 20', 'RUNNING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1703399415, null, 1703399430, 1703399409); +INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('014f9d6e1f17227ce8d837003bc3bedb', 'SELECT AVG(sent_bytes) AS \"Bytes Sent\", AVG(received_bytes) AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(start_timestamp_ms, \'PT5M\', \'zero\')) AS \"Start Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-18T13:18:01Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-19T13:18:01Z\') AND security_event.vsys_id IN (1) GROUP BY \"Start Time\" ORDER BY \"Start Time\" ASC', 'RUNNING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1702991925, null, 1702991940, 1702991921); +INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('0450dd9248d172aed6aac55e62f7d128', 'SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-27T09:05:44Z\') AND recv_time < UNIX_TIMESTAMP(\'2024-01-03T09:05:44Z\') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC', 'PENDING', 0, 0, '', 0, 0, 0, 0, 0, 1, null, null, 1704272820, 1704272794); +INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('04758d6dd22b008c4c66714b7caafd63', 'SELECT COUNT(sled_ip) AS \"Sled IP\", COUNT(client_asn) AS \"Client ASN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-20T09:31:16Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-20T10:31:16Z\') AND security_event.vsys_id IN (6) ORDER BY \"Sled IP\" DESC', 'DONE', 1.00, 0, 'OK', 100, 100, 100, 100, 1, 1, 1703068470, 1703068485, 1703068485, 1703068455); +INSERT INTO `saved_query_job` (`job_id`, `query_sql`, `state`, `done_progress`, `is_failed`, `result_message`, `elapsed`, `rows_read`, `bytes_read`, `result_bytes`, `result_rows`, `is_valid`, `start_time`, `end_time`, `last_update_time`, `generated_time`) VALUES ('155c91cef347d3afa015076df3e9395b', 'SELECT COUNT(sent_bytes) AS \"Bytes Sent\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP(\'2023-12-10T16:00:00Z\') AND recv_time < UNIX_TIMESTAMP(\'2023-12-17T16:00:00Z\') AND security_event.vsys_id IN (1) ORDER BY \"Bytes Sent\" DESC', 'PENDING', 0, 0, '', 0, 0, 0, 0, 0, 1, 1703376015, 1703376030, 1703376030, 1703376008);
\ No newline at end of file diff --git a/src/test/resources/parameters/executeProcessTest.json b/src/test/resources/parameters/executeProcessTest.json new file mode 100644 index 0000000..8a3d4a3 --- /dev/null +++ b/src/test/resources/parameters/executeProcessTest.json @@ -0,0 +1,170 @@ +{ + "reSetJob": { + "jobId": "04758d6dd22b008c4c66714b7caafd63", + "querySql": "", + "state": "DONE", + "resultName": "", + "doneProgress": 1.00, + "isFailed": 0, + "resultMessage": "OK", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": "", + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "cancelJob": { + "jobId": "04758d6dd22b008c4c66714b7caafd63", + "querySql": "", + "state": "DONE", + "resultName": "", + "doneProgress": 1.00, + "isFailed": 0, + "resultMessage": "OK", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 0, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "okJob": { + "jobId": "0550bf2eb3ed6ff0351554c085367286", + "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": 20000666, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "dataErrorJob": { + "jobId": "", + "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": 20000666, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "paramErrorJob": { + "jobId": "0550bf2eb3ed6ff0351554c085367286", + "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": 40000100, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "SQLErrorJob": { + "jobId": "0550bf2eb3ed6ff0351554c085367286", + "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": 50001300, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "unknownErrorJob": { + "jobId": "0550bf2eb3ed6ff0351554c085367286", + "querySql": "SELECT AVG(sent_bytes) / 300 AS \"Bytes Sent\", AVG(received_bytes) / 300 AS \"Bytes Received\", FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(recv_time, 'PT5M', 'zero')) AS \"Receive Time\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-22T10:00:00+08:00') AND recv_time < UNIX_TIMESTAMP('2023-12-23T10:00:00+08:00') AND security_event.vsys_id IN (1) GROUP BY \"Receive Time\" ORDER BY \"Receive Time\" ASC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": 1, + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": 50001500, + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + } +} diff --git a/src/test/resources/parameters/executeTest.json b/src/test/resources/parameters/executeTest.json new file mode 100644 index 0000000..8f59844 --- /dev/null +++ b/src/test/resources/parameters/executeTest.json @@ -0,0 +1,26 @@ +{ + "executeJob": { + "jobId": "0450dd9248d172aed6aac55e62f7d128", + "querySql": "SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-27T09:05:44Z') AND recv_time < UNIX_TIMESTAMP('2024-01-03T09:05:44Z') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC", + "state": "RUNNING", + "resultName": "", + "doneProgress": 0, + "isFailed": 0, + "resultMessage": "", + "elapsed": null, + "rowsRead": null, + "bytesRead": null, + "resultRows": null, + "resultBytes": null, + "isValid": "", + "startTime": null, + "endTime": null, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + } +}
\ No newline at end of file diff --git a/src/test/resources/parameters/hBaseTest.json b/src/test/resources/parameters/hBaseTest.json new file mode 100644 index 0000000..2162c5a --- /dev/null +++ b/src/test/resources/parameters/hBaseTest.json @@ -0,0 +1,26 @@ +{ + "putJob": { + "jobId": "0450dd9248d172aed6aac55e62f7d128", + "querySql": "SELECT count(apn) AS \"APN\" FROM security_event WHERE recv_time >= UNIX_TIMESTAMP('2023-12-27T09:05:44Z') AND recv_time < UNIX_TIMESTAMP('2024-01-03T09:05:44Z') AND security_event.vsys_id IN (1) ORDER BY \"APN\" DESC", + "state": "DONE", + "resultName": "", + "doneProgress": 1.00, + "isFailed": 0, + "resultMessage": "OK", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": "", + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + } +}
\ No newline at end of file diff --git a/src/test/resources/parameters/mysqlTest.json b/src/test/resources/parameters/mysqlTest.json new file mode 100644 index 0000000..7ed1831 --- /dev/null +++ b/src/test/resources/parameters/mysqlTest.json @@ -0,0 +1,50 @@ +{ + "updateProcessesSuccessJob": { + "jobId": "00838b9d94552afecc6fa29cde033714", + "querySql": "", + "state": "DONE", + "resultName": "", + "doneProgress": 1.00, + "isFailed": 0, + "resultMessage": "OK", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": "", + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + }, + "updateProcessesErrorJob": { + "jobId": "e70afb932207d556b20da4da5ec7ff38", + "querySql": "", + "state": "DONE", + "resultName": "", + "doneProgress": 1.00, + "isFailed": 0, + "resultMessage": "OK", + "elapsed": 1, + "rowsRead": 100, + "bytesRead": 100, + "resultRows": 1, + "resultBytes": 100, + "isValid": "", + "startTime": 1703399415, + "endTime": 1703399430, + "lastUpdateTime": "", + "generatedTime": "", + "queryId": "", + "executeStatus": "", + "result": "", + "memoryUsage": "", + "queryDurationMs": "" + } +}
\ No newline at end of file diff --git a/src/test/resources/parameters/queryGatewayTest.json b/src/test/resources/parameters/queryGatewayTest.json new file mode 100644 index 0000000..fc36ed0 --- /dev/null +++ b/src/test/resources/parameters/queryGatewayTest.json @@ -0,0 +1,6 @@ +{ + "querySqlSuccess": "select 1 from session_record limit 1", + "querySqlError": "select 1 from session_record_cn limit 1", + "queryForProcessIdError": "", + "queryForCancelIdError": "1:1" +} |
