summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2019-07-23 13:47:27 +0800
committerwangkuan <[email protected]>2019-07-23 13:47:27 +0800
commit81c4ff7dfaf18045cd8ed29de97e3161bc3b0071 (patch)
tree4fc622114c03fd601b33c48be62b81cc53979b55
parent76771bebb02ef24175e52d74ea4ab5410a649ea1 (diff)
代码格式化
-rw-r--r--src/main/java/com/mesa/reportservice/bean/HbaseData.java175
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java2
-rw-r--r--src/main/java/com/mesa/reportservice/bean/TaskEntity.java27
-rw-r--r--src/main/java/com/mesa/reportservice/dao/DruidPool.java63
-rw-r--r--src/main/java/com/mesa/reportservice/dao/HttpClientPool.java151
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java5
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java295
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java79
-rw-r--r--src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java13
-rw-r--r--src/main/java/com/mesa/reportservice/service/HBaseService.java631
-rw-r--r--src/main/java/com/mesa/reportservice/service/HttpAPIService.java4
-rw-r--r--src/main/java/com/mesa/reportservice/util/ConfigUtil.java7
-rw-r--r--src/main/java/com/mesa/reportservice/util/HbaseOpt.java377
-rw-r--r--src/main/java/com/mesa/reportservice/util/HbaseUtil.java4
-rw-r--r--src/main/java/com/mesa/reportservice/util/ScheduleConfig.java21
-rw-r--r--src/main/resources/application.properties57
16 files changed, 20 insertions, 1891 deletions
diff --git a/src/main/java/com/mesa/reportservice/bean/HbaseData.java b/src/main/java/com/mesa/reportservice/bean/HbaseData.java
deleted file mode 100644
index 636c6a4..0000000
--- a/src/main/java/com/mesa/reportservice/bean/HbaseData.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package com.mesa.reportservice.bean;
-
-/**
- * @author LBQ
- * @version 1.0.0
- * @ClassName HbasePO
- * @Description 表结构
- * @Date 2018年6月2日 上午10:34:06
- */
-public class HbaseData {
-
- private String query_id;
- private String query_duration_ms;
- private long read_rows;
- private String memory_usage;
-
- private int result_id;
- private String result_name;
- private String task_id;
- private String start_time;
- private String end_time;
- private String query_sql;
- private int status;
- private int excute_time;
- private long excute_row;
- private int excute_process;
- private String excute_detail;
- private int is_send_notice;
- private String op_time;
-
-
- public int getResult_id() {
- return result_id;
- }
-
- public void setResult_id(int result_id) {
- this.result_id = result_id;
- }
-
- public String getResult_name() {
- return result_name;
- }
-
- public void setResult_name(String result_name) {
- this.result_name = result_name;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getTask_id() {
- return task_id;
- }
-
- public void setTask_id(String task_id) {
- this.task_id = task_id;
- }
-
- public String getStart_time() {
- return start_time;
- }
-
- public void setStart_time(String start_time) {
- this.start_time = start_time;
- }
-
- public String getEnd_time() {
- return end_time;
- }
-
- public void setEnd_time(String end_time) {
- this.end_time = end_time;
- }
-
- public String getQuery_sql() {
- return query_sql;
- }
-
- public void setQuery_sql(String query_sql) {
- this.query_sql = query_sql;
- }
-
-
- public int getExcute_time() {
- return excute_time;
- }
-
- public void setExcute_time(int excute_time) {
- this.excute_time = excute_time;
- }
-
- public long getExcute_row() {
- return excute_row;
- }
-
- public void setExcute_row(long excute_row) {
- this.excute_row = excute_row;
- }
-
- public int getExcute_process() {
- return excute_process;
- }
-
- public void setExcute_process(int excute_process) {
- this.excute_process = excute_process;
- }
-
- public String getExcute_detail() {
- return excute_detail;
- }
-
- public void setExcute_detail(String excute_detail) {
- this.excute_detail = excute_detail;
- }
-
- public int getIs_send_notice() {
- return is_send_notice;
- }
-
- public void setIs_send_notice(int is_send_notice) {
- this.is_send_notice = is_send_notice;
- }
-
- public String getOp_time() {
- return op_time;
- }
-
- public void setOp_time(String op_time) {
- this.op_time = op_time;
- }
-
- public String getQuery_id() {
- return query_id;
- }
-
- public void setQuery_id(String query_id) {
- this.query_id = query_id;
- }
-
- public String getQuery_duration_ms() {
- return query_duration_ms;
- }
-
- public void setQuery_duration_ms(String query_duration_ms) {
- this.query_duration_ms = query_duration_ms;
- }
-
- public long getRead_rows() {
- return read_rows;
- }
-
- public void setRead_rows(long read_rows) {
- this.read_rows = read_rows;
- }
-
- public String getMemory_usage() {
- return memory_usage;
- }
-
- public void setMemory_usage(String memory_usage) {
- this.memory_usage = memory_usage;
- }
-
- @Override
- public String toString() {
- return "HbaseData [query_id=" + query_id + ", query_duration_ms=" + query_duration_ms + ", read_rows="
- + read_rows + ", memory_usage=" + memory_usage + "]";
- }
-
-}
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index 48af9fb..a566d83 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -3,7 +3,7 @@ package com.mesa.reportservice.bean;
/**
* Created by wk1 on 2019/5/20.
*/
-public class JobEntity implements Cloneable{
+public class JobEntity implements Cloneable {
private int result_id;
private String result_name;
diff --git a/src/main/java/com/mesa/reportservice/bean/TaskEntity.java b/src/main/java/com/mesa/reportservice/bean/TaskEntity.java
deleted file mode 100644
index bc17ec6..0000000
--- a/src/main/java/com/mesa/reportservice/bean/TaskEntity.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.mesa.reportservice.bean;
-
-/**
- * Created by wk1 on 2019/5/17.
- */
-public class TaskEntity {
-
- private static Boolean state = false;
-
- private static String sql = "";
-
- public static Boolean getState() {
- return state;
- }
-
- public static void setState(Boolean state) {
- TaskEntity.state = state;
- }
-
- public static String getSql() {
- return sql;
- }
-
- public static void setSql(String sql) {
- TaskEntity.sql = sql;
- }
-}
diff --git a/src/main/java/com/mesa/reportservice/dao/DruidPool.java b/src/main/java/com/mesa/reportservice/dao/DruidPool.java
index 5135e11..cd3a4d6 100644
--- a/src/main/java/com/mesa/reportservice/dao/DruidPool.java
+++ b/src/main/java/com/mesa/reportservice/dao/DruidPool.java
@@ -5,13 +5,9 @@ package com.mesa.reportservice.dao;
*/
import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
-import com.google.common.collect.Maps;
import com.mesa.reportservice.util.Logs;
-import org.junit.Test;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
@@ -19,10 +15,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
-import java.io.InputStream;
-import java.sql.*;
-import java.util.Arrays;
-import java.util.Properties;
+import java.sql.SQLException;
@Component
@@ -154,59 +147,5 @@ public class DruidPool {
}
}
-/*
- @Test
- public void demo02() {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
-
- try {
-
- //使用配置文件来配置文件
- Properties pr = new Properties();
- // pr.load(new FileInputStream("config/druid.properties"));
-
- InputStream is = DruidPool.class.getClassLoader().getResourceAsStream("druid.properties");
- //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties");
-
- pr.load(is);
- DataSource dataSource = DruidDataSourceFactory.createDataSource(pr);
-
-
- //1,创建Druid连接池对象
- //DruidDataSource dataSource = new DruidDataSource();
- //dataSource.setDriverClassName("com.mysql.jdbc.Driver");
- //dataSource.setUrl("jdbc:mysql://localhost:3306/mytest-day08");
- //dataSource.setUsername("root");
- //dataSource.setPassword("123");
-
-
- //工具类中获得连接
- //conn = MyUtils.getConnertion01();
-
- //用Druid来连接
- conn = dataSource.getConnection();
-
-
- //2,执行数据库语句
- String sql = "SELECT * FROM accoumt";
-
- //3,用prepareStatement获取sql语句
- ps = conn.prepareStatement(sql);
-
- //4,执行sql语句,查询用executeQuery,增删改用executeUpdate
- rs = ps.executeQuery();
- while (rs.next()) {
- System.out.println(rs.getInt("id") + " " + rs.getString("name") + " " + rs.getDouble("money"));
- }
-
- } catch (Exception e) {
- // TODO: handle exception
- e.printStackTrace();
- } finally {
-
- }
- }*/
diff --git a/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java b/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java
index d82511f..d3e1e48 100644
--- a/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java
+++ b/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java
@@ -134,158 +134,7 @@ public class HttpClientPool {
* @Description Hbase连接池
* @Date 2018年5月31日 下午10:22:06
*/
- //@Component
- //@ConfigurationProperties(prefix = "hbase")
- public static class HbasePool {
- /* private static Logger logger = Logger.getLogger(HbasePool.class);
-
- private static Configuration conf = null;
-
- private static String zookeeper_quorum;
-
- private static String zookeeper_property_clientPort;
-
- private static String zookeeper_znode_parent;
-
- *//* private static String client_pause;*//*
-
- private static String client_retries_number;
-
- private static String rpc_timeout;
-
- *//*private static String client_operation_timeout;
-
- private static String client_scanner_timeout_period;*//*
-
-
- public HbasePool() {
- }
-
- *//**
- * @Field @connectNumMax : 连接池最大数
- *//*
- private static Integer connectNumMax;
-
- *//**
- * @Field @connectNumMin : 连接池最小数
- *//*
- private static Integer connectNumMin;
-
- *//**
- * @Field @counter : 计数器
- *//*
- private static Integer counter = 0;
-
- private static LinkedBlockingQueue<Connection> queue = new LinkedBlockingQueue<Connection>();
-
-
-
- public static synchronized Configuration getConfiguration() {
- if (conf == null) {
- conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", zookeeper_quorum);
- conf.set("hbase.zookeeper.property.clientPort", zookeeper_property_clientPort);
- conf.set("hbase.zookeeper.znode.parent", zookeeper_znode_parent);
- // conf.set("hase.client.pause", client_pause);
- conf.set("hbase.client.retries.number", client_retries_number);
- conf.set("hbase.rpc.timeout", rpc_timeout);
- // conf.set("hbase.client.operation.timeout", client_operation_timeout);
- // conf.set("hbase.client.scanner.timeout.period", client_scanner_timeout_period);
- }
- return conf;
- }
-
-
-
-
- *//**
- * @Description 归还连接
- * @param con
- *//*
- public static void returnConnect(Connection con) {
- if (con == null) {
- return;
- }
- if (counter > connectNumMin) {
- try {
- con.close();
- } catch (IOException e) {
- logger.error("归还连接失败!", e);
- } finally {
- con=null;
- counter--;
- }
- } else {
- queue.add(con);
- }
- }
-
- *//**
- * @Description 获取连接
- * @return
- * @throws Exception
- *//*
- public static Connection getConnection() throws Exception {
-
- System.out.println("huoqu hbase lianjie" );
- if (!queue.isEmpty()) {
- return queue.poll();
- } else {
- if (counter >= connectNumMax) {
- throw new Exception("Hbase连接池已满,请释放连接资源!目前连接数已达:[" + counter + "]");
- } else {
- Connection con;
- try {
- con = ConnectionFactory.createConnection(getConfiguration());
- counter++;
- } catch (IOException e) {
- throw new Exception("Hbase连接创建失败,请检查连接配置!", e);
- }
- return con;
- }
- }
- }
-
- public void setConnectNumMax(Integer connectNumMax) {
- HbasePool.connectNumMax = connectNumMax;
- }
-
- public void setConnectNumMin(Integer connectNumMin) {
- HbasePool.connectNumMin = connectNumMin;
- }
-
- *//**
- * @Description 存活连接数
- * @return
- *//*
- public static Integer conNum() {
- return counter;
- }
-
-
- public void setZookeeper_quorum(String zookeeper_quorum) {
- HbasePool.zookeeper_quorum = zookeeper_quorum;
- }
-
- public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) {
- HbasePool.zookeeper_property_clientPort = zookeeper_property_clientPort;
- }
-
- public void setZookeeper_znode_parent(String zookeeper_znode_parent) {
- HbasePool.zookeeper_znode_parent = zookeeper_znode_parent;
- }
-
-
- public void setClient_retries_number(String client_retries_number) {
- HbasePool.client_retries_number = client_retries_number;
- }
-
- public void setRpc_timeout(String rpc_timeout) {
- HbasePool.rpc_timeout = rpc_timeout;
- }
- */
- }
}
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
index a5e826c..b7d720c 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
@@ -38,12 +38,11 @@ public class ExcuteTask {
do {
try {
hr = hp.doPost(joburl);
- if(hr.getCode()!=200 && hr.getBody().toLowerCase().contains("timeout")){
+ if (hr.getCode() != 200 && hr.getBody().toLowerCase().contains("timeout")) {
k--;
String killurl = ClickhouseConfig.getKillUrl(je.getQuery_id());
hp.doPost(killurl);
- }
- else {
+ } else {
break;
}
} catch (SocketException e) {
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java
deleted file mode 100644
index 6379b50..0000000
--- a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java
+++ /dev/null
@@ -1,295 +0,0 @@
-package com.mesa.reportservice.scheduledtask;
-
-/**
- * Created by wk1 on 2019/5/31.
- */
-
-//@Component
-
-public class JobTask {
-/*
- protected ExecutorService pool = ConfigUtil.pool;
- private HttpAPIService hp;
- private DataBaseBusiness hb;
-
- public JobTask(HttpAPIService hp, DataBaseBusiness hb) {
-
- this.hb = hb;
- this.hp = hp;
- Logs.debug("new Jobtask==========");
- }
-
- public HttpAPIService getHp() {
- return hp;
- }
-
- public void setHp(HttpAPIService hp) {
- this.hp = hp;
- }
-
- public DataBaseBusiness getHb() {
- return hb;
- }
-
- public void setHb(DataBaseBusiness hb) {
- this.hb = hb;
- }
-
- public void dotask(JobEntity jl) {
- int id = jl.getResult_id();
- String sql = jl.getQuery_sql().trim();
- String hbaseid = DigestUtils.md5Hex(id + sql);
- sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')");
- String query_id = UUID.randomUUID().toString().replaceAll("-", "");
- String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id);
- hb.updateStatue(id, 1);
-
- int a = 3;
- do {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- pool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- int i;
-
- for (i = 0; i < ConfigUtil.loop_number; i++) {
- Thread.sleep(ConfigUtil.query_log_sleep);
- String queryProccess = ClickhouseConfig.getProcessUrl(query_id);
- HttpResult hr = hp.doPost(queryProccess);
- String rs = hr.getBody().trim();
- if (!rs.isEmpty() && !rs.equals("")) {
- // int status = hb.getJobStatueFromId(id);
- JobEntity en = getProcessMessage(rs, id);
- en.setExcute_detail("");
- if (en.getExcute_time() < (ConfigUtil.ck_timeout)) {
-
- if (en.getExcute_time() > ConfigUtil.query_log_sleep) {
- hb.updateProcesses(en);
- } else {
- break;
- }
- } else {
- i = ConfigUtil.loop_number;
- break;
- }
- } else {
- break;
- }
- }
- if (i >= ConfigUtil.loop_number) {
- String killurl = ClickhouseConfig.getKillUrl(query_id);
- hp.doPost(killurl);
- }
- } catch (Exception e) {
- Logs.error("获取进度失败" + e.toString());
- } finally {
- Logs.debug("thread finish--------------------------" + Thread.currentThread());
- countDownLatch.countDown();
- }
- }
- });
-
- try {
- HttpResult hr = new HttpResult();
- hr = hp.doPost(httpQuery);
- countDownLatch.await();
- JobEntity entity = getQueryLogMessage(id, query_id);
-
- if (hr.getCode() == 200) {
-
- entity.setQuery_id(query_id);
- HttpResult h1 = saveToHbase(hbaseid, hr.getBody(), entity);
- if (h1.getCode() == 200) {
- //Thread.sleep(5000);
- // Logs.debug(entity.getJob_id() + entity.getStatue());
-
- entity.setResult_id(id);
- entity.setExcute_detail("success");
- entity.setExcute_time(0);
- entity.setExcute_process(100);
- entity.setStatus(2);
- int number = 0;
- int z = 3;
- do {
- number = hb.updateProcesses(entity);
- z--;
- }
- while (number != 1 && z >= 0);
- break;
- } else {
- updateErrorMessage(id, 5, "写入hbase失败");
- Logs.error("写入hbase失败");
- break;
- }
- } else {
-
- if (hr.getBody().contains("Code: 394")) {
- throw new IOException("queryid=" + query_id + " hbaseid=" + hbaseid + " 查询超时");
- } else if (hr.getBody().contains("Code: 241")) {
- throw new Exception("查询超出内存限制" + hr.getBody());
- } else if (entity.getStatus() == 3) {
- updateErrorMessage(id, 3, "查询语句错误" + hr.getBody());
- break;
- } else {
- Logs.error("查询clickhouse失败" + hr.getBody());
- throw new Exception("查询clickhouse失败" + hr.getBody());
- }
- }
-
-
- } catch (InterruptedException w) {
- Logs.error(w.toString());
- break;
- } catch (IOException r) {
- a--;
- String kill = ClickhouseConfig.getKillUrl(query_id);
- try {
- hp.doPost(kill);
- } catch (Exception er) {
- Logs.error(er.toString());
- }
- if (a <= 0) {
- updateErrorMessage(id, 4, "查询超时");
- break;
- }
- } catch (NullPointerException e) {
- e.printStackTrace();
- break;
- } catch (Exception e) {
- Logs.error("任务执行中报错" + e.toString());
- updateErrorMessage(id, 4, e.toString());
- break;
- }
- }
- while (a > 0);
- synchronized (this) {
- ConfigUtil.job_thread++;
- Logs.debug("job_thread++++++++++++++" + ConfigUtil.job_thread);
-
- }
- }
-
-
- public void updateErrorMessage(int id, int status, String error) {
-
-
- JobEntity js = new JobEntity();
- js.setResult_id(id);
- js.setStatus(status);
- js.setExcute_detail("任务执行中报错" + error);
- hb.updateProcesses(js);
-
- }
-
-
- public JobEntity getProcessMessage(String body, int id) {
- JobEntity job = new JobEntity();
- Map map = JSON.parseObject(body);
- float elapsed = Float.parseFloat(map.get("elapsed").toString());
- long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString());
- long read_rows = Long.parseLong(map.get("read_rows").toString());
- int ftime = (int) (elapsed * total_rows_approx / read_rows);
- ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10;
- Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime);
- // NumberFormat numberFormat = NumberFormat.getInstance();
- //numberFormat.setMaximumFractionDigits(0);
-
- double persent = (read_rows * 1.00 / total_rows_approx);
- int result = (int) (persent * 100);
- job.setExcute_row(total_rows_approx);
- job.setExcute_process(result);
- job.setStatus(1);
- job.setExcute_time(ftime);
- job.setResult_id(id);
- System.out.println("%+++++++++++++" + result + "%");
- System.out.println("needtime--------" + ftime);
- return job;
-
- }
-
- public HttpResult saveToHbase(String hbaseid, String body, JobEntity entity) throws Exception {
-
- int k = 3;
- HttpResult h1 = new HttpResult();
-
- do {
- try {
- String hbasejson = HbaseUtil.getHbaseJson(hbaseid, body, entity);
- String hbaseurl = HbaseUtil.getHbasePostUrl();
- k--;
- h1 = hp.doPost(hbaseurl, hbasejson);
- break;
- } catch (Exception e) {
- Logs.error("写入hbase报错重试次数" + (3 - k));
- if (k < 0) {
- throw new Exception("写入hbase报错", e);
- }
- }
- }
- while (k >= 0);
-
- // String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + hbaseid + "/response:result");
- // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id);
- // System.out.print(h2);
- return h1;
- }
-
-
- public JobEntity getQueryLogMessage(int id, String query_id) {
-
-
- JobEntity job = new JobEntity();
- String finishurl = ClickhouseConfig.getFinishUrl(query_id);
- HttpResult num = new HttpResult();
- int j = 3;
- do {
- try {
- Thread.sleep(3000);
- num = hp.doPost(finishurl);
- if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) {
-
- job.setResult_id(id);
- Map mapresult = JSON.parseObject(num.getBody());
- int type = Integer.parseInt(mapresult.get("type").toString());
- String query_duration_ms = mapresult.get("query_duration_ms").toString();
- long read_rows = Long.parseLong(mapresult.get("read_rows").toString());
- String memory_usage = mapresult.get("memory_usage").toString();
- Logs.debug("type" + type);
- switch (type) {
- case 2:
- job.setStatus(2);
- // job.setExcute_detail("success");
- break;
- case 3:
- job.setStatus(3);
- //job.setExcute_detail("语法错误");
- break;
- case 4:
- job.setStatus(4);
- //job.setExcute_detail("性能问题");
- break;
- default:
- break;
- }
- job.setQuery_duration_ms(query_duration_ms);
- job.setMemory_usage(memory_usage);
- job.setExcute_time(0);
- job.setExcute_process(100);
- job.setExcute_row(read_rows);
- } else {
- job.setStatus(5);
- job.setExcute_detail("没有查到query信息");
-
- }
- } catch (Exception e) {
- job.setStatus(5);
- Logs.error("获取querylogmessage失败");
- }
- j--;
- }
- while (job.getStatus() == 5 && j >= 0);
- return job;
- }*/
-
-}
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
index a0445a7..0472733 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
@@ -34,48 +34,10 @@ public class ScheduledTask {
@Autowired
private DataBaseBusiness hb;
- /* @Scheduled(cron = "${scan.mysql.scheduled.plan}")
- public void run() {
-
- System.out.print(Thread.currentThread());
- Logs.debug("jobthread=000000000000000" + ConfigUtil.job_thread);
- try {
- if (ConfigUtil.job_thread > 0) {
- List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread);
- Logs.debug("joblist size=" + joblist.size());
-
- for (JobEntity jl : joblist) {
- // Thread.sleep(2000);
- ConfigUtil.job_thread--;
- Logs.debug("jobthread--=" + ConfigUtil.job_thread + "---jobid=" + jl.getResult_id());
- pool.execute(new Runnable() {
- @Override
- public void run() {
- Logs.debug("threadname" + Thread.currentThread());
- JobTask jt = new JobTask(hp, hb);
- try {
- jt.dotask(jl);
- } catch (Exception e) {
- Logs.error(e.toString());
- }
- }
- });
-
- }
- }
- } catch (Exception e) {
- Logs.error(e.toString());
- }
- }
-*/
-
-
@Scheduled(cron = "${scan.mysql.scheduled.plan}")
public void scanmysql() {
try {
-
-
List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread);
Logs.debug("joblist size=" + joblist.size());
@@ -91,9 +53,7 @@ public class ScheduledTask {
jl.setExcute_detail("");
JobEntity jt = (JobEntity) jl.clone();
ConfigUtil.mapresult.put(queryid, jt);
-
hb.updateStatue(jl.getResult_id(), 1);
-
pool.execute(new Runnable() {
@Override
public void run() {
@@ -104,8 +64,6 @@ public class ScheduledTask {
}
});
ConfigUtil.job_thread--;
-
-
}
} catch (Exception e) {
@@ -118,8 +76,6 @@ public class ScheduledTask {
public void scanmysqlresult() {
try {
-
-
List<JobEntity> joblist = hb.getJobForExcute();
Logs.debug("joblist size=" + joblist.size());
@@ -130,23 +86,15 @@ public class ScheduledTask {
String killurl = ClickhouseConfig.getKillUrl(queryid);
hp.doPost(killurl);
updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行");
-
break;
}
-
-
JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone();
- /* if (je == null) {
- break;
- }*/
- if (je.getStatus() == 2 ) {
+ if (je.getStatus() == 2) {
try {
je = getQueryLogMessage(je);
HttpResult h1 = saveToHbase(je);
if (h1.getCode() == 200) {
- //Thread.sleep(5000);
- // Logs.debug(entity.getJob_id() + entity.getStatue());
je.setExcute_detail("success");
je.setExcute_time(0);
@@ -202,7 +150,7 @@ public class ScheduledTask {
} else {
JobEntity jo = getProcessMessage(je);
- if(jo.getExcute_row()!=0 || jo.getExcute_time()!=0) {
+ if (jo.getExcute_row() != 0 || jo.getExcute_time() != 0) {
hb.updateProcesses(jo);
}
}
@@ -212,8 +160,6 @@ public class ScheduledTask {
} catch (Exception e) {
Logs.error(e.toString());
- // ConfigUtil.job_thread++;
-
}
}
@@ -245,13 +191,10 @@ public class ScheduledTask {
long read_rows = Long.parseLong(map.get("read_rows").toString());
int ftime = (int) (elapsed * total_rows_approx / read_rows);
ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10;
- if(ftime<0){
- ftime=0;
+ if (ftime < 0) {
+ ftime = 0;
}
Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime);
- // NumberFormat numberFormat = NumberFormat.getInstance();
- //numberFormat.setMaximumFractionDigits(0);
-
double persent = (read_rows * 1.00 / total_rows_approx);
int result = (int) (persent * 100);
job.setExcute_row(total_rows_approx);
@@ -265,9 +208,7 @@ public class ScheduledTask {
} catch (Exception e) {
e.printStackTrace();
}
-
return job;
-
}
public HttpResult saveToHbase(JobEntity entity) {
@@ -289,20 +230,12 @@ public class ScheduledTask {
}
}
while (k >= 0);
-
- //String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + queryid + "/response:result");
- // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id);
- // System.out.print(h2);
return h1;
}
public JobEntity getQueryLogMessage(JobEntity job) {
-
- // JobEntity job = (JobEntity) jb.clone();
- /* job.setResult_id(resultid);
- job.setQuery_id(queryid);*/
String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id());
HttpResult num = new HttpResult();
int j = 0;
@@ -325,13 +258,13 @@ public class ScheduledTask {
break;
case 3:
job.setStatus(type);
- job.setExcute_detail("语法错误"+mapresult.get("exception").toString());
+ job.setExcute_detail("语法错误" + mapresult.get("exception").toString());
job.setQuery_sql(mapresult.get("query").toString());
job.setException(mapresult.get("exception").toString());
break;
case 4:
job.setStatus(type);
- job.setExcute_detail("性能问题"+mapresult.get("exception").toString());
+ job.setExcute_detail("性能问题" + mapresult.get("exception").toString());
job.setQuery_sql(mapresult.get("query").toString());
job.setException(mapresult.get("exception").toString());
break;
diff --git a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
index 3f8d729..befe0f2 100644
--- a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
+++ b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
@@ -42,9 +42,8 @@ public class DataBaseBusiness {
public List<JobEntity> getJobForExcute() {
-
String sql = " select * from report_result where status =1 order by end_time ";
- int resultid=0;
+ int resultid = 0;
List<JobEntity> sqllist = new ArrayList<JobEntity>();
try {
//connection = manager.getConnection("idb");
@@ -65,7 +64,7 @@ public class DataBaseBusiness {
} catch (Exception e) {
Logs.error(sql + "获取进度列表发生异常!");
- updateStatue(resultid,4);
+ updateStatue(resultid, 4);
e.printStackTrace();
throw new RuntimeException("获取进度列表发生异常!", e);
} finally {
@@ -86,11 +85,10 @@ public class DataBaseBusiness {
}
-
public List<JobEntity> getJobLimitRows(int row) {
String current_time = DateUtil.getDate();
- int resultid=0;
+ int resultid = 0;
String sql = " select * from report_result where status =0 and end_time<'" + current_time + "' order by end_time limit " + row;
List<JobEntity> sqllist = new ArrayList<JobEntity>();
@@ -114,7 +112,7 @@ public class DataBaseBusiness {
} catch (Exception e) {
Logs.error(sql + "获取任务列表发生异常!");
- updateStatue(resultid,4);
+ updateStatue(resultid, 4);
e.printStackTrace();
throw new RuntimeException("获取任务列表发生异常!", e);
@@ -136,8 +134,6 @@ public class DataBaseBusiness {
}
-
-
public int getJobStatueFromId(int id) {
int statue = 0;
@@ -245,5 +241,4 @@ public class DataBaseBusiness {
}
-
}
diff --git a/src/main/java/com/mesa/reportservice/service/HBaseService.java b/src/main/java/com/mesa/reportservice/service/HBaseService.java
deleted file mode 100644
index a5ee1dd..0000000
--- a/src/main/java/com/mesa/reportservice/service/HBaseService.java
+++ /dev/null
@@ -1,631 +0,0 @@
-package com.mesa.reportservice.service;
-
-/**
- * HBase数据库基本操作
- */
-
-//@Service
-public class HBaseService {
-/*
-
-
-
- private Logger log = LoggerFactory.getLogger(HBaseService.class);
-
- private Admin admin = null;
- private Connection connection = null;
-
- public HBaseService() {
- try {
- connection = ConnectionFactory.createConnection(HttpClientPool.HbasePool.getConfiguration());
- admin = connection.getAdmin();
- } catch (IOException e) {
- log.error("获取HBase连接失败");
- }
- }
-
-
-
-
- *//**
- * @Description 删除表
- * @param tableName
- * @throws Exception
- *//*
- public static void dropTable(String tableName) throws Exception {
-
- Connection con = null;
- Admin admin = null;
- try {
- // ============获取连接管理=============
- con = HttpClientPool.HbasePool.getConnection();
- admin = con.getAdmin();
- // ============查询表是否存在=============
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- throw new Exception("需要删除的表[" + tableName + "]-不存在!");
- }
-
- // ============删除表=============
- if (!admin.isTableDisabled(TableName.valueOf(tableName))) {
- admin.disableTable(TableName.valueOf(tableName));
- }
- admin.deleteTable(TableName.valueOf(tableName));
- } finally {
- if (admin != null) {
- try {
- admin.close();
- } catch (IOException e) {
- //log.error("Hbase - Admin资源释放失败!", e);
- }
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
-
- // logger.info("=====删除表,表名:" + tableName);
- }
-
- *//**
- * @Description 插入数据 - 单条模式
- * @param tableName String
- * @param rowKey List<HbaseData> hbaseDataList
- * @throws Exception
- *//*
- public void insertData(String tableName, String rowKey, List<HbaseData> hbaseDataList) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行数据解析,封装PUT============
- Put put = new Put(Bytes.toBytes(rowKey));
- for (HbaseData hbaseData : hbaseDataList) {
- String columnFamily = hbaseData.getColumnFamily();
- for (Map.Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) {
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()),
- Bytes.toBytes(entry.getValue()));
- }
- }
- table.put(put);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- //logger.info("=====插入数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],List<HbaseData>:" + hbaseDataList);
- }
-
-
-
-
- *//**
- * @Description 插入数据 - 批量插入模式
- * @param tableName String
- * @param hbaseDTOList String rowKey List<HbaseData> hbaseDataList
- * @throws Exception
- *//*
- public void insertBatchData(String tableName, List<HbaseDataDTO> hbaseDTOList) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行数据解析,封装PUT============
- List<Put> putList = new ArrayList<>();
- for (int index = 0; index < hbaseDTOList.size(); index++) {
- HbaseDataDTO hbaseDTO = hbaseDTOList.get(index);
- Put put = new Put(Bytes.toBytes(hbaseDTO.getRowKey()));
- for (HbaseData hbaseData : hbaseDTO.getHbaseDataList()) {
- String columnFamily = hbaseData.getColumnFamily();
- for (Map.Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) {
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()),
- Bytes.toBytes(entry.getValue()));
- }
- }
- putList.add(put);
- // ===============设置成1000次,提交一次==============
- if (index % 1000 == 0) {
- table.put(putList);
- putList.clear();
- }
- }
- table.put(putList);
- // logger.info("=====插入数据,表名:[" + tableName + "],批量数据插入成功!");
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- }
-
- public static void deleteDataByRowKey(String tableName, String rowKey) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按rowkey删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
-
- // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],删除成功!");
- }
-
- public static void deleteDataByFamily(String tableName, String rowKey, String family) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按family删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addFamily(Bytes.toBytes(family));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
-
- // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],删除成功!");
- }
-
- public static void deleteDataByColumn(String tableName, String rowKey, String family, String column)
- throws Exception {
-
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按column删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addColumns(Bytes.toBytes(family), Bytes.toBytes(column));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],column:[" + column
- // + "],删除成功!");
- }
-
- *//**
- * @Description 查询数据(最新一条)
- * @param tableName String 表名
- * @param rowKey String 行键
- * @return
- * @throws Exception
- *//*
- public Map<String, Object> queryData(String tableName, String rowKey) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行GET操作==================
- Get get = new Get(Bytes.toBytes(rowKey));
- Result result = table.get(get);
-
- Map<String, Object> map = analysisQueryResult(result);
-
- // ============封装返回结果===============
- Map<String, Object> resultMap = new HashMap<>();
- resultMap.put("result", map.values());
- resultMap.put("totalCount", map.values().size());
- return resultMap;
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- }
-
- *//**
- * @Description 根据条件查询数据
- * @param tableName String 表名
- * @param rowKey String 行键
- * @param version int 版本数量(查多少条数据)
- * @param paramMap key-String-family(列族) value-String[]-columns(列名)
- * @return
- * @throws Exception
- *//*
- public static Map<String, Object> queryDataByParams(String tableName, String rowKey, int version,
- Map<String, String[]> paramMap) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行GET操作==================
- Get get = new Get(Bytes.toBytes(rowKey));
- // get.getMaxResultsPerColumnFamily(version);
-
- // ============组装条件==================
- assembleCondition(paramMap, get);
- Result result = table.get(get);
- Map<String, Object> map = analysisQueryResult(result);
-
- // ============封装返回结果===============
- Map<String, Object> resultMap = new HashMap<>();
- resultMap.put("result", map.values());
- resultMap.put("totalCount", map.values().size());
- return resultMap;
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- }
-
- *//**
- * @Description 根据条件查询数据(根据时间进行限定)
- * @param tableName String 表名
- * @param rowKey String 行键
- * @param version int 版本数量(查多少条数据)
- * @param startTime long 开始时间戳 如果仅传开始时间戳,结束时间戳传0,则查开始时间戳对应的版本
- * @param endTime long 结束时间戳
- * @param paramMap key-String-family(列族) value-String[]-columns(列名)
- * @return
- * @throws Exception
- *//*
- public static Map<String, Object> queryDataByParamsAndTime(String tableName, String rowKey, int version,
- long startTime, long endTime, Map<String, String[]> paramMap) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HttpClientPool.HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行GET操作==================
- Get get = new Get(Bytes.toBytes(rowKey));
- get.getMaxResultsPerColumnFamily();
- if (endTime != 0) {
- get.setTimeRange(startTime, endTime);
- } else if (startTime != 0) {
- get.setTimeStamp(startTime);
- }
- // ============组装条件==================
- assembleCondition(paramMap, get);
- Result result = table.get(get);
- Map<String, Object> map = analysisQueryResult(result);
-
- // ============封装返回结果===============
- Map<String, Object> resultMap = new HashMap<>();
- resultMap.put("result", map.values());
- resultMap.put("totalCount", map.values().size());
- return resultMap;
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HttpClientPool.HbasePool.returnConnect(con);
- }
- }
- }
-
- *//**
- * @Description 装载条件
- * @param paramMap
- * @param get
- *//*
- private static void assembleCondition(Map<String, String[]> paramMap, Get get) {
- if (paramMap != null && !paramMap.isEmpty()) {
- for (Map.Entry<String, String[]> entry : paramMap.entrySet()) {
- if (entry.getValue() == null) {
- get.addFamily(Bytes.toBytes(entry.getKey()));
- } else {
- for (String column : entry.getValue()) {
- get.addColumn(Bytes.toBytes(entry.getKey()), Bytes.toBytes(column));
- }
- }
- }
- }
- }
-
- *//**
- * @Description 解析Result
- * @param result
- * @return
- *//*
- @SuppressWarnings("unchecked")
- private static Map<String, Object> analysisQueryResult(Result result) {
- Map<String, Object> map = new HashMap<>();
- for (Cell cell : result.listCells()) {
- Long timeStamp = cell.getTimestamp();
- String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
- String keyRs =
- Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-
- // =========参数封装=========
- Map<String, Object> map2 = (Map<String, Object>) map.get(timeStamp.toString());
- if (map2 == null) {
- map2 = new HashMap<>();
- }
- Map<String, Object> map3 = (Map<String, Object>) map2.get(familyRs);
- if (map3 == null) {
- map3 = new HashMap<>();
- }
- map3.put(keyRs, valueRs);
- map2.put(familyRs, map3);
- map2.put("rowKey", rowKey);
- map2.put("timeStamp", timeStamp.toString());
- map.put(timeStamp.toString(), map2);
- }
- return map;
- }
-
-
-
-
- @SuppressWarnings("unchecked")
- private static Map<String, Object> analysisScanResult(Result result) {
- Map<String, Object> map = new HashMap<>();
- for (Cell cell : result.listCells()) {
- Long timeStamp = cell.getTimestamp();
- String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
- String keyRs =
- Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-
- // =========参数封装=========
- Map<String, Object> map2 = (Map<String, Object>) map.get(familyRs);
- if (map2 == null) {
- map2 = new HashMap<>();
- }
- map2.put(keyRs, valueRs);
- map.put(familyRs, map2);
- map.put("rowKey", rowKey);
- map.put("timeStamp", timeStamp.toString());
- }
- return map;
- }
-
-
-
-
-
-
- *//**
- * 查询库中所有表的表名
- * shell command: list
- *//*
- public List<String> getAllTableNames() {
- List<String> result = new ArrayList<>();
- try {
- TableName[] tableNames = admin.listTableNames();
- for (TableName tableName : tableNames) {
- result.add(tableName.getNameAsString());
- }
- } catch (IOException e) {
- log.error("获取所有表的表名失败", e);
- } finally {
- close(admin, null, null);
- }
- return result;
- }
-
- *//**
- * 遍历查询指定表中的所有数据
- * shell command: scan 'user'
- *//*
- public Map<String, Map<String, String>> getResultScanner(String tableName) {
- Scan scan = new Scan();
- return this.queryData(tableName, scan);
- }
-
- *//**
- * 通过表名以及过滤条件查询数据
- *//*
- private Map<String, Map<String, String>> queryData(String tableName,
- Scan scan) {
- // <rowKey,对应的行数据>
- Map<String, Map<String, String>> result = new HashMap<>();
-
- ResultScanner rs = null;
- // 获取表
- Table table = null;
- try {
- table = getTable(tableName);
- rs = table.getScanner(scan);
- for (Result r : rs) {
- // 每一行数据
- Map<String, String> columnMap = new HashMap<>();
- String rowKey = null;
- // 行键,列族和列限定符一起确定一个单元(Cell)
- for (Cell cell : r.listCells()) {
- if (rowKey == null) {
- rowKey = Bytes.toString(cell.getRowArray(),
- cell.getRowOffset(), cell.getRowLength());
- }
- columnMap.put(
- // 列限定符
- Bytes.toString(cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength()),
- // 列族
- Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(),
- cell.getValueLength()));
- }
-
- if (rowKey != null) {
- result.put(rowKey, columnMap);
- }
- }
- } catch (IOException e) {
- log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}",
- tableName), e);
- } finally {
- close(null, rs, table);
- }
-
- return result;
- }
-
- *//**
- * 根据tableName和rowKey精确查询行数据
- *//*
- public Map<String, String> getRowData(String tableName, String rowKey) {
- // 返回的键值对
- Map<String, String> result = new HashMap<>();
-
- Get get = new Get(Bytes.toBytes(rowKey));
- // 获取表
- Table table = null;
- try {
- table = getTable(tableName);
- Result hTableResult = table.get(get);
- if (hTableResult != null && !hTableResult.isEmpty()) {
- for (Cell cell : hTableResult.listCells()) {
- result.put(
- Bytes.toString(cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength()),
- Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(),
- cell.getValueLength()));
- }
- }
- } catch (IOException e) {
- log.error(MessageFormat.format(
- "查询一行的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e);
- } finally {
- close(null, null, table);
- }
-
- return result;
- }
-
- *//**
- * 为表添加 or 更新数据
- *//*
- public void putData(String tableName, String rowKey, String familyName,
- String[] columns, String[] values) {
- // 获取表
- Table table = null;
- try {
- table = getTable(tableName);
-
- putData(table, rowKey, tableName, familyName, columns, values);
- } catch (Exception e) {
- log.error(MessageFormat.format(
- "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
- tableName, rowKey, familyName), e);
- } finally {
- close(null, null, table);
- }
- }
-
- private void putData(Table table, String rowKey, String tableName,
- String familyName, String[] columns, String[] values) {
- try {
- // 设置rowkey
- Put put = new Put(Bytes.toBytes(rowKey));
-
- if (columns != null && values != null
- && columns.length == values.length) {
- for (int i = 0; i < columns.length; i++) {
- if (columns[i] != null && values[i] != null) {
- put.addColumn(Bytes.toBytes(familyName),
- Bytes.toBytes(columns[i]),
- Bytes.toBytes(values[i]));
- } else {
- throw new NullPointerException(MessageFormat.format(
- "列名和列数据都不能为空,column:{0},value:{1}", columns[i],
- values[i]));
- }
- }
- }
-
- table.put(put);
- log.debug("putData add or update data Success,rowKey:" + rowKey);
- table.close();
- } catch (Exception e) {
- log.error(MessageFormat.format(
- "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
- tableName, rowKey, familyName), e);
- }
- }
-
- *//**
- * 根据表名 获取table
- * Used to communicate with a single HBase table.
- * Table can be used to get, put, delete or scan data from a table.
- *//*
- private Table getTable(String tableName) throws IOException {
- return connection.getTable(TableName.valueOf(tableName));
- }
-
- *//**
- * 关闭流
- *//*
- private void close(Admin admin, ResultScanner rs, Table table) {
- if (admin != null) {
- try {
- admin.close();
- } catch (IOException e) {
- log.error("关闭Admin失败", e);
- }
- }
-
- if (rs != null) {
- rs.close();
- }
-
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- log.error("关闭Table失败", e);
- }
- }
- }*/
-}
diff --git a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
index 3c46b69..f9dd236 100644
--- a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
+++ b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java
@@ -38,7 +38,7 @@ public class HttpAPIService {
public String doGet(String url) throws Exception {
// 声明 http get 请求
HttpGet httpGet = new HttpGet(url);
- httpGet.setHeader("Accept","application/json");
+ httpGet.setHeader("Accept", "application/json");
// 装载配置信息
httpGet.setConfig(config);
@@ -91,7 +91,7 @@ public class HttpAPIService {
HttpPost httpPost = new HttpPost(url);
// 加入配置信息
httpPost.setConfig(config);
- // httpPost.setHeader("Accept","application/json");
+ // httpPost.setHeader("Accept","application/json");
// 判断map是否为空,不为空则进行遍历,封装from表单对象
if (!json.equals("") && !json.isEmpty()) {
diff --git a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
index 4ab3822..7fe2daf 100644
--- a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java
@@ -6,8 +6,6 @@ import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* Created by wk1 on 2019/5/28.
@@ -18,8 +16,8 @@ import java.util.concurrent.Executors;
public class ConfigUtil {
- // public static ExecutorService pool = Executors.newFixedThreadPool(30);
- // public static int query_log_sleep;
+ // public static ExecutorService pool = Executors.newFixedThreadPool(30);
+ // public static int query_log_sleep;
//public static int loop_number;
public static int job_thread;
public static int ck_timeout;
@@ -31,7 +29,6 @@ public class ConfigUtil {
}
-
public void setJob_thread(int job_thread) {
ConfigUtil.job_thread = job_thread;
}
diff --git a/src/main/java/com/mesa/reportservice/util/HbaseOpt.java b/src/main/java/com/mesa/reportservice/util/HbaseOpt.java
deleted file mode 100644
index 21ecea9..0000000
--- a/src/main/java/com/mesa/reportservice/util/HbaseOpt.java
+++ /dev/null
@@ -1,377 +0,0 @@
-package com.mesa.reportservice.util;
-
-/**
- * @author LBQ
- * @version 1.0.0
- * @ClassName HbaseOpt
- * @Description Hbase2.0.0 操作类
- * @Date 2018年6月4日 下午9:42:32
- */
-public class HbaseOpt {
-
- /* private static Logger logger = Logger.getLogger(HbaseOpt.class);
-
- *//**
- * @Description 创建表
- * @param tableName 表名
- * @param version 版本数量
- * @param columnFamilys 列族
- * @throws Exception
- *//*
-
- *//**
- * @Description 删除表
- * @param tableName
- * @throws Exception
- *//*
- public static void dropTable(String tableName) throws Exception {
-
- Connection con = null;
- Admin admin = null;
- try {
- // ============获取连接管理=============
- con = HbasePool.getConnection();
- admin = con.getAdmin();
- // ============查询表是否存在=============
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- throw new Exception("需要删除的表[" + tableName + "]-不存在!");
- }
-
- // ============删除表=============
- if (!admin.isTableDisabled(TableName.valueOf(tableName))) {
- admin.disableTable(TableName.valueOf(tableName));
- }
- admin.deleteTable(TableName.valueOf(tableName));
- } finally {
- if (admin != null) {
- try {
- admin.close();
- } catch (IOException e) {
- logger.error("Hbase - Admin资源释放失败!", e);
- }
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
-
- logger.info("=====删除表,表名:" + tableName);
- }
-
- *//**
- * @Description 插入数据 - 单条模式
- * @param tableName String
- * @throws Exception
- *//*
- public static void insertData(String tableName, String rowKey, List<HbaseData> hbaseDataList) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行数据解析,封装PUT============
- Put put = new Put(Bytes.toBytes(rowKey));
- for (HbaseData hbaseData : hbaseDataList) {
- String columnFamily = hbaseData.getColumnFamily();
- for (Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) {
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()),
- Bytes.toBytes(entry.getValue()));
- }
- }
- table.put(put);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
- logger.info("=====插入数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],List<HbaseData>:" + hbaseDataList);
- }
-
- *//**
- * @Description 插入数据 - 批量插入模式
- * @param tableName String
- * @param hbaseDTOList String rowKey List<HbaseData> hbaseDataList
- * @throws Exception
- *//*
- public static void insertBatchData(String tableName, List<HbaseDataDTO> hbaseDTOList) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行数据解析,封装PUT============
- List<Put> putList = new ArrayList<>();
- for (int index = 0; index < hbaseDTOList.size(); index++) {
- HbaseDataDTO hbaseDTO = hbaseDTOList.get(index);
- Put put = new Put(Bytes.toBytes(hbaseDTO.getRowKey()));
- for (HbaseData hbaseData : hbaseDTO.getHbaseDataList()) {
- String columnFamily = hbaseData.getColumnFamily();
- for (Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) {
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()),
- Bytes.toBytes(entry.getValue()));
- }
- }
- putList.add(put);
- // ===============设置成1000次,提交一次==============
- if (index % 1000 == 0) {
- table.put(putList);
- putList.clear();
- }
- }
- table.put(putList);
- logger.info("=====插入数据,表名:[" + tableName + "],批量数据插入成功!");
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
- }
-
- public static void deleteDataByRowKey(String tableName, String rowKey) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按rowkey删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
-
- logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],删除成功!");
- }
-
- public static void deleteDataByFamily(String tableName, String rowKey, String family) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按family删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addFamily(Bytes.toBytes(family));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
-
- logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],删除成功!");
- }
-
- public static void deleteDataByColumn(String tableName, String rowKey, String family, String column)
- throws Exception {
-
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // 按column删除
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addColumns(Bytes.toBytes(family), Bytes.toBytes(column));
- table.delete(delete);
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
- logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],column:[" + column
- + "],删除成功!");
- }
-
- *//**
- * @Description 查询数据(最新一条)
- * @param tableName String 表名
- * @param rowKey String 行键
- * @return
- * @throws Exception
- *//*
- public static Map<String, Object> queryData(String tableName, String rowKey) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行GET操作==================
- Get get = new Get(Bytes.toBytes(rowKey));
- Result result = table.get(get);
-
- Map<String, Object> map = analysisQueryResult(result);
-
- // ============封装返回结果===============
- Map<String, Object> resultMap = new HashMap<>();
- resultMap.put("result", map.values());
- resultMap.put("totalCount", map.values().size());
- return resultMap;
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
- }
-
-
- *//**
- * @Description 根据条件查询数据(根据时间进行限定)
- * @param tableName String 表名
- * @param rowKey String 行键
- * @param version int 版本数量(查多少条数据)
- * @param startTime long 开始时间戳 如果仅传开始时间戳,结束时间戳传0,则查开始时间戳对应的版本
- * @param endTime long 结束时间戳
- * @param paramMap key-String-family(列族) value-String[]-columns(列名)
- * @return
- * @throws Exception
- *//*
- public static Map<String, Object> queryDataByParamsAndTime(String tableName, String rowKey, int version,
- long startTime, long endTime, Map<String, String[]> paramMap) throws Exception {
- Connection con = null;
- Table table = null;
- try {
- // ============获取连接管理==================
- con = HbasePool.getConnection();
- table = con.getTable(TableName.valueOf(tableName));
-
- // ============进行GET操作==================
- Get get = new Get(Bytes.toBytes(rowKey));
- // get.readVersions(version);
- if (endTime != 0) {
- get.setTimeRange(startTime, endTime);
- } else if (startTime != 0) {
- get.setTimeStamp(startTime);
- }
- // ============组装条件==================
- assembleCondition(paramMap, get);
- Result result = table.get(get);
- Map<String, Object> map = analysisQueryResult(result);
-
- // ============封装返回结果===============
- Map<String, Object> resultMap = new HashMap<>();
- resultMap.put("result", map.values());
- resultMap.put("totalCount", map.values().size());
- return resultMap;
- } finally {
- if (table != null) {
- table.close();
- }
- if (con != null) {
- HbasePool.returnConnect(con);
- }
- }
- }
-
- *//**
- * @Description 装载条件
- * @param paramMap
- * @param get
- *//*
- private static void assembleCondition(Map<String, String[]> paramMap, Get get) {
- if (paramMap != null && !paramMap.isEmpty()) {
- for (Entry<String, String[]> entry : paramMap.entrySet()) {
- if (entry.getValue() == null) {
- get.addFamily(Bytes.toBytes(entry.getKey()));
- } else {
- for (String column : entry.getValue()) {
- get.addColumn(Bytes.toBytes(entry.getKey()), Bytes.toBytes(column));
- }
- }
- }
- }
- }
-
- *//**
- * @Description 解析Result
- * @param result
- * @return
- *//*
- @SuppressWarnings("unchecked")
- private static Map<String, Object> analysisQueryResult(Result result) {
- Map<String, Object> map = new HashMap<>();
- for (Cell cell : result.listCells()) {
- Long timeStamp = cell.getTimestamp();
- String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
- String keyRs =
- Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-
- // =========参数封装=========
- Map<String, Object> map2 = (Map<String, Object>) map.get(timeStamp.toString());
- if (map2 == null) {
- map2 = new HashMap<>();
- }
- Map<String, Object> map3 = (Map<String, Object>) map2.get(familyRs);
- if (map3 == null) {
- map3 = new HashMap<>();
- }
- map3.put(keyRs, valueRs);
- map2.put(familyRs, map3);
- map2.put("rowKey", rowKey);
- map2.put("timeStamp", timeStamp.toString());
- map.put(timeStamp.toString(), map2);
- }
- return map;
- }
-
-
-
-
-
- private static Map<String, Object> analysisScanResult(Result result) {
- Map<String, Object> map = new HashMap<>();
- for (Cell cell : result.listCells()) {
- Long timeStamp = cell.getTimestamp();
- String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
- String keyRs =
- Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-
- // =========参数封装=========
- Map<String, Object> map2 = (Map<String, Object>) map.get(familyRs);
- if (map2 == null) {
- map2 = new HashMap<>();
- }
- map2.put(keyRs, valueRs);
- map.put(familyRs, map2);
- map.put("rowKey", rowKey);
- map.put("timeStamp", timeStamp.toString());
- }
- return map;
- }*/
-}
diff --git a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
index 71259fd..073befd 100644
--- a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java
@@ -74,11 +74,11 @@ public class HbaseUtil {
}
- public void setColume_exception(String colume_exception) {
+ public void setColume_exception(String colume_exception) {
HbaseUtil.colume_exception = colume_exception;
}
- public void setColume_sql(String colume_sql) {
+ public void setColume_sql(String colume_sql) {
HbaseUtil.colume_sql = colume_sql;
}
}
diff --git a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java
deleted file mode 100644
index 1e17f90..0000000
--- a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.mesa.reportservice.util;
-
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.annotation.SchedulingConfigurer;
-import org.springframework.scheduling.config.ScheduledTaskRegistrar;
-
-import java.util.concurrent.Executors;
-
-/**
- * Created by wk1 on 2019/6/21.
- */
-
-//@Configuration
-//所有的定时任务都放在一个线程池中,定时任务启动时使用不同都线程。
-public class ScheduleConfig {
-/* @Override
- public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
- //设定一个长度10的定时任务线程池
- taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
- }*/
-}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index f43d9cc..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1,57 +0,0 @@
-server.port=9090
-scan.mysql.scheduled.plan=0/30 * * * * ?
-
-#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ
-globle.query_log_sleep=10000
-globle.loop_number=350
-#ͬʱ��ִ�����߳���
-globle.job_thread=3
-#���û�г���7200��ͼ������û�������������Σ�����kill
-globle.ck_timeout=7200000
-hbase.url=192.168.10.202:4444
-hbase.table=tsg:report_result
-hbase.columefamily=response:result
-hbase.colume_job_id=detail:result_id
-hbase.colume_query_id=detail:query_id
-hbase.colume_query_duration_ms=detail:query_duration_ms
-hbase.colume_read_rows=detail:read_rows
-hbase.colume_memory_usage=detail:memory_usage
-ck.task_ip=192.168.10.182:8123
-ck.task_database=tsg_galaxy
-#ck.task_database=k18_galaxy_service
-ck.task_user=ck
-ck.task_user_password=111111
-ck.log_ip=192.168.10.182:8123
-ck.log_user=default
-ck.log_user_password=111111
-#���������
-http.maxTotal=300
-#������
-http.defaultMaxPerRoute=50
-#�������ӵ��ʱ��
-http.connectTimeout=10000
-#�����ӳ��л�ȡ�����ӵ��ʱ��
-http.connectionRequestTimeout=10000
-#���ݴ�����ʱ��
-http.socketTimeout=3600000
-#�ύ����ǰ���������Ƿ����
-http.staleConnectionCheckEnabled=true
-db.url=jdbc\:mariadb\://192.168.10.120\:3306/tsg-bifang
-#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg
-#����
-#drivers=ru.yandex.clickhouse.ClickHouseDriver
-db.drivers=org.mariadb.jdbc.Driver
-#�û���
-db.user=root
-#����
-db.password=111111
-#��ʼ����С
-db.initialsize=20
-#��������
-db.minidle=1
-#���������
-db.maxactive=300
-db.filters=stat,wall,log4j,config
-
-
-