diff options
14 files changed, 565 insertions, 210 deletions
@@ -9,7 +9,7 @@ <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.mesa</groupId> - <artifactId>reportservice</artifactId> + <artifactId>galaxy-report-service</artifactId> <version>0.0.1</version> <name>galaxy-report-service</name> <description>Demo project for Spring Boot</description> diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index d51bf1c..48af9fb 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -3,7 +3,7 @@ package com.mesa.reportservice.bean; /** * Created by wk1 on 2019/5/20. */ -public class JobEntity { +public class JobEntity implements Cloneable{ private int result_id; private String result_name; @@ -22,8 +22,36 @@ public class JobEntity { private String query_duration_ms; private long read_rows; private String memory_usage; + private String result; + private int excute_status; + private String issued_time; + private String exception; + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + public String getIssued_time() { + return issued_time; + } + + public void setIssued_time(String issued_time) { + this.issued_time = issued_time; + } + + public int getExcute_status() { + return excute_status; + } + + public void setExcute_status(int excute_status) { + this.excute_status = excute_status; + } + public int getResult_id() { return result_id; } @@ -160,4 +188,24 @@ public class JobEntity { public void setMemory_usage(String memory_usage) { this.memory_usage = memory_usage; } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + + + @Override + public Object clone() { + JobEntity o = null; + try { + o = (JobEntity) super.clone(); + } catch (CloneNotSupportedException e) { + System.out.println(e.toString()); + } + return o; + } } diff --git a/src/main/java/com/mesa/reportservice/dao/DruidPool.java b/src/main/java/com/mesa/reportservice/dao/DruidPool.java index 1cb5d52..5135e11 100644 --- a/src/main/java/com/mesa/reportservice/dao/DruidPool.java +++ b/src/main/java/com/mesa/reportservice/dao/DruidPool.java @@ -5,9 +5,13 @@ package com.mesa.reportservice.dao; */ import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidDataSourceFactory; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; +import com.google.common.collect.Maps; import com.mesa.reportservice.util.Logs; +import org.junit.Test; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; @@ -15,7 +19,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.sql.DataSource; -import java.sql.SQLException; +import java.io.InputStream; +import java.sql.*; +import java.util.Arrays; +import java.util.Properties; @Component @@ -94,12 +101,12 @@ public class DruidPool { dataSourcedb.setMaxActive(maxactive); //连接泄漏监测 dataSourcedb.setRemoveAbandoned(true); - dataSourcedb.setRemoveAbandonedTimeout(30); + dataSourcedb.setRemoveAbandonedTimeout(1800); dataSourcedb.setDefaultAutoCommit(false); //配置获取连接等待超时的时间 - dataSourcedb.setMaxWait(300000); + dataSourcedb.setMaxWait(30000); //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 - dataSourcedb.setTimeBetweenEvictionRunsMillis(60000); + dataSourcedb.setTimeBetweenEvictionRunsMillis(30000); //防止过期 dataSourcedb.setValidationQuery("SELECT 'x'"); dataSourcedb.setTestWhileIdle(true); diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java new file mode 100644 index 0000000..a5e826c --- /dev/null +++ b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java @@ -0,0 +1,82 @@ +package com.mesa.reportservice.scheduledtask; + +import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.service.HttpAPIService; +import com.mesa.reportservice.util.ClickhouseConfig; +import com.mesa.reportservice.util.ConfigUtil; +import com.mesa.reportservice.util.Logs; + +import java.net.SocketException; + +import static com.mesa.reportservice.util.ClickhouseConfig.getJobUrl; + +/** + * Created by wk1 on 2019/5/31. + */ + +//@Component + +public class ExcuteTask { + + + private HttpAPIService hp; + + public ExcuteTask(HttpAPIService hp) { + + this.hp = hp; + } + + + public void dotask(JobEntity je) { + + String joburl = getJobUrl(je.getQuery_sql(), je.getQuery_id()); + HttpResult hr = new HttpResult(); + + + int k = 3; + do { + try { + hr = hp.doPost(joburl); + if(hr.getCode()!=200 && hr.getBody().toLowerCase().contains("timeout")){ + k--; + String killurl = ClickhouseConfig.getKillUrl(je.getQuery_id()); + hp.doPost(killurl); + } + else { + break; + } + } catch (SocketException e) { + k--; + je.setExcute_detail("查询失败" + e.toString()); + je.setStatus(4); + Logs.error("查询失败" + e.toString()); + } catch (Exception e) { + je.setStatus(4); + je.setExcute_detail("查询失败" + e.toString()); + Logs.error("查询失败" + e.toString()); + break; + } + } + while (k > 0); + + je.setResult(hr.getBody()); + if (hr.getCode() == 200) { + je.setResult(hr.getBody()); + je.setExcute_detail("success"); + je.setExcute_time(0); + je.setExcute_process(100); + je.setStatus(2); + } else { + je.setResult(""); + je.setStatus(4); + je.setExcute_detail("查询clickhouse失败" + hr.getBody()); + Logs.error("查询clickhouse失败" + hr.getBody()); + + } + JobEntity ji = (JobEntity) je.clone(); + ConfigUtil.mapresult.put(ji.getQuery_id(), ji); + } + + +} diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java index 1062af8..6379b50 100644 --- a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java +++ b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java @@ -1,31 +1,13 @@ package com.mesa.reportservice.scheduledtask; -import com.alibaba.fastjson.JSON; -import com.mesa.reportservice.bean.HttpResult; -import com.mesa.reportservice.bean.JobEntity; -import com.mesa.reportservice.service.DataBaseBusiness; -import com.mesa.reportservice.service.HttpAPIService; -import com.mesa.reportservice.util.ClickhouseConfig; -import com.mesa.reportservice.util.ConfigUtil; -import com.mesa.reportservice.util.HbaseUtil; -import com.mesa.reportservice.util.Logs; -import org.apache.commons.codec.digest.DigestUtils; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; - /** * Created by wk1 on 2019/5/31. */ -@Component +//@Component public class JobTask { - +/* protected ExecutorService pool = ConfigUtil.pool; private HttpAPIService hp; private DataBaseBusiness hb; @@ -57,7 +39,7 @@ public class JobTask { int id = jl.getResult_id(); String sql = jl.getQuery_sql().trim(); String hbaseid = DigestUtils.md5Hex(id + sql); - sql = sql.replace("$exe_time", "toDateTime('" + jl.getEnd_time().trim() + "')"); + sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')"); String query_id = UUID.randomUUID().toString().replaceAll("-", ""); String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id); hb.updateStatue(id, 1); @@ -154,9 +136,7 @@ public class JobTask { throw new Exception("查询clickhouse失败" + hr.getBody()); } } - /* String h2=hp.doGet("http://192.168.10.202:4444/tsg:report_result/"+query_id+"/response:result"); - // Map<String, Object> aaa = hs.queryData("tsg:report_result",query_id); - System.out.print(h2);*/ + } catch (InterruptedException w) { Logs.error(w.toString()); @@ -248,6 +228,10 @@ public class JobTask { } } while (k >= 0); + + // String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + hbaseid + "/response:result"); + // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id); + // System.out.print(h2); return h1; } @@ -257,7 +241,7 @@ public class JobTask { JobEntity job = new JobEntity(); String finishurl = ClickhouseConfig.getFinishUrl(query_id); - HttpResult num = null; + HttpResult num = new HttpResult(); int j = 3; do { try { @@ -306,6 +290,6 @@ public class JobTask { } while (job.getStatus() == 5 && j >= 0); return job; - } + }*/ } diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java index 1ba6d43..a0445a7 100644 --- a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java +++ b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java @@ -1,16 +1,22 @@ package com.mesa.reportservice.scheduledtask; +import com.alibaba.fastjson.JSON; +import com.mesa.reportservice.bean.HttpResult; import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.service.DataBaseBusiness; import com.mesa.reportservice.service.HttpAPIService; +import com.mesa.reportservice.util.ClickhouseConfig; import com.mesa.reportservice.util.ConfigUtil; +import com.mesa.reportservice.util.HbaseUtil; import com.mesa.reportservice.util.Logs; +import org.apache.commons.codec.digest.DigestUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,7 +34,7 @@ public class ScheduledTask { @Autowired private DataBaseBusiness hb; - @Scheduled(cron = "${scan.mysql.scheduled.plan}") + /* @Scheduled(cron = "${scan.mysql.scheduled.plan}") public void run() { System.out.print(Thread.currentThread()); @@ -39,7 +45,7 @@ public class ScheduledTask { Logs.debug("joblist size=" + joblist.size()); for (JobEntity jl : joblist) { - Thread.sleep(2000); + // Thread.sleep(2000); ConfigUtil.job_thread--; Logs.debug("jobthread--=" + ConfigUtil.job_thread + "---jobid=" + jl.getResult_id()); pool.execute(new Runnable() { @@ -61,218 +67,295 @@ public class ScheduledTask { Logs.error(e.toString()); } } +*/ - // @Scheduled(cron = "${scan.mysql.scheduled.plan}") - /*public void run() { + @Scheduled(cron = "${scan.mysql.scheduled.plan}") + public void scanmysql() { - System.out.print(Thread.currentThread()); - List<JobEntity> joblist = hb.getJobLimitRows(1); - for (JobEntity jl : joblist) { - CountDownLatch countDownLatch = new CountDownLatch(1); + try { - int id = jl.getJob_id(); - String sqlold = jl.getQuery_sql().trim(); - String query_id = UUID.randomUUID().toString().replaceAll("-", ""); - String sql = sqlold.replaceAll(";", ""); - String hbaseid = DigestUtils.md5Hex(sqlold); - try { + List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread); + Logs.debug("joblist size=" + joblist.size()); + + for (JobEntity jl : joblist) { + String sql = jl.getQuery_sql().trim(); + String queryid = DigestUtils.md5Hex(jl.getResult_id() + sql); + sql = sql.replace("$exe_time", "toDateTime('" + jl.getEnd_time().trim() + "')"); + jl.setQuery_id(queryid); + jl.setQuery_sql(sql); + jl.setStatus(1); + jl.setResult(""); + jl.setException(""); + jl.setExcute_detail(""); + JobEntity jt = (JobEntity) jl.clone(); + ConfigUtil.mapresult.put(queryid, jt); + + hb.updateStatue(jl.getResult_id(), 1); - String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id); - System.out.print(httpQuery); pool.execute(new Runnable() { @Override public void run() { - try { - int i; - for (i = 0; i < ConfigUtil.loop_number; i++) { - Thread.sleep(ConfigUtil.query_log_sleep); - String queryProccess = ClickhouseConfig.getProcessUrl(query_id); - //System.out.println("queryProccess"+queryProccess); - - HttpResult hr = hp.doPost(queryProccess); - String rs = hr.getBody().trim(); - if (!rs.isEmpty() && !rs.equals("")) { - // int status = hb.getJobStatueFromId(id); - JobEntity en = getProcessMessage(rs, id); - en.setExcute_detail(""); - if (en.getExcute_time() < ConfigUtil.ck_timeout) { - - if (en.getExcute_time() > ConfigUtil.query_log_sleep) { - - hb.updateProcesses(en); - - } else { - break; - } - } else { - - i = 1001; - break; - } - } else { - break; - } - } - if (i >= 1000) { - String killurl = ClickhouseConfig.getKillUrl(query_id); - hp.doPost(killurl); - } - } catch (Exception e) { - logger.error("获取进度失败" + e.toString()); - updateErrorMessage(id, e.toString()); - } - countDownLatch.countDown(); + JobEntity je = (JobEntity) jl.clone(); + ExcuteTask ex = new ExcuteTask(hp); + ex.dotask(je); + } }); + ConfigUtil.job_thread--; + + + } + + } catch (Exception e) { + Logs.error(e.toString()); + } + } - HttpResult hr = hp.doPost(httpQuery); - countDownLatch.await(); + @Scheduled(cron = "${scan.job.scheduled.plan}") + public void scanmysqlresult() { - if (hr.getCode() == 200) { + try { - String hbasejson = HbaseUtil.getHbaseJson(hbaseid, hr.getBody()); - String hbaseurl = HbaseUtil.getHbasePostUrl(); - HttpResult h1 = hp.doPost(hbaseurl, hbasejson); + List<JobEntity> joblist = hb.getJobForExcute(); + Logs.debug("joblist size=" + joblist.size()); + + for (JobEntity jk : joblist) { + String sql = jk.getQuery_sql().trim(); + String queryid = DigestUtils.md5Hex(jk.getResult_id() + sql); + if (!ConfigUtil.mapresult.containsKey(queryid)) { + String killurl = ClickhouseConfig.getKillUrl(queryid); + hp.doPost(killurl); + updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行"); + + break; + } + + + JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone(); + /* if (je == null) { + break; + }*/ + if (je.getStatus() == 2 ) { + try { + je = getQueryLogMessage(je); + HttpResult h1 = saveToHbase(je); if (h1.getCode() == 200) { - JobEntity entity = new JobEntity(); - int j = 3; + //Thread.sleep(5000); + // Logs.debug(entity.getJob_id() + entity.getStatue()); + + je.setExcute_detail("success"); + je.setExcute_time(0); + je.setExcute_process(100); + je.setStatus(2); + int number = 0; + int z = 3; do { - Thread.sleep(3000); - entity = getQueryLogMessage(id, query_id); - j--; + number = hb.updateProcesses(je); + z--; } - while (entity.getStatue() != 2 && j > 0); - System.out.print(entity.getJob_id() + entity.getStatue()); + while (number != 1 && z >= 0); - hb.updateProcesses(entity); } else { - updateErrorMessage(id, "写入hbase失败"); + updateErrorMessage(je.getResult_id(), 5, "写入hbase失败"); + Logs.error("写入hbase失败"); } + } catch (Exception e) { + Logs.error(e.toString()); + + } finally { + ConfigUtil.mapresult.remove(queryid); + ConfigUtil.job_thread++; + } - } else { - if (hr.getBody().contains("cancelled")) { + } else if (je.getStatus() > 2) { + + try { + je = getQueryLogMessage(je); + HttpResult h1 = saveToHbase(je); + if (h1.getCode() == 200) { - updateErrorMessage(id, "查询超时任务已经取消"); + int number = 0; + int z = 3; + do { + number = hb.updateProcesses(je); + z--; + } + while (number != 1 && z >= 0); + + } else { + updateErrorMessage(je.getResult_id(), 5, "错误日志写入hbase失败"); + Logs.error("错误日志写入hbase失败"); + } + } catch (Exception e) { + Logs.error(e.toString()); + + } finally { + ConfigUtil.mapresult.remove(queryid); + ConfigUtil.job_thread++; } - else{ - updateErrorMessage(id, "查询clickhouse失败" + hr.getBody()); + + } else { + JobEntity jo = getProcessMessage(je); + if(jo.getExcute_row()!=0 || jo.getExcute_time()!=0) { + hb.updateProcesses(jo); } } - String h2 = hp.doGet("http://192.168.10.202:4444/tsg:report_result/" + hbaseid + "/response:result"); - // Map<String, Object> aaa = hs.queryData("tsg:report_result",query_id); - System.out.print("===============" + h2); - } catch (Exception e) { - - logger.error("任务执行中报错" + e.toString()); - updateErrorMessage(id, e.toString()); - break; - } - } - }*/ - + } + } catch (Exception e) { + Logs.error(e.toString()); + // ConfigUtil.job_thread++; + } + } -/* private synchronized void updateErrorMessage(int id, String error) { + public void updateErrorMessage(int id, int status, String error) { JobEntity js = new JobEntity(); - js.setJob_id(id); - js.setStatue(3); + js.setResult_id(id); + js.setStatus(status); js.setExcute_detail("任务执行中报错" + error); hb.updateProcesses(js); } - private synchronized JobEntity getProcessMessage(String body, int id) { - JobEntity job = new JobEntity(); - Map map = JSON.parseObject(body); - float elapsed = Float.parseFloat(map.get("elapsed").toString()); - long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString()); - long read_rows = Long.parseLong(map.get("read_rows").toString()); - int ftime = (int) (elapsed * total_rows_approx / read_rows); - ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10; - logger.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime); - - double persent = (read_rows * 1.00 / total_rows_approx); - int result = (int) (persent * 100); - job.setExcute_row(total_rows_approx); - job.setExcute_process(result); - job.setStatue(1); - job.setExcute_time(ftime); - job.setJob_id(id); - System.out.println("%+++++++++++++" + result + "%"); - System.out.println("needtime--------" + ftime); + public JobEntity getProcessMessage(JobEntity jb) { + JobEntity job = (JobEntity) jb.clone(); + String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id()); + HttpResult hr = null; + try { + hr = hp.doPost(queryProccess); + String rs = hr.getBody().trim(); + if (!rs.isEmpty() && !rs.equals("")) { + //JobEntity job = new JobEntity(); + Map map = JSON.parseObject(rs); + float elapsed = Float.parseFloat(map.get("elapsed").toString()); + long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString()); + long read_rows = Long.parseLong(map.get("read_rows").toString()); + int ftime = (int) (elapsed * total_rows_approx / read_rows); + ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10; + if(ftime<0){ + ftime=0; + } + Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime); + // NumberFormat numberFormat = NumberFormat.getInstance(); + //numberFormat.setMaximumFractionDigits(0); + + double persent = (read_rows * 1.00 / total_rows_approx); + int result = (int) (persent * 100); + job.setExcute_row(total_rows_approx); + job.setExcute_process(result); + // job.setStatus(1); + job.setExcute_time(ftime); + job.setExcute_detail(""); + System.out.println("%+++++++++++++" + result + "%"); + System.out.println("needtime--------" + ftime); + } + } catch (Exception e) { + e.printStackTrace(); + } + return job; } + public HttpResult saveToHbase(JobEntity entity) { - private synchronized JobEntity getQueryLogMessage(int id, String query_id) { + int k = 3; + HttpResult h1 = new HttpResult(); - JobEntity job = new JobEntity(); + do { + try { + String hbasejson = HbaseUtil.getHbaseJson(entity.getQuery_id(), entity.getResult(), entity); + String hbaseurl = HbaseUtil.getHbasePostUrl(); + k--; + h1 = hp.doPost(hbaseurl, hbasejson); + break; + } catch (Exception e) { + Logs.error("写入hbase报错重试次数" + (3 - k)); + k--; + e.printStackTrace(); + } + } + while (k >= 0); - String finishurl = ClickhouseConfig.getFinishUrl(query_id); - HttpResult num = null; - try { - num = hp.doPost(finishurl); - - if (!num.getBody().isEmpty() && !num.getBody().equals("")) { - - - job.setJob_id(id); - Map mapresult = JSON.parseObject(num.getBody()); - int type = Integer.parseInt(mapresult.get("type").toString()); - int query_duration_ms = Integer.parseInt(mapresult.get("query_duration_ms").toString()); - long read_rows = Long.parseLong(mapresult.get("read_rows").toString()); - long memory_usage = Long.parseLong(mapresult.get("memory_usage").toString()); - String query = mapresult.get("query").toString(); - String event_time = mapresult.get("event_time").toString(); - logger.debug("type" + type); - - - switch (type) { - case 2: - job.setStatue(2); - job.setExcute_detail("success"); - break; - case 3: - job.setStatue(3); - job.setExcute_detail("语法错误"); - break; - case 4: - job.setStatue(4); - job.setExcute_detail("性能问题"); - break; - - default: - break; - } - job.setExcute_time(0); - job.setExcute_process(100); - job.setExcute_row(read_rows); - } else { + //String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + queryid + "/response:result"); + // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id); + // System.out.print(h2); + return h1; + } - job.setStatue(3); - job.setExcute_detail("没有执行"); - } + public JobEntity getQueryLogMessage(JobEntity job) { - } catch (Exception e) { - e.printStackTrace(); + + // JobEntity job = (JobEntity) jb.clone(); + /* job.setResult_id(resultid); + job.setQuery_id(queryid);*/ + String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id()); + HttpResult num = new HttpResult(); + int j = 0; + do { + try { + Thread.sleep(3000); + num = hp.doPost(finishurl); + if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) { + + Map mapresult = JSON.parseObject(num.getBody()); + int type = Integer.parseInt(mapresult.get("type").toString()); + Logs.debug("type" + type); + switch (type) { + case 2: + job.setStatus(type); + job.setExcute_detail("success"); + job.setQuery_duration_ms(mapresult.get("query_duration_ms").toString()); + job.setMemory_usage(mapresult.get("memory_usage").toString()); + job.setExcute_row(Long.parseLong(mapresult.get("read_rows").toString())); + break; + case 3: + job.setStatus(type); + job.setExcute_detail("语法错误"+mapresult.get("exception").toString()); + job.setQuery_sql(mapresult.get("query").toString()); + job.setException(mapresult.get("exception").toString()); + break; + case 4: + job.setStatus(type); + job.setExcute_detail("性能问题"+mapresult.get("exception").toString()); + job.setQuery_sql(mapresult.get("query").toString()); + job.setException(mapresult.get("exception").toString()); + break; + default: + break; + } + + + } else { + job.setStatus(6); + job.setExcute_detail("没有查到query信息"); + job.setException("没有查到query信息"); + + } + } catch (Exception e) { + job.setStatus(6); + job.setException("获取querylogmessage失败"); + Logs.error("获取querylogmessage失败"); + } + j--; } + while (job.getStatus() == 6 && j > 0); return job; - }*/ + } } diff --git a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java index fbc3582..3f8d729 100644 --- a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java +++ b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java @@ -38,10 +38,59 @@ public class DataBaseBusiness { } + + public List<JobEntity> getJobForExcute() { + + + + String sql = " select * from report_result where status =1 order by end_time "; + int resultid=0; + List<JobEntity> sqllist = new ArrayList<JobEntity>(); + try { + //connection = manager.getConnection("idb"); + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + rset = pstmt.executeQuery(); + while (rset.next()) { + JobEntity je = new JobEntity(); + resultid = rset.getInt("result_id"); + je.setResult_id(rset.getInt("result_id")); + je.setEnd_time(rset.getString("end_time").trim().substring(0, 19)); + je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19)); + je.setQuery_sql(rset.getString("query_sql").trim()); + sqllist.add(je); + } + Logs.debug("-----------success----------" + sql); + + } catch (Exception e) { + Logs.error(sql + "获取进度列表发生异常!"); + updateStatue(resultid,4); + e.printStackTrace(); + throw new RuntimeException("获取进度列表发生异常!", e); + } finally { + clear(pstmt, null, rset); + // manager.freeConnection("idb", connection); + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + Logs.error(e.toString()); + e.printStackTrace(); + } + + } + } + return sqllist; + + } + + + public List<JobEntity> getJobLimitRows(int row) { String current_time = DateUtil.getDate(); - + int resultid=0; String sql = " select * from report_result where status =0 and end_time<'" + current_time + "' order by end_time limit " + row; List<JobEntity> sqllist = new ArrayList<JobEntity>(); @@ -53,8 +102,11 @@ public class DataBaseBusiness { rset = pstmt.executeQuery(); while (rset.next()) { JobEntity je = new JobEntity(); + resultid = rset.getInt("result_id"); + je.setExcute_detail(""); je.setResult_id(rset.getInt("result_id")); je.setEnd_time(rset.getString("end_time").trim().substring(0, 19)); + je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19)); je.setQuery_sql(rset.getString("query_sql").trim()); sqllist.add(je); } @@ -62,8 +114,11 @@ public class DataBaseBusiness { } catch (Exception e) { Logs.error(sql + "获取任务列表发生异常!"); + updateStatue(resultid,4); e.printStackTrace(); throw new RuntimeException("获取任务列表发生异常!", e); + + } finally { clear(pstmt, null, rset); // manager.freeConnection("idb", connection); @@ -80,6 +135,9 @@ public class DataBaseBusiness { return sqllist; } + + + public int getJobStatueFromId(int id) { int statue = 0; @@ -168,9 +226,9 @@ public class DataBaseBusiness { Logs.debug("-----------success----------" + sql); } catch (Exception e) { - Logs.error(sql + "存储结果日志发生异常!"); + Logs.error(sql + "更新状态发生异常!"); e.printStackTrace(); - throw new RuntimeException("存储结果日志发生异常!", e); + throw new RuntimeException("更新状态发生异常!", e); } finally { clear(pstmt, null, null); // manager.freeConnection("idb", connection); @@ -187,4 +245,5 @@ public class DataBaseBusiness { } + } diff --git a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java index 65cfba1..3c46b69 100644 --- a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java +++ b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java @@ -38,7 +38,7 @@ public class HttpAPIService { public String doGet(String url) throws Exception { // 声明 http get 请求 HttpGet httpGet = new HttpGet(url); - + httpGet.setHeader("Accept","application/json"); // 装载配置信息 httpGet.setConfig(config); @@ -91,7 +91,7 @@ public class HttpAPIService { HttpPost httpPost = new HttpPost(url); // 加入配置信息 httpPost.setConfig(config); - + // httpPost.setHeader("Accept","application/json"); // 判断map是否为空,不为空则进行遍历,封装from表单对象 if (!json.equals("") && !json.isEmpty()) { diff --git a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java index d2e7516..d46c0a3 100644 --- a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java +++ b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java @@ -63,7 +63,7 @@ public class ClickhouseConfig { // String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow"; try { - String sql = URLEncoder.encode("select type,read_rows,query_duration_ms,query,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8"); + String sql = URLEncoder.encode("select type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8"); finishsql = url + sql; } catch (UnsupportedEncodingException e) { Logs.error(e.toString()); diff --git a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java index 7a2ff39..4ab3822 100644 --- a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java +++ b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java @@ -1,8 +1,11 @@ package com.mesa.reportservice.util; +import com.mesa.reportservice.bean.JobEntity; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -15,24 +18,18 @@ import java.util.concurrent.Executors; public class ConfigUtil { - public static ExecutorService pool = Executors.newFixedThreadPool(30); - public static int query_log_sleep; - public static int loop_number; + // public static ExecutorService pool = Executors.newFixedThreadPool(30); + // public static int query_log_sleep; + //public static int loop_number; public static int job_thread; public static int ck_timeout; + public static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>(); public void setCk_timeout(int ck_timeout) { ConfigUtil.ck_timeout = ck_timeout; } - public void setQuery_log_sleep(int query_log_sleep) { - ConfigUtil.query_log_sleep = query_log_sleep; - } - - public void setLoop_number(int loop_number) { - ConfigUtil.loop_number = loop_number; - } public void setJob_thread(int job_thread) { diff --git a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java index 532aafa..71259fd 100644 --- a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java +++ b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java @@ -19,6 +19,8 @@ public class HbaseUtil { private static String colume_query_duration_ms; private static String colume_read_rows; private static String colume_memory_usage; + private static String colume_exception; + private static String colume_sql; public static String getHbaseJson(String hbaseid, String responsebody, JobEntity job) { @@ -26,7 +28,7 @@ public class HbaseUtil { if (job.getStatus() == 2) { hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_job_id.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getResult_id())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_query_id.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getQuery_id().trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_query_duration_ms.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getQuery_duration_ms().trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_read_rows.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getRead_rows())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_memory_usage.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getMemory_usage().trim()) + "\"}]}]}"; } else { - hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"}]}]}"; + hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_sql.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getQuery_sql())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_exception.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getExcute_detail().trim()) + "\"}]}]}"; } return hbasejson; @@ -71,4 +73,12 @@ public class HbaseUtil { HbaseUtil.colume_job_id = colume_job_id; } + + public void setColume_exception(String colume_exception) { + HbaseUtil.colume_exception = colume_exception; + } + + public void setColume_sql(String colume_sql) { + HbaseUtil.colume_sql = colume_sql; + } } diff --git a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java new file mode 100644 index 0000000..1e17f90 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java @@ -0,0 +1,21 @@ +package com.mesa.reportservice.util; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; + +import java.util.concurrent.Executors; + +/** + * Created by wk1 on 2019/6/21. + */ + +//@Configuration +//所有的定时任务都放在一个线程池中,定时任务启动时使用不同都线程。 +public class ScheduleConfig { +/* @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + //设定一个长度10的定时任务线程池 + taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10)); + }*/ +} diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties new file mode 100644 index 0000000..191a01e --- /dev/null +++ b/src/main/resources/config/application.properties @@ -0,0 +1,64 @@ +server.port=9093 +scan.mysql.scheduled.plan=0/10 * * * * ? +scan.job.scheduled.plan=0/15 * * * * ? + +#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ +#globle.query_log_sleep=10000 +#globle.loop_number=350 +#ͬʱ��ִ�����߳���nxwm + +globle.job_thread=2 +#���û�г���7200��ͼ������û�������������Σ�����kill +#globle.ck_timeout=7200000 +hbase.url=192.168.40.120:4444 +hbase.table=tsg:report_result +hbase.columefamily=response:result +hbase.colume_job_id=detail:result_id +hbase.colume_query_id=detail:query_id +hbase.colume_query_duration_ms=detail:query_duration_ms +hbase.colume_read_rows=detail:read_rows +hbase.colume_memory_usage=detail:memory_usage +hbase.colume_sql=detail:sql +hbase.colume_exception=detail:exception +ck.task_ip=192.168.40.182:8123 +ck.task_database=tsg_galaxy +#ck.task_database=k18_galaxy_service +ck.task_user=ck +ck.task_user_password=111111 +ck.log_ip=192.168.40.182:8123 +ck.log_user=default +ck.log_user_password=111111 +#��������� +http.maxTotal=300 +#������ +http.defaultMaxPerRoute=100 +#�������ӵ��ʱ�� +http.connectTimeout=10000 +#�����ӳ��л�ȡ�����ӵ��ʱ�� +http.connectionRequestTimeout=10000 +#���ݴ�����ʱ�� +http.socketTimeout=3600000 +#�ύ����ǰ���������Ƿ���� +http.staleConnectionCheckEnabled=true +db.url=jdbc\:mariadb\://192.168.40.120\:3306/tsg-bifang +#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg +#db.url=jdbc\:mariadb\://47.254.24.224\:3306/test +#db.url=jdbc\:mariadb\://10.4.35.1\:3306/tsg-bifang + +#���� +#drivers=ru.yandex.clickhouse.ClickHouseDriver +db.drivers=org.mariadb.jdbc.Driver +#�û��� +db.user=root +#���� +db.password=bifang!@# +#��ʼ����С +db.initialsize=20 +#��С������ +db.minidle=1 +#��������� +db.maxactive=300 +db.filters=stat,wall,log4j,config + + + diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 438b281..8dcd9c9 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -2,15 +2,15 @@ log4j.logger.org.apache.http=OFF log4j.logger.org.apache.http.wire=OFF #Log4j -log4j.rootLogger=debug,console,file +log4j.rootLogger=info,console,file # ����̨��־���� log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=debug +log4j.appender.console.Threshold=info log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n # �ļ���־���� log4j.appender.file=org.apache.log4j.DailyRollingFileAppender -log4j.appender.file.Threshold=debug +log4j.appender.file.Threshold=info log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true #·���������·����������ز��������Ӧ��Ŀ�� |
