summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorlijinyang <[email protected]>2023-11-28 09:52:21 +0800
committerlijinyang <[email protected]>2023-11-28 09:52:21 +0800
commite8293348a19e723185e3f093d01a70f649eae13d (patch)
tree3ecf6c00be70223c737e66c15177d5bc52d502e8 /src/main
parent9250c02a4be3a252628393f205f9f82765af8ba7 (diff)
fix:GAL-447 报告服务适配新版表结构23.11.28
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java246
-rw-r--r--src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java55
-rw-r--r--src/main/java/com/mesa/reportservice/service/ClickhouseService.java4
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java8
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java83
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java40
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java25
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java5
-rw-r--r--src/main/resources/mappers/ReportResultMapper.xml88
9 files changed, 174 insertions, 380 deletions
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index 7da68a5..b3baa8b 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -1,254 +1,56 @@
package com.mesa.reportservice.bean;
+import lombok.Data;
+
/**
* Created by wk1 on 2019/5/20.
*/
+@Data
public class JobEntity implements Cloneable {
- private Integer resultId;
-
- private String resultName;
-
- private Integer jobId;
-
-/* private String startTime;
-
- private String endTime;*/
+ private String jobId;
private String querySql;
- private Integer status;
-
- private Integer excuteTime;
-
- private Long excuteRow;
-
- private Integer excuteProcess;
-
- private String excuteDetail;
-
- private Integer isSendNotice;
-
- private String opTime;
-
- private String chartType;
-
- private String issuedTime;
-
- private String query_id;
-
- private String query_duration_ms;
-
- private int resultRows;
-
-
- private String memory_usage;
-
- private String result;
-
- private int excute_status;
-
- private String formatSql;
-
-
- private int isValid;
-
- private long beginTime;
-
- public Integer getResultId() {
- return resultId;
- }
-
- public void setResultId(Integer resultId) {
- this.resultId = resultId;
- }
-
- public String getResultName() {
- return resultName;
- }
-
- public void setResultName(String resultName) {
- this.resultName = resultName;
- }
+ private String state;
- public Integer getJobId() {
- return jobId;
- }
-
- public void setJobId(Integer jobId) {
- this.jobId = jobId;
- }
-
-/* public String getStartTime() {
- return startTime;
- }
-
- public void setStartTime(String startTime) {
- this.startTime = startTime;
- }
-
- public String getEndTime() {
- return endTime;
- }
-
- public void setEndTime(String endTime) {
- this.endTime = endTime;
- }*/
-
- public String getQuerySql() {
- return querySql;
- }
-
- public void setQuerySql(String querySql) {
- this.querySql = querySql;
- }
-
- public Integer getStatus() {
- return status;
- }
-
- public void setStatus(Integer status) {
- this.status = status;
- }
-
- public Integer getExcuteTime() {
- return excuteTime;
- }
-
- public void setExcuteTime(Integer excuteTime) {
- this.excuteTime = excuteTime;
- }
-
- public Long getExcuteRow() {
- return excuteRow;
- }
-
- public void setExcuteRow(Long excuteRow) {
- this.excuteRow = excuteRow;
- }
-
- public Integer getExcuteProcess() {
- return excuteProcess;
- }
-
- public void setExcuteProcess(Integer excuteProcess) {
- this.excuteProcess = excuteProcess;
- }
-
- public String getExcuteDetail() {
- return excuteDetail;
- }
-
- public void setExcuteDetail(String excuteDetail) {
- this.excuteDetail = excuteDetail;
- }
-
- public Integer getIsSendNotice() {
- return isSendNotice;
- }
-
- public void setIsSendNotice(Integer isSendNotice) {
- this.isSendNotice = isSendNotice;
- }
-
- public String getOpTime() {
- return opTime;
- }
-
- public void setOpTime(String opTime) {
- this.opTime = opTime;
- }
-
- public String getChartType() {
- return chartType;
- }
-
- public void setChartType(String chartType) {
- this.chartType = chartType;
- }
-
- public String getIssuedTime() {
- return issuedTime;
- }
-
- public void setIssuedTime(String issuedTime) {
- this.issuedTime = issuedTime;
- }
-
- public String getQuery_id() {
- return query_id;
- }
-
- public void setQuery_id(String query_id) {
- this.query_id = query_id;
- }
-
- public String getQuery_duration_ms() {
- return query_duration_ms;
- }
+ private String resultName;
- public void setQuery_duration_ms(String query_duration_ms) {
- this.query_duration_ms = query_duration_ms;
- }
+ private float doneProgress;
+ private Integer isFailed;
- public String getMemory_usage() {
- return memory_usage;
- }
+ private String resultMessage;
- public void setMemory_usage(String memory_usage) {
- this.memory_usage = memory_usage;
- }
+ private Integer elapsed;
- public String getResult() {
- return result;
- }
+ private long rowsRead;
- public void setResult(String result) {
- this.result = result;
- }
+ private long bytesRead;
- public int getExcute_status() {
- return excute_status;
- }
+ private long resultRows;
- public void setExcute_status(int excute_status) {
- this.excute_status = excute_status;
- }
+ private long resultBytes;
+ private Integer isValid;
- public int getResultRows() {
- return resultRows;
- }
+ private long startTime;
- public void setResultRows(int resultRows) {
- this.resultRows = resultRows;
- }
+ private long endTime;
- public String getFormatSql() {
- return formatSql;
- }
+ private long lastUpdateTime;
- public void setFormatSql(String formatSql) {
- this.formatSql = formatSql;
- }
+ private long generatedTime;
+ private String queryId;
- public int getIsValid() {
- return isValid;
- }
+ private Integer excuteStatus;
- public void setIsValid(int isValid) {
- this.isValid = isValid;
- }
+ private String result;
- public long getBeginTime() {
- return beginTime;
- }
+ private String memoryUsage;
- public void setBeginTime(long beginTime) {
- this.beginTime = beginTime;
- }
+ private String queryDurationMs;
@Override
public Object clone() {
diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
index 21aeeb5..79f9c47 100644
--- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
+++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
@@ -55,19 +55,19 @@ public class ScheduledResultController {
sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')");
sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')");*/
- String queryid = cs.getQueryId(jobEntity.getResultId().toString(), sql);
- jobEntity.setQuery_id(queryid);
+ String queryid = cs.getQueryId(sql);
+ jobEntity.setQueryId(queryid);
if (jobEntity.getIsValid() == 0) {
eps.killQuery(jobEntity);
- gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0);
- } else if (!gc.getMapresult().containsKey(jobEntity.getQuery_id())) {
+ gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0);
+ } else if (!gc.getMapresult().containsKey(jobEntity.getQueryId())) {
eps.reSet(jobEntity);
}
- if (gc.getMapresult().containsKey(jobEntity.getQuery_id())) {
+ if (gc.getMapresult().containsKey(jobEntity.getQueryId())) {
if (jobEntity.getIsValid() == 0) {
eps.killQuery(jobEntity);
- gc.getMapresult().get(jobEntity.getQuery_id()).setIsValid(0);
+ gc.getMapresult().get(jobEntity.getQueryId()).setIsValid(0);
}
} else {
eps.reSet(jobEntity);
@@ -76,17 +76,17 @@ public class ScheduledResultController {
}
//遍历内存中的任务对状态1的更新进度,其他更新数据库的状态
for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
- logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus());
+ logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
long currentTime = System.currentTimeMillis();
- long excutetime = currentTime - entry.getValue().getBeginTime();
+ long excutetime = currentTime - entry.getValue().getStartTime();
logger.info("excute time=" + excutetime + "ttl_time=" + hc.getSocketTimeout());
- if (entry.getValue().getStatus() == 1 && excutetime > hc.getSocketTimeout()) {
- entry.getValue().setStatus(2);
- entry.getValue().setExcute_status(500001);
+ if (("RUNNING").equals(entry.getValue().getState()) && excutetime > hc.getSocketTimeout()) {
+ entry.getValue().setState("DONE");
+ entry.getValue().setExcuteStatus(500001);
eps.killQuery(entry.getValue());
eps.updateResultMessage(entry.getValue());
} else {
- if (entry.getValue().getStatus() == 1) {
+ if (("RUNNING").equals(entry.getValue().getState())) {
eps.updateProcessMessage(entry.getValue());
} else {
eps.updateResultMessage(entry.getValue());
@@ -99,28 +99,29 @@ public class ScheduledResultController {
for (JobEntity job : jobs) {
logger.info("start executing task");
long begintime = System.currentTimeMillis();
- job.setBeginTime(begintime);
+ job.setStartTime(begintime);
String sql = job.getQuerySql().trim();
/* sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')");
sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')");
sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')");*/
job.setQuerySql(sql);
- job.setStatus(1);
- job.setExcute_status(1);
- job.setExcuteDetail("EXECUTING");
- job.setExcuteRow(0L);
- job.setExcuteTime(0);
- job.setExcuteProcess(0);
- job.setResultRows(0);
+ job.setState("RUNNING");
+ job.setExcuteStatus(1);
+ job.setResultMessage("EXECUTING");
+ job.setRowsRead(0L);
+ job.setElapsed(0);
+ job.setDoneProgress(0);
+ job.setResultRows(0L);
String queryid = "";
- queryid = cs.getQueryId(job.getResultId().toString(), sql);
- job.setQuery_id(queryid);
+ queryid = cs.getQueryId(sql);
+ job.setQueryId(queryid);
if (queryid.equals("") ) {
- job.setExcute_status(0);
- job.setStatus(7);
- job.setExcuteDetail("Unknow Error");
+ job.setExcuteStatus(0);
+ job.setState("DONE");
+ //status = 7
+ job.setResultMessage("Unknow Error");
}
- if (job.getStatus() == 1) {
+ if (("RUNNING").equals(job.getState())) {
if (ms.updateProcesses(job) != 0) {
gc.getMapresult().put(queryid, job);
pool.execute(new Runnable() {
@@ -148,7 +149,7 @@ public class ScheduledResultController {
} else {
if (gc.getMapresult().size() > 0) {
for (Map.Entry<String, JobEntity> entry : gc.getMapresult().entrySet()) {
- logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus());
+ logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getState());
eps.killQuery(entry.getValue());
}
gc.getMapresult().clear();
diff --git a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
index f178d6e..9c9a978 100644
--- a/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
+++ b/src/main/java/com/mesa/reportservice/service/ClickhouseService.java
@@ -9,10 +9,10 @@ import java.io.IOException;
*/
public interface ClickhouseService {
- String getQueryId(String resultId,String query) ;
+ String getQueryId(String query) ;
- HttpResult queryForExcute(String resultId,String query) throws IOException;
+ HttpResult queryForExcute(String query) throws IOException;
HttpResult queryForProcess(String queryId) throws IOException;
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
index b4ac737..e56c6f3 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
@@ -47,12 +47,12 @@ public class ClickhouseServiceImpl implements ClickhouseService {
@Override
- public String getQueryId(String resultId, String query) {
+ public String getQueryId(String query) {
CloseableHttpResponse response = null;
String query_id = "";
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?result_id=" + resultId + "&query=");
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/v1/sql/query/query_id?query=");
String sql = null;
sql = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
url = url + sql;
@@ -85,13 +85,13 @@ public class ClickhouseServiceImpl implements ClickhouseService {
* @throws Exception
*/
@Override
- public HttpResult queryForExcute(String resultId, String query) throws UnsupportedEncodingException {
+ public HttpResult queryForExcute(String query) throws UnsupportedEncodingException {
// 声明httpPost请求
CloseableHttpResponse response = null;
// 发起请求
HttpResult rs = null;
try {
- String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?option=long-term&result_id=" + resultId + "&query=");
+ String url = URLUtil.normalize(clickhouseConfig.getGateway_ip().trim() + "/sql?query=");
query = URLEncoder.encode(query, "utf8").replaceAll("\\+", "%20");
String jobsql = url + query;
HttpGet httpGet = new HttpGet(jobsql);
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
index a25653a..10e4bbb 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
@@ -38,42 +38,41 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
try {
if (je.getIsValid() == 0) {
- je.setStatus(9);
- je.setExcuteDetail("CANCEL");
+ //status = 9
+ je.setState("DONE");
+ je.setResultMessage("CANCEL");
} else {
-
-
- if (je.getExcute_status() >= 20000000 && je.getExcute_status() < 30000000) {
+ je.setState("DONE");
+ je.setIsFailed(1);
+ if (je.getExcuteStatus() >= 20000000 && je.getExcuteStatus() < 30000000) {
Boolean isok = saveToHbase(je);
if (isok) {
- je.setExcuteDetail("SUCCESS");
- je.setExcuteProcess(100);
- je.setStatus(2);
- logger.info("success save to hbase query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql());
+ //status = 2
+ je.setResultMessage("OK");
+ je.setDoneProgress(1.00f);
+ je.setIsFailed(0);
+ logger.info("success save to hbase query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
} else {
- je.setStatus(5);
- je.setExcuteDetail("Write Data Error");
+ //status = 5
+ je.setResultMessage("Write Data Error");
mons.addFail();
- logger.error("save hbase error "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql());
+ logger.error("save hbase error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
}
- } else if (je.getExcute_status() >= 40000000 && je.getExcute_status() < 50000000) {
-
- je.setStatus(3);
- je.setExcuteDetail("Param Syntax Error");
- logger.error("Param Syntax Error "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql());
+ } else if (je.getExcuteStatus() >= 40000000 && je.getExcuteStatus() < 50000000) {
+ //status = 3
+ je.setResultMessage("Param Syntax Error");
+ logger.error("Param Syntax Error "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
- } else if (je.getExcute_status() == 50001300) {
-
- je.setStatus(4);
- je.setExcuteDetail("SQL Execution Error");
- logger.error("SQL Execution Error ! ErrorCode "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql());
+ } else if (je.getExcuteStatus() == 50001300) {
+ //status = 4
+ je.setResultMessage("SQL Execution Error");
+ logger.error("SQL Execution Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
} else {
-
- je.setStatus(7);
- je.setExcuteDetail("Unknow Error");
- logger.error("Unknow Error ! ErrorCode "+je.getExcute_status()+" query_id=" + je.getQuery_id() + " resultid =" + je.getResultId() + " excutesql=" + je.getQuerySql());
+ //status = 7
+ je.setResultMessage("Unknow Error");
+ logger.error("Unknow Error ! ErrorCode "+je.getExcuteStatus()+" query_id=" + je.getQueryId() + " job_id =" + je.getJobId() + " excute_sql=" + je.getQuerySql());
}
}
@@ -86,14 +85,16 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
}
while (number != 1 && z >= 0);
} catch (Exception e) {
- je.setStatus(10);
- je.setExcuteDetail("Database Error");
+ //status = 10
+ je.setState("DONE");
+ je.setIsFailed(1);
+ je.setResultMessage("Database Error");
ms.updateProcesses(je);
- logger.error("save database error resultid =" + je.getResultId()+" queryid=" + je.getResultId() + e.toString());
+ logger.error("save database error job_id =" + je.getJobId()+" query_id=" + je.getQueryId() + e.toString());
} finally {
saveToMonitor(je);
- gc.getMapresult().remove(je.getQuery_id());
+ gc.getMapresult().remove(je.getQueryId());
}
@@ -101,9 +102,9 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
@Override
public void reSet(JobEntity jobEntity) {
-
- jobEntity.setStatus(0);
- jobEntity.setExcuteDetail("Re Execution");
+ //status = 0
+ jobEntity.setState("PENDING");
+ jobEntity.setResultMessage("Re Execution");
ms.updateProcesses(jobEntity);
}
@@ -111,14 +112,14 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
@Override
public void killQuery(JobEntity jobEntity) throws IOException {
- cs.queryForCancel(jobEntity.getQuery_id());
+ cs.queryForCancel(jobEntity.getQueryId());
}
@Override
public void updateProcessMessage(JobEntity job) throws IOException {
- HttpResult hr = cs.queryForProcess(job.getQuery_id());
+ HttpResult hr = cs.queryForProcess(job.getQueryId());
if (hr!=null) {
String rs = hr.getBody().trim();
Map data = JSON.parseObject(rs);
@@ -131,12 +132,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
float elapsed = Float.parseFloat(map.get("elapsed").toString());
double persent = Double.parseDouble(map.get("percent").toString());
int process = (int) (persent * 100);
- job.setExcuteTime((int) elapsed);
- job.setExcuteRow(read_rows);
- if(job.getExcuteProcess()<process){
- job.setExcuteProcess(process);
+ job.setElapsed((int) elapsed);
+ job.setRowsRead(read_rows);
+ if(job.getDoneProgress()<process){
+ job.setDoneProgress(process);
}
- if (job.getExcuteRow() != 0 || job.getExcuteTime() != 0) {
+ if (job.getRowsRead() != 0 || job.getElapsed() != 0) {
ms.updateProcesses(job);
}
} else {
@@ -175,7 +176,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
* promethus记录结果
*/
public void saveToMonitor(JobEntity entity) {
- if (entity.getStatus() == 2) {
+ if (("DONE").equals(entity.getState()) && entity.getIsFailed() == 0 && entity.getIsValid() == 1) {
mons.addSuccess();
} else {
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
index ddc2ee9..11e5a76 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
@@ -35,12 +35,12 @@ public class ExcuteserviceImpl implements ExcuteService {
public void excuteCkTask(JobEntity job) {
- logger.info("execute queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size());
+ logger.info("execute queryid=" + job.getQueryId() + " sql=" + job.getQuerySql() + "mapresult size=" + gc.getMapresult().size());
HttpResult hr = new HttpResult();
int k = 3;
do {
try {
- hr = cs.queryForExcute(job.getResultId().toString(), job.getQuerySql());
+ hr = cs.queryForExcute(job.getQuerySql());
if (hr != null) {
Map mapresult = JSON.parseObject(hr.getBody());
int query_status = Integer.parseInt(mapresult.get("status").toString());
@@ -51,16 +51,16 @@ public class ExcuteserviceImpl implements ExcuteService {
mapresult = JSON.parseObject(hr.getBody());
Map rows = (Map) mapresult.get("statistics");
- job.setResultRows(Integer.parseInt(rows.get("result_rows").toString()));
- job.setExcuteRow(Long.parseLong(rows.get("rows_read").toString()));
+ job.setResultRows(Long.parseLong(rows.get("result_rows").toString()));
+ job.setRowsRead(Long.parseLong(rows.get("rows_read").toString()));
job.setResult(hr.getBody());
- job.setExcuteTime((int) Float.parseFloat(rows.get("elapsed").toString()));
- job.setExcute_status(Integer.parseInt(mapresult.get("code").toString()));
- logger.info("success resultid = " + job.getResultId() + " queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql());
+ job.setElapsed((int) Float.parseFloat(rows.get("elapsed").toString()));
+ job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString()));
+ logger.info("success resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql());
} else {
k = 0;
- job.setExcute_status(Integer.parseInt(mapresult.get("code").toString()));
+ job.setExcuteStatus(Integer.parseInt(mapresult.get("code").toString()));
logger.error("excute sql Error ");
}
} else {
@@ -68,11 +68,11 @@ public class ExcuteserviceImpl implements ExcuteService {
}
} catch (SocketTimeoutException e) {
k--;
- job.setExcuteDetail(e.toString());
+ job.setResultMessage(e.toString());
if (k == 0) {
- job.setExcute_status(500001);
- job.setExcuteDetail("SQL Execution Error excute query time out");
- logger.info("timeout resultid = " + job.getResultId() + " queryid=" + job.getQuery_id() + " sql=" + job.getQuerySql());
+ job.setExcuteStatus(500001);
+ job.setResultMessage("SQL Execution Error excute query time out");
+ logger.info("timeout resultid = " + job.getJobId() + " queryid=" + job.getQueryId() + " sql=" + job.getQuerySql());
} else {
logger.info("Socket warn " + e.toString() + "retry time " + (3 - k));
}
@@ -80,32 +80,32 @@ public class ExcuteserviceImpl implements ExcuteService {
} catch (ConnectTimeoutException e) {
- job.setExcute_status(555999);
- job.setExcuteDetail(e.toString());
+ job.setExcuteStatus(555999);
+ job.setResultMessage(e.toString());
logger.error("Unknow Error" + e.toString());
k = 0;
} catch (OutOfMemoryError e) {
- job.setExcute_status(555999);
- job.setExcuteDetail("result too large");
+ job.setExcuteStatus(555999);
+ job.setResultMessage("result too large");
logger.error("outofmemery Error" + e.toString());
k = 0;
} catch (Exception e) {
- job.setExcute_status(555999);
- job.setExcuteDetail(e.toString());
+ job.setExcuteStatus(555999);
+ job.setResultMessage(e.toString());
logger.error("Unknow Error" + e.toString());
k = 0;
}
try {
- cs.queryForCancel(job.getQuery_id());
+ cs.queryForCancel(job.getQueryId());
} catch (Exception e) {
logger.error("Kill Query Error" + e.toString());
}
}
while (k > 0);
- job.setStatus(2);
+ job.setState("DONE");
}
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
index 6932853..90cafb3 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
@@ -30,16 +30,6 @@ public class HbaseServiceImpl implements HbaseService {
private HbaseProperties hbproperties;
Log logger = Log.get();
-
- private String getRowKey(String query_id) {
-
- String rowKey = "";
- String[] queryIdArray = query_id.split(":");
- rowKey = queryIdArray[1];
- return rowKey;
-
- }
-
@Override
public Boolean put(JobEntity jobEntity) {
@@ -48,21 +38,20 @@ public class HbaseServiceImpl implements HbaseService {
Table table = null;
try {
table = hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()));
- Put put = new Put(Bytes.toBytes(getRowKey(jobEntity.getQuery_id())));
+ Put put = new Put(Bytes.toBytes(jobEntity.getJobId()));
put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), Bytes.toBytes(jobEntity.getResult()));
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("result_id"), Bytes.toBytes(jobEntity.getResultId()));
put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql()));
//put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql()));
- if (jobEntity.getExcuteRow() != null) {
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getExcuteRow()));
+ if (jobEntity.getRowsRead() != 0) {
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getRowsRead()));
}
- if (jobEntity.getQuery_duration_ms() != null) {
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms()));
+ if (jobEntity.getQueryDurationMs() != null) {
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQueryDurationMs()));
}
- if (jobEntity.getMemory_usage() != null) {
- put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemory_usage()));
+ if (jobEntity.getMemoryUsage() != null) {
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemoryUsage()));
}
if (hbproperties.getCell_ttl_d() != null) {
put.setTTL(hbproperties.getCell_ttl_d() * 86400 * 1000);
diff --git a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
index f16ac68..60f7557 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/MysqlServiceImpl.java
@@ -8,6 +8,7 @@ import com.mesa.reportservice.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@@ -48,9 +49,7 @@ public class MysqlServiceImpl implements MysqlService {
public int updateProcesses(JobEntity job){
Date currentTime = new Date();
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String sDate = formatter.format(currentTime);
- job.setOpTime(sDate);
+ job.setLastUpdateTime(currentTime.getTime());
return rrm.updateProcesses(job);
}
diff --git a/src/main/resources/mappers/ReportResultMapper.xml b/src/main/resources/mappers/ReportResultMapper.xml
index a1f610f..2bc828b 100644
--- a/src/main/resources/mappers/ReportResultMapper.xml
+++ b/src/main/resources/mappers/ReportResultMapper.xml
@@ -2,66 +2,68 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.mesa.reportservice.mapper.ReportResultMapper" >
<resultMap type="com.mesa.reportservice.bean.JobEntity" id="BaseResultMap">
- <id property="resultId" column="result_id"/>
- <result property="resultName" column="result_name"/>
-<!-- <result property="startTime" column="start_time"/>
- <result property="endTime" column="end_time"/>-->
+ <id property="jobId" column="job_id"/>
<result property="querySql" column="query_sql"/>
- <result property="status" column="status"/>
- <result property="excuteTime" column="excute_time"/>
- <result property="excuteRow" column="excute_row"/>
- <result property="excuteProcess" column="excute_process"/>
- <result property="excuteDetail" column="excute_detail"/>
- <result property="opTime" column="op_time"/>
- <result property="issuedTime" column="issued_time"/>
+ <result property="state" column="state"/>
+ <result property="doneProgress" column="done_progress"/>
+ <result property="isFailed" column="is_failed"/>
+ <result property="resultMessage" column="result_message"/>
+ <result property="elapsed" column="elapsed"/>
+ <result property="rowsRead" column="rows_read"/>
+ <result property="bytesRead" column="bytes_read"/>
<result property="resultRows" column="result_rows"/>
+ <result property="resultBytes" column="result_bytes"/>
<result property="isValid" column="is_valid"/>
+ <result property="startTime" column="start_time"/>
+ <result property="endTime" column="end_time"/>
+ <result property="lastUpdateTime" column="last_update_time"/>
+ <result property="generatedTime" column="generated_time"/>
</resultMap>
<sql id="Base_Column_List" >
- result_id, result_name, query_sql, status, excute_time,
- excute_row, excute_process, excute_detail, is_send_notice,DATE_FORMAT(op_time,'%Y-%m-%d %H:%i:%s') as op_time,DATE_FORMAT(issued_time,'%Y-%m-%d %H:%i:%s') as issued_time,result_rows,is_valid
+ job_id,query_sql,state,done_progress,is_failed,result_message,elapsed,rows_read,
+ bytes_read,result_rows,result_bytes,is_valid,start_time,end_time,last_update_time,generated_time
</sql>
<select id="getJobForExcute" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
select
<include refid="Base_Column_List" />
- from report_result
- where status = 1 order by issued_time limit 10
+ from saved_query_job
+ where state = 'RUNNING' order by generated_time limit 10
</select>
<select id="getJobTask" resultMap="BaseResultMap" parameterType="hashmap">
select
<include refid="Base_Column_List" />
- from report_result
- where status = 0 and is_valid = 1 and issued_time &lt; #{issuedtime} order by issued_time limit ${rows}
+ from saved_query_job
+ where state = 'PENDING' and is_valid = 1 order by generated_time limit ${rows}
</select>
<update id="updateProcesses" parameterType="com.mesa.reportservice.bean.JobEntity" >
- update report_result
+ update saved_query_job
<set >
- <if test="status != null" >
- status = #{status,jdbcType=INTEGER},
+ <if test="state != null" >
+ state = #{state,jdbcType=VARCHAR},
</if>
- <if test="excuteTime != null" >
- excute_time = #{excuteTime,jdbcType=INTEGER},
+ <if test="elapsed != null" >
+ elapsed = #{elapsed,jdbcType=INTEGER},
</if>
- <if test="excuteRow != null" >
- excute_row = #{excuteRow,jdbcType=BIGINT},
+ <if test="rowsRead != null" >
+ rows_read = #{rowsRead,jdbcType=BIGINT},
</if>
- <if test="excuteProcess != null" >
- excute_process = #{excuteProcess,jdbcType=INTEGER},
+ <if test="doneProgress != null" >
+ done_progress = #{doneProgress,jdbcType=FLOAT},
</if>
- <if test="excuteDetail != null" >
- excute_detail = #{excuteDetail,jdbcType=VARCHAR},
+ <if test="resultMessage != null" >
+ result_message = #{resultMessage,jdbcType=VARCHAR},
</if>
- <if test="opTime != null" >
- op_time = #{opTime,jdbcType=TIMESTAMP},
+ <if test="lastUpdateTime != null" >
+ last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="resultRows != null" >
result_rows = #{resultRows,jdbcType=INTEGER},
</if>
</set>
- where result_id = #{resultId,jdbcType=INTEGER}
+ where job_id = #{jobId,jdbcType=VARCHAR}
</update>
@@ -69,29 +71,29 @@
<update id="updateStatue" parameterType="com.mesa.reportservice.bean.JobEntity" >
- update report_result
+ update saved_query_job
<set >
- <if test="status != null" >
- status = #{status,jdbcType=INTEGER},
+ <if test="state != null" >
+ state = #{state,jdbcType=VARCHAR},
</if>
- <if test="excuteDetail != null" >
- excute_detail = #{excuteDetail,jdbcType=VARCHAR},
+ <if test="resultMessage != null" >
+ result_message = #{resultMessage,jdbcType=VARCHAR},
</if>
- <if test="opTime != null" >
- op_time = #{opTime,jdbcType=TIMESTAMP},
+ <if test="lastUpdateTime != null" >
+ last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="resultRows != null" >
result_rows = #{resultRows,jdbcType=INTEGER},
</if>
</set>
- where result_id = #{resultId,jdbcType=INTEGER}
+ where job_id = #{jobId,jdbcType=VARCHAR}
</update>
<select id="getJobCount" resultType="map" parameterType="hashmap">
- SELECT (SELECT COUNT(1) FROM report_result where status = 0 and is_valid = 1 and issued_time &lt; #{issuedtime}) as queueNum,
- (SELECT COUNT(1) FROM report_result where status = 1 and is_valid = 1 ) as excuteingNum,
- (SELECT COUNT(1) FROM report_result where status = 2 and op_time > #{optime} ) as todaySuccessNum,
- (SELECT COUNT(1) FROM report_result where status > 2 and op_time > #{optime} ) as todayErrorNum
+ SELECT (SELECT COUNT(1) FROM saved_query_job where state = 'PENDING' and is_valid = 1) as queueNum,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'RUNNING' and is_valid = 1) as excuteingNum,
+ (SELECT COUNT(1) FROM saved_query_job where state = 'DONE' and is_failed = 0 and last_update_time > #{lastUpdateTime}) as todaySuccessNum,
+ (SELECT COUNT(1) FROM saved_query_job where is_failed = 1 and last_update_time > #{lastUpdateTime} ) as todayErrorNum
</select>
</mapper> \ No newline at end of file