diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2020-09-08 11:34:15 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2020-09-08 11:34:15 +0800 |
| commit | 37f0bc3d31ccacce76ff0ac9ed9c77fa99cdf9fb (patch) | |
| tree | 9338178d91b6de75e220f0ff28d86ce0945a4be9 | |
| parent | 631c932206ec585d7058f2c54671492097809558 (diff) | |
改为单线程执行任务解决任务停止bug
14 files changed, 280 insertions, 622 deletions
diff --git a/config/application.properties b/config/application.properties index 33e3853..e147191 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,79 +1,60 @@ -server.port=9093 +#Hbasehttp�Ķ˿� +server.port=9094 #���½�������ʱ��10s scan.result.scheduled.plan=0/10 * * * * ? -#��ʱɨ��mysql���ݿ�report_result��ʱ��15s -scan.task.scheduled.plan=0/15 * * * * ? #ͬʱ��ִ�����߳��� globle.job_thread=2 #Hbasehttp�Ķ˿� #Hbase�ı���������ͨ������Ҫ���� hbase.table=tsg:report_result - -hbase.zookeeper_quorum=192.168.40.193 +hbase.zookeeper_quorum=192.168.40.224 hbase.zookeeper_property_clientPort=2181 hbase.zookeeper_znode_parent=/hbase hbase.client_retries_number=3 - hbase.rpc_timeout=100000 hbase.connect_pool=10 - - +#��ѯ����ip ck.gateway_ip=192.168.40.223:9999 +#mariadb��url +spring.datasource.url=jdbc:mariadb://192.168.10.12:3306/tttreport?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false +#spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false +#mariadb���û��� +spring.datasource.username=root +#mariadb������ +spring.datasource.password=111111 + +#zk��Ⱥ��ip +zookeeper.connectString=192.168.40.203:2181 +#�Ƿ�����zookeeper 0����(��Ⱥ) 1����(����) +zookeeper.open=1 +zookeeper.retryCount=5 +zookeeper.elapsedTimeMs=1000 +zookeeper.sessionTimeoutMs=5000 +zookeeper.connectionTimeoutMs=5000 +zookeeper.nameSpace=reportservice -#clickhouse�ṩreport�����ip -#ck.task_ip=192.168.40.224:8123 -#clickhouse�����ݿ��� -#ck.task_database=tsg_galaxy_v3 -#clickhouse����������û� -#ck.task_user=tsg_report -#clickhouse������������� -#ck.task_user_password=ceiec2019 -#clickhouse��־����ipͨ��=ck.task_ip -#ck.log_ip=192.168.40.224:8123 -#clickhouse��־������û� -#ck.log_user=tsg_report -#clickhouse��־�û����� -#ck.log_user_password=ceiec2019 #��������� http.maxTotal=300 #������ http.defaultMaxPerRoute=100 #�������ӵ��ʱ�� http.connectTimeout=10000 -#�����ӳ��л�ȡ�����ӵ��ʱ��: +#�����ӳ��л�ȡ�����ӵ��ʱ�� http.connectionRequestTimeout=10000 #���ݴ�����ʱ�� -http.socketTimeout=21600500 +http.socketTimeout=21600000 #�ύ����ǰ���������Ƿ���� http.staleConnectionCheckEnabled=true - http.socketTimeoutShort=30000 - -#�Ƿ�����zookeeper 0����(��Ⱥ) 1����(����) -zookeeper.open=1 -zookeeper.retryCount=5 -zookeeper.elapsedTimeMs=1000 -#zk��Ⱥ��ip -zookeeper.connectString=192.168.40.193:2181 -zookeeper.sessionTimeoutMs=5000 -zookeeper.connectionTimeoutMs=5000 -zookeeper.nameSpace=reportservice -#mariadb��url -spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/trepor?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false -#spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false -#mariadb���û��� -spring.datasource.username=root -#mariadb������ -spring.datasource.password=111111 - +#�������ò���Ҫ����ͨ�� spring.datasource.name=druidDataSource spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=org.mariadb.jdbc.Driver #���ü��ͳ�����ص�filters��ȥ�����ؽ���SQL������ͳ�ƣ���wall�����ڷ���ǽ spring.datasource.druid.filters=stat,wall,slf4j -#���������S +#��������� spring.datasource.druid.max-active=30 #��С������ spring.datasource.druid.min-idle=1 @@ -91,8 +72,7 @@ spring.datasource.druid.validation-query=select 1 # ִ��validationQuery��������Ƿ���Ч spring.datasource.druid.test-while-idle=true #��������ʱִ��validationQuery��������Ƿ���Ч������������ûή������ -spring.datasource.druid.test-on-borrow=false -#�黹����ʱִ��validationQuery��������Ƿ���Ч������������ûή������ +spring.datasource.druid.test-on-borrow=true spring.datasource.druid.test-on-return=false spring.datasource.druid.connection-properties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 #�Ƿ���WebStatFilter @@ -110,9 +90,6 @@ spring.datasource.druid.stat-view-servlet.login-username=admin spring.datasource.druid.stat-view-servlet.login-password=admin #Spring��أ����ڲ����ӿڵ��õļ��,��Ҫ����aop��ذ� spring.datasource.druid.aop-patterns=com.mesa.reportservice.controller.*,com.mesa.reportservice.service.*,com.mesa.reportservice.mapper.* - - - mybatis.typeAliasesPackage=com.mesa.reportservice.bean mybatis.mapperLocations=classpath*:/mappers/*.xml spring.application.name=ReportserviceApplication @@ -10,7 +10,7 @@ </parent> <groupId>com.mesa</groupId> <artifactId>galaxy-report-service</artifactId> - <version>20.08.31</version> + <version>20.09.07</version> <name>galaxy-report-service</name> <description>Demo project for Spring Boot</description> @@ -211,7 +211,7 @@ <forceTags>true</forceTags> <imageTags> - <imageTag>20.08.31</imageTag> + <imageTag>20.09.07</imageTag> </imageTags> <!--远程docker构建,供dockerfile使用--> <dockerHost>http://192.168.40.153:2375</dockerHost> diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index 63e108d..de10222 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -53,6 +53,8 @@ public class JobEntity implements Cloneable { private int isValid; + private long beginTime; + public Integer getResultId() { return resultId; } @@ -240,6 +242,14 @@ public class JobEntity implements Cloneable { this.isValid = isValid; } + public long getBeginTime() { + return beginTime; + } + + public void setBeginTime(long beginTime) { + this.beginTime = beginTime; + } + @Override public Object clone() { JobEntity o = null; diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java index 87f0b0d..983048c 100644 --- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java +++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java @@ -14,21 +14,10 @@ import java.net.URLEncoder; @ConfigurationProperties(prefix = "ck") public class ClickhouseConfig { - /* private static String task_ip; - private static String task_database; - private static String task_user; - private static String task_user_password; - private static String log_ip;*/ private static String gateway_ip; - // private static String log_database; - /*private static String log_user; - private static String log_user_password;*/ - public static String getJobUrl(String sql, Integer reportId) { - String url = "http://" + gateway_ip.trim() + "/?option=long-term&reportId=" + reportId + "&query="; String jobsql = ""; - try { sql = URLEncoder.encode(sql , "utf8").replaceAll("\\+", "%20"); @@ -45,12 +34,10 @@ public class ClickhouseConfig { String url = "http://" + gateway_ip.trim() + "/?query="; String processsql = ""; try { - String sql = URLEncoder.encode("select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "'", "utf8").replaceAll("\\+", "%20"); processsql = url + sql; } catch (UnsupportedEncodingException e) { Logs.error(e.toString()); - } return processsql; } @@ -60,12 +47,10 @@ public class ClickhouseConfig { String url = "http://" + gateway_ip.trim() + "/?query="; String finishsql = ""; try { - String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1", "utf8").replaceAll("\\+", "%20"); finishsql = url + sql; } catch (UnsupportedEncodingException e) { Logs.error(e.toString()); - } return finishsql; } @@ -73,117 +58,9 @@ public class ClickhouseConfig { public static String getKillUrl(String query_id) { String url = "http://" + gateway_ip.trim() + "/sys/engine/tasks/"+query_id; - - //String killsql = ""; - return url; } - - /*public static String getFormatUrl(String sql) { - - String formaturl = "http://" + gateway_ip.trim() + "/?option=syntax-check"+ "&query="; - - try { - - sql = URLEncoder.encode(sql, "utf8"); - sql= sql.replaceAll("\\+", "%20"); - formaturl = formaturl + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - - } - - return formaturl; - }*/ - public void setGateway_ip(String gateway_ip) { ClickhouseConfig.gateway_ip = gateway_ip; } - - -/* public static String getJobUrl(String sql, String query_id) { - - String url = "http://" + task_ip.trim() + "/?user=" + task_user.trim() + "&password=" + task_user_password.trim() - + "&database=" + task_database.trim() + "&query_id=" + query_id + "&query="; - String jobsql = ""; - - try { - jobsql = url + URLEncoder.encode(sql + " FORMAT JSON", "utf8"); - - - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - } - return jobsql; - } - - public static String getProcessUrl(String query_id) { - - String url = "http://" + log_ip.trim() + "/?user=" + log_user.trim() + "&password=" + log_user_password.trim() - + "&database=system" + "&query="; - String processsql = ""; - // String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow"; - try { - - String sql = URLEncoder.encode("select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow", "utf8"); - processsql = url + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - - } - return processsql; - } - - public static String getFinishUrl(String query_id) { - - String url = "http://" + log_ip.trim() + "/?user=" + log_user.trim() + "&password=" + log_user_password.trim() - + "&database=system" + "&query="; - String finishsql = ""; - // String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow"; - try { - - String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8"); - finishsql = url + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - - } - return finishsql; - } - - public static String getKillUrl(String query_id) { - - String url = "http://" + task_ip.trim() + "/?user=" + task_user.trim() + "&password=" + task_user_password.trim() - + "&max_execution_time=60" + "&query="; - - String killsql = ""; - - try { - - String sql = URLEncoder.encode("KILL QUERY WHERE query_id='" + query_id + "'", "utf8"); - killsql = url + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - - } - - return killsql; - } - - public static String getFormatUrl(String sql) { - - String formaturl = "http://" + gateway_ip.trim() + "/?option=syntax-check"+ "&query="; - - try { - - sql = URLEncoder.encode(sql, "utf8"); - sql= sql.replaceAll("\\+", "%20"); - formaturl = formaturl + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - - } - - return formaturl; - }*/ } diff --git a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java index fd83b41..643d198 100644 --- a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java +++ b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java @@ -15,23 +15,9 @@ import java.util.concurrent.ConcurrentHashMap; @ConfigurationProperties(prefix = "globle") public class GlobelConfig { - - // public static ExecutorService pool = Executors.newFixedThreadPool(30); - // public static int query_log_sleep; - //public static int loop_number; public static int job_thread; - public static int ck_timeout; public static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>(); - - - public void setCk_timeout(int ck_timeout) { - GlobelConfig.ck_timeout = ck_timeout; - } - - public void setJob_thread(int job_thread) { GlobelConfig.job_thread = job_thread; } - - } diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java index 1fac959..0861eff 100644 --- a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java +++ b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java @@ -34,15 +34,9 @@ public class HbaseFactory { conf.set("hbase.zookeeper.quorum", hbproperties.getZookeeper_quorum()); conf.set("hbase.zookeeper.property.clientPort", hbproperties.getZookeeper_property_clientPort()); conf.set("hbase.zookeeper.znode.parent", hbproperties.getZookeeper_znode_parent()); - // conf.set("hase.client.pause", client_pause); conf.set("hbase.client.retries.number", hbproperties.getClient_retries_number()); conf.set("hbase.rpc.timeout", hbproperties.getRpc_timeout()); - // conf.set("hbase.client.operation.timeout", client_operation_timeout); - // conf.set("hbase.client.scanner.timeout.period", client_scanner_timeout_period); conf.set("hbase.client.keyvalue.maxsize", "1024000000"); - //conf.set("hbase.client.write.buffer", "20971520"); - - } return conf; @@ -63,44 +57,4 @@ public class HbaseFactory { } - /*@Bean(name = "hbaseTable") - public Table getTable(@Qualifier("hbaseConnection") Connection con) { - - Table table=null; - try { - - ExecutorService pool = Executors.newFixedThreadPool(10); - BufferedMutatorParams bmp = new BufferedMutatorParams(); - ss.writeBufferSize(); - ss.setWriteBufferPeriodicFlushTimerTickMs() - BufferedMutatorParams aaa = con.getBufferedMutator() - aaa.writeBufferSize(1000000); - - BufferedMutator aaa=null; - aaa - table = con.getTable(TableName.valueOf("tsg:t1"),pool); - } catch (IOException e) { - e.printStackTrace(); - } - return table; - - - }*/ - - /* @Bean(name = "hbaseExceptionListener") - public BufferedMutator.ExceptionListener getLishtener(@Qualifier("hbaseConnection") Connection con) { - - BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { - - public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { - for (int i = 0; i < e.getNumExceptions(); i++) { - System.out.print("Failed to sent put " + e.getRow(i) + "."); - } - } - }; - return listener; - - - }*/ - } diff --git a/src/main/java/com/mesa/reportservice/pool/HttpClientPool.java b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java index 94f5bf6..28d408b 100644 --- a/src/main/java/com/mesa/reportservice/pool/HttpClientPool.java +++ b/src/main/java/com/mesa/reportservice/configuration/HttpClientPool.java @@ -1,4 +1,4 @@ -package com.mesa.reportservice.pool; +package com.mesa.reportservice.configuration; import com.mesa.reportservice.util.Logs; import org.apache.http.client.config.RequestConfig; @@ -25,7 +25,7 @@ public class HttpClientPool { private Integer connectionRequestTimeout; - private Integer socketTimeout; + public static Integer socketTimeout; private boolean staleConnectionCheckEnabled; diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index 6269cda..a6b53b2 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -2,19 +2,21 @@ package com.mesa.reportservice.controller; import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.configuration.GlobelConfig; +import com.mesa.reportservice.configuration.HttpClientPool; import com.mesa.reportservice.service.ExcuteProcessService; +import com.mesa.reportservice.service.ExcuteService; import com.mesa.reportservice.service.MysqlService; import com.mesa.reportservice.service.ZkService; +import org.apache.commons.codec.digest.DigestUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; - -import static com.mesa.reportservice.configuration.GlobelConfig.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @author wk1 @@ -26,29 +28,86 @@ public class ScheduledResultController { private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass()); + protected static ExecutorService pool = Executors.newFixedThreadPool(30); + + @Autowired private MysqlService ms; - + @Autowired + private ExcuteService es; @Autowired private ExcuteProcessService eps; @Autowired private ZkService zs; - - @Async("resultExecutor") @Scheduled(cron = "${scan.result.scheduled.plan}") public void getExcuteResult() { try { if (zs.isMaster()) { logger.info("开始查看结果"); + //先查询数据库是否有异常状态任务,killquery List<JobEntity> joblist = ms.getJobForExcute(); for (JobEntity jobEntity : joblist) { - eps.getTaskProcess(jobEntity); + String sql = jobEntity.getQuerySql().trim(); + sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')"); + //sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); + //sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')"); + String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql); + jobEntity.setQuery_id(queryid); + + if (jobEntity.getIsValid() == 0) { + eps.killQuery(jobEntity); + GlobelConfig.mapresult.get(jobEntity.getQuery_id()).setIsValid(0); + } else if (!GlobelConfig.mapresult.containsKey(jobEntity.getQuery_id())) { + eps.killQuery(jobEntity); + } + } + //遍历内存中的任务对状态1的更新进度,其他更新数据库的状态 + for (Map.Entry<String, JobEntity> entry : GlobelConfig.mapresult.entrySet()) { + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + long currentTime = System.currentTimeMillis(); + long excutetime =currentTime-entry.getValue().getBeginTime(); + logger.info("excute time="+excutetime+"ttl_time="+HttpClientPool.socketTimeout); + if (entry.getValue().getStatus()==1 && excutetime> HttpClientPool.socketTimeout+1){ + entry.getValue().setStatus(2); + entry.getValue().setExcute_status(500001); + eps.killQuery(entry.getValue()); + eps.updateResultMessage(entry.getValue()); + } + else{ + if (entry.getValue().getStatus() == 1) { + eps.updateProcessMessage(entry.getValue()); + } + else { + eps.updateResultMessage(entry.getValue()); + } + } + } + int rows = GlobelConfig.job_thread - GlobelConfig.mapresult.size(); + for (Map.Entry<String, JobEntity> entry : GlobelConfig.mapresult.entrySet()) { + logger.info("----key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + System.out.print("----key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + } + if (rows > 0) { + List<JobEntity> jobs = ms.getJobTask(rows); + for (JobEntity job : jobs) { + logger.info("开始执行任务"); + long begintime = System.currentTimeMillis(); + job.setBeginTime(begintime); + pool.execute(new Runnable() { + @Override + public void run() { + es.excuteCkTask(job); + } + }); + } + } else { + logger.info("无待执行任务"); + } } - } - catch (Exception e){ + } catch (Exception e) { logger.error(e.toString()); } } diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledTaskController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledTaskController.java deleted file mode 100644 index 77790dc..0000000 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledTaskController.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.mesa.reportservice.controller; - -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.service.ExcuteService; -import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.service.ZkService; -import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.util.Logs; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Created by wk1 on 2020/1/8. - */ -@Component -@EnableScheduling -public class ScheduledTaskController { - private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass()); - - @Autowired - private MysqlService ms; - @Autowired - private ExcuteService es; - @Autowired - private ZkService zs; - - @Async("taskExecutor") - @Scheduled(cron = "${scan.task.scheduled.plan}") - public void excuteSqlToCk() { - - try { - if (zs.isMaster()) { - - int rows = GlobelConfig.job_thread - GlobelConfig.mapresult.size(); - logger.info("row=" + rows); - if (rows > 0) { - List<JobEntity> jobs = ms.getJobTask(1); - for (JobEntity job : jobs) { - logger.info("开始执行任务"); - es.excuteCkTask(job); - - } - } else { - logger.debug("无待执行任务"); - } - } else { - logger.debug("zk 不是主节点"); - - } - } - catch (Exception e){ - logger.error(e.toString()); - } - } - - -} diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java index 54f64f6..d35f6d4 100644 --- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java +++ b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java @@ -9,5 +9,7 @@ import java.util.List; */ public interface ExcuteProcessService { - void getTaskProcess(JobEntity job); + void updateResultMessage(JobEntity job); + void killQuery(JobEntity jobEntity); + void updateProcessMessage(JobEntity job); } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java index d459dae..b203ed5 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java @@ -9,8 +9,6 @@ import com.mesa.reportservice.service.ClickhouseService; import com.mesa.reportservice.service.ExcuteProcessService; import com.mesa.reportservice.service.HbaseService; import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.util.StringUtil; -import org.apache.commons.codec.digest.DigestUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -34,149 +32,105 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { private HbaseService hs; @Override - public void getTaskProcess(JobEntity jobEntity) { + public void updateResultMessage(JobEntity je) { - try { - String sql = jobEntity.getQuerySql().trim(); - sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')"); - // sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); - // sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')"); - //job.setFormatSql(sql); - String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql); - jobEntity.setQuery_id(queryid); - //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行 - if (!GlobelConfig.mapresult.containsKey(queryid)) { - String killurl = ClickhouseConfig.getKillUrl(queryid); - try { - logger.info("startkill++++++" + queryid); - cs.QuerySystemForDelete(killurl); - logger.info("endkill=======" + queryid); - - } catch (Exception e) { - logger.error(e.toString()); - } finally { - updateErrorMessage(jobEntity.getResultId(), 0, "Re Execution"); - } - } else { - if (jobEntity.getIsValid() == 0) { - String killurl = ClickhouseConfig.getKillUrl(queryid); - try { - logger.info("startkill++++++" + queryid); - - cs.QuerySystemForDelete(killurl); - logger.info("endkill=======" + queryid); - } catch (Exception e) { - logger.error(e.toString()); - } finally { - //updateErrorMessage(jobEntity.getResultId(), 7, "Re Execution"); - GlobelConfig.mapresult.get(queryid).setIsValid(0); - logger.error("user cancel resultid =" + GlobelConfig.mapresult.get(queryid).getResultId() + " excutesql=" + GlobelConfig.mapresult.get(queryid).getQuerySql() ); - } - } - JobEntity je = GlobelConfig.mapresult.get(queryid); - logger.info("startget++++++" + je.getQuery_id()); - - if (je.getStatus() == 1) { - je = getProcessMessage(je); - if (je.getExcuteRow() != 0 || je.getExcuteTime() != 0) { - ms.updateProcesses(je); - } - } else { - try { - if (je.getIsValid() == 0) { - updateErrorMessage(je.getResultId(), 9, "CANCEL"); - // logger.error("excute error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() + " formatsql=" + je.getFormatSql()); + logger.info("startget++++++" + je.getQuery_id()); + try { + if (je.getIsValid() == 0) { + je.setStatus(9); + je.setExcuteDetail("CANCEL"); + } else { + switch (je.getExcute_status()) { + + case 200666: + Boolean isok = saveToHbase(je); + if (isok) { + je.setExcuteDetail("SUCCESS"); + je.setExcuteProcess(100); + je.setStatus(2); + logger.info("success save to hbase resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); } else { - switch (je.getExcute_status()) { - - case 200666: - // je = getQueryLogMessage(je); - Boolean isok = saveToHbase(je); - if (isok) { - je.setExcuteDetail("SUCCESS"); - // je.setExcuteTime(0); - je.setExcuteProcess(100); - je.setStatus(2); - int number = 0; - int z = 3; - do { - number = ms.updateProcesses(je); - z--; - } - while (number != 1 && z >= 0); - logger.info("success save to hbase resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - } else { - updateErrorMessage(je.getResultId(), 5, "Write Data Error"); - logger.error("save hbase error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - } - break; - - case 400001: - updateErrorMessage(je.getResultId(), 3, "Param Syntax Error"); - logger.error("Param Syntax Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - break; - case 400010: - updateErrorMessage(je.getResultId(), 3, "SQL Syntax Error"); - logger.error("SQL Syntax Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - break; - case 500001: - updateErrorMessage(je.getResultId(), 4, "SQL Execution Error"); - logger.error("SQL Execution Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - break; - case 500010: - updateErrorMessage(je.getResultId(), 6, "Engine Statistics Error"); - logger.error("Engine Statistics Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - break; - case 555999: - updateErrorMessage(je.getResultId(), 7, "unknow error"); - logger.error("unknow error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - break; - default: - // je = getQueryLogMessage(je); - updateErrorMessage(je.getResultId(), 8, "system error"); - logger.error("system error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql() ); - } + je.setStatus(5); + je.setExcuteDetail("Write Data Error"); + logger.error("save hbase error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); } - } catch (Exception e) { - // updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error"); - logger.error("save database error queryid=" + je.getResultId() + e.toString()); - - } finally { - GlobelConfig.mapresult.remove(je.getQuery_id()); - } + break; + + case 400001: + je.setStatus(3); + je.setExcuteDetail("Param Syntax Error"); + logger.error("Param Syntax Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + break; + case 400010: + je.setStatus(3); + je.setExcuteDetail("SQL Syntax Error"); + logger.error("SQL Syntax Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + break; + case 500001: + je.setStatus(4); + je.setExcuteDetail("SQL Execution Error"); + logger.error("SQL Execution Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + break; + case 500010: + je.setStatus(6); + je.setExcuteDetail("Engine Statistics Error"); + logger.error("Engine Statistics Error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + break; + case 555999: + je.setStatus(7); + je.setExcuteDetail("Unknow Error"); + logger.error("unknow error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + break; + default: + je.setStatus(8); + je.setExcuteDetail("System Error"); + logger.error("system error resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); } + + } + int number = 0; + int z = 3; + do { + number = ms.updateProcesses(je); + z--; } + while (number != 1 && z >= 0); } catch (Exception e) { - logger.error(e.toString()); + // updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error"); + logger.error("save database error queryid=" + je.getResultId() + e.toString()); + } finally { + GlobelConfig.mapresult.remove(je.getQuery_id()); } - } - public void updateErrorMessage(int id, int status, String error) { - - - JobEntity js = new JobEntity(); + } + @Override + public void killQuery(JobEntity jobEntity) { + String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id()); try { - js.setResultId(id); - js.setStatus(status); - js.setExcuteDetail(StringUtil.bSubstring(error, 1000)); + logger.info("startkill++++++" + jobEntity.getQuery_id()); + cs.QuerySystemForDelete(killurl); + logger.info("endkill=======" + jobEntity.getQuery_id()); + } catch (Exception e) { - logger.error("updateErrorMessage Error " + e.toString()); - js.setExcuteDetail("updateErrorMessage Error"); + logger.error(e.toString()); + } finally { + jobEntity.setStatus(0); + jobEntity.setExcuteDetail("Re Execution"); + ms.updateProcesses(jobEntity); } - ms.updateProcesses(js); - } + /** * 获取进度条信息 */ - public JobEntity getProcessMessage(JobEntity job) { - // JobEntity job = (JobEntity) jb.clone(); + @Override + public void updateProcessMessage(JobEntity job) { String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id()); HttpResult hr = null; try { @@ -185,41 +139,38 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { Map data = JSON.parseObject(rs); if (!rs.isEmpty() && !rs.equals("")) { - //JobEntity job = new JobEntity(); List listdata = (List) data.get("data"); - if(listdata.size()>0) { + if (null != listdata && listdata.size() > 0) { Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0))); - //float elapsed = Float.parseFloat(map.get("elapsed").toString()); long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString()); long read_rows = Long.parseLong(map.get("read_rows").toString()); - - /* int ftime = (int) (elapsed * total_rows_approx / read_rows); - ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10; - if (ftime <= 0) { - ftime = 1; - }*/ - //logger.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime); + float elapsed = Float.parseFloat(map.get("elapsed").toString())*1000; double persent = (read_rows * 1.00 / total_rows_approx); int result = (int) (persent * 100); if (result > 98) { result = 98; } + job.setExcuteTime((int) elapsed); job.setExcuteRow(total_rows_approx); job.setExcuteProcess(result); - // job.setExcuteTime(ftime); + if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) { + ms.updateProcesses(job); + } + } else { + logger.error("responsedata is null"); } + } else { + logger.error("responsebody is null"); } } catch (Exception e) { logger.error(e.toString()); } - return job; } /** * 结果存入hbase */ public Boolean saveToHbase(JobEntity entity) { - int k = 3; Boolean isok = false; do { @@ -236,64 +187,4 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { return isok; } - /** - * 获取查询log中的信息 - */ - /* public JobEntity getQueryLogMessage(JobEntity job) { - - if (job.getStatus() != 4) { - String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id()); - HttpResult num = new HttpResult(); - int j = 0; - do { - try { - //ck默认最慢同步时间7500ms,循环三次 - // Thread.sleep(3000); - num = cs.QuerySystemForGet(finishurl); - if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) { - - Map mapresult = JSON.parseObject(num.getBody()); - int type = Integer.parseInt(mapresult.get("type").toString()); - logger.debug("type" + type); - switch (type) { - case 2: - job.setStatus(type); - job.setExcuteDetail("SUCCESS"); - job.setQuery_duration_ms(mapresult.get("query_duration_ms").toString()); - job.setMemory_usage(mapresult.get("memory_usage").toString()); - job.setExcuteRow(Long.parseLong(mapresult.get("read_rows").toString())); - job.setResultRows(Integer.valueOf(mapresult.get("result_rows").toString())); - break; - case 3: - job.setStatus(type); - job.setExcuteDetail("SQL Syntax Error:" + mapresult.get("exception").toString()); - - // job.setQuerySql(mapresult.get("query").toString()); - break; - case 4: - job.setStatus(type); - job.setExcuteDetail("SQL Execution Error:" + mapresult.get("exception").toString()); - // job.setQuerySql(mapresult.get("query").toString()); - break; - default: - break; - } - } else { - job.setStatus(6); - job.setExcuteDetail("Other ERROR "); - - } - } catch (Exception e) { - job.setStatus(6); - job.setExcuteDetail("Other ERROR: " + e.toString()); - logger.error("获取querylogmessage失败"); - } - j--; - } - while (job.getStatus() == 6 && j > 0); - return job; - } - - return job; - }*/ } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java index 3b6a03b..e732e9d 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java @@ -1,25 +1,20 @@ package com.mesa.reportservice.service.impl; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.mesa.reportservice.bean.HttpResult; import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.configuration.ClickhouseConfig; +import com.mesa.reportservice.configuration.GlobelConfig; import com.mesa.reportservice.service.ClickhouseService; import com.mesa.reportservice.service.ExcuteService; import com.mesa.reportservice.service.FormatService; import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.configuration.GlobelConfig; import io.netty.channel.ConnectTimeoutException; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.net.ConnectException; -import java.net.SocketException; import java.net.SocketTimeoutException; -import java.util.HashMap; import java.util.Map; import static com.mesa.reportservice.configuration.ClickhouseConfig.getJobUrl; @@ -46,116 +41,94 @@ public class ExcuteserviceImpl implements ExcuteService { String sql = job.getQuerySql().trim(); sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')"); - // sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); - // sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')"); - //job.setFormatSql(sql); + //sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); + //sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')"); job.setQuerySql(sql); String queryid = DigestUtils.md5Hex(job.getResultId() + sql); + job.setQuery_id(queryid); + job.setQuerySql(sql); + job.setStatus(1); + job.setExcute_status(1); + job.setExcuteDetail("EXECUTING"); + job.setExcuteRow(0L); + job.setExcuteTime(0); + job.setExcuteProcess(0); + job.setResultRows(0); + //JobEntity jt = (JobEntity) job.clone(); + GlobelConfig.mapresult.put(queryid, job); + ms.updateProcesses(job); + logger.info("execute queryid=" + queryid + " sql=" + sql + "mapresult size=" + GlobelConfig.mapresult.size()); + String joburl = getJobUrl(job.getQuerySql(), job.getResultId()); + String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id()); + HttpResult hr = new HttpResult(); + int k = 3; + do { + try { + hr = cs.sendQueryForGet(joburl); + logger.info("httpcode" + hr.getCode()); + + if (hr != null && hr.getCode() != 200) { + + k = 0; + Map mapresult = JSON.parseObject(hr.getBody()); + job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); + logger.error("excute sql Error "); + + } else { + + k = 0; + Map mapresult = JSON.parseObject(hr.getBody()); + + Map rows = (Map) mapresult.get("statistics"); + // Integer resultrows = (Integer) rows.get("result_rows"); + job.setResultRows(Integer.parseInt(rows.get("result_rows").toString())); + job.setExcuteRow(Long.parseLong(rows.get("rows_read").toString())); + job.setResult(hr.getBody()); + job.setExcuteTime((int) Float.parseFloat(rows.get("elapsed").toString())); + job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); + logger.info("success queryid=" + queryid + " sql=" + sql); - /* job.setFormatSql(sql); - try { - String formateUrl = getFormatUrl(sql); - sql = fs.doFormat(formateUrl); - } catch (Exception e) { - sql=""; - logger.error(e.toString()); - } + } + } catch (SocketTimeoutException e) { + k--; + job.setExcuteDetail(e.toString()); + if (k == 0) { + job.setExcute_status(500001); + job.setExcuteDetail("SQL Execution Error excute query time out"); + logger.info("timeout queryid=" + queryid + " sql=" + sql); + } else { + logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); + } - if (sql.equals("ok") || sql.equals("")) { - job.setStatus(3); - job.setExcuteTime(0); - job.setExcuteDetail("SQL Syntax Error parsing SQL Error"); - ms.updateStatue(job); - - - } else {*/ - job.setQuery_id(queryid); - job.setQuerySql(sql); - job.setStatus(1); - job.setExcute_status(1); - job.setExcuteDetail("EXECUTING"); - //JobEntity jt = (JobEntity) job.clone(); - GlobelConfig.mapresult.put(queryid, job); - ms.updateStatue(job); - logger.info("execute queryid=" + queryid + " sql=" + sql + "mapresult size=" + GlobelConfig.mapresult.size()); - String joburl = getJobUrl(job.getQuerySql(), job.getResultId()); - String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id()); - HttpResult hr = new HttpResult(); - int k = 3; - do { - try { - hr = cs.sendQueryForGet(joburl); - logger.info("httpcode" + hr.getCode()); - - if (hr!=null &&hr.getCode() != 200 ) { - - k=0; - Map mapresult = JSON.parseObject(hr.getBody()); - job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); - logger.error("excute sql Error "); - - } else { - - k=0; - Map mapresult = JSON.parseObject(hr.getBody()); - - Map rows = (Map) mapresult.get("statistics"); - // Integer resultrows = (Integer) rows.get("result_rows"); - job.setResultRows(Integer.parseInt(rows.get("result_rows").toString())); - job.setExcuteRow(Long.parseLong(rows.get("rows_read").toString())); - job.setResult(hr.getBody()); - job.setExcuteTime(Math.round( Float.parseFloat(rows.get("elapsed").toString()))); - - job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); - logger.info("success queryid=" + queryid + " sql=" + sql); - - } - } catch (SocketTimeoutException e) { - k--; - job.setExcuteDetail(e.toString()); - if(k==0) { - job.setExcute_status(500001); - job.setExcuteDetail("SQL Execution Error excute query time out"); - logger.info("timeout queryid=" + queryid + " sql=" + sql); - } - else{ - logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); - } - - - } catch (ConnectTimeoutException e){ - - job.setExcute_status(555999); - job.setExcuteDetail(e.toString()); - logger.error("Unknow Error" + e.toString()); - k=0; - } - catch (OutOfMemoryError e){ + } catch (ConnectTimeoutException e) { - job.setExcute_status(555999); - job.setExcuteDetail("result too large"); - logger.error("outofmemery Error" + e.toString()); - k=0; - } - catch (Exception e) { - job.setExcute_status(555999); - job.setExcuteDetail(e.toString()); - logger.error("Unknow Error" + e.toString()); - k=0; - } + job.setExcute_status(555999); + job.setExcuteDetail(e.toString()); + logger.error("Unknow Error" + e.toString()); + k = 0; - try { - cs.QuerySystemForDelete(killurl); - } catch (Exception e) { - logger.error("Kill Query Error" + e.toString()); - } - } - while (k > 0); - job.setStatus(2); - // GlobelConfig.mapresult.put(job.getQuery_id(), job); + } catch (OutOfMemoryError e) { + job.setExcute_status(555999); + job.setExcuteDetail("result too large"); + logger.error("outofmemery Error" + e.toString()); + k = 0; + } catch (Exception e) { + job.setExcute_status(555999); + job.setExcuteDetail(e.toString()); + logger.error("Unknow Error" + e.toString()); + k = 0; + } + try { + cs.QuerySystemForDelete(killurl); + } catch (Exception e) { + logger.error("Kill Query Error" + e.toString()); + } + } + while (k > 0); + job.setStatus(2); } diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java index 3cda945..1b216b2 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java @@ -62,13 +62,6 @@ public class MysqlServiceImpl implements MysqlService { String sDate = formatter.format(currentTime); java.sql.Timestamp nowTime = java.sql.Timestamp.valueOf(sDate); job.setOpTime(sDate); - /* if (job.getExcuteDetail().length() > 500) { - try { - job.setExcuteDetail(StringUtil.bSubstring(job.getExcuteDetail(), 500)); - } catch (Exception e) { - Logs.error(e.toString()); - } - }*/ return rrm.updateStatue(job); } }
\ No newline at end of file diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties index 67ed66a..1d5bffa 100644 --- a/src/main/resources/config/application.properties +++ b/src/main/resources/config/application.properties @@ -2,9 +2,9 @@ server.port=9093 #���½�������ʱ��10s scan.result.scheduled.plan=0/10 * * * * ? #��ʱɨ��mysql���ݿ�report_result��ʱ��15s -scan.task.scheduled.plan=0/15 * * * * ? #ͬʱ��ִ�����߳��� globle.job_thread=2 +globle.ttl_time=21600600 #Hbasehttp�Ķ˿� #Hbase�ı���������ͨ������Ҫ���� hbase.table=tsg:report_result |
