diff options
| author | lijinyang <[email protected]> | 2023-11-28 09:52:21 +0800 |
|---|---|---|
| committer | lijinyang <[email protected]> | 2023-11-28 09:52:21 +0800 |
| commit | e8293348a19e723185e3f093d01a70f649eae13d (patch) | |
| tree | 3ecf6c00be70223c737e66c15177d5bc52d502e8 | |
| parent | 9250c02a4be3a252628393f205f9f82765af8ba7 (diff) | |
fix:GAL-447 报告服务适配新版表结构23.11.28
11 files changed, 183 insertions, 380 deletions
diff --git a/docs/release/release-23-12.md b/docs/release/release-23-12.md new file mode 100644 index 0000000..5b47610 --- /dev/null +++ b/docs/release/release-23-12.md @@ -0,0 +1,2 @@ +Release 23.12 +* 报告服务适配新版表结构(GAL-447)
\ No newline at end of file @@ -245,6 +245,13 @@ <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.7</version> </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.4</version> + <scope>provided</scope> + </dependency> </dependencies> diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index 7da68a5..b3baa8b 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -1,254 +1,56 @@ package com.mesa.reportservice.bean; +import lombok.Data; + /** * Created by wk1 on 2019/5/20. */ +@Data public class JobEntity implements Cloneable { - private Integer resultId; - - private String resultName; - - private Integer jobId; - -/* private String startTime; - - private String endTime;*/ + private String jobId; private String querySql; - private Integer status; - - private Integer excuteTime; - - private Long excuteRow; - - private Integer excuteProcess; - - private String excuteDetail; - - private Integer isSendNotice; - - private String opTime; - - private String chartType; - - private String issuedTime; - - private String query_id; - - private String query_duration_ms; - - private int resultRows; - - - private String memory_usage; - - private String result; - - private int excute_status; - - private String formatSql; - - - private int isValid; - - private long beginTime; - - public Integer getResultId() { - return resultId; - } - - public void setResultId(Integer resultId) { - this.resultId = resultId; - } - - public String getResultName() { - return resultName; - } - - public void setResultName(String resultName) { - this.resultName = resultName; - } + private String state; - public Integer getJobId() { - return jobId; - } - - public void setJobId(Integer jobId) { - this.jobId = jobId; - } - -/* public String getStartTime() { - return startTime; - } - - public void setStartTime(String startTime) { - this.startTime = startTime; - } - - public String getEndTime() { - return endTime; - } - - public void setEndTime(String endTime) { - this.endTime = endTime; - }*/ - - public String getQuerySql() { - return querySql; - } - - public void setQuerySql(String querySql) { - this.querySql = querySql; - } - - public Integer getStatus() { - return status; - } - - public void setStatus(Integer status) { - this.status = status; - } - - public Integer getExcuteTime() { - return excuteTime; - } - - public void setExcuteTime(Integer excuteTime) { - this.excuteTime = excuteTime; - } - - public Long getExcuteRow() { - return excuteRow; - } - - public void setExcuteRow(Long excuteRow) { - this.excuteRow = excuteRow; - } - - public Integer getExcuteProcess() { - return excuteProcess; - } - - public void setExcuteProcess(Integer excuteProcess) { - this.excuteProcess = excuteProcess; - } - - public String getExcuteDetail() { - return excuteDetail; - } - - public void setExcuteDetail(String excuteDetail) { - this.excuteDetail = excuteDetail; - } - - public Integer getIsSendNotice() { - return isSendNotice; - } - - public void setIsSendNotice(Integer isSendNotice) { - this.isSendNotice = isSendNotice; - } - - public String getOpTime() { - return opTime; - } - - public void setOpTime(String opTime) { - this.opTime = opTime; - } - - public String getChartType() { - return chartType; - } - - public void setChartType(String chartType) { - this.chartType = chartType; - } - - public String getIssuedTime() { - return issuedTime; - } - - public void setIssuedTime(String issuedTime) { - this.issuedTime = issuedTime; - } - - public String getQuery_id() { - return query_id; - } - - public void setQuery_id(String query_id) { - this.query_id = query_id; - } - - public String getQuery_duration_ms() { - return query_duration_ms; - } + private String resultName; - public void setQuery_duration_ms(String query_duration_ms) { - this.query_duration_ms = query_duration_ms; - } + private float doneProgress; + private Integer isFailed; - public String getMemory_usage() { - return memory_usage; - } + private String resultMessage; - public void setMemory_usage(String memory_usage) { - this.memory_usage = memory_usage; - } + private Integer elapsed; - public String getResult() { - return result; - } + private long rowsRead; - public void setResult(String result) { - this.result = result; - } + private long bytesRead; - public int getExcute_status() { - return excute_status; - } + private long resultRows; - public void setExcute_status(int excute_status) { - this.excute_status = excute_status; - } + private long resultBytes; + private Integer isValid; - public int getResultRows() { - return resultRows; - } + private long startTime; - public void setResultRows(int resultRows) { - this.resultRows = resultRows; - } + private long endTime; - public String getFormatSql() { - return formatSql; - } + private long lastUpdateTime; - public void setFormatSql(String formatSql) { - this.formatSql = formatSql; - } + private long generatedTime; + private String queryId; - public int getIsValid() { - return isValid; - } + private Integer excuteStatus; - public void setIsValid(int isValid) { - this.isValid = isValid; - } + private String result; - public long getBeginTime() { - return beginTime; - } + private String memoryUsage; - public void setBeginTime(long beginTime) { - this.beginTime = beginTime; - } + private String queryDurationMs; @Override public Object clone() { diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index 21aeeb5..79f9c47 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -55,19 +55,19 @@ public class ScheduledResultController { sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");*/ - String queryid = cs.getQueryId(jobEntity.getResultId().toString(), sql); - jobEntity.setQuery_id(queryid); + String queryid = cs.getQueryId(sql); + jobEntity.setQueryId(queryid); if (jobEntity.getIsValid() == 0) { eps.killQuery(jobEntity); - gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0); - } else if (!gc.getMapresult().containsKey(jobEntity.getQuery_id())) { + gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0); + } else if (!gc.getMapresult().containsKey(jobEntity.getQueryId())) { eps.reSet(jobEntity); } - if (gc.getMapresult().containsKey(jobEntity.getQuery_id())) { + if (gc.getMapresult().containsKey(jobEntity.getQueryId())) { if (jobEntity.getIsValid() == 0) { eps.killQuery(jobEntity); - gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0); + gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0); } } else { eps.reSet(jobEntity); @@ -76,17 +76,17 @@ public class ScheduledResultController { } //遍历内存中的任务对状态1的更新进度,其他更新数据库的状态 for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) { - logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); long currentTime = System.currentTimeMillis(); - long excutetime = currentTime - entry.getValue().getBeginTime(); + long excutetime = currentTime - entry.getValue().getStartTime(); logger.info("excute time=" + excutetime + "ttl_time=" + hc.getSocketTimeout()); - if (entry.getValue().getStatus() == 1 && excutetime > hc.getSocketTimeout()) { - entry.getValue().setStatus(2); - entry.getValue().setExcute_status(500001); + if (("RUNNING").equals(entry.getValue().getState()) && excutetime > hc.getSocketTimeout()) { + entry.getValue().setState("DONE"); + entry.getValue().setExcuteStatus(500001); eps.killQuery(entry.getValue()); eps.updateResultMessage(entry.getValue()); } else { - if (entry.getValue().getStatus() == 1) { + if (("RUNNING").equals(entry.getValue().getState())) { eps.updateProcessMessage(entry.getValue()); } else { eps.updateResultMessage(entry.getValue()); @@ -99,28 +99,29 @@ public class ScheduledResultController { for (JobEntity job : jobs) { logger.info("start executing task"); long begintime = System.currentTimeMillis(); - job.setBeginTime(begintime); + job.setStartTime(begintime); String sql = job.getQuerySql().trim(); /* sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')"); sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");*/ job.setQuerySql(sql); - job.setStatus(1); - job.setExcute_status(1); - job.setExcuteDetail("EXECUTING"); - job.setExcuteRow(0L); - job.setExcuteTime(0); - job.setExcuteProcess(0); - job.setResultRows(0); + job.setState("RUNNING"); + job.setExcuteStatus(1); + job.setResultMessage("EXECUTING"); + job.setRowsRead(0L); + job.setElapsed(0); + job.setDoneProgress(0); + job.setResultRows(0L); String queryid = ""; - queryid = cs.getQueryId(job.getResultId().toString(), sql); - job.setQuery_id(queryid); + queryid = cs.getQueryId(sql); + job.setQueryId(queryid); if (queryid.equals("") ) { - job.setExcute_status(0); - job.setStatus(7); - job.setExcuteDetail("Unknow Error"); + job.setExcuteStatus(0); + job.setState("DONE"); + //status = 7 + job.setResultMessage("Unknow Error"); } - if (job.getStatus() == 1) { + if (("RUNNING").equals(job.getState())) { if (ms.updateProcesses(job) != 0) { gc.getMapresult().put(queryid, job); pool.execute(new Runnable() { @@ -148,7 +149,7 @@ public class ScheduledResultController { } else { if (gc.getMapresult().size() > 0) { for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) { - logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState()); eps.killQuery(entry.getValue()); } gc.getMapresult().clear(); diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java index f178d6e..9c9a978 100644 --- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java +++ b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java @@ -9,10 +9,10 @@ import java.io.IOException; */ public interface ClickhouseService { - String getQueryId(String resultId,String query) ; + String getQueryId(String query) ; - HttpResult queryForExcute(String resultId,String query) throws IOException; + HttpResult queryForExcute(String query) throws IOException; HttpResult queryForProcess(String queryId) throws IOException; diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java index b4ac737..e56c6f3 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java @@ -47,12 +47,12 @@ public class ClickhouseServiceImpl implements ClickhouseService { @Override - public String getQueryId(String resultId, String query) { + public String getQueryId(String query) { CloseableHttpResponse response = null; String query_id = ""; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?result_id=" + resultId + "&query="); + String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?query="); String sql = null; sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20"); url = url + sql; @@ -85,13 +85,13 @@ public class ClickhouseServiceImpl implements ClickhouseService { * @throws Exception */ @Override - public HttpResult queryForExcute(String resultId, String query) throws UnsupportedEncodingException { + public HttpResult queryForExcute(String query) throws UnsupportedEncodingException { // 声明httpPost请求 CloseableHttpResponse response = null; // 发起请求 HttpResult rs = null; try { - String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?option=long-term&result_id=" + resultId + "&query="); + String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?query="); query = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20"); String jobsql = url + query; HttpGet httpGet = new HttpGet(jobsql); diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java index a25653a..10e4bbb 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java @@ -38,42 +38,41 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { try { if (je.getIsValid() == 0) { - je.setStatus(9); - je.setExcuteDetail("CANCEL"); + //status = 9 + je.setState("DONE"); + je.setResultMessage("CANCEL"); } else { - - - if (je.getExcute_status() >= 20000000 && je.getExcute_status() < 30000000) { + je.setState("DONE"); + je.setIsFailed(1); + if (je.getExcuteStatus() >= 20000000 && je.getExcuteStatus() < 30000000) { Boolean isok = saveToHbase(je); if (isok) { - je.setExcuteDetail("SUCCESS"); - je.setExcuteProcess(100); - je.setStatus(2); - logger.info("success save to hbase query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + //status = 2 + je.setResultMessage("OK"); + je.setDoneProgress(1.00f); + je.setIsFailed(0); + logger.info("success save to hbase query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); } else { - je.setStatus(5); - je.setExcuteDetail("Write Data Error"); + //status = 5 + je.setResultMessage("Write Data Error"); mons.addFail(); - logger.error("save hbase error "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + logger.error("save hbase error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); } - } else if (je.getExcute_status() >= 40000000 && je.getExcute_status() < 50000000) { - - je.setStatus(3); - je.setExcuteDetail("Param Syntax Error"); - logger.error("Param Syntax Error "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + } else if (je.getExcuteStatus() >= 40000000 && je.getExcuteStatus() < 50000000) { + //status = 3 + je.setResultMessage("Param Syntax Error"); + logger.error("Param Syntax Error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); - } else if (je.getExcute_status() == 50001300) { - - je.setStatus(4); - je.setExcuteDetail("SQL Execution Error"); - logger.error("SQL Execution Error ! ErrorCode "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + } else if (je.getExcuteStatus() == 50001300) { + //status = 4 + je.setResultMessage("SQL Execution Error"); + logger.error("SQL Execution Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); } else { - - je.setStatus(7); - je.setExcuteDetail("Unknow Error"); - logger.error("Unknow Error ! ErrorCode "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql()); + //status = 7 + je.setResultMessage("Unknow Error"); + logger.error("Unknow Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql()); } } @@ -86,14 +85,16 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { } while (number != 1 && z >= 0); } catch (Exception e) { - je.setStatus(10); - je.setExcuteDetail("Database Error"); + //status = 10 + je.setState("DONE"); + je.setIsFailed(1); + je.setResultMessage("Database Error"); ms.updateProcesses(je); - logger.error("save database error resultid =" + je.getResultId()+" queryid=" + je.getResultId() + e.toString()); + logger.error("save database error job_id =" + je.getJobId()+" query_id=" + je.getQueryId() + e.toString()); } finally { saveToMonitor(je); - gc.getMapresult().remove(je.getQuery_id()); + gc.getMapresult().remove(je.getQueryId()); } @@ -101,9 +102,9 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { @Override public void reSet(JobEntity jobEntity) { - - jobEntity.setStatus(0); - jobEntity.setExcuteDetail("Re Execution"); + //status = 0 + jobEntity.setState("PENDING"); + jobEntity.setResultMessage("Re Execution"); ms.updateProcesses(jobEntity); } @@ -111,14 +112,14 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { @Override public void killQuery(JobEntity jobEntity) throws IOException { - cs.queryForCancel(jobEntity.getQuery_id()); + cs.queryForCancel(jobEntity.getQueryId()); } @Override public void updateProcessMessage(JobEntity job) throws IOException { - HttpResult hr = cs.queryForProcess(job.getQuery_id()); + HttpResult hr = cs.queryForProcess(job.getQueryId()); if (hr!=null) { String rs = hr.getBody().trim(); Map data = JSON.parseObject(rs); @@ -131,12 +132,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { float elapsed = Float.parseFloat(map.get("elapsed").toString()); double persent = Double.parseDouble(map.get("percent").toString()); int process = (int) (persent * 100); - job.setExcuteTime((int) elapsed); - job.setExcuteRow(read_rows); - if(job.getExcuteProcess()<process){ - job.setExcuteProcess(process); + job.setElapsed((int) elapsed); + job.setRowsRead(read_rows); + if(job.getDoneProgress()<process){ + job.setDoneProgress(process); } - if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) { + if (job.getRowsRead() != 0 || job.getElapsed() != 0) { ms.updateProcesses(job); } } else { @@ -175,7 +176,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { * promethus记录结果 */ public void saveToMonitor(JobEntity entity) { - if (entity.getStatus() == 2) { + if (("DONE").equals(entity.getState()) && entity.getIsFailed() == 0 && entity.getIsValid() == 1) { mons.addSuccess(); } else { diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java index ddc2ee9..11e5a76 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java @@ -35,12 +35,12 @@ public class ExcuteserviceImpl implements ExcuteService { public void excuteCkTask(JobEntity job) { - logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size()); + logger.info("execute queryid=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size()); HttpResult hr = new HttpResult(); int k = 3; do { try { - hr = cs.queryForExcute(job.getResultId().toString(), job.getQuerySql()); + hr = cs.queryForExcute(job.getQuerySql()); if (hr != null) { Map mapresult = JSON.parseObject(hr.getBody()); int query_status = Integer.parseInt(mapresult.get("status").toString()); @@ -51,16 +51,16 @@ public class ExcuteserviceImpl implements ExcuteService { mapresult = JSON.parseObject(hr.getBody()); Map rows = (Map) mapresult.get("statistics"); - job.setResultRows(Integer.parseInt(rows.get("result_rows").toString())); - job.setExcuteRow(Long.parseLong(rows.get("rows_read").toString())); + job.setResultRows(Long.parseLong(rows.get("result_rows").toString())); + job.setRowsRead(Long.parseLong(rows.get("rows_read").toString())); job.setResult(hr.getBody()); - job.setExcuteTime((int) Float.parseFloat(rows.get("elapsed").toString())); - job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); - logger.info("success resultid = " + job.getResultId() + " queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql()); + job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString())); + job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString())); + logger.info("success resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql()); } else { k = 0; - job.setExcute_status(Integer.parseInt(mapresult.get("code").toString())); + job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString())); logger.error("excute sql Error "); } } else { @@ -68,11 +68,11 @@ public class ExcuteserviceImpl implements ExcuteService { } } catch (SocketTimeoutException e) { k--; - job.setExcuteDetail(e.toString()); + job.setResultMessage(e.toString()); if (k == 0) { - job.setExcute_status(500001); - job.setExcuteDetail("SQL Execution Error excute query time out"); - logger.info("timeout resultid = " + job.getResultId() + " queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql()); + job.setExcuteStatus(500001); + job.setResultMessage("SQL Execution Error excute query time out"); + logger.info("timeout resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql()); } else { logger.info("Socket warn " + e.toString() + "retry time " + (3 - k)); } @@ -80,32 +80,32 @@ public class ExcuteserviceImpl implements ExcuteService { } catch (ConnectTimeoutException e) { - job.setExcute_status(555999); - job.setExcuteDetail(e.toString()); + job.setExcuteStatus(555999); + job.setResultMessage(e.toString()); logger.error("Unknow Error" + e.toString()); k = 0; } catch (OutOfMemoryError e) { - job.setExcute_status(555999); - job.setExcuteDetail("result too large"); + job.setExcuteStatus(555999); + job.setResultMessage("result too large"); logger.error("outofmemery Error" + e.toString()); k = 0; } catch (Exception e) { - job.setExcute_status(555999); - job.setExcuteDetail(e.toString()); + job.setExcuteStatus(555999); + job.setResultMessage(e.toString()); logger.error("Unknow Error" + e.toString()); k = 0; } try { - cs.queryForCancel(job.getQuery_id()); + cs.queryForCancel(job.getQueryId()); } catch (Exception e) { logger.error("Kill Query Error" + e.toString()); } } while (k > 0); - job.setStatus(2); + job.setState("DONE"); } } diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java index 6932853..90cafb3 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java @@ -30,16 +30,6 @@ public class HbaseServiceImpl implements HbaseService { private HbaseProperties hbproperties; Log logger = Log.get(); - - private String getRowKey(String query_id) { - - String rowKey = ""; - String[] queryIdArray = query_id.split(":"); - rowKey = queryIdArray[1]; - return rowKey; - - } - @Override public Boolean put(JobEntity jobEntity) { @@ -48,21 +38,20 @@ public class HbaseServiceImpl implements HbaseService { Table table = null; try { table = hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable())); - Put put = new Put(Bytes.toBytes(getRowKey(jobEntity.getQuery_id()))); + Put put = new Put(Bytes.toBytes(jobEntity.getJobId())); put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), Bytes.toBytes(jobEntity.getResult())); - put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("result_id"), Bytes.toBytes(jobEntity.getResultId())); put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql())); //put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql())); - if (jobEntity.getExcuteRow() != null) { - put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getExcuteRow())); + if (jobEntity.getRowsRead() != 0) { + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getRowsRead())); } - if (jobEntity.getQuery_duration_ms() != null) { - put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms())); + if (jobEntity.getQueryDurationMs() != null) { + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQueryDurationMs())); } - if (jobEntity.getMemory_usage() != null) { - put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemory_usage())); + if (jobEntity.getMemoryUsage() != null) { + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemoryUsage())); } if (hbproperties.getCell_ttl_d() != null) { put.setTTL(hbproperties.getCell_ttl_d() * 86400 * 1000); diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java index f16ac68..60f7557 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java @@ -8,6 +8,7 @@ import com.mesa.reportservice.util.DateUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; @@ -48,9 +49,7 @@ public class MysqlServiceImpl implements MysqlService { public int updateProcesses(JobEntity job){ Date currentTime = new Date(); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String sDate = formatter.format(currentTime); - job.setOpTime(sDate); + job.setLastUpdateTime(currentTime.getTime()); return rrm.updateProcesses(job); } diff --git a/src/main/resources/mappers/ReportResultMapper.xml b/src/main/resources/mappers/ReportResultMapper.xml index a1f610f..2bc828b 100644 --- a/src/main/resources/mappers/ReportResultMapper.xml +++ b/src/main/resources/mappers/ReportResultMapper.xml @@ -2,66 +2,68 @@ <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.mesa.reportservice.mapper.ReportResultMapper" > <resultMap type="com.mesa.reportservice.bean.JobEntity" id="BaseResultMap"> - <id property="resultId" column="result_id"/> - <result property="resultName" column="result_name"/> -<!-- <result property="startTime" column="start_time"/> - <result property="endTime" column="end_time"/>--> + <id property="jobId" column="job_id"/> <result property="querySql" column="query_sql"/> - <result property="status" column="status"/> - <result property="excuteTime" column="excute_time"/> - <result property="excuteRow" column="excute_row"/> - <result property="excuteProcess" column="excute_process"/> - <result property="excuteDetail" column="excute_detail"/> - <result property="opTime" column="op_time"/> - <result property="issuedTime" column="issued_time"/> + <result property="state" column="state"/> + <result property="doneProgress" column="done_progress"/> + <result property="isFailed" column="is_failed"/> + <result property="resultMessage" column="result_message"/> + <result property="elapsed" column="elapsed"/> + <result property="rowsRead" column="rows_read"/> + <result property="bytesRead" column="bytes_read"/> <result property="resultRows" column="result_rows"/> + <result property="resultBytes" column="result_bytes"/> <result property="isValid" column="is_valid"/> + <result property="startTime" column="start_time"/> + <result property="endTime" column="end_time"/> + <result property="lastUpdateTime" column="last_update_time"/> + <result property="generatedTime" column="generated_time"/> </resultMap> <sql id="Base_Column_List" > - result_id, result_name, query_sql, status, excute_time, - excute_row, excute_process, excute_detail, is_send_notice,DATE_FORMAT(op_time,'%Y-%m-%d %H:%i:%s') as op_time,DATE_FORMAT(issued_time,'%Y-%m-%d %H:%i:%s') as issued_time,result_rows,is_valid + job_id,query_sql,state,done_progress,is_failed,result_message,elapsed,rows_read, + bytes_read,result_rows,result_bytes,is_valid,start_time,end_time,last_update_time,generated_time </sql> <select id="getJobForExcute" resultMap="BaseResultMap" parameterType="java.lang.Integer" > select <include refid="Base_Column_List" /> - from report_result - where status = 1 order by issued_time limit 10 + from saved_query_job + where state = 'RUNNING' order by generated_time limit 10 </select> <select id="getJobTask" resultMap="BaseResultMap" parameterType="hashmap"> select <include refid="Base_Column_List" /> - from report_result - where status = 0 and is_valid = 1 and issued_time < #{issuedtime} order by issued_time limit ${rows} + from saved_query_job + where state = 'PENDING' and is_valid = 1 order by generated_time limit ${rows} </select> <update id="updateProcesses" parameterType="com.mesa.reportservice.bean.JobEntity" > - update report_result + update saved_query_job <set > - <if test="status != null" > - status = #{status,jdbcType=INTEGER}, + <if test="state != null" > + state = #{state,jdbcType=VARCHAR}, </if> - <if test="excuteTime != null" > - excute_time = #{excuteTime,jdbcType=INTEGER}, + <if test="elapsed != null" > + elapsed = #{elapsed,jdbcType=INTEGER}, </if> - <if test="excuteRow != null" > - excute_row = #{excuteRow,jdbcType=BIGINT}, + <if test="rowsRead != null" > + rows_read = #{rowsRead,jdbcType=BIGINT}, </if> - <if test="excuteProcess != null" > - excute_process = #{excuteProcess,jdbcType=INTEGER}, + <if test="doneProgress != null" > + done_progress = #{doneProgress,jdbcType=FLOAT}, </if> - <if test="excuteDetail != null" > - excute_detail = #{excuteDetail,jdbcType=VARCHAR}, + <if test="resultMessage != null" > + result_message = #{resultMessage,jdbcType=VARCHAR}, </if> - <if test="opTime != null" > - op_time = #{opTime,jdbcType=TIMESTAMP}, + <if test="lastUpdateTime != null" > + last_update_time = #{lastUpdateTime,jdbcType=BIGINT}, </if> <if test="resultRows != null" > result_rows = #{resultRows,jdbcType=INTEGER}, </if> </set> - where result_id = #{resultId,jdbcType=INTEGER} + where job_id = #{jobId,jdbcType=VARCHAR} </update> @@ -69,29 +71,29 @@ <update id="updateStatue" parameterType="com.mesa.reportservice.bean.JobEntity" > - update report_result + update saved_query_job <set > - <if test="status != null" > - status = #{status,jdbcType=INTEGER}, + <if test="state != null" > + state = #{state,jdbcType=VARCHAR}, </if> - <if test="excuteDetail != null" > - excute_detail = #{excuteDetail,jdbcType=VARCHAR}, + <if test="resultMessage != null" > + result_message = #{resultMessage,jdbcType=VARCHAR}, </if> - <if test="opTime != null" > - op_time = #{opTime,jdbcType=TIMESTAMP}, + <if test="lastUpdateTime != null" > + last_update_time = #{lastUpdateTime,jdbcType=BIGINT}, </if> <if test="resultRows != null" > result_rows = #{resultRows,jdbcType=INTEGER}, </if> </set> - where result_id = #{resultId,jdbcType=INTEGER} + where job_id = #{jobId,jdbcType=VARCHAR} </update> <select id="getJobCount" resultType="map" parameterType="hashmap"> - SELECT (SELECT COUNT(1) FROM report_result where status = 0 and is_valid = 1 and issued_time < #{issuedtime}) as queueNum, - (SELECT COUNT(1) FROM report_result where status = 1 and is_valid = 1 ) as excuteingNum, - (SELECT COUNT(1) FROM report_result where status = 2 and op_time > #{optime} ) as todaySuccessNum, - (SELECT COUNT(1) FROM report_result where status > 2 and op_time > #{optime} ) as todayErrorNum + SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum, + (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as excuteingNum, + (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum, + (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum </select> </mapper>
\ No newline at end of file |
