summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2021-02-01 14:37:12 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2021-02-01 14:37:12 +0800
commit1421e9d5a7ea607c6d126b201b77c1c754b02504 (patch)
tree1b26a6f47126365e3b0d2b8fa93072f6e5f3ff19 /src
parent7f6884f6f75ed8ad3eb3336f4f9896066cc41285 (diff)
增加自定义metrics
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java48
-rw-r--r--src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java14
-rw-r--r--src/main/java/com/mesa/reportservice/service/ClickhouseService.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java96
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java31
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java7
6 files changed, 63 insertions, 144 deletions
diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
index 983048c..43491dc 100644
--- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
@@ -15,52 +15,12 @@ import java.net.URLEncoder;
public class ClickhouseConfig {
private static String gateway_ip;
- public static String getJobUrl(String sql, Integer reportId) {
- String url = "http://" + gateway_ip.trim() + "/?option=long-term&reportId=" + reportId + "&query=";
- String jobsql = "";
- try {
- sql = URLEncoder.encode(sql , "utf8").replaceAll("\\+", "%20");
- jobsql = url + sql;
-
- } catch (UnsupportedEncodingException e) {
- Logs.error(e.toString());
- }
- return jobsql;
- }
-
- public static String getProcessUrl(String query_id) {
-
- String url = "http://" + gateway_ip.trim() + "/?query=";
- String processsql = "";
- try {
- String sql = URLEncoder.encode("select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "'", "utf8").replaceAll("\\+", "%20");
- processsql = url + sql;
- } catch (UnsupportedEncodingException e) {
- Logs.error(e.toString());
- }
- return processsql;
- }
-
- public static String getFinishUrl(String query_id) {
-
- String url = "http://" + gateway_ip.trim() + "/?query=";
- String finishsql = "";
- try {
- String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1", "utf8").replaceAll("\\+", "%20");
- finishsql = url + sql;
- } catch (UnsupportedEncodingException e) {
- Logs.error(e.toString());
- }
- return finishsql;
- }
-
- public static String getKillUrl(String query_id) {
-
- String url = "http://" + gateway_ip.trim() + "/sys/engine/tasks/"+query_id;
- return url;
- }
public void setGateway_ip(String gateway_ip) {
ClickhouseConfig.gateway_ip = gateway_ip;
}
+
+ public String getGateway_ip() {
+ return gateway_ip;
+ }
}
diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
index 387f403..7012dac 100644
--- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
+++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
@@ -26,8 +26,6 @@ public class ScheduledResultController {
private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass());
protected static ExecutorService pool = Executors.newFixedThreadPool(30);
-
-
@Autowired
private MysqlService ms;
@Autowired
@@ -36,7 +34,8 @@ public class ScheduledResultController {
private ExcuteProcessService eps;
@Autowired
private ZkService zs;
-
+ @Autowired
+ private ClickhouseService cs;
@Scheduled(cron = "${scan.result.scheduled.plan}")
public void getExcuteResult() {
@@ -48,7 +47,10 @@ public class ScheduledResultController {
for (JobEntity jobEntity : joblist) {
String sql = jobEntity.getQuerySql().trim();
sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')");
- String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql);
+ sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')");
+ sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");
+
+ String queryid = cs.getQueryId(jobEntity.getResultId().toString(),sql);
jobEntity.setQuery_id(queryid);
if (jobEntity.getIsValid() == 0) {
@@ -97,8 +99,10 @@ public class ScheduledResultController {
job.setBeginTime(begintime);
String sql = job.getQuerySql().trim();
sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')");
+ sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')");
+ sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");
job.setQuerySql(sql);
- String queryid = DigestUtils.md5Hex(job.getResultId() + sql);
+ String queryid = cs.getQueryId(job.getResultId().toString(),sql);
job.setQuery_id(queryid);
job.setQuerySql(sql);
job.setStatus(1);
diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
index cd9d7f7..a21a9c2 100644
--- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
+++ b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
@@ -9,19 +9,16 @@ import java.util.Map;
*/
public interface ClickhouseService {
- String doGet(String url) throws Exception;
+ String getQueryId(String resultId,String query) throws Exception;
- String doGet(String url, Map<String, Object> map) throws Exception;
+ HttpResult queryForExcute(String resultId,String query) throws Exception;
- HttpResult sendQueryForGet(String url) throws Exception;
+ HttpResult queryForProcess(String queryId) throws Exception;
- HttpResult QuerySystemForGet(String url) throws Exception;
-
-
- HttpResult QuerySystemForDelete(String url) throws Exception;
+ HttpResult queryForCancel(String queryId) throws Exception;
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
index 0f30f34..742e352 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
@@ -3,7 +3,9 @@ package com.mesa.reportservice.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.configuration.ClickhouseConfig;
import com.mesa.reportservice.service.ClickhouseService;
+import com.mesa.reportservice.util.Logs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
@@ -18,6 +20,9 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.List;
import java.util.Map;
/**
@@ -29,6 +34,8 @@ public class ClickhouseServiceImpl implements ClickhouseService {
@Autowired
private CloseableHttpClient httpClient;
+ @Autowired
+ private ClickhouseConfig clickhouseConfig;
@Autowired
@Qualifier("RequestShortConfig")
@@ -39,82 +46,49 @@ public class ClickhouseServiceImpl implements ClickhouseService {
private RequestConfig RequestLongConfig;
private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass());
- /**
- * 不带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null
- *
- * @param url
- * @return
- * @throws Exception
- */
- @Override
- public String doGet(String url) throws Exception {
- // 声明 http get 请求
- HttpGet httpGet = new HttpGet(url);
- httpGet.setHeader("Accept", "application/json");
- // 装载配置信息
- httpGet.setConfig(RequestLongConfig);
+ @Override
+ public String getQueryId(String resultId, String query) throws Exception {
- try(CloseableHttpResponse response = this.httpClient.execute(httpGet)){
- if (response.getStatusLine().getStatusCode() == 200) {
- // 返回响应体的内容
- return EntityUtils.toString(response.getEntity(), "UTF-8");
- } else {
+ String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/queryIds?resultId=" + resultId + "&query=";
+ String sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
+ url = url+sql;
+ HttpGet httpGet = new HttpGet(url);
+ // 加入配置信息
+ httpGet.setConfig(RequestshortConfig);
+ try(CloseableHttpResponse response = this.httpClient.execute(httpGet) ){
+ if(response.getStatusLine().getStatusCode()!=200){
throw new Exception();
-
}
- }
- catch (Exception e){
+ else {
+ Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ List listdata = (List) data.get("data");
+ Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
+ String query_id = map.get("queryId").toString();
+ return query_id;
+ }
+ }catch (Exception e){
logger.error(e.toString());
throw new Exception();
-
}
- // 发起请求
-
-
- // 判断状态码是否为200
-
-
- }
-
- /**
- * 带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null
- *
- * @param url
- * @return
- * @throws Exception
- */
- @Override
- public String doGet(String url, Map<String, Object> map) throws Exception {
- URIBuilder uriBuilder = new URIBuilder(url);
-
- if (map != null) {
- // 遍历map,拼接请求参数
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- uriBuilder.setParameter(entry.getKey(), entry.getValue().toString());
- }
- }
-
- // 调用不带参数的get请求
- return this.doGet(uriBuilder.build().toString());
-
}
/**
* 带参数的post请求
*
- * @param url
* @return
* @throws Exception
*/
@Override
- public HttpResult sendQueryForGet(String url) throws Exception,OutOfMemoryError{
+ public HttpResult queryForExcute(String resultId, String query) throws Exception,OutOfMemoryError{
// 声明httpPost请求
- HttpGet httpGet = new HttpGet(url);
-
+ String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/?option=long-term&resultId=" + resultId + "&query=";
+ query = URLEncoder.encode(query , "utf8").replaceAll("\\+", "%20");
+ String jobsql = url + query;
+ HttpGet httpGet = new HttpGet(jobsql);
// 加入配置信息
httpGet.setConfig(RequestLongConfig);
@@ -135,8 +109,10 @@ public class ClickhouseServiceImpl implements ClickhouseService {
@Override
- public HttpResult QuerySystemForGet(String url) throws Exception {
+ public HttpResult queryForProcess(String queryId) throws Exception {
// 声明httpPost请求
+ String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/processes/"+queryId;
+
HttpGet httpGet = new HttpGet(url);
// 加入配置信息
httpGet.setConfig(RequestshortConfig);
@@ -159,7 +135,9 @@ public class ClickhouseServiceImpl implements ClickhouseService {
@Override
- public HttpResult QuerySystemForDelete(String url) throws Exception {
+ public HttpResult queryForCancel(String queryId) throws Exception {
+
+ String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/tasks/"+queryId;
// 声明httpPost请求
HttpDelete HttpDelete = new HttpDelete(url);
// 加入配置信息
@@ -171,8 +149,6 @@ public class ClickhouseServiceImpl implements ClickhouseService {
HttpResult rs = new HttpResult();
rs.setCode(response.getStatusLine().getStatusCode());
rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
-
- //Thread.sleep(120);
return rs;
}
catch (Exception e){
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
index 4ddcd93..cee4af7 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
@@ -111,38 +111,28 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
@Override
public void reSet(JobEntity jobEntity) {
- String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id());
- try {
- cs.QuerySystemForDelete(killurl);
- } catch (Exception e) {
- logger.error(e.toString());
- } finally {
jobEntity.setStatus(0);
jobEntity.setExcuteDetail("Re Execution");
ms.updateProcesses(jobEntity);
- }
+
}
@Override
public void killQuery(JobEntity jobEntity) {
- String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id());
try {
- cs.QuerySystemForDelete(killurl);
+ cs.queryForCancel(jobEntity.getQuery_id());
} catch (Exception e) {
logger.error(e.toString());
}
}
- /**
- * 获取进度条信息
- */
+
@Override
public void updateProcessMessage(JobEntity job) {
- String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id());
HttpResult hr = null;
try {
- hr = cs.QuerySystemForGet(queryProccess);
+ hr = cs.queryForProcess(job.getQuery_id());
String rs = hr.getBody().trim();
Map data = JSON.parseObject(rs);
@@ -150,16 +140,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
List listdata = (List) data.get("data");
if (null != listdata && listdata.size() > 0) {
Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0)));
- long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString());
- long read_rows = Long.parseLong(map.get("read_rows").toString());
- float elapsed = Float.parseFloat(map.get("elapsed").toString()) * 1000;
- double persent = (read_rows * 1.00 / total_rows_approx);
+ long read_rows = Long.parseLong(map.get("rows_read").toString());
+ float elapsed = Float.parseFloat(map.get("elapsed").toString());
+ double persent = Double.parseDouble(map.get("percent").toString());
int result = (int) (persent * 100);
- if (result > 98) {
- result = 98;
- }
job.setExcuteTime((int) elapsed);
- job.setExcuteRow(total_rows_approx);
+ job.setExcuteRow(read_rows);
job.setExcuteProcess(result);
if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) {
ms.updateProcesses(job);
@@ -174,7 +160,6 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
logger.error(e.toString());
}
}
-
/**
* 结果存入hbase
*/
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
index 10769a1..314320f 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
@@ -17,7 +17,6 @@ import org.springframework.stereotype.Service;
import java.net.SocketTimeoutException;
import java.util.Map;
-import static com.mesa.reportservice.configuration.ClickhouseConfig.getJobUrl;
/**
* Created by wk1 on 2020/1/8.
@@ -41,13 +40,11 @@ public class ExcuteserviceImpl implements ExcuteService {
logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + GlobelConfig.mapresult.size());
- String joburl = getJobUrl(job.getQuerySql(), job.getResultId());
- String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id());
HttpResult hr = new HttpResult();
int k = 3;
do {
try {
- hr = cs.sendQueryForGet(joburl);
+ hr = cs.queryForExcute(job.getResultId().toString(),job.getQuerySql());
logger.info("httpcode" + hr.getCode());
if (hr != null && hr.getCode() != 200) {
@@ -105,7 +102,7 @@ public class ExcuteserviceImpl implements ExcuteService {
if(hr.getCode() != 200) {
try {
- cs.QuerySystemForDelete(killurl);
+ cs.queryForCancel(job.getQuery_id());
} catch (Exception e) {
logger.error("Kill Query Error" + e.toString());
}