diff options
| author | wangkuan <[email protected]> | 2019-07-23 13:47:27 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2019-07-23 13:47:27 +0800 |
| commit | 81c4ff7dfaf18045cd8ed29de97e3161bc3b0071 (patch) | |
| tree | 4fc622114c03fd601b33c48be62b81cc53979b55 | |
| parent | 76771bebb02ef24175e52d74ea4ab5410a649ea1 (diff) | |
代码格式化
16 files changed, 20 insertions, 1891 deletions
diff --git a/src/main/java/com/mesa/reportservice/bean/HbaseData.java b/src/main/java/com/mesa/reportservice/bean/HbaseData.java deleted file mode 100644 index 636c6a4..0000000 --- a/src/main/java/com/mesa/reportservice/bean/HbaseData.java +++ /dev/null @@ -1,175 +0,0 @@ -package com.mesa.reportservice.bean; - -/** - * @author LBQ - * @version 1.0.0 - * @ClassName HbasePO - * @Description 表结构 - * @Date 2018年6月2日 上午10:34:06 - */ -public class HbaseData { - - private String query_id; - private String query_duration_ms; - private long read_rows; - private String memory_usage; - - private int result_id; - private String result_name; - private String task_id; - private String start_time; - private String end_time; - private String query_sql; - private int status; - private int excute_time; - private long excute_row; - private int excute_process; - private String excute_detail; - private int is_send_notice; - private String op_time; - - - public int getResult_id() { - return result_id; - } - - public void setResult_id(int result_id) { - this.result_id = result_id; - } - - public String getResult_name() { - return result_name; - } - - public void setResult_name(String result_name) { - this.result_name = result_name; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getTask_id() { - return task_id; - } - - public void setTask_id(String task_id) { - this.task_id = task_id; - } - - public String getStart_time() { - return start_time; - } - - public void setStart_time(String start_time) { - this.start_time = start_time; - } - - public String getEnd_time() { - return end_time; - } - - public void setEnd_time(String end_time) { - this.end_time = end_time; - } - - public String getQuery_sql() { - return query_sql; - } - - public void setQuery_sql(String query_sql) { - this.query_sql = query_sql; - } - - - public int getExcute_time() { - return excute_time; - } - - public void setExcute_time(int excute_time) { - this.excute_time = excute_time; - } - - public long getExcute_row() { - return excute_row; - } - - public void setExcute_row(long excute_row) { - this.excute_row = excute_row; - } - - public int getExcute_process() { - return excute_process; - } - - public void setExcute_process(int excute_process) { - this.excute_process = excute_process; - } - - public String getExcute_detail() { - return excute_detail; - } - - public void setExcute_detail(String excute_detail) { - this.excute_detail = excute_detail; - } - - public int getIs_send_notice() { - return is_send_notice; - } - - public void setIs_send_notice(int is_send_notice) { - this.is_send_notice = is_send_notice; - } - - public String getOp_time() { - return op_time; - } - - public void setOp_time(String op_time) { - this.op_time = op_time; - } - - public String getQuery_id() { - return query_id; - } - - public void setQuery_id(String query_id) { - this.query_id = query_id; - } - - public String getQuery_duration_ms() { - return query_duration_ms; - } - - public void setQuery_duration_ms(String query_duration_ms) { - this.query_duration_ms = query_duration_ms; - } - - public long getRead_rows() { - return read_rows; - } - - public void setRead_rows(long read_rows) { - this.read_rows = read_rows; - } - - public String getMemory_usage() { - return memory_usage; - } - - public void setMemory_usage(String memory_usage) { - this.memory_usage = memory_usage; - } - - @Override - public String toString() { - return "HbaseData [query_id=" + query_id + ", query_duration_ms=" + query_duration_ms + ", read_rows=" - + read_rows + ", memory_usage=" + memory_usage + "]"; - } - -} diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index 48af9fb..a566d83 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -3,7 +3,7 @@ package com.mesa.reportservice.bean; /** * Created by wk1 on 2019/5/20. */ -public class JobEntity implements Cloneable{ +public class JobEntity implements Cloneable { private int result_id; private String result_name; diff --git a/src/main/java/com/mesa/reportservice/bean/TaskEntity.java b/src/main/java/com/mesa/reportservice/bean/TaskEntity.java deleted file mode 100644 index bc17ec6..0000000 --- a/src/main/java/com/mesa/reportservice/bean/TaskEntity.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.mesa.reportservice.bean; - -/** - * Created by wk1 on 2019/5/17. - */ -public class TaskEntity { - - private static Boolean state = false; - - private static String sql = ""; - - public static Boolean getState() { - return state; - } - - public static void setState(Boolean state) { - TaskEntity.state = state; - } - - public static String getSql() { - return sql; - } - - public static void setSql(String sql) { - TaskEntity.sql = sql; - } -} diff --git a/src/main/java/com/mesa/reportservice/dao/DruidPool.java b/src/main/java/com/mesa/reportservice/dao/DruidPool.java index 5135e11..cd3a4d6 100644 --- a/src/main/java/com/mesa/reportservice/dao/DruidPool.java +++ b/src/main/java/com/mesa/reportservice/dao/DruidPool.java @@ -5,13 +5,9 @@ package com.mesa.reportservice.dao; */ import com.alibaba.druid.pool.DruidDataSource; -import com.alibaba.druid.pool.DruidDataSourceFactory; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; -import com.google.common.collect.Maps; import com.mesa.reportservice.util.Logs; -import org.junit.Test; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; @@ -19,10 +15,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.sql.DataSource; -import java.io.InputStream; -import java.sql.*; -import java.util.Arrays; -import java.util.Properties; +import java.sql.SQLException; @Component @@ -154,59 +147,5 @@ public class DruidPool { } } -/* - @Test - public void demo02() { - Connection conn = null; - PreparedStatement ps = null; - ResultSet rs = null; - - try { - - //使用配置文件来配置文件 - Properties pr = new Properties(); - // pr.load(new FileInputStream("config/druid.properties")); - - InputStream is = DruidPool.class.getClassLoader().getResourceAsStream("druid.properties"); - //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties"); - - pr.load(is); - DataSource dataSource = DruidDataSourceFactory.createDataSource(pr); - - - //1,创建Druid连接池对象 - //DruidDataSource dataSource = new DruidDataSource(); - //dataSource.setDriverClassName("com.mysql.jdbc.Driver"); - //dataSource.setUrl("jdbc:mysql://localhost:3306/mytest-day08"); - //dataSource.setUsername("root"); - //dataSource.setPassword("123"); - - - //工具类中获得连接 - //conn = MyUtils.getConnertion01(); - - //用Druid来连接 - conn = dataSource.getConnection(); - - - //2,执行数据库语句 - String sql = "SELECT * FROM accoumt"; - - //3,用prepareStatement获取sql语句 - ps = conn.prepareStatement(sql); - - //4,执行sql语句,查询用executeQuery,增删改用executeUpdate - rs = ps.executeQuery(); - while (rs.next()) { - System.out.println(rs.getInt("id") + " " + rs.getString("name") + " " + rs.getDouble("money")); - } - - } catch (Exception e) { - // TODO: handle exception - e.printStackTrace(); - } finally { - - } - }*/ diff --git a/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java b/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java index d82511f..d3e1e48 100644 --- a/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java +++ b/src/main/java/com/mesa/reportservice/dao/HttpClientPool.java @@ -134,158 +134,7 @@ public class HttpClientPool { * @Description Hbase连接池 * @Date 2018年5月31日 下午10:22:06 */ - //@Component - //@ConfigurationProperties(prefix = "hbase") - public static class HbasePool { - /* private static Logger logger = Logger.getLogger(HbasePool.class); - - private static Configuration conf = null; - - private static String zookeeper_quorum; - - private static String zookeeper_property_clientPort; - - private static String zookeeper_znode_parent; - - *//* private static String client_pause;*//* - - private static String client_retries_number; - - private static String rpc_timeout; - - *//*private static String client_operation_timeout; - - private static String client_scanner_timeout_period;*//* - - - public HbasePool() { - } - - *//** - * @Field @connectNumMax : 连接池最大数 - *//* - private static Integer connectNumMax; - - *//** - * @Field @connectNumMin : 连接池最小数 - *//* - private static Integer connectNumMin; - - *//** - * @Field @counter : 计数器 - *//* - private static Integer counter = 0; - - private static LinkedBlockingQueue<Connection> queue = new LinkedBlockingQueue<Connection>(); - - - - public static synchronized Configuration getConfiguration() { - if (conf == null) { - conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", zookeeper_quorum); - conf.set("hbase.zookeeper.property.clientPort", zookeeper_property_clientPort); - conf.set("hbase.zookeeper.znode.parent", zookeeper_znode_parent); - // conf.set("hase.client.pause", client_pause); - conf.set("hbase.client.retries.number", client_retries_number); - conf.set("hbase.rpc.timeout", rpc_timeout); - // conf.set("hbase.client.operation.timeout", client_operation_timeout); - // conf.set("hbase.client.scanner.timeout.period", client_scanner_timeout_period); - } - return conf; - } - - - - - *//** - * @Description 归还连接 - * @param con - *//* - public static void returnConnect(Connection con) { - if (con == null) { - return; - } - if (counter > connectNumMin) { - try { - con.close(); - } catch (IOException e) { - logger.error("归还连接失败!", e); - } finally { - con=null; - counter--; - } - } else { - queue.add(con); - } - } - - *//** - * @Description 获取连接 - * @return - * @throws Exception - *//* - public static Connection getConnection() throws Exception { - - System.out.println("huoqu hbase lianjie" ); - if (!queue.isEmpty()) { - return queue.poll(); - } else { - if (counter >= connectNumMax) { - throw new Exception("Hbase连接池已满,请释放连接资源!目前连接数已达:[" + counter + "]"); - } else { - Connection con; - try { - con = ConnectionFactory.createConnection(getConfiguration()); - counter++; - } catch (IOException e) { - throw new Exception("Hbase连接创建失败,请检查连接配置!", e); - } - return con; - } - } - } - - public void setConnectNumMax(Integer connectNumMax) { - HbasePool.connectNumMax = connectNumMax; - } - - public void setConnectNumMin(Integer connectNumMin) { - HbasePool.connectNumMin = connectNumMin; - } - - *//** - * @Description 存活连接数 - * @return - *//* - public static Integer conNum() { - return counter; - } - - - public void setZookeeper_quorum(String zookeeper_quorum) { - HbasePool.zookeeper_quorum = zookeeper_quorum; - } - - public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) { - HbasePool.zookeeper_property_clientPort = zookeeper_property_clientPort; - } - - public void setZookeeper_znode_parent(String zookeeper_znode_parent) { - HbasePool.zookeeper_znode_parent = zookeeper_znode_parent; - } - - - public void setClient_retries_number(String client_retries_number) { - HbasePool.client_retries_number = client_retries_number; - } - - public void setRpc_timeout(String rpc_timeout) { - HbasePool.rpc_timeout = rpc_timeout; - } - */ - } } diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java index a5e826c..b7d720c 100644 --- a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java +++ b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java @@ -38,12 +38,11 @@ public class ExcuteTask { do { try { hr = hp.doPost(joburl); - if(hr.getCode()!=200 && hr.getBody().toLowerCase().contains("timeout")){ + if (hr.getCode() != 200 && hr.getBody().toLowerCase().contains("timeout")) { k--; String killurl = ClickhouseConfig.getKillUrl(je.getQuery_id()); hp.doPost(killurl); - } - else { + } else { break; } } catch (SocketException e) { diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java deleted file mode 100644 index 6379b50..0000000 --- a/src/main/java/com/mesa/reportservice/scheduledtask/JobTask.java +++ /dev/null @@ -1,295 +0,0 @@ -package com.mesa.reportservice.scheduledtask; - -/** - * Created by wk1 on 2019/5/31. - */ - -//@Component - -public class JobTask { -/* - protected ExecutorService pool = ConfigUtil.pool; - private HttpAPIService hp; - private DataBaseBusiness hb; - - public JobTask(HttpAPIService hp, DataBaseBusiness hb) { - - this.hb = hb; - this.hp = hp; - Logs.debug("new Jobtask=========="); - } - - public HttpAPIService getHp() { - return hp; - } - - public void setHp(HttpAPIService hp) { - this.hp = hp; - } - - public DataBaseBusiness getHb() { - return hb; - } - - public void setHb(DataBaseBusiness hb) { - this.hb = hb; - } - - public void dotask(JobEntity jl) { - int id = jl.getResult_id(); - String sql = jl.getQuery_sql().trim(); - String hbaseid = DigestUtils.md5Hex(id + sql); - sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')"); - String query_id = UUID.randomUUID().toString().replaceAll("-", ""); - String httpQuery = ClickhouseConfig.getJobUrl(sql, query_id); - hb.updateStatue(id, 1); - - int a = 3; - do { - CountDownLatch countDownLatch = new CountDownLatch(1); - pool.execute(new Runnable() { - @Override - public void run() { - try { - int i; - - for (i = 0; i < ConfigUtil.loop_number; i++) { - Thread.sleep(ConfigUtil.query_log_sleep); - String queryProccess = ClickhouseConfig.getProcessUrl(query_id); - HttpResult hr = hp.doPost(queryProccess); - String rs = hr.getBody().trim(); - if (!rs.isEmpty() && !rs.equals("")) { - // int status = hb.getJobStatueFromId(id); - JobEntity en = getProcessMessage(rs, id); - en.setExcute_detail(""); - if (en.getExcute_time() < (ConfigUtil.ck_timeout)) { - - if (en.getExcute_time() > ConfigUtil.query_log_sleep) { - hb.updateProcesses(en); - } else { - break; - } - } else { - i = ConfigUtil.loop_number; - break; - } - } else { - break; - } - } - if (i >= ConfigUtil.loop_number) { - String killurl = ClickhouseConfig.getKillUrl(query_id); - hp.doPost(killurl); - } - } catch (Exception e) { - Logs.error("获取进度失败" + e.toString()); - } finally { - Logs.debug("thread finish--------------------------" + Thread.currentThread()); - countDownLatch.countDown(); - } - } - }); - - try { - HttpResult hr = new HttpResult(); - hr = hp.doPost(httpQuery); - countDownLatch.await(); - JobEntity entity = getQueryLogMessage(id, query_id); - - if (hr.getCode() == 200) { - - entity.setQuery_id(query_id); - HttpResult h1 = saveToHbase(hbaseid, hr.getBody(), entity); - if (h1.getCode() == 200) { - //Thread.sleep(5000); - // Logs.debug(entity.getJob_id() + entity.getStatue()); - - entity.setResult_id(id); - entity.setExcute_detail("success"); - entity.setExcute_time(0); - entity.setExcute_process(100); - entity.setStatus(2); - int number = 0; - int z = 3; - do { - number = hb.updateProcesses(entity); - z--; - } - while (number != 1 && z >= 0); - break; - } else { - updateErrorMessage(id, 5, "写入hbase失败"); - Logs.error("写入hbase失败"); - break; - } - } else { - - if (hr.getBody().contains("Code: 394")) { - throw new IOException("queryid=" + query_id + " hbaseid=" + hbaseid + " 查询超时"); - } else if (hr.getBody().contains("Code: 241")) { - throw new Exception("查询超出内存限制" + hr.getBody()); - } else if (entity.getStatus() == 3) { - updateErrorMessage(id, 3, "查询语句错误" + hr.getBody()); - break; - } else { - Logs.error("查询clickhouse失败" + hr.getBody()); - throw new Exception("查询clickhouse失败" + hr.getBody()); - } - } - - - } catch (InterruptedException w) { - Logs.error(w.toString()); - break; - } catch (IOException r) { - a--; - String kill = ClickhouseConfig.getKillUrl(query_id); - try { - hp.doPost(kill); - } catch (Exception er) { - Logs.error(er.toString()); - } - if (a <= 0) { - updateErrorMessage(id, 4, "查询超时"); - break; - } - } catch (NullPointerException e) { - e.printStackTrace(); - break; - } catch (Exception e) { - Logs.error("任务执行中报错" + e.toString()); - updateErrorMessage(id, 4, e.toString()); - break; - } - } - while (a > 0); - synchronized (this) { - ConfigUtil.job_thread++; - Logs.debug("job_thread++++++++++++++" + ConfigUtil.job_thread); - - } - } - - - public void updateErrorMessage(int id, int status, String error) { - - - JobEntity js = new JobEntity(); - js.setResult_id(id); - js.setStatus(status); - js.setExcute_detail("任务执行中报错" + error); - hb.updateProcesses(js); - - } - - - public JobEntity getProcessMessage(String body, int id) { - JobEntity job = new JobEntity(); - Map map = JSON.parseObject(body); - float elapsed = Float.parseFloat(map.get("elapsed").toString()); - long total_rows_approx = Long.parseLong(map.get("total_rows_approx").toString()); - long read_rows = Long.parseLong(map.get("read_rows").toString()); - int ftime = (int) (elapsed * total_rows_approx / read_rows); - ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10; - Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime); - // NumberFormat numberFormat = NumberFormat.getInstance(); - //numberFormat.setMaximumFractionDigits(0); - - double persent = (read_rows * 1.00 / total_rows_approx); - int result = (int) (persent * 100); - job.setExcute_row(total_rows_approx); - job.setExcute_process(result); - job.setStatus(1); - job.setExcute_time(ftime); - job.setResult_id(id); - System.out.println("%+++++++++++++" + result + "%"); - System.out.println("needtime--------" + ftime); - return job; - - } - - public HttpResult saveToHbase(String hbaseid, String body, JobEntity entity) throws Exception { - - int k = 3; - HttpResult h1 = new HttpResult(); - - do { - try { - String hbasejson = HbaseUtil.getHbaseJson(hbaseid, body, entity); - String hbaseurl = HbaseUtil.getHbasePostUrl(); - k--; - h1 = hp.doPost(hbaseurl, hbasejson); - break; - } catch (Exception e) { - Logs.error("写入hbase报错重试次数" + (3 - k)); - if (k < 0) { - throw new Exception("写入hbase报错", e); - } - } - } - while (k >= 0); - - // String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + hbaseid + "/response:result"); - // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id); - // System.out.print(h2); - return h1; - } - - - public JobEntity getQueryLogMessage(int id, String query_id) { - - - JobEntity job = new JobEntity(); - String finishurl = ClickhouseConfig.getFinishUrl(query_id); - HttpResult num = new HttpResult(); - int j = 3; - do { - try { - Thread.sleep(3000); - num = hp.doPost(finishurl); - if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) { - - job.setResult_id(id); - Map mapresult = JSON.parseObject(num.getBody()); - int type = Integer.parseInt(mapresult.get("type").toString()); - String query_duration_ms = mapresult.get("query_duration_ms").toString(); - long read_rows = Long.parseLong(mapresult.get("read_rows").toString()); - String memory_usage = mapresult.get("memory_usage").toString(); - Logs.debug("type" + type); - switch (type) { - case 2: - job.setStatus(2); - // job.setExcute_detail("success"); - break; - case 3: - job.setStatus(3); - //job.setExcute_detail("语法错误"); - break; - case 4: - job.setStatus(4); - //job.setExcute_detail("性能问题"); - break; - default: - break; - } - job.setQuery_duration_ms(query_duration_ms); - job.setMemory_usage(memory_usage); - job.setExcute_time(0); - job.setExcute_process(100); - job.setExcute_row(read_rows); - } else { - job.setStatus(5); - job.setExcute_detail("没有查到query信息"); - - } - } catch (Exception e) { - job.setStatus(5); - Logs.error("获取querylogmessage失败"); - } - j--; - } - while (job.getStatus() == 5 && j >= 0); - return job; - }*/ - -} diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java index a0445a7..0472733 100644 --- a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java +++ b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java @@ -34,48 +34,10 @@ public class ScheduledTask { @Autowired private DataBaseBusiness hb; - /* @Scheduled(cron = "${scan.mysql.scheduled.plan}") - public void run() { - - System.out.print(Thread.currentThread()); - Logs.debug("jobthread=000000000000000" + ConfigUtil.job_thread); - try { - if (ConfigUtil.job_thread > 0) { - List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread); - Logs.debug("joblist size=" + joblist.size()); - - for (JobEntity jl : joblist) { - // Thread.sleep(2000); - ConfigUtil.job_thread--; - Logs.debug("jobthread--=" + ConfigUtil.job_thread + "---jobid=" + jl.getResult_id()); - pool.execute(new Runnable() { - @Override - public void run() { - Logs.debug("threadname" + Thread.currentThread()); - JobTask jt = new JobTask(hp, hb); - try { - jt.dotask(jl); - } catch (Exception e) { - Logs.error(e.toString()); - } - } - }); - - } - } - } catch (Exception e) { - Logs.error(e.toString()); - } - } -*/ - - @Scheduled(cron = "${scan.mysql.scheduled.plan}") public void scanmysql() { try { - - List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread); Logs.debug("joblist size=" + joblist.size()); @@ -91,9 +53,7 @@ public class ScheduledTask { jl.setExcute_detail(""); JobEntity jt = (JobEntity) jl.clone(); ConfigUtil.mapresult.put(queryid, jt); - hb.updateStatue(jl.getResult_id(), 1); - pool.execute(new Runnable() { @Override public void run() { @@ -104,8 +64,6 @@ public class ScheduledTask { } }); ConfigUtil.job_thread--; - - } } catch (Exception e) { @@ -118,8 +76,6 @@ public class ScheduledTask { public void scanmysqlresult() { try { - - List<JobEntity> joblist = hb.getJobForExcute(); Logs.debug("joblist size=" + joblist.size()); @@ -130,23 +86,15 @@ public class ScheduledTask { String killurl = ClickhouseConfig.getKillUrl(queryid); hp.doPost(killurl); updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行"); - break; } - - JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone(); - /* if (je == null) { - break; - }*/ - if (je.getStatus() == 2 ) { + if (je.getStatus() == 2) { try { je = getQueryLogMessage(je); HttpResult h1 = saveToHbase(je); if (h1.getCode() == 200) { - //Thread.sleep(5000); - // Logs.debug(entity.getJob_id() + entity.getStatue()); je.setExcute_detail("success"); je.setExcute_time(0); @@ -202,7 +150,7 @@ public class ScheduledTask { } else { JobEntity jo = getProcessMessage(je); - if(jo.getExcute_row()!=0 || jo.getExcute_time()!=0) { + if (jo.getExcute_row() != 0 || jo.getExcute_time() != 0) { hb.updateProcesses(jo); } } @@ -212,8 +160,6 @@ public class ScheduledTask { } catch (Exception e) { Logs.error(e.toString()); - // ConfigUtil.job_thread++; - } } @@ -245,13 +191,10 @@ public class ScheduledTask { long read_rows = Long.parseLong(map.get("read_rows").toString()); int ftime = (int) (elapsed * total_rows_approx / read_rows); ftime = Math.round((ftime - elapsed) * 10 * 1000) / 10; - if(ftime<0){ - ftime=0; + if (ftime < 0) { + ftime = 0; } Logs.info("total_rows_approx" + total_rows_approx + "read_rows" + read_rows + "elapsed" + elapsed + "ftime" + ftime); - // NumberFormat numberFormat = NumberFormat.getInstance(); - //numberFormat.setMaximumFractionDigits(0); - double persent = (read_rows * 1.00 / total_rows_approx); int result = (int) (persent * 100); job.setExcute_row(total_rows_approx); @@ -265,9 +208,7 @@ public class ScheduledTask { } catch (Exception e) { e.printStackTrace(); } - return job; - } public HttpResult saveToHbase(JobEntity entity) { @@ -289,20 +230,12 @@ public class ScheduledTask { } } while (k >= 0); - - //String h2 = hp.doGet("http://192.168.10.224:4444/tsg:report_result/" + queryid + "/response:result"); - // Map<String, Object> aaa = h2.queryData("tsg:report_result",query_id); - // System.out.print(h2); return h1; } public JobEntity getQueryLogMessage(JobEntity job) { - - // JobEntity job = (JobEntity) jb.clone(); - /* job.setResult_id(resultid); - job.setQuery_id(queryid);*/ String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id()); HttpResult num = new HttpResult(); int j = 0; @@ -325,13 +258,13 @@ public class ScheduledTask { break; case 3: job.setStatus(type); - job.setExcute_detail("语法错误"+mapresult.get("exception").toString()); + job.setExcute_detail("语法错误" + mapresult.get("exception").toString()); job.setQuery_sql(mapresult.get("query").toString()); job.setException(mapresult.get("exception").toString()); break; case 4: job.setStatus(type); - job.setExcute_detail("性能问题"+mapresult.get("exception").toString()); + job.setExcute_detail("性能问题" + mapresult.get("exception").toString()); job.setQuery_sql(mapresult.get("query").toString()); job.setException(mapresult.get("exception").toString()); break; diff --git a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java index 3f8d729..befe0f2 100644 --- a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java +++ b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java @@ -42,9 +42,8 @@ public class DataBaseBusiness { public List<JobEntity> getJobForExcute() { - String sql = " select * from report_result where status =1 order by end_time "; - int resultid=0; + int resultid = 0; List<JobEntity> sqllist = new ArrayList<JobEntity>(); try { //connection = manager.getConnection("idb"); @@ -65,7 +64,7 @@ public class DataBaseBusiness { } catch (Exception e) { Logs.error(sql + "获取进度列表发生异常!"); - updateStatue(resultid,4); + updateStatue(resultid, 4); e.printStackTrace(); throw new RuntimeException("获取进度列表发生异常!", e); } finally { @@ -86,11 +85,10 @@ public class DataBaseBusiness { } - public List<JobEntity> getJobLimitRows(int row) { String current_time = DateUtil.getDate(); - int resultid=0; + int resultid = 0; String sql = " select * from report_result where status =0 and end_time<'" + current_time + "' order by end_time limit " + row; List<JobEntity> sqllist = new ArrayList<JobEntity>(); @@ -114,7 +112,7 @@ public class DataBaseBusiness { } catch (Exception e) { Logs.error(sql + "获取任务列表发生异常!"); - updateStatue(resultid,4); + updateStatue(resultid, 4); e.printStackTrace(); throw new RuntimeException("获取任务列表发生异常!", e); @@ -136,8 +134,6 @@ public class DataBaseBusiness { } - - public int getJobStatueFromId(int id) { int statue = 0; @@ -245,5 +241,4 @@ public class DataBaseBusiness { } - } diff --git a/src/main/java/com/mesa/reportservice/service/HBaseService.java b/src/main/java/com/mesa/reportservice/service/HBaseService.java deleted file mode 100644 index a5ee1dd..0000000 --- a/src/main/java/com/mesa/reportservice/service/HBaseService.java +++ /dev/null @@ -1,631 +0,0 @@ -package com.mesa.reportservice.service; - -/** - * HBase数据库基本操作 - */ - -//@Service -public class HBaseService { -/* - - - - private Logger log = LoggerFactory.getLogger(HBaseService.class); - - private Admin admin = null; - private Connection connection = null; - - public HBaseService() { - try { - connection = ConnectionFactory.createConnection(HttpClientPool.HbasePool.getConfiguration()); - admin = connection.getAdmin(); - } catch (IOException e) { - log.error("获取HBase连接失败"); - } - } - - - - - *//** - * @Description 删除表 - * @param tableName - * @throws Exception - *//* - public static void dropTable(String tableName) throws Exception { - - Connection con = null; - Admin admin = null; - try { - // ============获取连接管理============= - con = HttpClientPool.HbasePool.getConnection(); - admin = con.getAdmin(); - // ============查询表是否存在============= - if (!admin.tableExists(TableName.valueOf(tableName))) { - throw new Exception("需要删除的表[" + tableName + "]-不存在!"); - } - - // ============删除表============= - if (!admin.isTableDisabled(TableName.valueOf(tableName))) { - admin.disableTable(TableName.valueOf(tableName)); - } - admin.deleteTable(TableName.valueOf(tableName)); - } finally { - if (admin != null) { - try { - admin.close(); - } catch (IOException e) { - //log.error("Hbase - Admin资源释放失败!", e); - } - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - - // logger.info("=====删除表,表名:" + tableName); - } - - *//** - * @Description 插入数据 - 单条模式 - * @param tableName String - * @param rowKey List<HbaseData> hbaseDataList - * @throws Exception - *//* - public void insertData(String tableName, String rowKey, List<HbaseData> hbaseDataList) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行数据解析,封装PUT============ - Put put = new Put(Bytes.toBytes(rowKey)); - for (HbaseData hbaseData : hbaseDataList) { - String columnFamily = hbaseData.getColumnFamily(); - for (Map.Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) { - put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()), - Bytes.toBytes(entry.getValue())); - } - } - table.put(put); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - //logger.info("=====插入数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],List<HbaseData>:" + hbaseDataList); - } - - - - - *//** - * @Description 插入数据 - 批量插入模式 - * @param tableName String - * @param hbaseDTOList String rowKey List<HbaseData> hbaseDataList - * @throws Exception - *//* - public void insertBatchData(String tableName, List<HbaseDataDTO> hbaseDTOList) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行数据解析,封装PUT============ - List<Put> putList = new ArrayList<>(); - for (int index = 0; index < hbaseDTOList.size(); index++) { - HbaseDataDTO hbaseDTO = hbaseDTOList.get(index); - Put put = new Put(Bytes.toBytes(hbaseDTO.getRowKey())); - for (HbaseData hbaseData : hbaseDTO.getHbaseDataList()) { - String columnFamily = hbaseData.getColumnFamily(); - for (Map.Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) { - put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()), - Bytes.toBytes(entry.getValue())); - } - } - putList.add(put); - // ===============设置成1000次,提交一次============== - if (index % 1000 == 0) { - table.put(putList); - putList.clear(); - } - } - table.put(putList); - // logger.info("=====插入数据,表名:[" + tableName + "],批量数据插入成功!"); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - } - - public static void deleteDataByRowKey(String tableName, String rowKey) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按rowkey删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - - // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],删除成功!"); - } - - public static void deleteDataByFamily(String tableName, String rowKey, String family) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按family删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - delete.addFamily(Bytes.toBytes(family)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - - // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],删除成功!"); - } - - public static void deleteDataByColumn(String tableName, String rowKey, String family, String column) - throws Exception { - - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按column删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - delete.addColumns(Bytes.toBytes(family), Bytes.toBytes(column)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - // logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],column:[" + column - // + "],删除成功!"); - } - - *//** - * @Description 查询数据(最新一条) - * @param tableName String 表名 - * @param rowKey String 行键 - * @return - * @throws Exception - *//* - public Map<String, Object> queryData(String tableName, String rowKey) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行GET操作================== - Get get = new Get(Bytes.toBytes(rowKey)); - Result result = table.get(get); - - Map<String, Object> map = analysisQueryResult(result); - - // ============封装返回结果=============== - Map<String, Object> resultMap = new HashMap<>(); - resultMap.put("result", map.values()); - resultMap.put("totalCount", map.values().size()); - return resultMap; - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - } - - *//** - * @Description 根据条件查询数据 - * @param tableName String 表名 - * @param rowKey String 行键 - * @param version int 版本数量(查多少条数据) - * @param paramMap key-String-family(列族) value-String[]-columns(列名) - * @return - * @throws Exception - *//* - public static Map<String, Object> queryDataByParams(String tableName, String rowKey, int version, - Map<String, String[]> paramMap) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行GET操作================== - Get get = new Get(Bytes.toBytes(rowKey)); - // get.getMaxResultsPerColumnFamily(version); - - // ============组装条件================== - assembleCondition(paramMap, get); - Result result = table.get(get); - Map<String, Object> map = analysisQueryResult(result); - - // ============封装返回结果=============== - Map<String, Object> resultMap = new HashMap<>(); - resultMap.put("result", map.values()); - resultMap.put("totalCount", map.values().size()); - return resultMap; - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - } - - *//** - * @Description 根据条件查询数据(根据时间进行限定) - * @param tableName String 表名 - * @param rowKey String 行键 - * @param version int 版本数量(查多少条数据) - * @param startTime long 开始时间戳 如果仅传开始时间戳,结束时间戳传0,则查开始时间戳对应的版本 - * @param endTime long 结束时间戳 - * @param paramMap key-String-family(列族) value-String[]-columns(列名) - * @return - * @throws Exception - *//* - public static Map<String, Object> queryDataByParamsAndTime(String tableName, String rowKey, int version, - long startTime, long endTime, Map<String, String[]> paramMap) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HttpClientPool.HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行GET操作================== - Get get = new Get(Bytes.toBytes(rowKey)); - get.getMaxResultsPerColumnFamily(); - if (endTime != 0) { - get.setTimeRange(startTime, endTime); - } else if (startTime != 0) { - get.setTimeStamp(startTime); - } - // ============组装条件================== - assembleCondition(paramMap, get); - Result result = table.get(get); - Map<String, Object> map = analysisQueryResult(result); - - // ============封装返回结果=============== - Map<String, Object> resultMap = new HashMap<>(); - resultMap.put("result", map.values()); - resultMap.put("totalCount", map.values().size()); - return resultMap; - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HttpClientPool.HbasePool.returnConnect(con); - } - } - } - - *//** - * @Description 装载条件 - * @param paramMap - * @param get - *//* - private static void assembleCondition(Map<String, String[]> paramMap, Get get) { - if (paramMap != null && !paramMap.isEmpty()) { - for (Map.Entry<String, String[]> entry : paramMap.entrySet()) { - if (entry.getValue() == null) { - get.addFamily(Bytes.toBytes(entry.getKey())); - } else { - for (String column : entry.getValue()) { - get.addColumn(Bytes.toBytes(entry.getKey()), Bytes.toBytes(column)); - } - } - } - } - } - - *//** - * @Description 解析Result - * @param result - * @return - *//* - @SuppressWarnings("unchecked") - private static Map<String, Object> analysisQueryResult(Result result) { - Map<String, Object> map = new HashMap<>(); - for (Cell cell : result.listCells()) { - Long timeStamp = cell.getTimestamp(); - String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - String keyRs = - Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - - // =========参数封装========= - Map<String, Object> map2 = (Map<String, Object>) map.get(timeStamp.toString()); - if (map2 == null) { - map2 = new HashMap<>(); - } - Map<String, Object> map3 = (Map<String, Object>) map2.get(familyRs); - if (map3 == null) { - map3 = new HashMap<>(); - } - map3.put(keyRs, valueRs); - map2.put(familyRs, map3); - map2.put("rowKey", rowKey); - map2.put("timeStamp", timeStamp.toString()); - map.put(timeStamp.toString(), map2); - } - return map; - } - - - - - @SuppressWarnings("unchecked") - private static Map<String, Object> analysisScanResult(Result result) { - Map<String, Object> map = new HashMap<>(); - for (Cell cell : result.listCells()) { - Long timeStamp = cell.getTimestamp(); - String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - String keyRs = - Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - - // =========参数封装========= - Map<String, Object> map2 = (Map<String, Object>) map.get(familyRs); - if (map2 == null) { - map2 = new HashMap<>(); - } - map2.put(keyRs, valueRs); - map.put(familyRs, map2); - map.put("rowKey", rowKey); - map.put("timeStamp", timeStamp.toString()); - } - return map; - } - - - - - - - *//** - * 查询库中所有表的表名 - * shell command: list - *//* - public List<String> getAllTableNames() { - List<String> result = new ArrayList<>(); - try { - TableName[] tableNames = admin.listTableNames(); - for (TableName tableName : tableNames) { - result.add(tableName.getNameAsString()); - } - } catch (IOException e) { - log.error("获取所有表的表名失败", e); - } finally { - close(admin, null, null); - } - return result; - } - - *//** - * 遍历查询指定表中的所有数据 - * shell command: scan 'user' - *//* - public Map<String, Map<String, String>> getResultScanner(String tableName) { - Scan scan = new Scan(); - return this.queryData(tableName, scan); - } - - *//** - * 通过表名以及过滤条件查询数据 - *//* - private Map<String, Map<String, String>> queryData(String tableName, - Scan scan) { - // <rowKey,对应的行数据> - Map<String, Map<String, String>> result = new HashMap<>(); - - ResultScanner rs = null; - // 获取表 - Table table = null; - try { - table = getTable(tableName); - rs = table.getScanner(scan); - for (Result r : rs) { - // 每一行数据 - Map<String, String> columnMap = new HashMap<>(); - String rowKey = null; - // 行键,列族和列限定符一起确定一个单元(Cell) - for (Cell cell : r.listCells()) { - if (rowKey == null) { - rowKey = Bytes.toString(cell.getRowArray(), - cell.getRowOffset(), cell.getRowLength()); - } - columnMap.put( - // 列限定符 - Bytes.toString(cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - // 列族 - Bytes.toString(cell.getValueArray(), - cell.getValueOffset(), - cell.getValueLength())); - } - - if (rowKey != null) { - result.put(rowKey, columnMap); - } - } - } catch (IOException e) { - log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", - tableName), e); - } finally { - close(null, rs, table); - } - - return result; - } - - *//** - * 根据tableName和rowKey精确查询行数据 - *//* - public Map<String, String> getRowData(String tableName, String rowKey) { - // 返回的键值对 - Map<String, String> result = new HashMap<>(); - - Get get = new Get(Bytes.toBytes(rowKey)); - // 获取表 - Table table = null; - try { - table = getTable(tableName); - Result hTableResult = table.get(get); - if (hTableResult != null && !hTableResult.isEmpty()) { - for (Cell cell : hTableResult.listCells()) { - result.put( - Bytes.toString(cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - Bytes.toString(cell.getValueArray(), - cell.getValueOffset(), - cell.getValueLength())); - } - } - } catch (IOException e) { - log.error(MessageFormat.format( - "查询一行的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e); - } finally { - close(null, null, table); - } - - return result; - } - - *//** - * 为表添加 or 更新数据 - *//* - public void putData(String tableName, String rowKey, String familyName, - String[] columns, String[] values) { - // 获取表 - Table table = null; - try { - table = getTable(tableName); - - putData(table, rowKey, tableName, familyName, columns, values); - } catch (Exception e) { - log.error(MessageFormat.format( - "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}", - tableName, rowKey, familyName), e); - } finally { - close(null, null, table); - } - } - - private void putData(Table table, String rowKey, String tableName, - String familyName, String[] columns, String[] values) { - try { - // 设置rowkey - Put put = new Put(Bytes.toBytes(rowKey)); - - if (columns != null && values != null - && columns.length == values.length) { - for (int i = 0; i < columns.length; i++) { - if (columns[i] != null && values[i] != null) { - put.addColumn(Bytes.toBytes(familyName), - Bytes.toBytes(columns[i]), - Bytes.toBytes(values[i])); - } else { - throw new NullPointerException(MessageFormat.format( - "列名和列数据都不能为空,column:{0},value:{1}", columns[i], - values[i])); - } - } - } - - table.put(put); - log.debug("putData add or update data Success,rowKey:" + rowKey); - table.close(); - } catch (Exception e) { - log.error(MessageFormat.format( - "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}", - tableName, rowKey, familyName), e); - } - } - - *//** - * 根据表名 获取table - * Used to communicate with a single HBase table. - * Table can be used to get, put, delete or scan data from a table. - *//* - private Table getTable(String tableName) throws IOException { - return connection.getTable(TableName.valueOf(tableName)); - } - - *//** - * 关闭流 - *//* - private void close(Admin admin, ResultScanner rs, Table table) { - if (admin != null) { - try { - admin.close(); - } catch (IOException e) { - log.error("关闭Admin失败", e); - } - } - - if (rs != null) { - rs.close(); - } - - if (table != null) { - try { - table.close(); - } catch (IOException e) { - log.error("关闭Table失败", e); - } - } - }*/ -} diff --git a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java index 3c46b69..f9dd236 100644 --- a/src/main/java/com/mesa/reportservice/service/HttpAPIService.java +++ b/src/main/java/com/mesa/reportservice/service/HttpAPIService.java @@ -38,7 +38,7 @@ public class HttpAPIService { public String doGet(String url) throws Exception { // 声明 http get 请求 HttpGet httpGet = new HttpGet(url); - httpGet.setHeader("Accept","application/json"); + httpGet.setHeader("Accept", "application/json"); // 装载配置信息 httpGet.setConfig(config); @@ -91,7 +91,7 @@ public class HttpAPIService { HttpPost httpPost = new HttpPost(url); // 加入配置信息 httpPost.setConfig(config); - // httpPost.setHeader("Accept","application/json"); + // httpPost.setHeader("Accept","application/json"); // 判断map是否为空,不为空则进行遍历,封装from表单对象 if (!json.equals("") && !json.isEmpty()) { diff --git a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java index 4ab3822..7fe2daf 100644 --- a/src/main/java/com/mesa/reportservice/util/ConfigUtil.java +++ b/src/main/java/com/mesa/reportservice/util/ConfigUtil.java @@ -6,8 +6,6 @@ import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Created by wk1 on 2019/5/28. @@ -18,8 +16,8 @@ import java.util.concurrent.Executors; public class ConfigUtil { - // public static ExecutorService pool = Executors.newFixedThreadPool(30); - // public static int query_log_sleep; + // public static ExecutorService pool = Executors.newFixedThreadPool(30); + // public static int query_log_sleep; //public static int loop_number; public static int job_thread; public static int ck_timeout; @@ -31,7 +29,6 @@ public class ConfigUtil { } - public void setJob_thread(int job_thread) { ConfigUtil.job_thread = job_thread; } diff --git a/src/main/java/com/mesa/reportservice/util/HbaseOpt.java b/src/main/java/com/mesa/reportservice/util/HbaseOpt.java deleted file mode 100644 index 21ecea9..0000000 --- a/src/main/java/com/mesa/reportservice/util/HbaseOpt.java +++ /dev/null @@ -1,377 +0,0 @@ -package com.mesa.reportservice.util; - -/** - * @author LBQ - * @version 1.0.0 - * @ClassName HbaseOpt - * @Description Hbase2.0.0 操作类 - * @Date 2018年6月4日 下午9:42:32 - */ -public class HbaseOpt { - - /* private static Logger logger = Logger.getLogger(HbaseOpt.class); - - *//** - * @Description 创建表 - * @param tableName 表名 - * @param version 版本数量 - * @param columnFamilys 列族 - * @throws Exception - *//* - - *//** - * @Description 删除表 - * @param tableName - * @throws Exception - *//* - public static void dropTable(String tableName) throws Exception { - - Connection con = null; - Admin admin = null; - try { - // ============获取连接管理============= - con = HbasePool.getConnection(); - admin = con.getAdmin(); - // ============查询表是否存在============= - if (!admin.tableExists(TableName.valueOf(tableName))) { - throw new Exception("需要删除的表[" + tableName + "]-不存在!"); - } - - // ============删除表============= - if (!admin.isTableDisabled(TableName.valueOf(tableName))) { - admin.disableTable(TableName.valueOf(tableName)); - } - admin.deleteTable(TableName.valueOf(tableName)); - } finally { - if (admin != null) { - try { - admin.close(); - } catch (IOException e) { - logger.error("Hbase - Admin资源释放失败!", e); - } - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - - logger.info("=====删除表,表名:" + tableName); - } - - *//** - * @Description 插入数据 - 单条模式 - * @param tableName String - * @throws Exception - *//* - public static void insertData(String tableName, String rowKey, List<HbaseData> hbaseDataList) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行数据解析,封装PUT============ - Put put = new Put(Bytes.toBytes(rowKey)); - for (HbaseData hbaseData : hbaseDataList) { - String columnFamily = hbaseData.getColumnFamily(); - for (Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) { - put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()), - Bytes.toBytes(entry.getValue())); - } - } - table.put(put); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - logger.info("=====插入数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],List<HbaseData>:" + hbaseDataList); - } - - *//** - * @Description 插入数据 - 批量插入模式 - * @param tableName String - * @param hbaseDTOList String rowKey List<HbaseData> hbaseDataList - * @throws Exception - *//* - public static void insertBatchData(String tableName, List<HbaseDataDTO> hbaseDTOList) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行数据解析,封装PUT============ - List<Put> putList = new ArrayList<>(); - for (int index = 0; index < hbaseDTOList.size(); index++) { - HbaseDataDTO hbaseDTO = hbaseDTOList.get(index); - Put put = new Put(Bytes.toBytes(hbaseDTO.getRowKey())); - for (HbaseData hbaseData : hbaseDTO.getHbaseDataList()) { - String columnFamily = hbaseData.getColumnFamily(); - for (Entry<String, String> entry : hbaseData.getColumnValueMap().entrySet()) { - put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(entry.getKey()), - Bytes.toBytes(entry.getValue())); - } - } - putList.add(put); - // ===============设置成1000次,提交一次============== - if (index % 1000 == 0) { - table.put(putList); - putList.clear(); - } - } - table.put(putList); - logger.info("=====插入数据,表名:[" + tableName + "],批量数据插入成功!"); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - } - - public static void deleteDataByRowKey(String tableName, String rowKey) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按rowkey删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - - logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],删除成功!"); - } - - public static void deleteDataByFamily(String tableName, String rowKey, String family) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按family删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - delete.addFamily(Bytes.toBytes(family)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - - logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],删除成功!"); - } - - public static void deleteDataByColumn(String tableName, String rowKey, String family, String column) - throws Exception { - - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // 按column删除 - Delete delete = new Delete(Bytes.toBytes(rowKey)); - delete.addColumns(Bytes.toBytes(family), Bytes.toBytes(column)); - table.delete(delete); - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - logger.info("=====删除数据,表名:[" + tableName + "],rowKey:[" + rowKey + "],family:[" + family + "],column:[" + column - + "],删除成功!"); - } - - *//** - * @Description 查询数据(最新一条) - * @param tableName String 表名 - * @param rowKey String 行键 - * @return - * @throws Exception - *//* - public static Map<String, Object> queryData(String tableName, String rowKey) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行GET操作================== - Get get = new Get(Bytes.toBytes(rowKey)); - Result result = table.get(get); - - Map<String, Object> map = analysisQueryResult(result); - - // ============封装返回结果=============== - Map<String, Object> resultMap = new HashMap<>(); - resultMap.put("result", map.values()); - resultMap.put("totalCount", map.values().size()); - return resultMap; - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - } - - - *//** - * @Description 根据条件查询数据(根据时间进行限定) - * @param tableName String 表名 - * @param rowKey String 行键 - * @param version int 版本数量(查多少条数据) - * @param startTime long 开始时间戳 如果仅传开始时间戳,结束时间戳传0,则查开始时间戳对应的版本 - * @param endTime long 结束时间戳 - * @param paramMap key-String-family(列族) value-String[]-columns(列名) - * @return - * @throws Exception - *//* - public static Map<String, Object> queryDataByParamsAndTime(String tableName, String rowKey, int version, - long startTime, long endTime, Map<String, String[]> paramMap) throws Exception { - Connection con = null; - Table table = null; - try { - // ============获取连接管理================== - con = HbasePool.getConnection(); - table = con.getTable(TableName.valueOf(tableName)); - - // ============进行GET操作================== - Get get = new Get(Bytes.toBytes(rowKey)); - // get.readVersions(version); - if (endTime != 0) { - get.setTimeRange(startTime, endTime); - } else if (startTime != 0) { - get.setTimeStamp(startTime); - } - // ============组装条件================== - assembleCondition(paramMap, get); - Result result = table.get(get); - Map<String, Object> map = analysisQueryResult(result); - - // ============封装返回结果=============== - Map<String, Object> resultMap = new HashMap<>(); - resultMap.put("result", map.values()); - resultMap.put("totalCount", map.values().size()); - return resultMap; - } finally { - if (table != null) { - table.close(); - } - if (con != null) { - HbasePool.returnConnect(con); - } - } - } - - *//** - * @Description 装载条件 - * @param paramMap - * @param get - *//* - private static void assembleCondition(Map<String, String[]> paramMap, Get get) { - if (paramMap != null && !paramMap.isEmpty()) { - for (Entry<String, String[]> entry : paramMap.entrySet()) { - if (entry.getValue() == null) { - get.addFamily(Bytes.toBytes(entry.getKey())); - } else { - for (String column : entry.getValue()) { - get.addColumn(Bytes.toBytes(entry.getKey()), Bytes.toBytes(column)); - } - } - } - } - } - - *//** - * @Description 解析Result - * @param result - * @return - *//* - @SuppressWarnings("unchecked") - private static Map<String, Object> analysisQueryResult(Result result) { - Map<String, Object> map = new HashMap<>(); - for (Cell cell : result.listCells()) { - Long timeStamp = cell.getTimestamp(); - String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - String keyRs = - Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - - // =========参数封装========= - Map<String, Object> map2 = (Map<String, Object>) map.get(timeStamp.toString()); - if (map2 == null) { - map2 = new HashMap<>(); - } - Map<String, Object> map3 = (Map<String, Object>) map2.get(familyRs); - if (map3 == null) { - map3 = new HashMap<>(); - } - map3.put(keyRs, valueRs); - map2.put(familyRs, map3); - map2.put("rowKey", rowKey); - map2.put("timeStamp", timeStamp.toString()); - map.put(timeStamp.toString(), map2); - } - return map; - } - - - - - - private static Map<String, Object> analysisScanResult(Result result) { - Map<String, Object> map = new HashMap<>(); - for (Cell cell : result.listCells()) { - Long timeStamp = cell.getTimestamp(); - String rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - String familyRs = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - String keyRs = - Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - String valueRs = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - - // =========参数封装========= - Map<String, Object> map2 = (Map<String, Object>) map.get(familyRs); - if (map2 == null) { - map2 = new HashMap<>(); - } - map2.put(keyRs, valueRs); - map.put(familyRs, map2); - map.put("rowKey", rowKey); - map.put("timeStamp", timeStamp.toString()); - } - return map; - }*/ -} diff --git a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java index 71259fd..073befd 100644 --- a/src/main/java/com/mesa/reportservice/util/HbaseUtil.java +++ b/src/main/java/com/mesa/reportservice/util/HbaseUtil.java @@ -74,11 +74,11 @@ public class HbaseUtil { } - public void setColume_exception(String colume_exception) { + public void setColume_exception(String colume_exception) { HbaseUtil.colume_exception = colume_exception; } - public void setColume_sql(String colume_sql) { + public void setColume_sql(String colume_sql) { HbaseUtil.colume_sql = colume_sql; } } diff --git a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java b/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java deleted file mode 100644 index 1e17f90..0000000 --- a/src/main/java/com/mesa/reportservice/util/ScheduleConfig.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.mesa.reportservice.util; - -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.SchedulingConfigurer; -import org.springframework.scheduling.config.ScheduledTaskRegistrar; - -import java.util.concurrent.Executors; - -/** - * Created by wk1 on 2019/6/21. - */ - -//@Configuration -//所有的定时任务都放在一个线程池中,定时任务启动时使用不同都线程。 -public class ScheduleConfig { -/* @Override - public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - //设定一个长度10的定时任务线程池 - taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10)); - }*/ -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index f43d9cc..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1,57 +0,0 @@ -server.port=9090 -scan.mysql.scheduled.plan=0/30 * * * * ? - -#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ -globle.query_log_sleep=10000 -globle.loop_number=350 -#ͬʱ��ִ�����߳��� -globle.job_thread=3 -#���û�г���7200��ͼ������û�������������Σ�����kill -globle.ck_timeout=7200000 -hbase.url=192.168.10.202:4444 -hbase.table=tsg:report_result -hbase.columefamily=response:result -hbase.colume_job_id=detail:result_id -hbase.colume_query_id=detail:query_id -hbase.colume_query_duration_ms=detail:query_duration_ms -hbase.colume_read_rows=detail:read_rows -hbase.colume_memory_usage=detail:memory_usage -ck.task_ip=192.168.10.182:8123 -ck.task_database=tsg_galaxy -#ck.task_database=k18_galaxy_service -ck.task_user=ck -ck.task_user_password=111111 -ck.log_ip=192.168.10.182:8123 -ck.log_user=default -ck.log_user_password=111111 -#��������� -http.maxTotal=300 -#������ -http.defaultMaxPerRoute=50 -#�������ӵ��ʱ�� -http.connectTimeout=10000 -#�����ӳ��л�ȡ�����ӵ��ʱ�� -http.connectionRequestTimeout=10000 -#���ݴ�����ʱ�� -http.socketTimeout=3600000 -#�ύ����ǰ���������Ƿ���� -http.staleConnectionCheckEnabled=true -db.url=jdbc\:mariadb\://192.168.10.120\:3306/tsg-bifang -#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg -#���� -#drivers=ru.yandex.clickhouse.ClickHouseDriver -db.drivers=org.mariadb.jdbc.Driver -#�û��� -db.user=root -#���� -db.password=111111 -#��ʼ����С -db.initialsize=20 -#��С������ -db.minidle=1 -#��������� -db.maxactive=300 -db.filters=stat,wall,log4j,config - - - |
