diff options
| author | wangkuan <[email protected]> | 2020-05-28 15:24:54 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2020-05-28 15:24:54 +0800 |
| commit | 0b2953adcdcc326fc2055c4fc863e4e180c19741 (patch) | |
| tree | 40254a0dd212e58ba9a8216b6b5b9076dd7e188e | |
| parent | 05346f8ec73fe2efb89f88382dc3cc72b9ded9c0 (diff) | |
解决大文件不能存储的问题
19 files changed, 680 insertions, 335 deletions
diff --git a/config/application.properties b/config/application.properties index 0e4d13b..b21e05e 100644 --- a/config/application.properties +++ b/config/application.properties @@ -4,19 +4,19 @@ scan.result.scheduled.plan=0/10 * * * * ? #��ʱɨ��mysql���ݿ�report_result��ʱ��15s scan.task.scheduled.plan=0/15 * * * * ? #ͬʱ��ִ�����߳��� -globle.job_thread=1 +globle.job_thread=2 #Hbasehttp�Ķ˿� -hbase.url=192.168.40.224:8084 #Hbase�ı���������ͨ������Ҫ���� hbase.table=tsg:report_result -hbase.columefamily=response:result -hbase.colume_job_id=detail:result_id -hbase.colume_query_id=detail:query_id -hbase.colume_query_duration_ms=detail:query_duration_ms -hbase.colume_read_rows=detail:read_rows -hbase.colume_memory_usage=detail:memory_usage -hbase.colume_sql=detail:sql -hbase.colume_exception=detail:exception + +hbase.zookeeper_quorum=192.168.40.224 +hbase.zookeeper_property_clientPort=2181 +hbase.zookeeper_znode_parent=/hbase +hbase.client_retries_number=3 + +hbase.rpc_timeout=100000 +hbase.connect_pool=10 + ck.gateway_ip=192.168.40.224:9999 @@ -53,12 +53,12 @@ zookeeper.open=1 zookeeper.retryCount=5 zookeeper.elapsedTimeMs=1000 #zk��Ⱥ��ip -zookeeper.connectString=192.168.40.193:2181 +zookeeper.connectString=192.168.40.224:2181 zookeeper.sessionTimeoutMs=5000 zookeeper.connectionTimeoutMs=5000 zookeeper.nameSpace=reportservice #mariadb��url -spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/reporttest?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false +spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/treport?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false #spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false #mariadb���û��� spring.datasource.username=root @@ -71,7 +71,7 @@ spring.datasource.driver-class-name=org.mariadb.jdbc.Driver #���ü��ͳ�����ص�filters��ȥ�����ؽ���SQL������ͳ�ƣ���wall�����ڷ���ǽ spring.datasource.druid.filters=stat,wall,slf4j -#��������� +#���������S spring.datasource.druid.max-active=30 #��С������ spring.datasource.druid.min-idle=1 diff --git a/docker/Dockerfile b/docker/Dockerfile index dd44d1f..7502902 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -7,7 +7,7 @@ COPY config config ARG JAR_FILE COPY ${JAR_FILE} galaxy-report-service.jar #dockercompose set JAVA_OPTS -ENV JAVA_OPTS=" -Xms1024m -Xmx2048m " +ENV JAVA_OPTS=" -Xms20480m -Xmx20480m " ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8 #ENV TZ=Asia/Almaty #RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone @@ -10,7 +10,7 @@ </parent> <groupId>com.mesa</groupId> <artifactId>galaxy-report-service</artifactId> - <version>3.1</version> + <version>3.2</version> <name>galaxy-report-service</name> <description>Demo project for Spring Boot</description> @@ -141,6 +141,27 @@ <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>2.2.3</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.1</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> @@ -165,7 +186,7 @@ <forceTags>true</forceTags> <imageTags> - <imageTag>3.1</imageTag> + <imageTag>3.2</imageTag> </imageTags> <!--远程docker构建,供dockerfile使用--> <dockerHost>http://192.168.40.153:2375</dockerHost> diff --git a/src/main/java/com/mesa/reportservice/bean/HttpResult.java b/src/main/java/com/mesa/reportservice/bean/HttpResult.java index a19e5c8..85a6042 100644 --- a/src/main/java/com/mesa/reportservice/bean/HttpResult.java +++ b/src/main/java/com/mesa/reportservice/bean/HttpResult.java @@ -7,6 +7,7 @@ public class HttpResult { private int code; private String body; + private byte[] bodybyte; public HttpResult(int code, String body) { @@ -18,6 +19,14 @@ public class HttpResult { } + public byte[] getBodybyte() { + return bodybyte; + } + + public void setBodybyte(byte[] bodybyte) { + this.bodybyte = bodybyte; + } + public int getCode() { return code; } diff --git a/src/main/java/com/mesa/reportservice/bean/JobEntity.java b/src/main/java/com/mesa/reportservice/bean/JobEntity.java index 8e16147..98dc4b5 100644 --- a/src/main/java/com/mesa/reportservice/bean/JobEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/JobEntity.java @@ -39,15 +39,17 @@ public class JobEntity implements Cloneable { private String query_duration_ms; - private Integer resultRows; + private int resultRows; private String memory_usage; - private String result; + private byte[] result; private int excute_status; + private String formatSql; + public Integer getResultId() { return resultId; @@ -185,13 +187,6 @@ public class JobEntity implements Cloneable { this.query_duration_ms = query_duration_ms; } - public Integer getResultRows() { - return resultRows; - } - - public void setResultRows(Integer resultRows) { - this.resultRows = resultRows; - } public String getMemory_usage() { return memory_usage; @@ -201,11 +196,11 @@ public class JobEntity implements Cloneable { this.memory_usage = memory_usage; } - public String getResult() { + public byte[] getResult() { return result; } - public void setResult(String result) { + public void setResult(byte[] result) { this.result = result; } @@ -218,6 +213,21 @@ public class JobEntity implements Cloneable { } + public int getResultRows() { + return resultRows; + } + + public void setResultRows(int resultRows) { + this.resultRows = resultRows; + } + + public String getFormatSql() { + return formatSql; + } + + public void setFormatSql(String formatSql) { + this.formatSql = formatSql; + } @Override public Object clone() { diff --git a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java index 250adfe..c6ba31a 100644 --- a/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java +++ b/src/main/java/com/mesa/reportservice/configuration/ClickhouseConfig.java @@ -65,7 +65,7 @@ public class ClickhouseConfig { // String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow"; try { - String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8"); + String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time,result_rows,result_bytes from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8"); finishsql = url + sql; } catch (UnsupportedEncodingException e) { Logs.error(e.toString()); diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java new file mode 100644 index 0000000..1fac959 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/HbaseFactory.java @@ -0,0 +1,106 @@ +package com.mesa.reportservice.configuration; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by Administrator on 2020/3/10. + */ +@Configuration +@EnableConfigurationProperties(HbaseProperties.class) +public class HbaseFactory { + private final HbaseProperties hbproperties; + + @Autowired + public HbaseFactory(HbaseProperties hbproperties) { + this.hbproperties = hbproperties; + } + + @Bean(name = "hbaseConfiguration") + public org.apache.hadoop.conf.Configuration getConfiguration() { + + org.apache.hadoop.conf.Configuration conf = null; + if (conf == null) { + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", hbproperties.getZookeeper_quorum()); + conf.set("hbase.zookeeper.property.clientPort", hbproperties.getZookeeper_property_clientPort()); + conf.set("hbase.zookeeper.znode.parent", hbproperties.getZookeeper_znode_parent()); + // conf.set("hase.client.pause", client_pause); + conf.set("hbase.client.retries.number", hbproperties.getClient_retries_number()); + conf.set("hbase.rpc.timeout", hbproperties.getRpc_timeout()); + // conf.set("hbase.client.operation.timeout", client_operation_timeout); + // conf.set("hbase.client.scanner.timeout.period", client_scanner_timeout_period); + conf.set("hbase.client.keyvalue.maxsize", "1024000000"); + //conf.set("hbase.client.write.buffer", "20971520"); + + + } + + return conf; + } + + @Bean(name = "hbaseConnection") + public Connection getConnection(@Qualifier("hbaseConfiguration") org.apache.hadoop.conf.Configuration Conf) { + ExecutorService executor = null; + Connection con = null; + try { + executor = Executors.newFixedThreadPool(hbproperties.getConnect_pool()); + con = ConnectionFactory.createConnection(Conf,executor); + + } catch (IOException e) { + e.printStackTrace(); + } + return con; + + + } + /*@Bean(name = "hbaseTable") + public Table getTable(@Qualifier("hbaseConnection") Connection con) { + + Table table=null; + try { + + ExecutorService pool = Executors.newFixedThreadPool(10); + BufferedMutatorParams bmp = new BufferedMutatorParams(); + ss.writeBufferSize(); + ss.setWriteBufferPeriodicFlushTimerTickMs() + BufferedMutatorParams aaa = con.getBufferedMutator() + aaa.writeBufferSize(1000000); + + BufferedMutator aaa=null; + aaa + table = con.getTable(TableName.valueOf("tsg:t1"),pool); + } catch (IOException e) { + e.printStackTrace(); + } + return table; + + + }*/ + + /* @Bean(name = "hbaseExceptionListener") + public BufferedMutator.ExceptionListener getLishtener(@Qualifier("hbaseConnection") Connection con) { + + BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { + + public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { + for (int i = 0; i < e.getNumExceptions(); i++) { + System.out.print("Failed to sent put " + e.getRow(i) + "."); + } + } + }; + return listener; + + + }*/ + +} diff --git a/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java new file mode 100644 index 0000000..fa4d4a1 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/configuration/HbaseProperties.java @@ -0,0 +1,95 @@ +package com.mesa.reportservice.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Created by Administrator on 2020/3/10. + */ + +@ConfigurationProperties(prefix = "hbase") +public class HbaseProperties { + + private String zookeeper_quorum; + + private String zookeeper_property_clientPort; + + private String zookeeper_znode_parent; + + private String client_pause; + + private String client_retries_number; + + private String rpc_timeout; + + private Integer connect_pool; + + private String table; + + + + public String getZookeeper_quorum() { + return zookeeper_quorum; + } + + public void setZookeeper_quorum(String zookeeper_quorum) { + this.zookeeper_quorum = zookeeper_quorum; + } + + public String getZookeeper_property_clientPort() { + return zookeeper_property_clientPort; + } + + public void setZookeeper_property_clientPort(String zookeeper_property_clientPort) { + this.zookeeper_property_clientPort = zookeeper_property_clientPort; + } + + public String getZookeeper_znode_parent() { + return zookeeper_znode_parent; + } + + public void setZookeeper_znode_parent(String zookeeper_znode_parent) { + this.zookeeper_znode_parent = zookeeper_znode_parent; + } + + public String getClient_pause() { + return client_pause; + } + + public void setClient_pause(String client_pause) { + this.client_pause = client_pause; + } + + public String getClient_retries_number() { + return client_retries_number; + } + + public void setClient_retries_number(String client_retries_number) { + this.client_retries_number = client_retries_number; + } + + public String getRpc_timeout() { + return rpc_timeout; + } + + public void setRpc_timeout(String rpc_timeout) { + this.rpc_timeout = rpc_timeout; + } + + + public Integer getConnect_pool() { + return connect_pool; + } + + public void setConnect_pool(Integer connect_pool) { + this.connect_pool = connect_pool; + } + + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index 8f3c692..2cdf3ce 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -1,6 +1,7 @@ package com.mesa.reportservice.controller; import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.GlobelConfig; import com.mesa.reportservice.service.ExcuteProcessService; import com.mesa.reportservice.service.MysqlService; import com.mesa.reportservice.service.ZkService; @@ -10,6 +11,9 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; + +import static com.mesa.reportservice.configuration.GlobelConfig.*; /** * @author wk1 @@ -21,6 +25,7 @@ public class ScheduledResultController { @Autowired private MysqlService ms; + @Autowired private ExcuteProcessService eps; @Autowired @@ -29,9 +34,11 @@ public class ScheduledResultController { @Scheduled(cron = "${scan.result.scheduled.plan}") public void getExcuteResult() { if (zs.isMaster()) { - List<JobEntity> joblist = ms.getJobForExcute(); - eps.getTaskProcess(joblist); + List<JobEntity> joblist = ms.getJobForExcute(); + for(JobEntity jobEntity:joblist) { + eps.getTaskProcess(jobEntity); + } } } diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java index bb0f3b5..54f64f6 100644 --- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java +++ b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java @@ -9,5 +9,5 @@ import java.util.List; */ public interface ExcuteProcessService { - void getTaskProcess(List<JobEntity> joblist); + void getTaskProcess(JobEntity job); } diff --git a/src/main/java/com/mesa/reportservice/service/HbaseService.java b/src/main/java/com/mesa/reportservice/service/HbaseService.java index e144256..46db5ef 100644 --- a/src/main/java/com/mesa/reportservice/service/HbaseService.java +++ b/src/main/java/com/mesa/reportservice/service/HbaseService.java @@ -1,6 +1,8 @@ package com.mesa.reportservice.service; import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.bean.JobEntity; +import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; import java.util.Map; @@ -9,14 +11,7 @@ import java.util.Map; */ public interface HbaseService { - String doGet(String url) throws Exception; - - String doGet(String url, Map<String, Object> map) throws Exception; - - - HttpResult doPost(String url, String json) throws Exception; - - HttpResult doPost(String url) throws Exception; + Boolean put(JobEntity jobEntity) throws Exception; } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java index 667e459..64afe53 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Service; import java.util.Map; +import static com.mesa.reportservice.util.GZIPUtils.compress; +import static com.mesa.reportservice.util.GZIPUtils.uncompress; + /** * Created by wk1 on 2019/5/15. */ @@ -89,79 +92,34 @@ public class ClickhouseServiceImpl implements ClickhouseService { * @throws Exception */ @Override - public HttpResult sendQueryForPost(String url) throws Exception { + public HttpResult sendQueryForPost(String url) throws Exception,OutOfMemoryError{ // 声明httpPost请求 HttpPost httpPost = new HttpPost(url); + // 加入配置信息 httpPost.setConfig(config); - // httpPost.setHeader("Accept","application/json"); - // 判断map是否为空,不为空则进行遍历,封装from表单对象 - - /* if (!json.equals("") && !json.isEmpty()) { - - - StringEntity s = new StringEntity(json.toString()); - s.setContentEncoding("UTF-8"); - s.setContentType("application/json");//发送json数据需要设置contentType - httpPost.setEntity(s); - }*/ - - - /*if (map != null) { - List<NameValuePair> list = new ArrayList<NameValuePair>(); - for (Map.Entry<String, Object> entry : map.entrySet()) { - list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString())); - } - // 构造from表单对象 - UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8"); - // 把表单放到post里 - httpPost.setEntity(urlEncodedFormEntity); - }*/ - - // 发起请求 CloseableHttpResponse response = this.httpClient.execute(httpPost); - HttpResult rs = new HttpResult(response.getStatusLine().getStatusCode(), EntityUtils.toString( - response.getEntity(), "UTF-8")); - + HttpResult rs = new HttpResult(); + rs.setCode(response.getStatusLine().getStatusCode()); + rs.setBodybyte(EntityUtils.toByteArray(response.getEntity())); - //Thread.sleep(120); + /*byte[] bodybyte =snappyCompress(EntityUtils.toByteArray(response.getEntity())); + rs.setCode(response.getStatusLine().getStatusCode()); + rs.setBodybyte(bodybyte);*/ return rs; } + + + @Override public HttpResult QuerySystemForPost(String url) throws Exception { // 声明httpPost请求 HttpPost httpPost = new HttpPost(url); // 加入配置信息 httpPost.setConfig(config); - // httpPost.setHeader("Accept","application/json"); - // 判断map是否为空,不为空则进行遍历,封装from表单对象 -// httpPost.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 1000); - - - /* if (!json.equals("") && !json.isEmpty()) { - - - StringEntity s = new StringEntity(json.toString()); - s.setContentEncoding("UTF-8"); - s.setContentType("application/json");//发送json数据需要设置contentType - httpPost.setEntity(s); - }*/ - - - /*if (map != null) { - List<NameValuePair> list = new ArrayList<NameValuePair>(); - for (Map.Entry<String, Object> entry : map.entrySet()) { - list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString())); - } - // 构造from表单对象 - UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8"); - - // 把表单放到post里 - httpPost.setEntity(urlEncodedFormEntity); - }*/ // 发起请求 CloseableHttpResponse response = this.httpClient.execute(httpPost); diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java index 61eb861..359d02a 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java @@ -5,17 +5,16 @@ import com.mesa.reportservice.bean.HttpResult; import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.configuration.ClickhouseConfig; import com.mesa.reportservice.configuration.GlobelConfig; -import com.mesa.reportservice.configuration.HbaseConfig; import com.mesa.reportservice.service.ClickhouseService; import com.mesa.reportservice.service.ExcuteProcessService; import com.mesa.reportservice.service.HbaseService; import com.mesa.reportservice.service.MysqlService; -import com.mesa.reportservice.util.*; +import com.mesa.reportservice.util.Logs; +import com.mesa.reportservice.util.StringUtil; import org.apache.commons.codec.digest.DigestUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.List; import java.util.Map; /** @@ -34,44 +33,46 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { private HbaseService hs; @Override - public void getTaskProcess(List<JobEntity> joblist) { + public void getTaskProcess(JobEntity jobEntity) { try { - for (JobEntity jk : joblist) { - String sql = jk.getQuerySql().trim(); - String queryid = DigestUtils.md5Hex(jk.getResultId() + sql); - //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行 - if (!GlobelConfig.mapresult.containsKey(queryid)) { - String killurl = ClickhouseConfig.getKillUrl(queryid); - try { - Logs.info("startkill++++++"+queryid); + String sql = jobEntity.getQuerySql().trim(); + String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql); + jobEntity.setQuery_id(queryid); + //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行 + if (!GlobelConfig.mapresult.containsKey(queryid)) { + String killurl = ClickhouseConfig.getKillUrl(queryid); + try { + Logs.info("startkill++++++" + queryid); - cs.QuerySystemForPost(killurl); - Logs.info("endkill======="+queryid); - } catch (Exception e) { - Logs.error(e.toString()); - } - updateErrorMessage(jk.getResultId(), 0, "Re Execution"); + cs.QuerySystemForPost(killurl); + Logs.info("endkill=======" + queryid); + + } catch (Exception e) { + Logs.error(e.toString()); } - else { - JobEntity je = (JobEntity) GlobelConfig.mapresult.get(queryid).clone(); - Logs.info("startget++++++" + queryid); - - switch (je.getStatus()) { - case 1: - je = getProcessMessage(je); - - if (je.getExcuteRow() != 0 || je.getExcuteTime() != 0) { - ms.updateProcesses(je); - } - break; - case 2: - try { - je = getQueryLogMessage(je); - HttpResult hr = saveToHbase(je); + updateErrorMessage(jobEntity.getResultId(), 0, "Re Execution"); + + } else { - if (hr.getCode() == 200) { + JobEntity je =GlobelConfig.mapresult.get(queryid); + Logs.info("startget++++++" + je.getQuery_id()); + if (je.getStatus() == 1) { + je = getProcessMessage(je); + if (je.getExcuteRow() != 0 || je.getExcuteTime() != 0) { + ms.updateProcesses(je); + } + } else { + + try { + switch (je.getExcute_status()) { + + case 2: + + je = getQueryLogMessage(je); + Boolean isok = saveToHbase(je); + if (isok) { je.setExcuteDetail("SUCCESS"); je.setExcuteTime(0); je.setExcuteProcess(100); @@ -86,28 +87,34 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { Logs.info("success save to hbase queryid=" + je.getQuery_id() + " sql=" + je.getQuerySql()); } else { updateErrorMessage(je.getResultId(), 5, "Write Data Error"); - Logs.error("save hbase error" + hr.getBody()); + Logs.error("save hbase error"); } - } catch (Exception e) { - updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error"); - Logs.error("save hbase error queryid=" + je.getResultId() + e.toString()); - } finally { - GlobelConfig.mapresult.remove(queryid); - } - break; - default: - try { + break; + + case 6: + updateErrorMessage(je.getResultId(), 6, "unknow error"); + Logs.error("unknow error resultid ="+je.getResultId()+" excutesql=" + je.getQuerySql() +" formatsql="+je.getFormatSql()); + break; + default: je = getQueryLogMessage(je); updateErrorMessage(je.getResultId(), je.getStatus(), je.getExcuteDetail()); - } catch (Exception e) { - Logs.error(e.toString()); - } finally { - GlobelConfig.mapresult.remove(queryid); - } + Logs.error("excute error resultid ="+je.getResultId()+" excutesql=" + je.getQuerySql() +" formatsql="+je.getFormatSql()); + + + } + } catch (Exception e) { + // updateErrorMessage(je.getResultId(), 6, "Result Write To Database Error"); + Logs.error("save database error queryid=" + je.getResultId() + e.toString()); + + } finally { + GlobelConfig.mapresult.remove(je.getQuery_id()); } + } + + } } catch (Exception e) { Logs.error(e.toString()); @@ -141,7 +148,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id()); HttpResult hr = null; try { - hr = cs.sendQueryForPost(queryProccess); + hr = cs.QuerySystemForPost(queryProccess); String rs = hr.getBody().trim(); if (!rs.isEmpty() && !rs.equals("")) { //JobEntity job = new JobEntity(); @@ -171,17 +178,14 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { /** * 结果存入hbase */ - public HttpResult saveToHbase(JobEntity entity) { + public Boolean saveToHbase(JobEntity entity) { int k = 3; - HttpResult hr = new HttpResult(); - + Boolean isok = false; do { try { - String hbasejson = HbaseConfig.getHbaseJson(entity.getQuery_id(), entity.getResult(), entity); - String hbaseurl = HbaseConfig.getHbasePostUrl(); k--; - hr = hs.doPost(hbaseurl, hbasejson); + isok = hs.put(entity); break; } catch (Exception e) { Logs.error("写入hbase报错重试次数" + (3 - k) + e.toString()); @@ -189,7 +193,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { } } while (k >= 0); - return hr; + return isok; } /** @@ -205,7 +209,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { try { //ck默认最慢同步时间7500ms,循环三次 Thread.sleep(3000); - num = cs.sendQueryForPost(finishurl); + num = cs.QuerySystemForPost(finishurl); if (num.getCode() == 200 && !num.getBody().isEmpty() && !num.getBody().equals("")) { Map mapresult = JSON.parseObject(num.getBody()); @@ -218,10 +222,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { job.setQuery_duration_ms(mapresult.get("query_duration_ms").toString()); job.setMemory_usage(mapresult.get("memory_usage").toString()); job.setExcuteRow(Long.parseLong(mapresult.get("read_rows").toString())); + job.setResultRows(Integer.valueOf(mapresult.get("result_rows").toString())); break; case 3: job.setStatus(type); job.setExcuteDetail("SQL Syntax Error:" + mapresult.get("exception").toString()); + // job.setQuerySql(mapresult.get("query").toString()); break; case 4: @@ -234,14 +240,12 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { } } else { job.setStatus(6); - if(job.getExcuteDetail().equals("")){ - job.setExcuteDetail("Other ERROR "); - } + job.setExcuteDetail("Other ERROR "); } } catch (Exception e) { job.setStatus(6); - job.setExcuteDetail("Other ERROR: "+e.toString()); + job.setExcuteDetail("Other ERROR: " + e.toString()); Logs.error("获取querylogmessage失败"); } j--; diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java index f637f1f..7124c91 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java @@ -44,9 +44,9 @@ public class ExcuteserviceImpl implements ExcuteService { String sql = job.getQuerySql().trim(); String queryid = DigestUtils.md5Hex(job.getResultId() + sql); sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().toString().trim() + "')"); - //sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().toString().trim() + "')"); - //sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().toString().trim() + "')"); - + // sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().toString().trim() + "')"); + // sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().toString().trim() + "')"); + job.setFormatSql(sql); try { String formateUrl = getFormatUrl(sql); sql = fs.doFormat(formateUrl); @@ -57,96 +57,79 @@ public class ExcuteserviceImpl implements ExcuteService { if (sql.equals("ok") || sql.equals("")) { job.setStatus(3); + job.setExcuteTime(0); job.setExcuteDetail("SQL Syntax Error parsing SQL Error"); ms.updateStatue(job); + } else { job.setQuery_id(queryid); job.setQuerySql(sql); - job.setQuery_id(queryid); - job.setQuerySql(sql); job.setStatus(1); - job.setResult(""); + job.setExcute_status(1); job.setExcuteDetail("EXECUTING"); - JobEntity jt = (JobEntity) job.clone(); - GlobelConfig.mapresult.put(queryid, jt); + //JobEntity jt = (JobEntity) job.clone(); + GlobelConfig.mapresult.put(queryid, job); ms.updateStatue(job); Logs.info("execute queryid=" + queryid + " sql=" + sql + "mapresult size=" + GlobelConfig.mapresult.size()); String joburl = getJobUrl(job.getQuerySql(), job.getQuery_id()); String killurl = ClickhouseConfig.getKillUrl(job.getQuery_id()); HttpResult hr = new HttpResult(); - Map<String, Object> resultmap = new HashMap<String, Object>(); int k = 3; do { try { hr = cs.sendQueryForPost(joburl); Logs.info("httpcode" + hr.getCode()); - if (hr.getCode() != 200 && hr.getBody().toLowerCase().contains("timeout")) { - k--; - if(k==0) { - job.setStatus(4); - Logs.error("Socket timeout Error retry time 3"); - } - else{ - Logs.info("Socket timeout warn retry time " + (3 - k)); - } + if (hr.getCode() != 200 ) { + + k=0; + job.setExcute_status(4); + Logs.error("excute sql Error "); + } else { - resultmap = JSON.parseObject(hr.getBody()); + k=0; + job.setResult(hr.getBodybyte()); + job.setExcute_status(2); + Logs.info("success queryid=" + queryid + " sql=" + sql); + } } catch (SocketTimeoutException e) { k--; job.setExcuteDetail(e.toString()); if(k==0) { - job.setStatus(4); - Logs.error("Socket Error " + e.toString() + "retry time 3"); + job.setExcute_status(4); + job.setExcuteDetail("SQL Execution Error excute query time out"); + Logs.info("timeout queryid=" + queryid + " sql=" + sql); } else{ Logs.info("Socket warn " + e.toString() + "retry time " + (3 - k)); } + + } catch (Exception e) { - // job.setStatus(6); + job.setExcute_status(6); job.setExcuteDetail(e.toString()); Logs.error("Unknow Error" + e.toString()); k=0; } + catch (OutOfMemoryError e){ + + job.setExcute_status(6); + job.setExcuteDetail("result too large"); + Logs.error("outofmemery Error" + e.toString()); + k=0; + } try { - cs.sendQueryForPost(killurl); + cs.QuerySystemForPost(killurl); } catch (Exception e) { Logs.error("Kill Query Error" + e.toString()); } } - - while (k > 0); - - job.setResult(hr.getBody()); - if (hr.getCode() == 200 && resultmap.containsKey("rows")) { - - int resultrows= Integer.parseInt(resultmap.get("rows").toString()); - job.setResultRows(resultrows); - job.setResult(hr.getBody()); - job.setExcuteDetail("Query SUCCESS"); - job.setExcuteTime(0); - job.setExcuteProcess(100); - job.setStatus(2); - Logs.info("success queryid=" + queryid + " sql=" + sql); - } - else if(job.getStatus()==4){ - job.setExcuteDetail("SQL Execution Error excute query time out"); - Logs.info("timeout queryid=" + queryid + " sql=" + sql); - } - - else { - job.setResult(""); - job.setStatus(6); - //job.setExcuteDetail(hr.getBody()); - //Logs.error("Unknow Error Query " + hr.getBody()); - Logs.info("error queryid=" + queryid + " sql=" + sql + "error"+job.getExcuteDetail()); - - } - GlobelConfig.mapresult.put(job.getQuery_id(), job); + job.setStatus(2); + // GlobelConfig.mapresult.put(job.getQuery_id(), job); } diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java index fe7a0a7..11efadb 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java @@ -1,7 +1,14 @@ package com.mesa.reportservice.service.impl; import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.bean.JobEntity; +import com.mesa.reportservice.configuration.HbaseProperties; import com.mesa.reportservice.service.HbaseService; +import com.mesa.reportservice.util.Logs; +import com.mesa.reportservice.util.ziputil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -13,6 +20,7 @@ import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.Map; /** @@ -23,119 +31,39 @@ import java.util.Map; public class HbaseServiceImpl implements HbaseService { @Autowired - private CloseableHttpClient httpClient; + private Connection hbaseConnection; @Autowired - private RequestConfig config; + private HbaseProperties hbproperties; - - /** - * 不带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null - * - * @param url - * @return - * @throws Exception - */ @Override - public String doGet(String url) throws Exception { - // 声明 http get 请求 - HttpGet httpGet = new HttpGet(url); - httpGet.setHeader("Accept", "application/json"); - // 装载配置信息 - httpGet.setConfig(config); - - // 发起请求 - CloseableHttpResponse response = this.httpClient.execute(httpGet); - - // 判断状态码是否为200 - if (response.getStatusLine().getStatusCode() == 200) { - // 返回响应体的内容 - - return EntityUtils.toString(response.getEntity(), "UTF-8"); - } + public Boolean put(JobEntity jobEntity) throws Exception { - return null; - } + Boolean status = true; + try(Table table =hbaseConnection.getTable(TableName.valueOf(hbproperties.getTable()))) { + Put put = new Put(Bytes.toBytes(jobEntity.getQuery_id())); + put.addColumn(Bytes.toBytes("response"), Bytes.toBytes("result"), jobEntity.getResult()); + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("result_id"), Bytes.toBytes(jobEntity.getResultId())); - /** - * 带参数的get请求,如果状态码为200,则返回body,如果不为200,则返回null - * - * @param url - * @return - * @throws Exception - */ - @Override - public String doGet(String url, Map<String, Object> map) throws Exception { - URIBuilder uriBuilder = new URIBuilder(url); + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("excute_sql"), Bytes.toBytes(jobEntity.getQuerySql())); + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("format_sql"), Bytes.toBytes(jobEntity.getFormatSql())); - if (map != null) { - // 遍历map,拼接请求参数 - for (Map.Entry<String, Object> entry : map.entrySet()) { - uriBuilder.setParameter(entry.getKey(), entry.getValue().toString()); + if(jobEntity.getExcuteRow()!=null){ + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("read_rows"), Bytes.toBytes(jobEntity.getExcuteRow())); } - } - - // 调用不带参数的get请求 - return this.doGet(uriBuilder.build().toString()); - - } - - /** - * 带参数的post请求 - * - * @param url - * @param json - * @return - * @throws Exception - */ - @Override - public HttpResult doPost(String url, String json) throws Exception { - // 声明httpPost请求 - HttpPost httpPost = new HttpPost(url); - // 加入配置信息 - httpPost.setConfig(config); - // httpPost.setHeader("Accept","application/json"); - // 判断map是否为空,不为空则进行遍历,封装from表单对象 - if (!json.equals("") && !json.isEmpty()) { - - - StringEntity s = new StringEntity(json.toString()); - s.setContentEncoding("UTF-8"); - s.setContentType("application/json");//发送json数据需要设置contentType - httpPost.setEntity(s); - } - /*if (map != null) { - List<NameValuePair> list = new ArrayList<NameValuePair>(); - for (Map.Entry<String, Object> entry : map.entrySet()) { - list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString())); + if(jobEntity.getQuery_duration_ms()!=null){ + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("query_duration_ms"), Bytes.toBytes(jobEntity.getQuery_duration_ms())); } - // 构造from表单对象 - UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(list, "UTF-8"); - - // 把表单放到post里 - httpPost.setEntity(urlEncodedFormEntity); - }*/ - - // 发起请求 - CloseableHttpResponse response = this.httpClient.execute(httpPost); - - HttpResult rs = new HttpResult(response.getStatusLine().getStatusCode(), EntityUtils.toString( - response.getEntity(), "UTF-8")); - - - //Thread.sleep(120); - return rs; + if(jobEntity.getMemory_usage()!=null){ + put.addColumn(Bytes.toBytes("detail"), Bytes.toBytes("memory_usage"), Bytes.toBytes(jobEntity.getMemory_usage())); + } + table.put(put); + }catch(Exception e) { + status =false; + Logs.error(e.toString()); + } + return status; } - /** - * 不带参数post请求 - * - * @param url - * @return - * @throws Exception - */ - public HttpResult doPost(String url) throws Exception { - return this.doPost(url, ""); - } } diff --git a/src/main/java/com/mesa/reportservice/util/GZIPUtils.java b/src/main/java/com/mesa/reportservice/util/GZIPUtils.java new file mode 100644 index 0000000..5185351 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/util/GZIPUtils.java @@ -0,0 +1,85 @@ +package com.mesa.reportservice.util; + +/** + * Created by wk1 on 2020/5/14. + */ +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZIPUtils { + public static final String GZIP_ENCODE_UTF_8 = "UTF-8"; + public static final String GZIP_ENCODE_ISO_8859_1 = "ISO-8859-1"; + + + public static byte[] compress(byte[] str) { + if (str == null || str.length == 0) { + return null; + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GZIPOutputStream gzip; + try { + gzip = new GZIPOutputStream(out); + gzip.write(str); + gzip.close(); + } catch ( Exception e) { + e.printStackTrace(); + } + return out.toByteArray(); + } + + + + public static byte[] uncompress(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + try { + GZIPInputStream ungzip = new GZIPInputStream(in); + byte[] buffer = new byte[256]; + int n; + while ((n = ungzip.read(buffer)) >= 0) { + out.write(buffer, 0, n); + } + } catch (Exception e) { + e.printStackTrace(); + } + return out.toByteArray(); + } + + public static String uncompressToString(byte[] bytes, String encoding) { + if (bytes == null || bytes.length == 0) { + return null; + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + try { + GZIPInputStream ungzip = new GZIPInputStream(in); + byte[] buffer = new byte[256]; + int n; + while ((n = ungzip.read(buffer)) >= 0) { + out.write(buffer, 0, n); + } + return out.toString(encoding); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static String uncompressToString(byte[] bytes) { + return uncompressToString(bytes, GZIP_ENCODE_UTF_8); + } + + public static void main(String[] args) throws IOException { + String s = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + System.out.println("字符串长度:"+s.length()); + //System.out.println("压缩后::"+compress(s).length); + // System.out.println("解压后:"+uncompress(compress(s)).length); + // System.out.println("解压字符串后::"+uncompressToString(compress(s)).length()); + } +} diff --git a/src/main/java/com/mesa/reportservice/util/ziputil.java b/src/main/java/com/mesa/reportservice/util/ziputil.java new file mode 100644 index 0000000..f530f45 --- /dev/null +++ b/src/main/java/com/mesa/reportservice/util/ziputil.java @@ -0,0 +1,96 @@ +package com.mesa.reportservice.util; + + +import net.jpountz.lz4.*; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.xerial.snappy.Snappy; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * Created by wk1 on 2020/5/15. + */ +public class ziputil { + + + public static byte[] commonGzipCompress(byte[] data) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + GzipCompressorOutputStream gcos = new GzipCompressorOutputStream(baos); + gcos.write(data); + gcos.close(); + return baos.toByteArray(); + } + + public static byte[] commonGzipDecompress(byte[] data) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + GzipCompressorInputStream gcis = new GzipCompressorInputStream(new ByteArrayInputStream(data)); + int count; + byte[] buffer = new byte[8192]; + while ((count = gcis.read(buffer)) != -1) { + baos.write(buffer, 0, count); + } + gcis.close(); + return baos.toByteArray(); + } + + + + public static byte[] snappyCompress(byte[] data) throws IOException { + return Snappy.compress(data); + } + + public static byte[] snappyDecompress(byte[] data) throws IOException { + return Snappy.uncompress(data); + } + + + public static byte[] commonXzCompress(byte[] data) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); + XZCompressorOutputStream pcos = new XZCompressorOutputStream(baos); + pcos.write(data); + pcos.close(); + return baos.toByteArray(); + } + + public static byte[] commonXzDecompress(byte[] data) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); + XZCompressorInputStream xzis = new XZCompressorInputStream(new ByteArrayInputStream(data)); + int count; + byte[] buffer = new byte[8192]; + while ((count = xzis.read(buffer)) != -1) { + baos.write(buffer, 0, count); + } + xzis.close(); + return baos.toByteArray(); + } + + + public static byte[] lz4Compress(byte[] data) throws IOException { + LZ4Factory factory = LZ4Factory.fastestInstance(); + ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); + LZ4Compressor compressor = factory.fastCompressor(); + LZ4BlockOutputStream compressedOutput = new LZ4BlockOutputStream(byteOutput, 8192, compressor); + compressedOutput.write(data); + compressedOutput.close(); + return byteOutput.toByteArray(); + } + + public static byte[] lz4Decompress(byte[] data) throws IOException { + LZ4Factory factory = LZ4Factory.fastestInstance(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); + LZ4FastDecompressor decompresser = factory.fastDecompressor(); + LZ4BlockInputStream lzis = new LZ4BlockInputStream(new ByteArrayInputStream(data), decompresser); + int count; + byte[] buffer = new byte[8192]; + while ((count = lzis.read(buffer)) != -1) { + baos.write(buffer, 0, count); + } + lzis.close(); + return baos.toByteArray(); + } +} diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties index 2ffca42..954d3ef 100644 --- a/src/main/resources/config/application.properties +++ b/src/main/resources/config/application.properties @@ -6,43 +6,44 @@ scan.task.scheduled.plan=0/15 * * * * ? #ͬʱ��ִ�����߳��� globle.job_thread=2 #Hbasehttp�Ķ˿� -hbase.url=192.168.40.224:8084 #Hbase�ı���������ͨ������Ҫ���� hbase.table=tsg:report_result -hbase.columefamily=response:result -hbase.colume_job_id=detail:result_id -hbase.colume_query_id=detail:query_id -hbase.colume_query_duration_ms=detail:query_duration_ms -hbase.colume_read_rows=detail:read_rows -hbase.colume_memory_usage=detail:memory_usage -hbase.colume_sql=detail:sql -hbase.colume_exception=detail:exception -#��sql���������ص�ip -ck.gateway_ip=192.168.41.63:9999 + +hbase.zookeeper_quorum=192.168.40.224 +hbase.zookeeper_property_clientPort=2181 +hbase.zookeeper_znode_parent=/hbase +hbase.client_retries_number=3 + +hbase.rpc_timeout=100000 +hbase.connect_pool=10 + + +ck.gateway_ip=192.168.40.224:9999 + #clickhouse�ṩreport�����ip -ck.task_ip=192.168.40.194:8123 +ck.task_ip=192.168.40.224:8123 #clickhouse�����ݿ��� -ck.task_database=tsg_galaxy +ck.task_database=tsg_galaxy_v3 #clickhouse����������û� -ck.task_user=default +ck.task_user=tsg_report #clickhouse������������� -ck.task_user_password=111111 +ck.task_user_password=ceiec2019 #clickhouse��־����ipͨ��=ck.task_ip -ck.log_ip=192.168.40.194:8123 +ck.log_ip=192.168.40.224:8123 #clickhouse��־������û� ck.log_user=default #clickhouse��־�û����� -ck.log_user_password=111111 +ck.log_user_password=ceiec2019 #��������� http.maxTotal=300 #������ http.defaultMaxPerRoute=100 #�������ӵ��ʱ�� http.connectTimeout=10000 -#�����ӳ��л�ȡ�����ӵ��ʱ�� +#�����ӳ��л�ȡ�����ӵ��ʱ��: http.connectionRequestTimeout=10000 #���ݴ�����ʱ�� -http.socketTimeout=21500000 +http.socketTimeout=21600000 #�ύ����ǰ���������Ƿ���� http.staleConnectionCheckEnabled=true @@ -52,17 +53,17 @@ zookeeper.open=1 zookeeper.retryCount=5 zookeeper.elapsedTimeMs=1000 #zk��Ⱥ��ip -zookeeper.connectString=192.168.40.193:2181 +zookeeper.connectString=192.168.40.224:2181 zookeeper.sessionTimeoutMs=5000 zookeeper.connectionTimeoutMs=5000 zookeeper.nameSpace=reportservice #mariadb��url -spring.datasource.url=jdbc:mariadb://192.168.40.204:3306/reporttest?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false +spring.datasource.url=jdbc:mariadb://192.168.40.131:3306/tsg-bifang?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false #spring.datasource.url=jdbc:mysql://192.168.11.208:3306/ntc-api?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false #mariadb���û��� spring.datasource.username=root #mariadb������ -spring.datasource.password=bfzdjizhi +spring.datasource.password=111111 spring.datasource.name=druidDataSource spring.datasource.type=com.alibaba.druid.pool.DruidDataSource diff --git a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java new file mode 100644 index 0000000..61e9172 --- /dev/null +++ b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java @@ -0,0 +1,47 @@ +package com.mesa.reportservice; + + +import com.mesa.reportservice.bean.HttpResult; +import com.mesa.reportservice.util.ziputil; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.Map; +import java.util.TreeMap; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReportserviceApplicationTests { + + + + + @Autowired + private CloseableHttpClient httpClient; + + @Autowired + private RequestConfig config; + + + + + @Test + public void contextLoads() throws IOException { + + + + } + + +} |
