summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2019-07-23 11:46:57 +0800
committerwangkuan <[email protected]>2019-07-23 11:46:57 +0800
commit76771bebb02ef24175e52d74ea4ab5410a649ea1 (patch)
treea84e0e1c085c8c82e28a5f867afdd3ae523e5075
parentc153fdec1b2275df5b94c60bfef2af54fca087e0 (diff)
合并本地版本
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java50
-rw-r--r--src/main/java/com/mesa/reportservice/dao/DruidPool.java15
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java82
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java36
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java399
-rw-r--r--src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java65
-rw-r--r--src/main/java/com/mesa/reportservice/service/HttpAPIService.java4
-rw-r--r--src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java2
-rw-r--r--src/main/java/com/mesa/reportservice/util/ConfigUtil.java17
-rw-r--r--src/main/java/com/mesa/reportservice/util/HbaseUtil.java12
-rw-r--r--src/main/java/com/mesa/reportservice/util/ScheduleConfig.java21
-rw-r--r--src/main/resources/config/application.properties64
-rw-r--r--src/main/resources/log4j.properties6
14 files changed, 565 insertions, 210 deletions
diff --git a/pom.xml b/pom.xml
index dc96041..e812a69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.mesa</groupId>
- <artifactId>reportservice</artifactId>
+ <artifactId>galaxy-report-service</artifactId>
<version>0.0.1</version>
<name>galaxy-report-service</name>
<description>Demo project for Spring Boot</description>
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index d51bf1c..48af9fb 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -3,7 +3,7 @@ package com.mesa.reportservice.bean;
/**
* Created by wk1 on 2019/5/20.
*/
-public class JobEntity {
+public class JobEntity implements Cloneable{
private int result_id;
private String result_name;
@@ -22,8 +22,36 @@ public class JobEntity {
private String query_duration_ms;
private long read_rows;
private String memory_usage;
+ private String result;
+ private int excute_status;
+ private String issued_time;
+ private String exception;
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getIssued_time() {
+ return issued_time;
+ }
+
+ public void setIssued_time(String issued_time) {
+ this.issued_time = issued_time;
+ }
+
+ public int getExcute_status() {
+ return excute_status;
+ }
+
+ public void setExcute_status(int excute_status) {
+ this.excute_status = excute_status;
+ }
+
public int getResult_id() {
return result_id;
}
@@ -160,4 +188,24 @@ public class JobEntity {
public void setMemory_usage(String memory_usage) {
this.memory_usage = memory_usage;
}
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
+
+
+ @Override
+ public Object clone() {
+ JobEntity o = null;
+ try {
+ o = (JobEntity) super.clone();
+ } catch (CloneNotSupportedException e) {
+ System.out.println(e.toString());
+ }
+ return o;
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/dao/DruidPool.java b/src/main/java/com/mesa/reportservice/dao/DruidPool.java
index 1cb5d52..5135e11 100644
--- a/src/main/java/com/mesa/reportservice/dao/DruidPool.java
+++ b/src/main/java/com/mesa/reportservice/dao/DruidPool.java
@@ -5,9 +5,13 @@ package com.mesa.reportservice.dao;
*/
import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
+import com.google.common.collect.Maps;
import com.mesa.reportservice.util.Logs;
+import org.junit.Test;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
@@ -15,7 +19,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
-import java.sql.SQLException;
+import java.io.InputStream;
+import java.sql.*;
+import java.util.Arrays;
+import java.util.Properties;
@Component
@@ -94,12 +101,12 @@ public class DruidPool {
dataSourcedb.setMaxActive(maxactive);
//连接泄漏监测
dataSourcedb.setRemoveAbandoned(true);
- dataSourcedb.setRemoveAbandonedTimeout(30);
+ dataSourcedb.setRemoveAbandonedTimeout(1800);
dataSourcedb.setDefaultAutoCommit(false);
//配置获取连接等待超时的时间
- dataSourcedb.setMaxWait(300000);
+ dataSourcedb.setMaxWait(30000);
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
- dataSourcedb.setTimeBetweenEvictionRunsMillis(60000);
+ dataSourcedb.setTimeBetweenEvictionRunsMillis(30000);
//防止过期
dataSourcedb.setValidationQuery("SELECT 'x'");
dataSourcedb.setTestWhileIdle(true);
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
new file mode 100644
index 0000000..a5e826c
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
@@ -0,0 +1,82 @@
+package com.mesa.reportservice.scheduledtask;
+
+import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.service.HttpAPIService;
+import com.mesa.reportservice.util.ClickhouseConfig;
+import com.mesa.reportservice.util.ConfigUtil;
+import com.mesa.reportservice.util.Logs;
+
+import java.net.SocketException;
+
+import static com.mesa.reportservice.util.ClickhouseConfig.getJobUrl;
+
+/**
+ * Created by wk1 on 2019/5/31.
+ */
+
+//@Component
+
+public class ExcuteTask {
+
+
+ private HttpAPIService hp;
+
+ public ExcuteTask(HttpAPIService hp) {
+
+ this.hp = hp;
+ }
+
+
+ public void dotask(JobEntity je) {
+
+ String joburl = getJobUrl(je.getQuery_sql(), je.getQuery_id());
+ HttpResult hr = new HttpResult();
+
+
+ int k = 3;
+ do {
+ try {
+ hr = hp.doPost(joburl);
+ if(hr.getCode()!=200 && hr.getBody().toLowerCase().contains("timeout")){
+ k--;
+ String killurl = ClickhouseConfig.getKillUrl(je.getQuery_id());
+ hp.doPost(killurl);
+ }
+ else {
+ break;
+ }
+ } catch (SocketException e) {
+ k--;
+ je.setExcute_detail("查询失败" + e.toString());
+ je.setStatus(4);
+ Logs.error("查询失败" + e.toString());
+ } catch (Exception e) {
+ je.setStatus(4);
+ je.setExcute_detail("查询失败" + e.toString());
+ Logs.error("查询失败" + e.toString());
+ break;
+ }
+ }
+ while (k > 0);
+
+ je.setResult(hr.getBody());
+ if (hr.getCode() == 200) {
+ je.setResult(hr.getBody());
+ je.setExcute_detail("success");
+ je.setExcute_time(0);
+ je.setExcute_process(100);
+ je.setStatus(2);
+ } else {
+ je.setResult("");
+ je.setStatus(4);
+ je.setExcute_detail("查询clickhouse失败" + hr.getBody());
+ Logs.error("查询clickhouse失败" + hr.getBody());
+
+ }
+ JobEntity ji = (JobEntity) je.clone();
+ ConfigUtil.mapresult.put(ji.getQuery_id(), ji);
+ }
+
+
+}
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java
index 1062af8..6379b50 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java
@@ -1,31 +1,13 @@
package com.mesa.reportservice.scheduledtask;
-import com.alibaba.fastjson.JSON;
-import com.mesa.reportservice.bean.HttpResult;
-import com.mesa.reportservice.bean.JobEntity;
-import com.mesa.reportservice.service.DataBaseBusiness;
-import com.mesa.reportservice.service.HttpAPIService;
-import com.mesa.reportservice.util.ClickhouseConfig;
-import com.mesa.reportservice.util.ConfigUtil;
-import com.mesa.reportservice.util.HbaseUtil;
-import com.mesa.reportservice.util.Logs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-
/**
* Created by wk1 on 2019/5/31.
*/
-@Component
+//@Component
public class JobTask {
-
+/*
protected ExecutorService pool = ConfigUtil.pool;
private HttpAPIService hp;
private DataBaseBusiness hb;
@@ -57,7 +39,7 @@ public class JobTask {
int id = jl.getResult_id();
String sql = jl.getQuery_sql().trim();
String hbaseid = DigestUtils.md5Hex(id + sql);
- sql = sql.replace("$exe_time", "toDateTime('" + jl.getEnd_time().trim() + "')");
+ sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')");
String query_id = UUID.randomUUID().toString().replaceAll("-", "");
String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id);
hb.updateStatue(id, 1);
@@ -154,9 +136,7 @@ public class JobTask {
throw new Exception("查询clickhouse失败" + hr.getBody());
}
}
- /* String h2=hp.doGet("http://192.168.10.202:4444/tsg:report_result/"+query_id+"/response:result");
- // Map<String, Object> aaa = hs.queryData("tsg:report_result",query_id);
- System.out.print(h2);*/
+
} catch (InterruptedException w) {
Logs.error(w.toString());
@@ -248,6 +228,10 @@ public class JobTask {
}
}
while (k >= 0);
+
+ // String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + hbaseid + "/response:result");
+ // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id);
+ // System.out.print(h2);
return h1;
}
@@ -257,7 +241,7 @@ public class JobTask {
JobEntity job = new JobEntity();
String finishurl = ClickhouseConfig.getFinishUrl(query_id);
- HttpResult num = null;
+ HttpResult num = new HttpResult();
int j = 3;
do {
try {
@@ -306,6 +290,6 @@ public class JobTask {
}
while (job.getStatus() == 5 && j >= 0);
return job;
- }
+ }*/
}
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
index 1ba6d43..a0445a7 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
@@ -1,16 +1,22 @@
package com.mesa.reportservice.scheduledtask;
+import com.alibaba.fastjson.JSON;
+import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.service.DataBaseBusiness;
import com.mesa.reportservice.service.HttpAPIService;
+import com.mesa.reportservice.util.ClickhouseConfig;
import com.mesa.reportservice.util.ConfigUtil;
+import com.mesa.reportservice.util.HbaseUtil;
import com.mesa.reportservice.util.Logs;
+import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,7 +34,7 @@ public class ScheduledTask {
@Autowired
private DataBaseBusiness hb;
- @Scheduled(cron = "${scan.mysql.scheduled.plan}")
+ /* @Scheduled(cron = "${scan.mysql.scheduled.plan}")
public void run() {
System.out.print(Thread.currentThread());
@@ -39,7 +45,7 @@ public class ScheduledTask {
Logs.debug("joblist size=" + joblist.size());
for (JobEntity jl : joblist) {
- Thread.sleep(2000);
+ // Thread.sleep(2000);
ConfigUtil.job_thread--;
Logs.debug("jobthread--=" + ConfigUtil.job_thread + "---jobid=" + jl.getResult_id());
pool.execute(new Runnable() {
@@ -61,218 +67,295 @@ public class ScheduledTask {
Logs.error(e.toString());
}
}
+*/
- // @Scheduled(cron = "${scan.mysql.scheduled.plan}")
- /*public void run() {
+ @Scheduled(cron = "${scan.mysql.scheduled.plan}")
+ public void scanmysql() {
- System.out.print(Thread.currentThread());
- List<JobEntity> joblist = hb.getJobLimitRows(1);
- for (JobEntity jl : joblist) {
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
- int id = jl.getJob_id();
- String sqlold = jl.getQuery_sql().trim();
- String query_id = UUID.randomUUID().toString().replaceAll("-", "");
- String sql = sqlold.replaceAll(";", "");
- String hbaseid = DigestUtils.md5Hex(sqlold);
- try {
+ List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread);
+ Logs.debug("joblist size=" + joblist.size());
+
+ for (JobEntity jl : joblist) {
+ String sql = jl.getQuery_sql().trim();
+ String queryid = DigestUtils.md5Hex(jl.getResult_id() + sql);
+ sql = sql.replace("$exe_time", "toDateTime('" + jl.getEnd_time().trim() + "')");
+ jl.setQuery_id(queryid);
+ jl.setQuery_sql(sql);
+ jl.setStatus(1);
+ jl.setResult("");
+ jl.setException("");
+ jl.setExcute_detail("");
+ JobEntity jt = (JobEntity) jl.clone();
+ ConfigUtil.mapresult.put(queryid, jt);
+
+ hb.updateStatue(jl.getResult_id(), 1);
- String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id);
- System.out.print(httpQuery);
pool.execute(new Runnable() {
@Override
public void run() {
- try {
- int i;
- for (i = 0; i < ConfigUtil.loop_number; i++) {
- Thread.sleep(ConfigUtil.query_log_sleep);
- String queryProccess = ClickhouseConfig.getProcessUrl(query_id);
- //System.out.println("queryProccess"+queryProccess);
-
- HttpResult hr = hp.doPost(queryProccess);
- String rs = hr.getBody().trim();
- if (!rs.isEmpty() && !rs.equals("")) {
- // int status = hb.getJobStatueFromId(id);
- JobEntity en = getProcessMessage(rs, id);
- en.setExcute_detail("");
- if (en.getExcute_time() < ConfigUtil.ck_timeout) {
-
- if (en.getExcute_time() > ConfigUtil.query_log_sleep) {
-
- hb.updateProcesses(en);
-
- } else {
- break;
- }
- } else {
-
- i = 1001;
- break;
- }
- } else {
- break;
- }
- }
- if (i >= 1000) {
- String killurl = ClickhouseConfig.getKillUrl(query_id);
- hp.doPost(killurl);
- }
- } catch (Exception e) {
- logger.error("获取进度失败" + e.toString());
- updateErrorMessage(id, e.toString());
- }
- countDownLatch.countDown();
+ JobEntity je = (JobEntity) jl.clone();
+ ExcuteTask ex = new ExcuteTask(hp);
+ ex.dotask(je);
+
}
});
+ ConfigUtil.job_thread--;
+
+
+ }
+
+ } catch (Exception e) {
+ Logs.error(e.toString());
+ }
+ }
- HttpResult hr = hp.doPost(httpQuery);
- countDownLatch.await();
+ @Scheduled(cron = "${scan.job.scheduled.plan}")
+ public void scanmysqlresult() {
- if (hr.getCode() == 200) {
+ try {
- String hbasejson = HbaseUtil.getHbaseJson(hbaseid, hr.getBody());
- String hbaseurl = HbaseUtil.getHbasePostUrl();
- HttpResult h1 = hp.doPost(hbaseurl, hbasejson);
+ List<JobEntity> joblist = hb.getJobForExcute();
+ Logs.debug("joblist size=" + joblist.size());
+
+ for (JobEntity jk : joblist) {
+ String sql = jk.getQuery_sql().trim();
+ String queryid = DigestUtils.md5Hex(jk.getResult_id() + sql);
+ if (!ConfigUtil.mapresult.containsKey(queryid)) {
+ String killurl = ClickhouseConfig.getKillUrl(queryid);
+ hp.doPost(killurl);
+ updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行");
+
+ break;
+ }
+
+
+ JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone();
+ /* if (je == null) {
+ break;
+ }*/
+ if (je.getStatus() == 2 ) {
+ try {
+ je = getQueryLogMessage(je);
+ HttpResult h1 = saveToHbase(je);
if (h1.getCode() == 200) {
- JobEntity entity = new JobEntity();
- int j = 3;
+ //Thread.sleep(5000);
+ // Logs.debug(entity.getJob_id() + entity.getStatue());
+
+ je.setExcute_detail("success");
+ je.setExcute_time(0);
+ je.setExcute_process(100);
+ je.setStatus(2);
+ int number = 0;
+ int z = 3;
do {
- Thread.sleep(3000);
- entity = getQueryLogMessage(id, query_id);
- j--;
+ number = hb.updateProcesses(je);
+ z--;
}
- while (entity.getStatue() != 2 && j > 0);
- System.out.print(entity.getJob_id() + entity.getStatue());
+ while (number != 1 && z >= 0);
- hb.updateProcesses(entity);
} else {
- updateErrorMessage(id, "写入hbase失败");
+ updateErrorMessage(je.getResult_id(), 5, "写入hbase失败");
+ Logs.error("写入hbase失败");
}
+ } catch (Exception e) {
+ Logs.error(e.toString());
+
+ } finally {
+ ConfigUtil.mapresult.remove(queryid);
+ ConfigUtil.job_thread++;
+ }
- } else {
- if (hr.getBody().contains("cancelled")) {
+ } else if (je.getStatus() > 2) {
+
+ try {
+ je = getQueryLogMessage(je);
+ HttpResult h1 = saveToHbase(je);
+ if (h1.getCode() == 200) {
- updateErrorMessage(id, "查询超时任务已经取消");
+ int number = 0;
+ int z = 3;
+ do {
+ number = hb.updateProcesses(je);
+ z--;
+ }
+ while (number != 1 && z >= 0);
+
+ } else {
+ updateErrorMessage(je.getResult_id(), 5, "错误日志写入hbase失败");
+ Logs.error("错误日志写入hbase失败");
+ }
+ } catch (Exception e) {
+ Logs.error(e.toString());
+
+ } finally {
+ ConfigUtil.mapresult.remove(queryid);
+ ConfigUtil.job_thread++;
}
- else{
- updateErrorMessage(id, "查询clickhouse失败" + hr.getBody());
+
+ } else {
+ JobEntity jo = getProcessMessage(je);
+ if(jo.getExcute_row()!=0 || jo.getExcute_time()!=0) {
+ hb.updateProcesses(jo);
}
}
- String h2 = hp.doGet("http://192.168.10.202:4444/tsg:report_result/" + hbaseid + "/response:result");
- // Map<String, Object> aaa = hs.queryData("tsg:report_result",query_id);
- System.out.print("===============" + h2);
- } catch (Exception e) {
-
- logger.error("任务执行中报错" + e.toString());
- updateErrorMessage(id, e.toString());
- break;
- }
- }
- }*/
-
+ }
+ } catch (Exception e) {
+ Logs.error(e.toString());
+ // ConfigUtil.job_thread++;
+ }
+ }
-/* private synchronized void updateErrorMessage(int id, String error) {
+ public void updateErrorMessage(int id, int status, String error) {
JobEntity js = new JobEntity();
- js.setJob_id(id);
- js.setStatue(3);
+ js.setResult_id(id);
+ js.setStatus(status);
js.setExcute_detail("任务执行中报错" + error);
hb.updateProcesses(js);
}
- private synchronized JobEntity getProcessMessage(String body, int id) {
- JobEntity job = new JobEntity();
- Map map = JSON.parseObject(body);
- float elapsed = Float.parseFloat(map.get("elapsed").toString());
- long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString());
- long read_rows = Long.parseLong(map.get("read_rows").toString());
- int ftime = (int) (elapsed * total_rows_approx / read_rows);
- ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10;
- logger.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime);
-
- double persent = (read_rows * 1.00 / total_rows_approx);
- int result = (int) (persent * 100);
- job.setExcute_row(total_rows_approx);
- job.setExcute_process(result);
- job.setStatue(1);
- job.setExcute_time(ftime);
- job.setJob_id(id);
- System.out.println("%+++++++++++++" + result + "%");
- System.out.println("needtime--------" + ftime);
+ public JobEntity getProcessMessage(JobEntity jb) {
+ JobEntity job = (JobEntity) jb.clone();
+ String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id());
+ HttpResult hr = null;
+ try {
+ hr = hp.doPost(queryProccess);
+ String rs = hr.getBody().trim();
+ if (!rs.isEmpty() && !rs.equals("")) {
+ //JobEntity job = new JobEntity();
+ Map map = JSON.parseObject(rs);
+ float elapsed = Float.parseFloat(map.get("elapsed").toString());
+ long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString());
+ long read_rows = Long.parseLong(map.get("read_rows").toString());
+ int ftime = (int) (elapsed * total_rows_approx / read_rows);
+ ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10;
+ if(ftime<0){
+ ftime=0;
+ }
+ Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime);
+ // NumberFormat numberFormat = NumberFormat.getInstance();
+ //numberFormat.setMaximumFractionDigits(0);
+
+ double persent = (read_rows * 1.00 / total_rows_approx);
+ int result = (int) (persent * 100);
+ job.setExcute_row(total_rows_approx);
+ job.setExcute_process(result);
+ // job.setStatus(1);
+ job.setExcute_time(ftime);
+ job.setExcute_detail("");
+ System.out.println("%+++++++++++++" + result + "%");
+ System.out.println("needtime--------" + ftime);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
return job;
}
+ public HttpResult saveToHbase(JobEntity entity) {
- private synchronized JobEntity getQueryLogMessage(int id, String query_id) {
+ int k = 3;
+ HttpResult h1 = new HttpResult();
- JobEntity job = new JobEntity();
+ do {
+ try {
+ String hbasejson = HbaseUtil.getHbaseJson(entity.getQuery_id(), entity.getResult(), entity);
+ String hbaseurl = HbaseUtil.getHbasePostUrl();
+ k--;
+ h1 = hp.doPost(hbaseurl, hbasejson);
+ break;
+ } catch (Exception e) {
+ Logs.error("写入hbase报错重试次数" + (3 - k));
+ k--;
+ e.printStackTrace();
+ }
+ }
+ while (k >= 0);
- String finishurl = ClickhouseConfig.getFinishUrl(query_id);
- HttpResult num = null;
- try {
- num = hp.doPost(finishurl);
-
- if (!num.getBody().isEmpty() && !num.getBody().equals("")) {
-
-
- job.setJob_id(id);
- Map mapresult = JSON.parseObject(num.getBody());
- int type = Integer.parseInt(mapresult.get("type").toString());
- int query_duration_ms = Integer.parseInt(mapresult.get("query_duration_ms").toString());
- long read_rows = Long.parseLong(mapresult.get("read_rows").toString());
- long memory_usage = Long.parseLong(mapresult.get("memory_usage").toString());
- String query = mapresult.get("query").toString();
- String event_time = mapresult.get("event_time").toString();
- logger.debug("type" + type);
-
-
- switch (type) {
- case 2:
- job.setStatue(2);
- job.setExcute_detail("success");
- break;
- case 3:
- job.setStatue(3);
- job.setExcute_detail("语法错误");
- break;
- case 4:
- job.setStatue(4);
- job.setExcute_detail("性能问题");
- break;
-
- default:
- break;
- }
- job.setExcute_time(0);
- job.setExcute_process(100);
- job.setExcute_row(read_rows);
- } else {
+ //String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + queryid + "/response:result");
+ // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id);
+ // System.out.print(h2);
+ return h1;
+ }
- job.setStatue(3);
- job.setExcute_detail("没有执行");
- }
+ public JobEntity getQueryLogMessage(JobEntity job) {
- } catch (Exception e) {
- e.printStackTrace();
+
+ // JobEntity job = (JobEntity) jb.clone();
+ /* job.setResult_id(resultid);
+ job.setQuery_id(queryid);*/
+ String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id());
+ HttpResult num = new HttpResult();
+ int j = 0;
+ do {
+ try {
+ Thread.sleep(3000);
+ num = hp.doPost(finishurl);
+ if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) {
+
+ Map mapresult = JSON.parseObject(num.getBody());
+ int type = Integer.parseInt(mapresult.get("type").toString());
+ Logs.debug("type" + type);
+ switch (type) {
+ case 2:
+ job.setStatus(type);
+ job.setExcute_detail("success");
+ job.setQuery_duration_ms(mapresult.get("query_duration_ms").toString());
+ job.setMemory_usage(mapresult.get("memory_usage").toString());
+ job.setExcute_row(Long.parseLong(mapresult.get("read_rows").toString()));
+ break;
+ case 3:
+ job.setStatus(type);
+ job.setExcute_detail("语法错误"+mapresult.get("exception").toString());
+ job.setQuery_sql(mapresult.get("query").toString());
+ job.setException(mapresult.get("exception").toString());
+ break;
+ case 4:
+ job.setStatus(type);
+ job.setExcute_detail("性能问题"+mapresult.get("exception").toString());
+ job.setQuery_sql(mapresult.get("query").toString());
+ job.setException(mapresult.get("exception").toString());
+ break;
+ default:
+ break;
+ }
+
+
+ } else {
+ job.setStatus(6);
+ job.setExcute_detail("没有查到query信息");
+ job.setException("没有查到query信息");
+
+ }
+ } catch (Exception e) {
+ job.setStatus(6);
+ job.setException("获取querylogmessage失败");
+ Logs.error("获取querylogmessage失败");
+ }
+ j--;
}
+ while (job.getStatus() == 6 && j > 0);
return job;
- }*/
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
index fbc3582..3f8d729 100644
--- a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
+++ b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
@@ -38,10 +38,59 @@ public class DataBaseBusiness {
}
+
+ public List<JobEntity> getJobForExcute() {
+
+
+
+ String sql = " select * from report_result where status =1 order by end_time ";
+ int resultid=0;
+ List<JobEntity> sqllist = new ArrayList<JobEntity>();
+ try {
+ //connection = manager.getConnection("idb");
+ connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ rset = pstmt.executeQuery();
+ while (rset.next()) {
+ JobEntity je = new JobEntity();
+ resultid = rset.getInt("result_id");
+ je.setResult_id(rset.getInt("result_id"));
+ je.setEnd_time(rset.getString("end_time").trim().substring(0, 19));
+ je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19));
+ je.setQuery_sql(rset.getString("query_sql").trim());
+ sqllist.add(je);
+ }
+ Logs.debug("-----------success----------" + sql);
+
+ } catch (Exception e) {
+ Logs.error(sql + "获取进度列表发生异常!");
+ updateStatue(resultid,4);
+ e.printStackTrace();
+ throw new RuntimeException("获取进度列表发生异常!", e);
+ } finally {
+ clear(pstmt, null, rset);
+ // manager.freeConnection("idb", connection);
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ Logs.error(e.toString());
+ e.printStackTrace();
+ }
+
+ }
+ }
+ return sqllist;
+
+ }
+
+
+
public List<JobEntity> getJobLimitRows(int row) {
String current_time = DateUtil.getDate();
-
+ int resultid=0;
String sql = " select * from report_result where status =0 and end_time<'" + current_time + "' order by end_time limit " + row;
List<JobEntity> sqllist = new ArrayList<JobEntity>();
@@ -53,8 +102,11 @@ public class DataBaseBusiness {
rset = pstmt.executeQuery();
while (rset.next()) {
JobEntity je = new JobEntity();
+ resultid = rset.getInt("result_id");
+ je.setExcute_detail("");
je.setResult_id(rset.getInt("result_id"));
je.setEnd_time(rset.getString("end_time").trim().substring(0, 19));
+ je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19));
je.setQuery_sql(rset.getString("query_sql").trim());
sqllist.add(je);
}
@@ -62,8 +114,11 @@ public class DataBaseBusiness {
} catch (Exception e) {
Logs.error(sql + "获取任务列表发生异常!");
+ updateStatue(resultid,4);
e.printStackTrace();
throw new RuntimeException("获取任务列表发生异常!", e);
+
+
} finally {
clear(pstmt, null, rset);
// manager.freeConnection("idb", connection);
@@ -80,6 +135,9 @@ public class DataBaseBusiness {
return sqllist;
}
+
+
+
public int getJobStatueFromId(int id) {
int statue = 0;
@@ -168,9 +226,9 @@ public class DataBaseBusiness {
Logs.debug("-----------success----------" + sql);
} catch (Exception e) {
- Logs.error(sql + "存储结果日志发生异常!");
+ Logs.error(sql + "更新状态发生异常!");
e.printStackTrace();
- throw new RuntimeException("存储结果日志发生异常!", e);
+ throw new RuntimeException("更新状态发生异常!", e);
} finally {
clear(pstmt, null, null);
// manager.freeConnection("idb", connection);
@@ -187,4 +245,5 @@ public class DataBaseBusiness {
}
+
}
diff --git a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
index 65cfba1..3c46b69 100644
--- a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
+++ b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
@@ -38,7 +38,7 @@ public class HttpAPIService {
public String doGet(String url) throws Exception {
// 声明 http get 请求
HttpGet httpGet = new HttpGet(url);
-
+ httpGet.setHeader("Accept","application/json");
// 装载配置信息
httpGet.setConfig(config);
@@ -91,7 +91,7 @@ public class HttpAPIService {
HttpPost httpPost = new HttpPost(url);
// 加入配置信息
httpPost.setConfig(config);
-
+ // httpPost.setHeader("Accept","application/json");
// 判断map是否为空,不为空则进行遍历,封装from表单对象
if (!json.equals("") && !json.isEmpty()) {
diff --git a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
index d2e7516..d46c0a3 100644
--- a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
+++ b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
@@ -63,7 +63,7 @@ public class ClickhouseConfig {
// String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow";
try {
- String sql = URLEncoder.encode("select type,read_rows,query_duration_ms,query,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
+ String sql = URLEncoder.encode("select type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
finishsql = url + sql;
} catch (UnsupportedEncodingException e) {
Logs.error(e.toString());
diff --git a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
index 7a2ff39..4ab3822 100644
--- a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
@@ -1,8 +1,11 @@
package com.mesa.reportservice.util;
+import com.mesa.reportservice.bean.JobEntity;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -15,24 +18,18 @@ import java.util.concurrent.Executors;
public class ConfigUtil {
- public static ExecutorService pool = Executors.newFixedThreadPool(30);
- public static int query_log_sleep;
- public static int loop_number;
+ // public static ExecutorService pool = Executors.newFixedThreadPool(30);
+ // public static int query_log_sleep;
+ //public static int loop_number;
public static int job_thread;
public static int ck_timeout;
+ public static Map<String, JobEntity> mapresult = new ConcurrentHashMap<>();
public void setCk_timeout(int ck_timeout) {
ConfigUtil.ck_timeout = ck_timeout;
}
- public void setQuery_log_sleep(int query_log_sleep) {
- ConfigUtil.query_log_sleep = query_log_sleep;
- }
-
- public void setLoop_number(int loop_number) {
- ConfigUtil.loop_number = loop_number;
- }
public void setJob_thread(int job_thread) {
diff --git a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
index 532aafa..71259fd 100644
--- a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
@@ -19,6 +19,8 @@ public class HbaseUtil {
private static String colume_query_duration_ms;
private static String colume_read_rows;
private static String colume_memory_usage;
+ private static String colume_exception;
+ private static String colume_sql;
public static String getHbaseJson(String hbaseid, String responsebody, JobEntity job) {
@@ -26,7 +28,7 @@ public class HbaseUtil {
if (job.getStatus() == 2) {
hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_job_id.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getResult_id())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_query_id.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getQuery_id().trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_query_duration_ms.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getQuery_duration_ms().trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_read_rows.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getRead_rows())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_memory_usage.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getMemory_usage().trim()) + "\"}]}]}";
} else {
- hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"}]}]}";
+ hbasejson = "{\"Row\":[{\"key\":\"" + StringUtil.getBase64(hbaseid) + "\", \"Cell\": [{\"column\":\"" + StringUtil.getBase64(columefamily.trim()) + "\",\"$\":\"" + StringUtil.getBase64(responsebody.trim()) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_sql.trim()) + "\", \"$\":\"" + StringUtil.getBase64(String.valueOf(job.getQuery_sql())) + "\"},{\"column\":\"" + StringUtil.getBase64(colume_exception.trim()) + "\", \"$\":\"" + StringUtil.getBase64(job.getExcute_detail().trim()) + "\"}]}]}";
}
return hbasejson;
@@ -71,4 +73,12 @@ public class HbaseUtil {
HbaseUtil.colume_job_id = colume_job_id;
}
+
+ public void setColume_exception(String colume_exception) {
+ HbaseUtil.colume_exception = colume_exception;
+ }
+
+ public void setColume_sql(String colume_sql) {
+ HbaseUtil.colume_sql = colume_sql;
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java
new file mode 100644
index 0000000..1e17f90
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java
@@ -0,0 +1,21 @@
+package com.mesa.reportservice.util;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+import java.util.concurrent.Executors;
+
+/**
+ * Created by wk1 on 2019/6/21.
+ */
+
+//@Configuration
+//所有的定时任务都放在一个线程池中,定时任务启动时使用不同都线程。
+public class ScheduleConfig {
+/* @Override
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+ //设定一个长度10的定时任务线程池
+ taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
+ }*/
+}
diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties
new file mode 100644
index 0000000..191a01e
--- /dev/null
+++ b/src/main/resources/config/application.properties
@@ -0,0 +1,64 @@
+server.port=9093
+scan.mysql.scheduled.plan=0/10 * * * * ?
+scan.job.scheduled.plan=0/15 * * * * ?
+
+#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ
+#globle.query_log_sleep=10000
+#globle.loop_number=350
+#ͬʱ��ִ�����߳���nxwm
+
+globle.job_thread=2
+#���û�г���7200��ͼ������û�������������Σ�����kill
+#globle.ck_timeout=7200000
+hbase.url=192.168.40.120:4444
+hbase.table=tsg:report_result
+hbase.columefamily=response:result
+hbase.colume_job_id=detail:result_id
+hbase.colume_query_id=detail:query_id
+hbase.colume_query_duration_ms=detail:query_duration_ms
+hbase.colume_read_rows=detail:read_rows
+hbase.colume_memory_usage=detail:memory_usage
+hbase.colume_sql=detail:sql
+hbase.colume_exception=detail:exception
+ck.task_ip=192.168.40.182:8123
+ck.task_database=tsg_galaxy
+#ck.task_database=k18_galaxy_service
+ck.task_user=ck
+ck.task_user_password=111111
+ck.log_ip=192.168.40.182:8123
+ck.log_user=default
+ck.log_user_password=111111
+#���������
+http.maxTotal=300
+#������
+http.defaultMaxPerRoute=100
+#�������ӵ��ʱ��
+http.connectTimeout=10000
+#�����ӳ��л�ȡ�����ӵ��ʱ��
+http.connectionRequestTimeout=10000
+#���ݴ�����ʱ��
+http.socketTimeout=3600000
+#�ύ����ǰ���������Ƿ����
+http.staleConnectionCheckEnabled=true
+db.url=jdbc\:mariadb\://192.168.40.120\:3306/tsg-bifang
+#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg
+#db.url=jdbc\:mariadb\://47.254.24.224\:3306/test
+#db.url=jdbc\:mariadb\://10.4.35.1\:3306/tsg-bifang
+
+#����
+#drivers=ru.yandex.clickhouse.ClickHouseDriver
+db.drivers=org.mariadb.jdbc.Driver
+#�û���
+db.user=root
+#����
+db.password=bifang!@#
+#��ʼ����С
+db.initialsize=20
+#��������
+db.minidle=1
+#���������
+db.maxactive=300
+db.filters=stat,wall,log4j,config
+
+
+
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
index 438b281..8dcd9c9 100644
--- a/src/main/resources/log4j.properties
+++ b/src/main/resources/log4j.properties
@@ -2,15 +2,15 @@
log4j.logger.org.apache.http=OFF
log4j.logger.org.apache.http.wire=OFF
#Log4j
-log4j.rootLogger=debug,console,file
+log4j.rootLogger=info,console,file
# ����̨��־����
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=debug
+log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# �ļ���־����
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.file.Threshold=debug
+log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#·���������·����������ز��������Ӧ��Ŀ��