summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2020-05-28 15:24:54 +0800
committerwangkuan <[email protected]>2020-05-28 15:24:54 +0800
commit0b2953adcdcc326fc2055c4fc863e4e180c19741 (patch)
tree40254a0dd212e58ba9a8216b6b5b9076dd7e188e
parent05346f8ec73fe2efb89f88382dc3cc72b9ded9c0 (diff)
解决大文件不能存储的问题
-rw-r--r--config/application.properties26
-rw-r--r--docker/Dockerfile2
-rw-r--r--pom.xml25
-rw-r--r--src/main/java/com/mesa/reportservice/bean/HttpResult.java9
-rw-r--r--src/main/java/com/mesa/reportservice/bean/JobEntity.java32
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java2
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java106
-rw-r--r--src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java95
-rw-r--r--src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java2
-rw-r--r--src/main/java/com/mesa/reportservice/service/HbaseService.java11
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java70
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java130
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java87
-rw-r--r--src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java134
-rw-r--r--src/main/java/com/mesa/reportservice/util/GZIPUtils.java85
-rw-r--r--src/main/java/com/mesa/reportservice/util/ziputil.java96
-rw-r--r--src/main/resources/config/application.properties45
-rw-r--r--src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java47
19 files changed, 680 insertions, 335 deletions
diff --git a/config/application.properties b/config/application.properties
index 0e4d13b..b21e05e 100644
--- a/config/application.properties
+++ b/config/application.properties
@@ -4,19 +4,19 @@ scan.result.scheduled.plan=0/10 * * * * ?
#��ʱɨ��mysql���ݿ�report_result��ʱ��15s
scan.task.scheduled.plan=0/15 * * * * ?
#ͬʱ��ִ�����߳���
-globle.job_thread=1
+globle.job_thread=2
#Hbasehttp�Ķ˿�
-hbase.url=192.168.40.224:8084
#Hbase�ı���������ͨ������Ҫ����
hbase.table=tsg:report_result
-hbase.columefamily=response:result
-hbase.colume_job_id=detail:result_id
-hbase.colume_query_id=detail:query_id
-hbase.colume_query_duration_ms=detail:query_duration_ms
-hbase.colume_read_rows=detail:read_rows
-hbase.colume_memory_usage=detail:memory_usage
-hbase.colume_sql=detail:sql
-hbase.colume_exception=detail:exception
+
+hbase.zookeeper_quorum=192.168.40.224
+hbase.zookeeper_property_clientPort=2181
+hbase.zookeeper_znode_parent=/hbase
+hbase.client_retries_number=3
+
+hbase.rpc_timeout=100000
+hbase.connect_pool=10
+
ck.gateway_ip=192.168.40.224:9999
@@ -53,12 +53,12 @@ zookeeper.open=1
zookeeper.retryCount=5
zookeeper.elapsedTimeMs=1000
#zk��Ⱥ��ip
-zookeeper.connectString=192.168.40.193:2181
+zookeeper.connectString=192.168.40.224:2181
zookeeper.sessionTimeoutMs=5000
zookeeper.connectionTimeoutMs=5000
zookeeper.nameSpace=reportservice
#mariadb��url
-spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/reporttest?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
+spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/treport?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
#spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
#mariadb���û���
spring.datasource.username=root
@@ -71,7 +71,7 @@ spring.datasource.driver-class-name=org.mariadb.jdbc.Driver
#���ü��ͳ�����ص�filters��ȥ�����ؽ���SQL�޷�����ͳ�ƣ���wall�����ڷ���ǽ
spring.datasource.druid.filters=stat,wall,slf4j
-#���������
+#���������S
spring.datasource.druid.max-active=30
#��������
spring.datasource.druid.min-idle=1
diff --git a/docker/Dockerfile b/docker/Dockerfile
index dd44d1f..7502902 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -7,7 +7,7 @@ COPY config config
ARG JAR_FILE
COPY ${JAR_FILE} galaxy-report-service.jar
#dockercompose set JAVA_OPTS
-ENV JAVA_OPTS=" -Xms1024m -Xmx2048m "
+ENV JAVA_OPTS=" -Xms20480m -Xmx20480m "
ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
#ENV TZ=Asia/Almaty
#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
diff --git a/pom.xml b/pom.xml
index 3550b32..61e5737 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
</parent>
<groupId>com.mesa</groupId>
<artifactId>galaxy-report-service</artifactId>
- <version>3.1</version>
+ <version>3.2</version>
<name>galaxy-report-service</name>
<description>Demo project for Spring Boot</description>
@@ -141,6 +141,27 @@
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>2.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.1</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
@@ -165,7 +186,7 @@
<forceTags>true</forceTags>
<imageTags>
- <imageTag>3.1</imageTag>
+ <imageTag>3.2</imageTag>
</imageTags>
<!--远程docker构建,供dockerfile使用-->
<dockerHost>http://192.168.40.153:2375</dockerHost>
diff --git a/src/main/java/com/mesa/reportservice/bean/HttpResult.java b/src/main/java/com/mesa/reportservice/bean/HttpResult.java
index a19e5c8..85a6042 100644
--- a/src/main/java/com/mesa/reportservice/bean/HttpResult.java
+++ b/src/main/java/com/mesa/reportservice/bean/HttpResult.java
@@ -7,6 +7,7 @@ public class HttpResult {
private int code;
private String body;
+ private byte[] bodybyte;
public HttpResult(int code, String body) {
@@ -18,6 +19,14 @@ public class HttpResult {
}
+ public byte[] getBodybyte() {
+ return bodybyte;
+ }
+
+ public void setBodybyte(byte[] bodybyte) {
+ this.bodybyte = bodybyte;
+ }
+
public int getCode() {
return code;
}
diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
index 8e16147..98dc4b5 100644
--- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java
+++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java
@@ -39,15 +39,17 @@ public class JobEntity implements Cloneable {
private String query_duration_ms;
- private Integer resultRows;
+ private int resultRows;
private String memory_usage;
- private String result;
+ private byte[] result;
private int excute_status;
+ private String formatSql;
+
public Integer getResultId() {
return resultId;
@@ -185,13 +187,6 @@ public class JobEntity implements Cloneable {
this.query_duration_ms = query_duration_ms;
}
- public Integer getResultRows() {
- return resultRows;
- }
-
- public void setResultRows(Integer resultRows) {
- this.resultRows = resultRows;
- }
public String getMemory_usage() {
return memory_usage;
@@ -201,11 +196,11 @@ public class JobEntity implements Cloneable {
this.memory_usage = memory_usage;
}
- public String getResult() {
+ public byte[] getResult() {
return result;
}
- public void setResult(String result) {
+ public void setResult(byte[] result) {
this.result = result;
}
@@ -218,6 +213,21 @@ public class JobEntity implements Cloneable {
}
+ public int getResultRows() {
+ return resultRows;
+ }
+
+ public void setResultRows(int resultRows) {
+ this.resultRows = resultRows;
+ }
+
+ public String getFormatSql() {
+ return formatSql;
+ }
+
+ public void setFormatSql(String formatSql) {
+ this.formatSql = formatSql;
+ }
@Override
public Object clone() {
diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
index 250adfe..c6ba31a 100644
--- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
+++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java
@@ -65,7 +65,7 @@ public class ClickhouseConfig {
// String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow";
try {
- String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
+ String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
finishsql = url + sql;
} catch (UnsupportedEncodingException e) {
Logs.error(e.toString());
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
new file mode 100644
index 0000000..1fac959
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java
@@ -0,0 +1,106 @@
+package com.mesa.reportservice.configuration;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by Administrator on 2020/3/10.
+ */
+@Configuration
+@EnableConfigurationProperties(HbaseProperties.class)
+public class HbaseFactory {
+ private final HbaseProperties hbproperties;
+
+ @Autowired
+ public HbaseFactory(HbaseProperties hbproperties) {
+ this.hbproperties = hbproperties;
+ }
+
+ @Bean(name = "hbaseConfiguration")
+ public org.apache.hadoop.conf.Configuration getConfiguration() {
+
+ org.apache.hadoop.conf.Configuration conf = null;
+ if (conf == null) {
+ conf = HBaseConfiguration.create();
+ conf.set("hbase.zookeeper.quorum", hbproperties.getZookeeper_quorum());
+ conf.set("hbase.zookeeper.property.clientPort", hbproperties.getZookeeper_property_clientPort());
+ conf.set("hbase.zookeeper.znode.parent", hbproperties.getZookeeper_znode_parent());
+ // conf.set("hase.client.pause", client_pause);
+ conf.set("hbase.client.retries.number", hbproperties.getClient_retries_number());
+ conf.set("hbase.rpc.timeout", hbproperties.getRpc_timeout());
+ // conf.set("hbase.client.operation.timeout", client_operation_timeout);
+ // conf.set("hbase.client.scanner.timeout.period", client_scanner_timeout_period);
+ conf.set("hbase.client.keyvalue.maxsize", "1024000000");
+ //conf.set("hbase.client.write.buffer", "20971520");
+
+
+ }
+
+ return conf;
+ }
+
+ @Bean(name = "hbaseConnection")
+ public Connection getConnection(@Qualifier("hbaseConfiguration") org.apache.hadoop.conf.Configuration Conf) {
+ ExecutorService executor = null;
+ Connection con = null;
+ try {
+ executor = Executors.newFixedThreadPool(hbproperties.getConnect_pool());
+ con = ConnectionFactory.createConnection(Conf,executor);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return con;
+
+
+ }
+ /*@Bean(name = "hbaseTable")
+ public Table getTable(@Qualifier("hbaseConnection") Connection con) {
+
+ Table table=null;
+ try {
+
+ ExecutorService pool = Executors.newFixedThreadPool(10);
+ BufferedMutatorParams bmp = new BufferedMutatorParams();
+ ss.writeBufferSize();
+ ss.setWriteBufferPeriodicFlushTimerTickMs()
+ BufferedMutatorParams aaa = con.getBufferedMutator()
+ aaa.writeBufferSize(1000000);
+
+ BufferedMutator aaa=null;
+ aaa
+ table = con.getTable(TableName.valueOf("tsg:t1"),pool);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return table;
+
+
+ }*/
+
+ /* @Bean(name = "hbaseExceptionListener")
+ public BufferedMutator.ExceptionListener getLishtener(@Qualifier("hbaseConnection") Connection con) {
+
+ BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
+
+ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
+ for (int i = 0; i < e.getNumExceptions(); i++) {
+ System.out.print("Failed to sent put " + e.getRow(i) + ".");
+ }
+ }
+ };
+ return listener;
+
+
+ }*/
+
+}
diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
new file mode 100644
index 0000000..fa4d4a1
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java
@@ -0,0 +1,95 @@
+package com.mesa.reportservice.configuration;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Created by Administrator on 2020/3/10.
+ */
+
+@ConfigurationProperties(prefix = "hbase")
+public class HbaseProperties {
+
+ private String zookeeper_quorum;
+
+ private String zookeeper_property_clientPort;
+
+ private String zookeeper_znode_parent;
+
+ private String client_pause;
+
+ private String client_retries_number;
+
+ private String rpc_timeout;
+
+ private Integer connect_pool;
+
+ private String table;
+
+
+
+ public String getZookeeper_quorum() {
+ return zookeeper_quorum;
+ }
+
+ public void setZookeeper_quorum(String zookeeper_quorum) {
+ this.zookeeper_quorum = zookeeper_quorum;
+ }
+
+ public String getZookeeper_property_clientPort() {
+ return zookeeper_property_clientPort;
+ }
+
+ public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) {
+ this.zookeeper_property_clientPort = zookeeper_property_clientPort;
+ }
+
+ public String getZookeeper_znode_parent() {
+ return zookeeper_znode_parent;
+ }
+
+ public void setZookeeper_znode_parent(String zookeeper_znode_parent) {
+ this.zookeeper_znode_parent = zookeeper_znode_parent;
+ }
+
+ public String getClient_pause() {
+ return client_pause;
+ }
+
+ public void setClient_pause(String client_pause) {
+ this.client_pause = client_pause;
+ }
+
+ public String getClient_retries_number() {
+ return client_retries_number;
+ }
+
+ public void setClient_retries_number(String client_retries_number) {
+ this.client_retries_number = client_retries_number;
+ }
+
+ public String getRpc_timeout() {
+ return rpc_timeout;
+ }
+
+ public void setRpc_timeout(String rpc_timeout) {
+ this.rpc_timeout = rpc_timeout;
+ }
+
+
+ public Integer getConnect_pool() {
+ return connect_pool;
+ }
+
+ public void setConnect_pool(Integer connect_pool) {
+ this.connect_pool = connect_pool;
+ }
+
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
index 8f3c692..2cdf3ce 100644
--- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
+++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java
@@ -1,6 +1,7 @@
package com.mesa.reportservice.controller;
import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.GlobelConfig;
import com.mesa.reportservice.service.ExcuteProcessService;
import com.mesa.reportservice.service.MysqlService;
import com.mesa.reportservice.service.ZkService;
@@ -10,6 +11,9 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.Map;
+
+import static com.mesa.reportservice.configuration.GlobelConfig.*;
/**
* @author wk1
@@ -21,6 +25,7 @@ public class ScheduledResultController {
@Autowired
private MysqlService ms;
+
@Autowired
private ExcuteProcessService eps;
@Autowired
@@ -29,9 +34,11 @@ public class ScheduledResultController {
@Scheduled(cron = "${scan.result.scheduled.plan}")
public void getExcuteResult() {
if (zs.isMaster()) {
- List<JobEntity> joblist = ms.getJobForExcute();
- eps.getTaskProcess(joblist);
+ List<JobEntity> joblist = ms.getJobForExcute();
+ for(JobEntity jobEntity:joblist) {
+ eps.getTaskProcess(jobEntity);
+ }
}
}
diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
index bb0f3b5..54f64f6 100644
--- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
+++ b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java
@@ -9,5 +9,5 @@ import java.util.List;
*/
public interface ExcuteProcessService {
- void getTaskProcess(List<JobEntity> joblist);
+ void getTaskProcess(JobEntity job);
}
diff --git a/src/main/java/com/mesa/reportservice/service/HbaseService.java b/src/main/java/com/mesa/reportservice/service/HbaseService.java
index e144256..46db5ef 100644
--- a/src/main/java/com/mesa/reportservice/service/HbaseService.java
+++ b/src/main/java/com/mesa/reportservice/service/HbaseService.java
@@ -1,6 +1,8 @@
package com.mesa.reportservice.service;
import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.bean.JobEntity;
+import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
import java.util.Map;
@@ -9,14 +11,7 @@ import java.util.Map;
*/
public interface HbaseService {
- String doGet(String url) throws Exception;
-
- String doGet(String url, Map<String, Object> map) throws Exception;
-
-
- HttpResult doPost(String url, String json) throws Exception;
-
- HttpResult doPost(String url) throws Exception;
+ Boolean put(JobEntity jobEntity) throws Exception;
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
index 667e459..64afe53 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java
@@ -15,6 +15,9 @@ import org.springframework.stereotype.Service;
import java.util.Map;
+import static com.mesa.reportservice.util.GZIPUtils.compress;
+import static com.mesa.reportservice.util.GZIPUtils.uncompress;
+
/**
* Created by wk1 on 2019/5/15.
*/
@@ -89,79 +92,34 @@ public class ClickhouseServiceImpl implements ClickhouseService {
* @throws Exception
*/
@Override
- public HttpResult sendQueryForPost(String url) throws Exception {
+ public HttpResult sendQueryForPost(String url) throws Exception,OutOfMemoryError{
// 声明httpPost请求
HttpPost httpPost = new HttpPost(url);
+
// 加入配置信息
httpPost.setConfig(config);
- // httpPost.setHeader("Accept","application/json");
- // 判断map是否为空,不为空则进行遍历,封装from表单对象
-
- /* if (!json.equals("") && !json.isEmpty()) {
-
-
- StringEntity s = new StringEntity(json.toString());
- s.setContentEncoding("UTF-8");
- s.setContentType("application/json");//发送json数据需要设置contentType
- httpPost.setEntity(s);
- }*/
-
-
- /*if (map != null) {
- List<NameValuePair> list = new ArrayList<NameValuePair>();
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString()));
- }
- // 构造from表单对象
- UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8");
- // 把表单放到post里
- httpPost.setEntity(urlEncodedFormEntity);
- }*/
-
- // 发起请求
CloseableHttpResponse response = this.httpClient.execute(httpPost);
- HttpResult rs = new HttpResult(response.getStatusLine().getStatusCode(), EntityUtils.toString(
- response.getEntity(), "UTF-8"));
-
+ HttpResult rs = new HttpResult();
+ rs.setCode(response.getStatusLine().getStatusCode());
+ rs.setBodybyte(EntityUtils.toByteArray(response.getEntity()));
- //Thread.sleep(120);
+ /*byte[] bodybyte =snappyCompress(EntityUtils.toByteArray(response.getEntity()));
+ rs.setCode(response.getStatusLine().getStatusCode());
+ rs.setBodybyte(bodybyte);*/
return rs;
}
+
+
+
@Override
public HttpResult QuerySystemForPost(String url) throws Exception {
// 声明httpPost请求
HttpPost httpPost = new HttpPost(url);
// 加入配置信息
httpPost.setConfig(config);
- // httpPost.setHeader("Accept","application/json");
- // 判断map是否为空,不为空则进行遍历,封装from表单对象
-// httpPost.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 1000);
-
-
- /* if (!json.equals("") && !json.isEmpty()) {
-
-
- StringEntity s = new StringEntity(json.toString());
- s.setContentEncoding("UTF-8");
- s.setContentType("application/json");//发送json数据需要设置contentType
- httpPost.setEntity(s);
- }*/
-
-
- /*if (map != null) {
- List<NameValuePair> list = new ArrayList<NameValuePair>();
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString()));
- }
- // 构造from表单对象
- UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8");
-
- // 把表单放到post里
- httpPost.setEntity(urlEncodedFormEntity);
- }*/
// 发起请求
CloseableHttpResponse response = this.httpClient.execute(httpPost);
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
index 61eb861..359d02a 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java
@@ -5,17 +5,16 @@ import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.configuration.ClickhouseConfig;
import com.mesa.reportservice.configuration.GlobelConfig;
-import com.mesa.reportservice.configuration.HbaseConfig;
import com.mesa.reportservice.service.ClickhouseService;
import com.mesa.reportservice.service.ExcuteProcessService;
import com.mesa.reportservice.service.HbaseService;
import com.mesa.reportservice.service.MysqlService;
-import com.mesa.reportservice.util.*;
+import com.mesa.reportservice.util.Logs;
+import com.mesa.reportservice.util.StringUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.List;
import java.util.Map;
/**
@@ -34,44 +33,46 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
private HbaseService hs;
@Override
- public void getTaskProcess(List<JobEntity> joblist) {
+ public void getTaskProcess(JobEntity jobEntity) {
try {
- for (JobEntity jk : joblist) {
- String sql = jk.getQuerySql().trim();
- String queryid = DigestUtils.md5Hex(jk.getResultId() + sql);
- //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行
- if (!GlobelConfig.mapresult.containsKey(queryid)) {
- String killurl = ClickhouseConfig.getKillUrl(queryid);
- try {
- Logs.info("startkill++++++"+queryid);
+ String sql = jobEntity.getQuerySql().trim();
+ String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql);
+ jobEntity.setQuery_id(queryid);
+ //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行
+ if (!GlobelConfig.mapresult.containsKey(queryid)) {
+ String killurl = ClickhouseConfig.getKillUrl(queryid);
+ try {
+ Logs.info("startkill++++++" + queryid);
- cs.QuerySystemForPost(killurl);
- Logs.info("endkill======="+queryid);
- } catch (Exception e) {
- Logs.error(e.toString());
- }
- updateErrorMessage(jk.getResultId(), 0, "Re Execution");
+ cs.QuerySystemForPost(killurl);
+ Logs.info("endkill=======" + queryid);
+
+ } catch (Exception e) {
+ Logs.error(e.toString());
}
- else {
- JobEntity je = (JobEntity) GlobelConfig.mapresult.get(queryid).clone();
- Logs.info("startget++++++" + queryid);
-
- switch (je.getStatus()) {
- case 1:
- je = getProcessMessage(je);
-
- if (je.getExcuteRow() != 0 || je.getExcuteTime() != 0) {
- ms.updateProcesses(je);
- }
- break;
- case 2:
- try {
- je = getQueryLogMessage(je);
- HttpResult hr = saveToHbase(je);
+ updateErrorMessage(jobEntity.getResultId(), 0, "Re Execution");
+
+ } else {
- if (hr.getCode() == 200) {
+ JobEntity je =GlobelConfig.mapresult.get(queryid);
+ Logs.info("startget++++++" + je.getQuery_id());
+ if (je.getStatus() == 1) {
+ je = getProcessMessage(je);
+ if (je.getExcuteRow() != 0 || je.getExcuteTime() != 0) {
+ ms.updateProcesses(je);
+ }
+ } else {
+
+ try {
+ switch (je.getExcute_status()) {
+
+ case 2:
+
+ je = getQueryLogMessage(je);
+ Boolean isok = saveToHbase(je);
+ if (isok) {
je.setExcuteDetail("SUCCESS");
je.setExcuteTime(0);
je.setExcuteProcess(100);
@@ -86,28 +87,34 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
Logs.info("success save to hbase queryid=" + je.getQuery_id() + " sql=" + je.getQuerySql());
} else {
updateErrorMessage(je.getResultId(), 5, "Write Data Error");
- Logs.error("save hbase error" + hr.getBody());
+ Logs.error("save hbase error");
}
- } catch (Exception e) {
- updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error");
- Logs.error("save hbase error queryid=" + je.getResultId() + e.toString());
- } finally {
- GlobelConfig.mapresult.remove(queryid);
- }
- break;
- default:
- try {
+ break;
+
+ case 6:
+ updateErrorMessage(je.getResultId(), 6, "unknow error");
+ Logs.error("unknow error resultid ="+je.getResultId()+" excutesql=" + je.getQuerySql() +" formatsql="+je.getFormatSql());
+ break;
+ default:
je = getQueryLogMessage(je);
updateErrorMessage(je.getResultId(), je.getStatus(), je.getExcuteDetail());
- } catch (Exception e) {
- Logs.error(e.toString());
- } finally {
- GlobelConfig.mapresult.remove(queryid);
- }
+ Logs.error("excute error resultid ="+je.getResultId()+" excutesql=" + je.getQuerySql() +" formatsql="+je.getFormatSql());
+
+
+ }
+ } catch (Exception e) {
+ // updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error");
+ Logs.error("save database error queryid=" + je.getResultId() + e.toString());
+
+ } finally {
+ GlobelConfig.mapresult.remove(je.getQuery_id());
}
+
}
+
+
}
} catch (Exception e) {
Logs.error(e.toString());
@@ -141,7 +148,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id());
HttpResult hr = null;
try {
- hr = cs.sendQueryForPost(queryProccess);
+ hr = cs.QuerySystemForPost(queryProccess);
String rs = hr.getBody().trim();
if (!rs.isEmpty() && !rs.equals("")) {
//JobEntity job = new JobEntity();
@@ -171,17 +178,14 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
/**
* 结果存入hbase
*/
- public HttpResult saveToHbase(JobEntity entity) {
+ public Boolean saveToHbase(JobEntity entity) {
int k = 3;
- HttpResult hr = new HttpResult();
-
+ Boolean isok = false;
do {
try {
- String hbasejson = HbaseConfig.getHbaseJson(entity.getQuery_id(), entity.getResult(), entity);
- String hbaseurl = HbaseConfig.getHbasePostUrl();
k--;
- hr = hs.doPost(hbaseurl, hbasejson);
+ isok = hs.put(entity);
break;
} catch (Exception e) {
Logs.error("写入hbase报错重试次数" + (3 - k) + e.toString());
@@ -189,7 +193,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
}
}
while (k >= 0);
- return hr;
+ return isok;
}
/**
@@ -205,7 +209,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
try {
//ck默认最慢同步时间7500ms,循环三次
Thread.sleep(3000);
- num = cs.sendQueryForPost(finishurl);
+ num = cs.QuerySystemForPost(finishurl);
if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) {
Map mapresult = JSON.parseObject(num.getBody());
@@ -218,10 +222,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
job.setQuery_duration_ms(mapresult.get("query_duration_ms").toString());
job.setMemory_usage(mapresult.get("memory_usage").toString());
job.setExcuteRow(Long.parseLong(mapresult.get("read_rows").toString()));
+ job.setResultRows(Integer.valueOf(mapresult.get("result_rows").toString()));
break;
case 3:
job.setStatus(type);
job.setExcuteDetail("SQL Syntax Error:" + mapresult.get("exception").toString());
+
// job.setQuerySql(mapresult.get("query").toString());
break;
case 4:
@@ -234,14 +240,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService {
}
} else {
job.setStatus(6);
- if(job.getExcuteDetail().equals("")){
- job.setExcuteDetail("Other ERROR ");
- }
+ job.setExcuteDetail("Other ERROR ");
}
} catch (Exception e) {
job.setStatus(6);
- job.setExcuteDetail("Other ERROR: "+e.toString());
+ job.setExcuteDetail("Other ERROR: " + e.toString());
Logs.error("获取querylogmessage失败");
}
j--;
diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
index f637f1f..7124c91 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java
@@ -44,9 +44,9 @@ public class ExcuteserviceImpl implements ExcuteService {
String sql = job.getQuerySql().trim();
String queryid = DigestUtils.md5Hex(job.getResultId() + sql);
sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().toString().trim() + "')");
- //sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().toString().trim() + "')");
- //sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().toString().trim() + "')");
-
+ // sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().toString().trim() + "')");
+ // sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().toString().trim() + "')");
+ job.setFormatSql(sql);
try {
String formateUrl = getFormatUrl(sql);
sql = fs.doFormat(formateUrl);
@@ -57,96 +57,79 @@ public class ExcuteserviceImpl implements ExcuteService {
if (sql.equals("ok") || sql.equals("")) {
job.setStatus(3);
+ job.setExcuteTime(0);
job.setExcuteDetail("SQL Syntax Error parsing SQL Error");
ms.updateStatue(job);
+
} else {
job.setQuery_id(queryid);
job.setQuerySql(sql);
- job.setQuery_id(queryid);
- job.setQuerySql(sql);
job.setStatus(1);
- job.setResult("");
+ job.setExcute_status(1);
job.setExcuteDetail("EXECUTING");
- JobEntity jt = (JobEntity) job.clone();
- GlobelConfig.mapresult.put(queryid, jt);
+ //JobEntity jt = (JobEntity) job.clone();
+ GlobelConfig.mapresult.put(queryid, job);
ms.updateStatue(job);
Logs.info("execute queryid=" + queryid + " sql=" + sql + "mapresult size=" + GlobelConfig.mapresult.size());
String joburl = getJobUrl(job.getQuerySql(), job.getQuery_id());
String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id());
HttpResult hr = new HttpResult();
- Map<String, Object> resultmap = new HashMap<String, Object>();
int k = 3;
do {
try {
hr = cs.sendQueryForPost(joburl);
Logs.info("httpcode" + hr.getCode());
- if (hr.getCode() != 200 && hr.getBody().toLowerCase().contains("timeout")) {
- k--;
- if(k==0) {
- job.setStatus(4);
- Logs.error("Socket timeout Error retry time 3");
- }
- else{
- Logs.info("Socket timeout warn retry time " + (3 - k));
- }
+ if (hr.getCode() != 200 ) {
+
+ k=0;
+ job.setExcute_status(4);
+ Logs.error("excute sql Error ");
+
} else {
- resultmap = JSON.parseObject(hr.getBody());
+
k=0;
+ job.setResult(hr.getBodybyte());
+ job.setExcute_status(2);
+ Logs.info("success queryid=" + queryid + " sql=" + sql);
+
}
} catch (SocketTimeoutException e) {
k--;
job.setExcuteDetail(e.toString());
if(k==0) {
- job.setStatus(4);
- Logs.error("Socket Error " + e.toString() + "retry time 3");
+ job.setExcute_status(4);
+ job.setExcuteDetail("SQL Execution Error excute query time out");
+ Logs.info("timeout queryid=" + queryid + " sql=" + sql);
}
else{
Logs.info("Socket warn " + e.toString() + "retry time " + (3 - k));
}
+
+
} catch (Exception e) {
- // job.setStatus(6);
+ job.setExcute_status(6);
job.setExcuteDetail(e.toString());
Logs.error("Unknow Error" + e.toString());
k=0;
}
+ catch (OutOfMemoryError e){
+
+ job.setExcute_status(6);
+ job.setExcuteDetail("result too large");
+ Logs.error("outofmemery Error" + e.toString());
+ k=0;
+ }
try {
- cs.sendQueryForPost(killurl);
+ cs.QuerySystemForPost(killurl);
} catch (Exception e) {
Logs.error("Kill Query Error" + e.toString());
}
}
-
-
while (k > 0);
-
- job.setResult(hr.getBody());
- if (hr.getCode() == 200 && resultmap.containsKey("rows")) {
-
- int resultrows= Integer.parseInt(resultmap.get("rows").toString());
- job.setResultRows(resultrows);
- job.setResult(hr.getBody());
- job.setExcuteDetail("Query SUCCESS");
- job.setExcuteTime(0);
- job.setExcuteProcess(100);
- job.setStatus(2);
- Logs.info("success queryid=" + queryid + " sql=" + sql);
- }
- else if(job.getStatus()==4){
- job.setExcuteDetail("SQL Execution Error excute query time out");
- Logs.info("timeout queryid=" + queryid + " sql=" + sql);
- }
-
- else {
- job.setResult("");
- job.setStatus(6);
- //job.setExcuteDetail(hr.getBody());
- //Logs.error("Unknow Error Query " + hr.getBody());
- Logs.info("error queryid=" + queryid + " sql=" + sql + "error"+job.getExcuteDetail());
-
- }
- GlobelConfig.mapresult.put(job.getQuery_id(), job);
+ job.setStatus(2);
+ // GlobelConfig.mapresult.put(job.getQuery_id(), job);
}
diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
index fe7a0a7..11efadb 100644
--- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
+++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java
@@ -1,7 +1,14 @@
package com.mesa.reportservice.service.impl;
import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.bean.JobEntity;
+import com.mesa.reportservice.configuration.HbaseProperties;
import com.mesa.reportservice.service.HbaseService;
+import com.mesa.reportservice.util.Logs;
+import com.mesa.reportservice.util.ziputil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -13,6 +20,7 @@ import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.IOException;
import java.util.Map;
/**
@@ -23,119 +31,39 @@ import java.util.Map;
public class HbaseServiceImpl implements HbaseService {
@Autowired
- private CloseableHttpClient httpClient;
+ private Connection hbaseConnection;
@Autowired
- private RequestConfig config;
+ private HbaseProperties hbproperties;
-
- /**
- * 不带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null
- *
- * @param url
- * @return
- * @throws Exception
- */
@Override
- public String doGet(String url) throws Exception {
- // 声明 http get 请求
- HttpGet httpGet = new HttpGet(url);
- httpGet.setHeader("Accept", "application/json");
- // 装载配置信息
- httpGet.setConfig(config);
-
- // 发起请求
- CloseableHttpResponse response = this.httpClient.execute(httpGet);
-
- // 判断状态码是否为200
- if (response.getStatusLine().getStatusCode() == 200) {
- // 返回响应体的内容
-
- return EntityUtils.toString(response.getEntity(), "UTF-8");
- }
+ public Boolean put(JobEntity jobEntity) throws Exception {
- return null;
- }
+ Boolean status = true;
+ try(Table table =hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()))) {
+ Put put = new Put(Bytes.toBytes(jobEntity.getQuery_id()));
+ put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), jobEntity.getResult());
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("result_id"), Bytes.toBytes(jobEntity.getResultId()));
- /**
- * 带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null
- *
- * @param url
- * @return
- * @throws Exception
- */
- @Override
- public String doGet(String url, Map<String, Object> map) throws Exception {
- URIBuilder uriBuilder = new URIBuilder(url);
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql()));
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql()));
- if (map != null) {
- // 遍历map,拼接请求参数
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- uriBuilder.setParameter(entry.getKey(), entry.getValue().toString());
+ if(jobEntity.getExcuteRow()!=null){
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getExcuteRow()));
}
- }
-
- // 调用不带参数的get请求
- return this.doGet(uriBuilder.build().toString());
-
- }
-
- /**
- * 带参数的post请求
- *
- * @param url
- * @param json
- * @return
- * @throws Exception
- */
- @Override
- public HttpResult doPost(String url, String json) throws Exception {
- // 声明httpPost请求
- HttpPost httpPost = new HttpPost(url);
- // 加入配置信息
- httpPost.setConfig(config);
- // httpPost.setHeader("Accept","application/json");
- // 判断map是否为空,不为空则进行遍历,封装from表单对象
- if (!json.equals("") && !json.isEmpty()) {
-
-
- StringEntity s = new StringEntity(json.toString());
- s.setContentEncoding("UTF-8");
- s.setContentType("application/json");//发送json数据需要设置contentType
- httpPost.setEntity(s);
- }
- /*if (map != null) {
- List<NameValuePair> list = new ArrayList<NameValuePair>();
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString()));
+ if(jobEntity.getQuery_duration_ms()!=null){
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms()));
}
- // 构造from表单对象
- UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8");
-
- // 把表单放到post里
- httpPost.setEntity(urlEncodedFormEntity);
- }*/
-
- // 发起请求
- CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
- HttpResult rs = new HttpResult(response.getStatusLine().getStatusCode(), EntityUtils.toString(
- response.getEntity(), "UTF-8"));
-
-
- //Thread.sleep(120);
- return rs;
+ if(jobEntity.getMemory_usage()!=null){
+ put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemory_usage()));
+ }
+ table.put(put);
+ }catch(Exception e) {
+ status =false;
+ Logs.error(e.toString());
+ }
+ return status;
}
- /**
- * 不带参数post请求
- *
- * @param url
- * @return
- * @throws Exception
- */
- public HttpResult doPost(String url) throws Exception {
- return this.doPost(url, "");
- }
}
diff --git a/src/main/java/com/mesa/reportservice/util/GZIPUtils.java b/src/main/java/com/mesa/reportservice/util/GZIPUtils.java
new file mode 100644
index 0000000..5185351
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/GZIPUtils.java
@@ -0,0 +1,85 @@
+package com.mesa.reportservice.util;
+
+/**
+ * Created by wk1 on 2020/5/14.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GZIPUtils {
+ public static final String GZIP_ENCODE_UTF_8 = "UTF-8";
+ public static final String GZIP_ENCODE_ISO_8859_1 = "ISO-8859-1";
+
+
+ public static byte[] compress(byte[] str) {
+ if (str == null || str.length == 0) {
+ return null;
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ GZIPOutputStream gzip;
+ try {
+ gzip = new GZIPOutputStream(out);
+ gzip.write(str);
+ gzip.close();
+ } catch ( Exception e) {
+ e.printStackTrace();
+ }
+ return out.toByteArray();
+ }
+
+
+
+ public static byte[] uncompress(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ try {
+ GZIPInputStream ungzip = new GZIPInputStream(in);
+ byte[] buffer = new byte[256];
+ int n;
+ while ((n = ungzip.read(buffer)) >= 0) {
+ out.write(buffer, 0, n);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return out.toByteArray();
+ }
+
+ public static String uncompressToString(byte[] bytes, String encoding) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ try {
+ GZIPInputStream ungzip = new GZIPInputStream(in);
+ byte[] buffer = new byte[256];
+ int n;
+ while ((n = ungzip.read(buffer)) >= 0) {
+ out.write(buffer, 0, n);
+ }
+ return out.toString(encoding);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static String uncompressToString(byte[] bytes) {
+ return uncompressToString(bytes, GZIP_ENCODE_UTF_8);
+ }
+
+ public static void main(String[] args) throws IOException {
+ String s = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+ System.out.println("字符串长度:"+s.length());
+ //System.out.println("压缩后::"+compress(s).length);
+ // System.out.println("解压后:"+uncompress(compress(s)).length);
+ // System.out.println("解压字符串后::"+uncompressToString(compress(s)).length());
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/util/ziputil.java b/src/main/java/com/mesa/reportservice/util/ziputil.java
new file mode 100644
index 0000000..f530f45
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/ziputil.java
@@ -0,0 +1,96 @@
+package com.mesa.reportservice.util;
+
+
+import net.jpountz.lz4.*;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
+import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.xerial.snappy.Snappy;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * Created by wk1 on 2020/5/15.
+ */
+public class ziputil {
+
+
+ public static byte[] commonGzipCompress(byte[] data) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+ GzipCompressorOutputStream gcos = new GzipCompressorOutputStream(baos);
+ gcos.write(data);
+ gcos.close();
+ return baos.toByteArray();
+ }
+
+ public static byte[] commonGzipDecompress(byte[] data) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+ GzipCompressorInputStream gcis = new GzipCompressorInputStream(new ByteArrayInputStream(data));
+ int count;
+ byte[] buffer = new byte[8192];
+ while ((count = gcis.read(buffer)) != -1) {
+ baos.write(buffer, 0, count);
+ }
+ gcis.close();
+ return baos.toByteArray();
+ }
+
+
+
+ public static byte[] snappyCompress(byte[] data) throws IOException {
+ return Snappy.compress(data);
+ }
+
+ public static byte[] snappyDecompress(byte[] data) throws IOException {
+ return Snappy.uncompress(data);
+ }
+
+
+ public static byte[] commonXzCompress(byte[] data) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(8192);
+ XZCompressorOutputStream pcos = new XZCompressorOutputStream(baos);
+ pcos.write(data);
+ pcos.close();
+ return baos.toByteArray();
+ }
+
+ public static byte[] commonXzDecompress(byte[] data) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(8192);
+ XZCompressorInputStream xzis = new XZCompressorInputStream(new ByteArrayInputStream(data));
+ int count;
+ byte[] buffer = new byte[8192];
+ while ((count = xzis.read(buffer)) != -1) {
+ baos.write(buffer, 0, count);
+ }
+ xzis.close();
+ return baos.toByteArray();
+ }
+
+
+ public static byte[] lz4Compress(byte[] data) throws IOException {
+ LZ4Factory factory = LZ4Factory.fastestInstance();
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ LZ4Compressor compressor = factory.fastCompressor();
+ LZ4BlockOutputStream compressedOutput = new LZ4BlockOutputStream(byteOutput, 8192, compressor);
+ compressedOutput.write(data);
+ compressedOutput.close();
+ return byteOutput.toByteArray();
+ }
+
+ public static byte[] lz4Decompress(byte[] data) throws IOException {
+ LZ4Factory factory = LZ4Factory.fastestInstance();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(8192);
+ LZ4FastDecompressor decompresser = factory.fastDecompressor();
+ LZ4BlockInputStream lzis = new LZ4BlockInputStream(new ByteArrayInputStream(data), decompresser);
+ int count;
+ byte[] buffer = new byte[8192];
+ while ((count = lzis.read(buffer)) != -1) {
+ baos.write(buffer, 0, count);
+ }
+ lzis.close();
+ return baos.toByteArray();
+ }
+}
diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties
index 2ffca42..954d3ef 100644
--- a/src/main/resources/config/application.properties
+++ b/src/main/resources/config/application.properties
@@ -6,43 +6,44 @@ scan.task.scheduled.plan=0/15 * * * * ?
#ͬʱ��ִ�����߳���
globle.job_thread=2
#Hbasehttp�Ķ˿�
-hbase.url=192.168.40.224:8084
#Hbase�ı���������ͨ������Ҫ����
hbase.table=tsg:report_result
-hbase.columefamily=response:result
-hbase.colume_job_id=detail:result_id
-hbase.colume_query_id=detail:query_id
-hbase.colume_query_duration_ms=detail:query_duration_ms
-hbase.colume_read_rows=detail:read_rows
-hbase.colume_memory_usage=detail:memory_usage
-hbase.colume_sql=detail:sql
-hbase.colume_exception=detail:exception
-#��sql���������ص�ip
-ck.gateway_ip=192.168.41.63:9999
+
+hbase.zookeeper_quorum=192.168.40.224
+hbase.zookeeper_property_clientPort=2181
+hbase.zookeeper_znode_parent=/hbase
+hbase.client_retries_number=3
+
+hbase.rpc_timeout=100000
+hbase.connect_pool=10
+
+
+ck.gateway_ip=192.168.40.224:9999
+
#clickhouse�ṩreport�����ip
-ck.task_ip=192.168.40.194:8123
+ck.task_ip=192.168.40.224:8123
#clickhouse�����ݿ���
-ck.task_database=tsg_galaxy
+ck.task_database=tsg_galaxy_v3
#clickhouse����������û�
-ck.task_user=default
+ck.task_user=tsg_report
#clickhouse�������������
-ck.task_user_password=111111
+ck.task_user_password=ceiec2019
#clickhouse��־����ipͨ��=ck.task_ip
-ck.log_ip=192.168.40.194:8123
+ck.log_ip=192.168.40.224:8123
#clickhouse��־������û�
ck.log_user=default
#clickhouse��־�û�����
-ck.log_user_password=111111
+ck.log_user_password=ceiec2019
#���������
http.maxTotal=300
#������
http.defaultMaxPerRoute=100
#�������ӵ��ʱ��
http.connectTimeout=10000
-#�����ӳ��л�ȡ�����ӵ��ʱ��
+#�����ӳ��л�ȡ�����ӵ��ʱ��:
http.connectionRequestTimeout=10000
#���ݴ�����ʱ��
-http.socketTimeout=21500000
+http.socketTimeout=21600000
#�ύ����ǰ���������Ƿ����
http.staleConnectionCheckEnabled=true
@@ -52,17 +53,17 @@ zookeeper.open=1
zookeeper.retryCount=5
zookeeper.elapsedTimeMs=1000
#zk��Ⱥ��ip
-zookeeper.connectString=192.168.40.193:2181
+zookeeper.connectString=192.168.40.224:2181
zookeeper.sessionTimeoutMs=5000
zookeeper.connectionTimeoutMs=5000
zookeeper.nameSpace=reportservice
#mariadb��url
-spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/reporttest?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
+spring.datasource.url=jdbc:mariadb://192.168.40.131:3306/tsg-bifang?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
#spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false
#mariadb���û���
spring.datasource.username=root
#mariadb������
-spring.datasource.password=bfzdjizhi
+spring.datasource.password=111111
spring.datasource.name=druidDataSource
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
diff --git a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java
new file mode 100644
index 0000000..61e9172
--- /dev/null
+++ b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java
@@ -0,0 +1,47 @@
+package com.mesa.reportservice;
+
+
+import com.mesa.reportservice.bean.HttpResult;
+import com.mesa.reportservice.util.ziputil;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.TreeMap;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ReportserviceApplicationTests {
+
+
+
+
+ @Autowired
+ private CloseableHttpClient httpClient;
+
+ @Autowired
+ private RequestConfig config;
+
+
+
+
+ @Test
+ public void contextLoads() throws IOException {
+
+
+
+ }
+
+
+}