diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-02-01 14:37:12 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-02-01 14:37:12 +0800 |
| commit | 1421e9d5a7ea607c6d126b201b77c1c754b02504 (patch) | |
| tree | 1b26a6f47126365e3b0d2b8fa93072f6e5f3ff19 /src | |
| parent | 7f6884f6f75ed8ad3eb3336f4f9896066cc41285 (diff) | |
增加自定义metrics
Diffstat (limited to 'src')
6 files changed, 63 insertions, 144 deletions
diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java index 983048c..43491dc 100644 --- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java +++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java @@ -15,52 +15,12 @@ import java.net.URLEncoder; public class ClickhouseConfig { private static String gateway_ip; - public static String getJobUrl(String sql, Integer reportId) { - String url = "http://" + gateway_ip.trim() + "/?option=long-term&reportId=" + reportId + "&query="; - String jobsql = ""; - try { - sql = URLEncoder.encode(sql , "utf8").replaceAll("\\+", "%20"); - jobsql = url + sql; - - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - } - return jobsql; - } - - public static String getProcessUrl(String query_id) { - - String url = "http://" + gateway_ip.trim() + "/?query="; - String processsql = ""; - try { - String sql = URLEncoder.encode("select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "'", "utf8").replaceAll("\\+", "%20"); - processsql = url + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - } - return processsql; - } - - public static String getFinishUrl(String query_id) { - - String url = "http://" + gateway_ip.trim() + "/?query="; - String finishsql = ""; - try { - String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1", "utf8").replaceAll("\\+", "%20"); - finishsql = url + sql; - } catch (UnsupportedEncodingException e) { - Logs.error(e.toString()); - } - return finishsql; - } - - public static String getKillUrl(String query_id) { - - String url = "http://" + gateway_ip.trim() + "/sys/engine/tasks/"+query_id; - return url; - } public void setGateway_ip(String gateway_ip) { ClickhouseConfig.gateway_ip = gateway_ip; } + + public String getGateway_ip() { + return gateway_ip; + } } diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index 387f403..7012dac 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -26,8 +26,6 @@ public class ScheduledResultController { private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass()); protected static ExecutorService pool = Executors.newFixedThreadPool(30); - - @Autowired private MysqlService ms; @Autowired @@ -36,7 +34,8 @@ public class ScheduledResultController { private ExcuteProcessService eps; @Autowired private ZkService zs; - + @Autowired + private ClickhouseService cs; @Scheduled(cron = "${scan.result.scheduled.plan}") public void getExcuteResult() { @@ -48,7 +47,10 @@ public class ScheduledResultController { for (JobEntity jobEntity : joblist) { String sql = jobEntity.getQuerySql().trim(); sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')"); - String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql); + sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); + sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')"); + + String queryid = cs.getQueryId(jobEntity.getResultId().toString(),sql); jobEntity.setQuery_id(queryid); if (jobEntity.getIsValid() == 0) { @@ -97,8 +99,10 @@ public class ScheduledResultController { job.setBeginTime(begintime); String sql = job.getQuerySql().trim(); sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')"); + sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); + sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')"); job.setQuerySql(sql); - String queryid = DigestUtils.md5Hex(job.getResultId() + sql); + String queryid = cs.getQueryId(job.getResultId().toString(),sql); job.setQuery_id(queryid); job.setQuerySql(sql); job.setStatus(1); diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java index cd9d7f7..a21a9c2 100644 --- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java +++ b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java @@ -9,19 +9,16 @@ import java.util.Map; */ public interface ClickhouseService { - String doGet(String url) throws Exception; + String getQueryId(String resultId,String query) throws Exception; - String doGet(String url, Map<String, Object> map) throws Exception; + HttpResult queryForExcute(String resultId,String query) throws Exception; - HttpResult sendQueryForGet(String url) throws Exception; + HttpResult queryForProcess(String queryId) throws Exception; - HttpResult QuerySystemForGet(String url) throws Exception; - - - HttpResult QuerySystemForDelete(String url) throws Exception; + HttpResult queryForCancel(String queryId) throws Exception; diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java index 0f30f34..742e352 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java @@ -3,7 +3,9 @@ package com.mesa.reportservice.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.configuration.ClickhouseConfig; import com.mesa.reportservice.service.ClickhouseService; +import com.mesa.reportservice.util.Logs; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; @@ -18,6 +20,9 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.List; import java.util.Map; /** @@ -29,6 +34,8 @@ public class ClickhouseServiceImpl implements ClickhouseService { @Autowired private CloseableHttpClient httpClient; + @Autowired + private ClickhouseConfig clickhouseConfig; @Autowired @Qualifier("RequestShortConfig") @@ -39,82 +46,49 @@ public class ClickhouseServiceImpl implements ClickhouseService { private RequestConfig RequestLongConfig; private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass()); - /** - * 不带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null - * - * @param url - * @return - * @throws Exception - */ - @Override - public String doGet(String url) throws Exception { - // 声明 http get 请求 - HttpGet httpGet = new HttpGet(url); - httpGet.setHeader("Accept", "application/json"); - // 装载配置信息 - httpGet.setConfig(RequestLongConfig); + @Override + public String getQueryId(String resultId, String query) throws Exception { - try(CloseableHttpResponse response = this.httpClient.execute(httpGet)){ - if (response.getStatusLine().getStatusCode() == 200) { - // 返回响应体的内容 - return EntityUtils.toString(response.getEntity(), "UTF-8"); - } else { + String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/queryIds?resultId=" + resultId + "&query="; + String sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20"); + url = url+sql; + HttpGet httpGet = new HttpGet(url); + // 加入配置信息 + httpGet.setConfig(RequestshortConfig); + try(CloseableHttpResponse response = this.httpClient.execute(httpGet) ){ + if(response.getStatusLine().getStatusCode()!=200){ throw new Exception(); - } - } - catch (Exception e){ + else { + Map data = JSON.parseObject(EntityUtils.toString(response.getEntity(), "UTF-8")); + List listdata = (List) data.get("data"); + Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0))); + String query_id = map.get("queryId").toString(); + return query_id; + } + }catch (Exception e){ logger.error(e.toString()); throw new Exception(); - } - // 发起请求 - - - // 判断状态码是否为200 - - - } - - /** - * 带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null - * - * @param url - * @return - * @throws Exception - */ - @Override - public String doGet(String url, Map<String, Object> map) throws Exception { - URIBuilder uriBuilder = new URIBuilder(url); - - if (map != null) { - // 遍历map,拼接请求参数 - for (Map.Entry<String, Object> entry : map.entrySet()) { - uriBuilder.setParameter(entry.getKey(), entry.getValue().toString()); - } - } - - // 调用不带参数的get请求 - return this.doGet(uriBuilder.build().toString()); - } /** * 带参数的post请求 * - * @param url * @return * @throws Exception */ @Override - public HttpResult sendQueryForGet(String url) throws Exception,OutOfMemoryError{ + public HttpResult queryForExcute(String resultId, String query) throws Exception,OutOfMemoryError{ // 声明httpPost请求 - HttpGet httpGet = new HttpGet(url); - + String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/?option=long-term&resultId=" + resultId + "&query="; + query = URLEncoder.encode(query , "utf8").replaceAll("\\+", "%20"); + String jobsql = url + query; + HttpGet httpGet = new HttpGet(jobsql); // 加入配置信息 httpGet.setConfig(RequestLongConfig); @@ -135,8 +109,10 @@ public class ClickhouseServiceImpl implements ClickhouseService { @Override - public HttpResult QuerySystemForGet(String url) throws Exception { + public HttpResult queryForProcess(String queryId) throws Exception { // 声明httpPost请求 + String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/processes/"+queryId; + HttpGet httpGet = new HttpGet(url); // 加入配置信息 httpGet.setConfig(RequestshortConfig); @@ -159,7 +135,9 @@ public class ClickhouseServiceImpl implements ClickhouseService { @Override - public HttpResult QuerySystemForDelete(String url) throws Exception { + public HttpResult queryForCancel(String queryId) throws Exception { + + String url = "http://" + clickhouseConfig.getGateway_ip().trim() + "/sys/engine/tasks/"+queryId; // 声明httpPost请求 HttpDelete HttpDelete = new HttpDelete(url); // 加入配置信息 @@ -171,8 +149,6 @@ public class ClickhouseServiceImpl implements ClickhouseService { HttpResult rs = new HttpResult(); rs.setCode(response.getStatusLine().getStatusCode()); rs.setBody(EntityUtils.toString(response.getEntity(), "UTF-8")); - - //Thread.sleep(120); return rs; } catch (Exception e){ diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java index 4ddcd93..cee4af7 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java @@ -111,38 +111,28 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { @Override public void reSet(JobEntity jobEntity) { - String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id()); - try { - cs.QuerySystemForDelete(killurl); - } catch (Exception e) { - logger.error(e.toString()); - } finally { jobEntity.setStatus(0); jobEntity.setExcuteDetail("Re Execution"); ms.updateProcesses(jobEntity); - } + } @Override public void killQuery(JobEntity jobEntity) { - String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id()); try { - cs.QuerySystemForDelete(killurl); + cs.queryForCancel(jobEntity.getQuery_id()); } catch (Exception e) { logger.error(e.toString()); } } - /** - * 获取进度条信息 - */ + @Override public void updateProcessMessage(JobEntity job) { - String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id()); HttpResult hr = null; try { - hr = cs.QuerySystemForGet(queryProccess); + hr = cs.queryForProcess(job.getQuery_id()); String rs = hr.getBody().trim(); Map data = JSON.parseObject(rs); @@ -150,16 +140,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { List listdata = (List) data.get("data"); if (null != listdata && listdata.size() > 0) { Map map = JSON.parseObject(JSON.toJSONString(listdata.get(0))); - long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString()); - long read_rows = Long.parseLong(map.get("read_rows").toString()); - float elapsed = Float.parseFloat(map.get("elapsed").toString()) * 1000; - double persent = (read_rows * 1.00 / total_rows_approx); + long read_rows = Long.parseLong(map.get("rows_read").toString()); + float elapsed = Float.parseFloat(map.get("elapsed").toString()); + double persent = Double.parseDouble(map.get("percent").toString()); int result = (int) (persent * 100); - if (result > 98) { - result = 98; - } job.setExcuteTime((int) elapsed); - job.setExcuteRow(total_rows_approx); + job.setExcuteRow(read_rows); job.setExcuteProcess(result); if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) { ms.updateProcesses(job); @@ -174,7 +160,6 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { logger.error(e.toString()); } } - /** * 结果存入hbase */ diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java index 10769a1..314320f 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java @@ -17,7 +17,6 @@ import org.springframework.stereotype.Service; import java.net.SocketTimeoutException; import java.util.Map; -import static com.mesa.reportservice.configuration.ClickhouseConfig.getJobUrl; /** * Created by wk1 on 2020/1/8. @@ -41,13 +40,11 @@ public class ExcuteserviceImpl implements ExcuteService { logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + GlobelConfig.mapresult.size()); - String joburl = getJobUrl(job.getQuerySql(), job.getResultId()); - String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id()); HttpResult hr = new HttpResult(); int k = 3; do { try { - hr = cs.sendQueryForGet(joburl); + hr = cs.queryForExcute(job.getResultId().toString(),job.getQuerySql()); logger.info("httpcode" + hr.getCode()); if (hr != null && hr.getCode() != 200) { @@ -105,7 +102,7 @@ public class ExcuteserviceImpl implements ExcuteService { if(hr.getCode() != 200) { try { - cs.QuerySystemForDelete(killurl); + cs.queryForCancel(job.getQuery_id()); } catch (Exception e) { logger.error("Kill Query Error" + e.toString()); } |
