summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2020-01-07 18:28:39 +0800
committerwangkuan <[email protected]>2020-01-07 18:28:39 +0800
commit8e42b7e24543c3b3992acd727dfa273087ba10fb (patch)
treed6dcce241a254b8a3f902fa8c595f7849edf8e14
parent66d1711700b0e2349a1cbfea6d28829eafa72db5 (diff)
zookeeper主备策略,ck新旧版本兼容性。对mysql标准语法解析
-rw-r--r--config/application.properties73
-rw-r--r--docker/Dockerfile14
-rw-r--r--pom.xml77
-rw-r--r--src/main/java/com/mesa/reportservice/ReportserviceApplication.java9
-rw-r--r--src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java15
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java7
-rw-r--r--src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java249
-rw-r--r--src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java22
-rw-r--r--src/main/java/com/mesa/reportservice/service/ZkService.java75
-rw-r--r--src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java2
-rw-r--r--src/main/java/com/mesa/reportservice/util/SQLDateFunctionUtil.java830
-rw-r--r--src/main/java/com/mesa/reportservice/util/StringUtil.java49
-rw-r--r--src/main/java/com/mesa/reportservice/util/ZkConfig.java53
-rw-r--r--src/main/java/com/mesa/reportservice/util/ZkProperties.java82
-rw-r--r--src/main/resources/config/application.properties35
-rw-r--r--src/main/resources/docker/Dockerfile14
16 files changed, 1469 insertions, 137 deletions
diff --git a/config/application.properties b/config/application.properties
new file mode 100644
index 0000000..c9655a2
--- /dev/null
+++ b/config/application.properties
@@ -0,0 +1,73 @@
+server.port=9093
+scan.mysql.scheduled.plan=0/10 * * * * ?
+scan.job.scheduled.plan=0/15 * * * * ?
+#management.endpoints.web.exposure.include=*
+#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ
+#globle.query_log_sleep=10000
+#globle.loop_number=350
+#ͬʱ��ִ�����߳���nxwm
+
+globle.job_thread=1
+#���û�г���7200��ͼ������û�������������Σ�����kill
+#globle.ck_timeout=7200000
+hbase.url=192.168.40.224:8084
+hbase.table=tsg:report_result
+hbase.columefamily=response:result
+hbase.colume_job_id=detail:result_id
+hbase.colume_query_id=detail:query_id
+hbase.colume_query_duration_ms=detail:query_duration_ms
+hbase.colume_read_rows=detail:read_rows
+hbase.colume_memory_usage=detail:memory_usage
+hbase.colume_sql=detail:sql
+hbase.colume_exception=detail:exception
+ck.task_ip=192.168.40.224:8123
+ck.task_database=tsg_galaxy_v3
+#ck.task_database=k18_galaxy_service
+ck.task_user=default
+ck.task_user_password=ceiec2019
+ck.log_ip=192.168.40.224:8123
+ck.log_user=default
+ck.log_user_password=ceiec2019
+#���������
+http.maxTotal=300
+#������
+http.defaultMaxPerRoute=100
+#�������ӵ��ʱ��
+http.connectTimeout=10000
+#�����ӳ��л�ȡ�����ӵ��ʱ��
+http.connectionRequestTimeout=10000
+#���ݴ�����ʱ��
+http.socketTimeout=21600000
+#�ύ����ǰ���������Ƿ����
+http.staleConnectionCheckEnabled=true
+#db.url=jdbc\:mariadb\://192.168.40.120\:3306/tsg-bifang
+#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg
+#db.url=jdbc\:mariadb\://47.254.24.224\:3306/test
+db.url=jdbc\:mariadb\://192.168.40.131\:3306/tsg-bifang
+#db.url=jdbc\:mariadb\://192.168.40.204\:3306/reporttest1
+
+#����
+#drivers=ru.yandex.clickhouse.ClickHouseDriver
+db.drivers=org.mariadb.jdbc.Driver
+#�û���
+db.user=root
+#����
+#db.password=bifang!@#
+db.password=111111
+#��ʼ����С
+db.initialsize=20
+#��������
+db.minidle=1
+#���������
+db.maxactive=300
+db.filters=stat,wall,log4j,config
+
+
+#�Ƿ�����zookeeper 0����(��Ⱥ) 1����(����)
+zookeeper.open=0
+zookeeper.retryCount=5
+zookeeper.elapsedTimeMs=1000
+zookeeper.connectString=localhost:2181
+zookeeper.sessionTimeoutMs=5000
+zookeeper.connectionTimeoutMs=5000
+zookeeper.nameSpace=reportservice \ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..dd44d1f
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,14 @@
+ARG JDK_IMAGE
+FROM ${JDK_IMAGE}
+MAINTAINER Galaxy
+VOLUME /tmp
+WORKDIR /home/tsg/galaxy/galaxy-report-service
+COPY config config
+ARG JAR_FILE
+COPY ${JAR_FILE} galaxy-report-service.jar
+#dockercompose set JAVA_OPTS
+ENV JAVA_OPTS=" -Xms1024m -Xmx2048m "
+ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
+#ENV TZ=Asia/Almaty
+#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
+ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar galaxy-report-service.jar" ]
diff --git a/pom.xml b/pom.xml
index e812a69..94b9e57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,13 +10,19 @@
</parent>
<groupId>com.mesa</groupId>
<artifactId>galaxy-report-service</artifactId>
- <version>0.0.1</version>
+ <version>2.0</version>
<name>galaxy-report-service</name>
<description>Demo project for Spring Boot</description>
<properties>
- <docker.image.prefix>springboot</docker.image.prefix>
<java.version>1.8</java.version>
+ <docker.build>192.168.40.153</docker.build>
+ <docker.build.port>2375</docker.build.port>
+ <!--docker-registry地址-->
+ <docker.registry>192.168.40.153</docker.registry>
+ <!--docker-registry的端口-->
+ <docker.registry.port>9080</docker.registry.port>
+ <docker.image.prefix>tsg/galaxy</docker.image.prefix>
</properties>
@@ -31,7 +37,11 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>com.github.jsqlparser</groupId>
+ <artifactId>jsqlparser</artifactId>
+ <version>1.4</version>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
@@ -105,7 +115,27 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.9</version>
+ </dependency>
</dependencies>
@@ -116,6 +146,47 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <serverId>153-docker-repo</serverId>
+ <registryUrl>${docker.registry}:${docker.registry.port}</registryUrl>
+ <!--是否向镜像registry(harbor)中推送镜像,如果为false则需要在mvn命令时添加-DpushImage参数-->
+ <pushImage>true</pushImage>
+
+ <imageName>${docker.registry}:${docker.registry.port}/${docker.image.prefix}/${project.artifactId}</imageName>
+ <forceTags>true</forceTags>
+
+ <imageTags>
+ <imageTag>2.0</imageTag>
+ </imageTags>
+ <!--远程docker构建,供dockerfile使用-->
+ <dockerHost>http://192.168.40.153:2375</dockerHost>
+ <dockerDirectory>docker</dockerDirectory>
+ <buildArgs>
+ <JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73</JDK_IMAGE>
+ <JAR_FILE>${project.build.finalName}.jar</JAR_FILE>
+ </buildArgs>
+
+
+ <!--将构建jar拷贝到/target/docker 目录下与dockerfile一起-->
+ <resources>
+ <resource>
+ <targetPath>/</targetPath>
+ <directory>${project.build.directory}</directory>
+ <include>${project.build.finalName}.jar</include>
+ </resource>
+
+ <resource>
+ <targetPath>/config</targetPath>
+ <directory>config</directory>
+ </resource>
+ </resources>
+
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java b/src/main/java/com/mesa/reportservice/ReportserviceApplication.java
index e5999d0..541ede7 100644
--- a/src/main/java/com/mesa/reportservice/ReportserviceApplication.java
+++ b/src/main/java/com/mesa/reportservice/ReportserviceApplication.java
@@ -1,8 +1,12 @@
package com.mesa.reportservice;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
@ServletComponentScan
@@ -15,4 +19,9 @@ public class ReportserviceApplication {
SpringApplication.run(ReportserviceApplication.class, args);
}
+/* @Bean
+ public CuratorFramework curatorFramework() {
+ return CuratorFrameworkFactory.newClient("192.168.40.194:2181,192.168.40.193:2181", new RetryNTimes(5, 1000));
+ }*/
+
}
diff --git a/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java b/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java
new file mode 100644
index 0000000..9f4ead4
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/bean/DBTypeEnum.java
@@ -0,0 +1,15 @@
+package com.mesa.reportservice.bean;
+
+
+
+public enum DBTypeEnum {
+ CLICKHOUSE("clickhouse"),
+ DRUID("druid"),
+ HBASE("hbase"),;
+ private String value;
+ DBTypeEnum(String value) {this.value = value;}
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
index 40a74ec..0c7bf37 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ExcuteTask.java
@@ -39,7 +39,8 @@ public class ExcuteTask {
hp.doPost(killurl);
if (hr.getCode() != 200 && hr.getBody().toLowerCase().contains("timeout")) {
k--;
- } else {
+ }
+ else {
break;
}
} catch (SocketException e) {
@@ -63,13 +64,13 @@ public class ExcuteTask {
je.setExcute_time(0);
je.setExcute_process(100);
je.setStatus(2);
- Logs.info("success queryid=" + je.getQuery_id() + " sql=" + je.getQuery_sql());
+ // Logs.info("success queryid=" + je.getQuery_id() + " sql=" + je.getQuery_sql());
} else {
je.setResult("");
je.setStatus(4);
je.setExcute_detail("查询clickhouse失败" + hr.getBody());
Logs.error("查询clickhouse失败" + hr.getBody());
- Logs.info("error queryid=" + je.getQuery_id() + " sql=" + je.getQuery_sql());
+ // Logs.info("error queryid=" + je.getQuery_id() + " sql=" + je.getQuery_sql());
}
JobEntity ji = (JobEntity) je.clone();
diff --git a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
index 9e81b33..61bc680 100644
--- a/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
+++ b/src/main/java/com/mesa/reportservice/scheduledtask/ScheduledTask.java
@@ -1,20 +1,30 @@
package com.mesa.reportservice.scheduledtask;
import com.alibaba.fastjson.JSON;
+import com.mesa.reportservice.bean.DBTypeEnum;
import com.mesa.reportservice.bean.HttpResult;
import com.mesa.reportservice.bean.JobEntity;
import com.mesa.reportservice.service.DataBaseBusiness;
import com.mesa.reportservice.service.HttpAPIService;
-import com.mesa.reportservice.util.ClickhouseConfig;
-import com.mesa.reportservice.util.ConfigUtil;
-import com.mesa.reportservice.util.HbaseUtil;
-import com.mesa.reportservice.util.Logs;
+import com.mesa.reportservice.service.ZkService;
+import com.mesa.reportservice.util.*;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.select.Select;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -33,144 +43,152 @@ public class ScheduledTask {
private HttpAPIService hp;
@Autowired
private DataBaseBusiness hb;
-
+ @Autowired
+ private ZkService zs ;
+ //按照配置的线程数执行sql
@Scheduled(cron = "${scan.mysql.scheduled.plan}")
public void scanmysql() {
try {
- List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread);
- Logs.debug("等待执行任务数" + joblist.size());
-
- for (JobEntity jl : joblist) {
- String sql = jl.getQuery_sql().trim();
- if (sql.equals("")) {
- hb.updateStatue(jl.getResult_id(), 3, "sql语句有错误");
- break;
- }
- String queryid = DigestUtils.md5Hex(jl.getResult_id() + sql);
- sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')");
- jl.setQuery_id(queryid);
- jl.setQuery_sql(sql);
- jl.setStatus(1);
- jl.setResult("");
- jl.setException("");
- jl.setExcute_detail("");
- JobEntity jt = (JobEntity) jl.clone();
- ConfigUtil.mapresult.put(queryid, jt);
- Logs.info("start queryid=" + queryid + " sql=" + sql);
- hb.updateStatue(jl.getResult_id(), 1, "executing");
-
- pool.execute(new Runnable() {
- @Override
- public void run() {
- JobEntity je = (JobEntity) jl.clone();
- ExcuteTask ex = new ExcuteTask(hp);
- ex.dotask(je);
+ if(zs.isMaster()){
+ // 定时任务的业务逻辑代码
+
+ List<JobEntity> joblist = hb.getJobLimitRows(ConfigUtil.job_thread);
+ Logs.debug("等待执行任务数" + joblist.size());
+ for (JobEntity jl : joblist) {
+ String sql = jl.getQuery_sql().trim();
+ String queryid = DigestUtils.md5Hex(jl.getResult_id() + sql);
+ sql = sql.replace("$exe_time", "toDateTime('" + jl.getIssued_time().trim() + "')");
+ sql = SQLDateFunctionUtil.generateDateFunction(sql, DBTypeEnum.CLICKHOUSE.getValue());
+
+ Statement statement = CCJSqlParserUtil.parse(sql);
+ Select selectStatement = (Select) statement;
+
+ if (sql.equals("")) {
+ hb.updateStatue(jl.getResult_id(), 3, "sql语句有错误");
+ break;
}
- });
- ConfigUtil.job_thread--;
+
+ jl.setQuery_id(queryid);
+ jl.setQuery_sql(sql);
+ jl.setStatus(1);
+ jl.setResult("");
+ jl.setException("");
+ jl.setExcute_detail("");
+ JobEntity jt = (JobEntity) jl.clone();
+ ConfigUtil.mapresult.put(queryid, jt);
+ Logs.info("start queryid=" + queryid + " sql=" + sql);
+ hb.updateStatue(jl.getResult_id(), 1, "executing");
+
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ JobEntity je = (JobEntity) jl.clone();
+ ExcuteTask ex = new ExcuteTask(hp);
+ ex.dotask(je);
+
+ }
+ });
+ ConfigUtil.job_thread--;
+ }
}
} catch (Exception e) {
Logs.error(e.toString());
}
- }
+ }
+
+//扫描数据库中正在执行的查询
@Scheduled(cron = "${scan.job.scheduled.plan}")
public void scanmysqlresult() {
try {
- List<JobEntity> joblist = hb.getJobForExcute();
- Logs.debug("正在执行任务数" + joblist.size());
-
- for (JobEntity jk : joblist) {
- String sql = jk.getQuery_sql().trim();
- String queryid = DigestUtils.md5Hex(jk.getResult_id() + sql);
- if (!ConfigUtil.mapresult.containsKey(queryid)) {
- String killurl = ClickhouseConfig.getKillUrl(queryid);
- hp.doPost(killurl);
- updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行");
- break;
- }
- JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone();
- if (je.getStatus() == 2) {
- try {
- je = getQueryLogMessage(je);
- HttpResult hr = saveToHbase(je);
-
- if (hr.getCode() == 200) {
-
- je.setExcute_detail("success");
- je.setExcute_time(0);
- je.setExcute_process(100);
- je.setStatus(2);
- int number = 0;
- int z = 3;
- do {
- number = hb.updateProcesses(je);
- z--;
+ if(zs.isMaster()){
+
+ List<JobEntity> joblist = hb.getJobForExcute();
+ Logs.debug("正在执行任务数" + joblist.size());
+
+ for (JobEntity jk : joblist) {
+ String sql = jk.getQuery_sql().trim();
+ String queryid = DigestUtils.md5Hex(jk.getResult_id() + sql);
+ //程序异常停止重启后kill正在进行的查询,更新数据库状态重新执行
+ if (!ConfigUtil.mapresult.containsKey(queryid)) {
+ String killurl = ClickhouseConfig.getKillUrl(queryid);
+ hp.doPost(killurl);
+ updateErrorMessage(jk.getResult_id(), 0, "执行结果不存在重新执行");
+ break;
+ }
+ JobEntity je = (JobEntity) ConfigUtil.mapresult.get(queryid).clone();
+ if (je.getStatus() == 2) {
+ try {
+ je = getQueryLogMessage(je);
+ HttpResult hr = saveToHbase(je);
+
+ if (hr.getCode() == 200) {
+
+ je.setExcute_detail("success");
+ je.setExcute_time(0);
+ je.setExcute_process(100);
+ je.setStatus(2);
+ int number = 0;
+ int z = 3;
+ do {
+ number = hb.updateProcesses(je);
+ z--;
+ }
+ while (number != 1 && z >= 0);
+ Logs.info("success save to hbase queryid=" + je.getQuery_id() + " sql=" + je.getQuery_sql());
+ } else {
+ updateErrorMessage(je.getResult_id(), 5, "写入hbase失败" + hr.getBody());
+ Logs.error("save hbase error" + hr.getBody());
}
- while (number != 1 && z >= 0);
+ } catch (Exception e) {
+ updateErrorMessage(je.getResult_id(), 7, "结果写入数据库失败");
+ Logs.error("queryid=" + je.getResult_id() + e.toString());
- } else {
- updateErrorMessage(je.getResult_id(), 5, "写入hbase失败");
- Logs.error("写入hbase失败");
+ } finally {
+ ConfigUtil.mapresult.remove(queryid);
+ ConfigUtil.job_thread++;
}
- } catch (Exception e) {
- updateErrorMessage(je.getResult_id(), 7, "结果写入数据库失败");
- Logs.error(e.toString());
-
- } finally {
- ConfigUtil.mapresult.remove(queryid);
- ConfigUtil.job_thread++;
- }
- } else if (je.getStatus() > 2) {
+ } else if (je.getStatus() > 2) {
- try {
- je = getQueryLogMessage(je);
- HttpResult h1 = saveToHbase(je);
- if (h1.getCode() == 200) {
-
- int number = 0;
- int z = 3;
- do {
- number = hb.updateProcesses(je);
- z--;
- }
- while (number != 1 && z >= 0);
-
- } else {
- updateErrorMessage(je.getResult_id(), 5, "错误日志写入hbase失败");
- Logs.error("错误日志写入hbase失败");
+ try {
+ je = getQueryLogMessage(je);
+ updateErrorMessage(je.getResult_id(), je.getStatus(), je.getExcute_detail());
+ } catch (Exception e) {
+ Logs.error(e.toString());
+ } finally {
+ ConfigUtil.mapresult.remove(queryid);
+ ConfigUtil.job_thread++;
}
- } catch (Exception e) {
- updateErrorMessage(je.getResult_id(), 7, "错误结果写入数据库失败");
- Logs.error(e.toString());
- } finally {
- ConfigUtil.mapresult.remove(queryid);
- ConfigUtil.job_thread++;
+ } else {
+ JobEntity jo = getProcessMessage(je);
+ if (jo.getExcute_row() != 0 || jo.getExcute_time() != 0) {
+ hb.updateProcesses(jo);
+ }
}
- } else {
- JobEntity jo = getProcessMessage(je);
- if (jo.getExcute_row() != 0 || jo.getExcute_time() != 0) {
- hb.updateProcesses(jo);
- }
+
}
}
+
} catch (Exception e) {
Logs.error(e.toString());
}
}
+/**
+ * 记录查询中的错误更新数据库
+ */
public void updateErrorMessage(int id, int status, String error) {
@@ -184,6 +202,14 @@ public class ScheduledTask {
}
+
+
+
+
+
+ /**
+ * 获取进度条信息
+ */
public JobEntity getProcessMessage(JobEntity jb) {
JobEntity job = (JobEntity) jb.clone();
String queryProccess = ClickhouseConfig.getProcessUrl(job.getQuery_id());
@@ -207,11 +233,9 @@ public class ScheduledTask {
int result = (int) (persent * 100);
job.setExcute_row(total_rows_approx);
job.setExcute_process(result);
- // job.setStatus(1);
job.setExcute_time(ftime);
job.setExcute_detail("");
- /* System.out.println("%+++++++++++++" + result + "%");
- System.out.println("needtime--------" + ftime);*/
+
}
} catch (Exception e) {
Logs.error(e.toString());
@@ -219,6 +243,9 @@ public class ScheduledTask {
return job;
}
+ /**
+ * 结果存入hbase
+ */
public HttpResult saveToHbase(JobEntity entity) {
int k = 3;
@@ -232,7 +259,7 @@ public class ScheduledTask {
hr = hp.doPost(hbaseurl, hbasejson);
break;
} catch (Exception e) {
- Logs.error("写入hbase报错重试次数" + (3 - k));
+ Logs.error("写入hbase报错重试次数" + (3 - k)+e.toString());
k--;
}
}
@@ -240,7 +267,9 @@ public class ScheduledTask {
return hr;
}
-
+ /**
+ * 获取查询log中的信息
+ */
public JobEntity getQueryLogMessage(JobEntity job) {
String finishurl = ClickhouseConfig.getFinishUrl(job.getQuery_id());
diff --git a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
index 082cada..a2edb02 100644
--- a/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
+++ b/src/main/java/com/mesa/reportservice/service/DataBaseBusiness.java
@@ -52,9 +52,10 @@ public class DataBaseBusiness {
connection.setAutoCommit(false);
pstmt = connection.prepareStatement(sql);
rset = pstmt.executeQuery();
+
while (rset.next()) {
JobEntity je = new JobEntity();
- resultid = rset.getInt("result_id");
+ // resultid = rset.getInt("result_id");
je.setResult_id(rset.getInt("result_id"));
je.setEnd_time(rset.getString("end_time").trim().substring(0, 19));
je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19));
@@ -65,10 +66,11 @@ public class DataBaseBusiness {
} catch (Exception e) {
Logs.error(sql + "获取正在执行任务信息失败!");
- updateStatue(resultid, 7,"获取正在执行任务信息失败");
+ // updateStatue(resultid, 7,"获取正在执行任务信息失败");
e.printStackTrace();
throw new RuntimeException("获取正在执行任务信息失败!", e);
} finally {
+
clear(pstmt, null, rset);
// manager.freeConnection("idb", connection);
if (connection != null) {
@@ -101,10 +103,11 @@ public class DataBaseBusiness {
rset = pstmt.executeQuery();
while (rset.next()) {
JobEntity je = new JobEntity();
- resultid = rset.getInt("result_id");
+ // resultid = rset.getInt("result_id");
je.setExcute_detail("");
je.setResult_id(rset.getInt("result_id"));
je.setEnd_time(rset.getString("end_time").trim().substring(0, 19));
+ je.setStart_time(rset.getString("start_time").trim().substring(0, 19));
je.setIssued_time(rset.getString("issued_time").trim().substring(0, 19));
je.setQuery_sql(rset.getString("query_sql").trim());
sqllist.add(je);
@@ -113,13 +116,14 @@ public class DataBaseBusiness {
} catch (Exception e) {
Logs.error(sql + "获取待执行任务信息失败!");
- updateStatue(resultid, 7,"获取待执行任务信息失败");
+ // updateStatue(resultid, 7,"获取待执行任务信息失败");
e.printStackTrace();
throw new RuntimeException("获取待执行任务信息失败!", e);
} finally {
clear(pstmt, null, rset);
+
// manager.freeConnection("idb", connection);
if (connection != null) {
try {
@@ -156,6 +160,7 @@ public class DataBaseBusiness {
e.printStackTrace();
throw new RuntimeException("获取job状态发生异常!", e);
} finally {
+
clear(pstmt, null, rset);
// manager.freeConnection("idb", connection);
if (connection != null) {
@@ -179,14 +184,14 @@ public class DataBaseBusiness {
if(job.getExcute_detail().length()>500) {
job.setExcute_detail(StringUtil.bSubstring(job.getExcute_detail(), 500));
}
- sql = "update report_result set op_time ='" + current_time + "',excute_detail = \"" + job.getExcute_detail() + "\", status = " + job.getStatus() + " ,excute_time =" + job.getExcute_time() + ",excute_row =" + job.getExcute_row() + ",excute_process=" + job.getExcute_process() + " where result_id = " + job.getResult_id();
+ sql = "update report_result set op_time ='" + current_time + "',excute_detail = \"" + job.getExcute_detail() + "\", status = " + job.getStatus() + " ,excute_time =" + job.getExcute_time() + ",excute_row =" + job.getExcute_row() + ",excute_process=" + job.getExcute_process() + " where status = 1 and result_id = " + job.getResult_id();
//connection = manager.getConnection("idb");
connection = dataSource.getConnection();
connection.setAutoCommit(false);
pstmt = connection.prepareStatement(sql);
a = pstmt.executeUpdate();
- connection.commit();
+ // connection.commit();
Logs.debug("-----------success----------" + sql);
} catch (Exception e) {
@@ -195,9 +200,11 @@ public class DataBaseBusiness {
throw new RuntimeException("更新进度条发生异常!", e);
} finally {
clear(pstmt, null, null);
+
// manager.freeConnection("idb", connection);
if (connection != null) {
try {
+ connection.commit();
connection.close();
} catch (SQLException e) {
Logs.error(e.toString());
@@ -226,7 +233,6 @@ public class DataBaseBusiness {
pstmt = connection.prepareStatement(sql);
int a = pstmt.executeUpdate();
- connection.commit();
Logs.debug("-----------success----------" + sql);
} catch (Exception e) {
@@ -234,10 +240,12 @@ public class DataBaseBusiness {
e.printStackTrace();
throw new RuntimeException("更新状态发生异常!", e);
} finally {
+
clear(pstmt, null, null);
// manager.freeConnection("idb", connection);
if (connection != null) {
try {
+ connection.commit();
connection.close();
} catch (SQLException e) {
Logs.error(e.toString());
diff --git a/src/main/java/com/mesa/reportservice/service/ZkService.java b/src/main/java/com/mesa/reportservice/service/ZkService.java
new file mode 100644
index 0000000..272c528
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/service/ZkService.java
@@ -0,0 +1,75 @@
+package com.mesa.reportservice.service;
+
+import com.mesa.reportservice.util.Logs;
+import com.mesa.reportservice.util.ZkConfig;
+import com.mesa.reportservice.util.ZkProperties;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+import java.net.InetAddress;
+
+/**
+ * Created by wk1 on 2020/1/6.
+ */
+@Service
+public class ZkService {
+ @Autowired
+ private CuratorFramework curatorConnect;
+ @Autowired
+ private ZkProperties zp;
+
+
+ public boolean isMaster() {
+
+ try {
+ if(zp.getOpen()==0) {
+ boolean isZkCuratorStarted = curatorConnect.isStarted();
+ String nodePath = "/masterip";
+ System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
+ ExistsBuilder sss = curatorConnect.checkExists();
+ Stat statExist = curatorConnect.checkExists().forPath(nodePath);
+ InetAddress address = InetAddress.getLocalHost();
+ String localip = address.getHostAddress().trim();
+ if (statExist == null) {
+ System.out.println(address.getHostAddress());
+ byte[] data = address.getHostAddress().getBytes(); // 节点数据
+
+ String result = curatorConnect.create().creatingParentsIfNeeded() // 创建父节点,也就是会递归创建
+ .withMode(CreateMode.EPHEMERAL) // 节点类型
+ .forPath(nodePath, data);
+ System.out.println(result + "节点,创建成功...");
+ return true;
+
+ } else {
+ System.out.println(nodePath + " 节点存在");
+ Stat stat = new Stat();
+
+ byte[] nodeData = curatorConnect.getData().storingStatIn(stat).forPath(nodePath);
+ String masterip = new String(nodeData).trim();
+ System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
+
+ if (masterip.equals(localip)) {
+
+ return true;
+
+ } else {
+ return false;
+
+ }
+ }
+ }
+ else{
+ return true;
+ }
+ }catch (Exception e){
+ Logs.error(e.toString());
+ return true;
+ }
+ }
+
+}
diff --git a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
index d46c0a3..13ae8cf 100644
--- a/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
+++ b/src/main/java/com/mesa/reportservice/util/ClickhouseConfig.java
@@ -63,7 +63,7 @@ public class ClickhouseConfig {
// String logsql = "select elapsed,total_rows_approx,read_rows from `system`.processes where query_id='" + query_id + "' FORMAT JSONEachRow";
try {
- String sql = URLEncoder.encode("select type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
+ String sql = URLEncoder.encode("select CAST(type, 'Int8') as type,read_rows,query_duration_ms,query,exception,memory_usage,event_time from `system`.query_log where type>1 and query_id='" + query_id + "' order by event_time desc limit 1 FORMAT JSONEachRow", "utf8");
finishsql = url + sql;
} catch (UnsupportedEncodingException e) {
Logs.error(e.toString());
diff --git a/src/main/java/com/mesa/reportservice/util/SQLDateFunctionUtil.java b/src/main/java/com/mesa/reportservice/util/SQLDateFunctionUtil.java
new file mode 100644
index 0000000..0b96a0f
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/SQLDateFunctionUtil.java
@@ -0,0 +1,830 @@
+package com.mesa.reportservice.util;
+
+import com.mesa.reportservice.bean.DBTypeEnum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author GouGe
+ * @data 2019/8/1 10:13
+ */
+public class SQLDateFunctionUtil {
+ public static Pattern pDateFormat = Pattern.compile("\\W(FROM_UNIXTIME|DATE_FORMAT|STR_TO_DATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pGroupByFormat = Pattern.compile("(SELECT\\s+|GROUP\\s*BY\\s+|,\\s*)(FROM_UNIXTIME|DATE_FORMAT|STR_TO_DATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pDateRelative = Pattern.compile("\\W(ADDDATE|DATE_ADD|DATE_SUB|SUBDATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pIntervalNumUnit = Pattern.compile("INTERVAL(.*?)(YEAR|QUARTER|MONTH|WEEK|DAY|HOUR|MINUTE|SECOND)$", Pattern.CASE_INSENSITIVE);
+ public static Pattern pUnixTime = Pattern.compile("\\WUNIX_TIMESTAMP\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pDate = Pattern.compile("\\W(DATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pDateCalculateNum = Pattern.compile("\\(\\s*(DATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pUnitFuncGetNum = Pattern.compile("\\W(YEAR|QUARTER|MONTH|DAY|HOUR|MINUTE|SECOND|DAYOFYEAR|DAYOFMONTH|DAYOFWEEK)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pMarkDate = Pattern.compile("\\W(MAKEDATE)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pLastDateFunc = Pattern.compile("\\W(LAST_DAY)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ public static Pattern pCKTimeStamp = Pattern.compile("\\s+TIMESTAMP\\s+([^'])", Pattern.CASE_INSENSITIVE);
+ public static Pattern pNow = Pattern.compile("NOW\\(\\s*\\)", Pattern.CASE_INSENSITIVE);
+ public static Pattern pStrDateTime = Pattern.compile("(<|>|=|between|and)\\s*('\\d{4}-\\d{2}-\\d{2}\\s*\\d{2}:\\d{2}:\\d{2}')", Pattern.CASE_INSENSITIVE);
+
+
+ /**
+ * 解析mysql函数替换为对应数据库类型函数
+ * @param sql 要解析的sql
+ * @param dbType 数据库类型
+ * @return
+ */
+ public static String generateDateFunction(String sql, String dbType) {
+ sql = SQLDateFunctionUtil.parseMarkDate(sql, dbType);
+ sql = SQLDateFunctionUtil.parseDateCalculateNum(sql, dbType);
+ sql = SQLDateFunctionUtil.parseLastDayFunc(sql, dbType);
+ sql = SQLDateFunctionUtil.parseGroupByFormat(sql, dbType);
+ sql = SQLDateFunctionUtil.parseDateFormat(sql, dbType);
+ sql = SQLDateFunctionUtil.parseDateRelative(sql, dbType);
+ sql = SQLDateFunctionUtil.parseUnixTime(sql, dbType);
+ sql = SQLDateFunctionUtil.parseDate(sql, dbType);
+ sql = SQLDateFunctionUtil.parseUnitFuncGetNum(sql, dbType);
+ sql = SQLDateFunctionUtil.specialDisposeEnd(sql, dbType);
+ sql = SQLDateFunctionUtil.replaceMark(sql);
+ return sql;
+ }
+
+
+
+ /**
+ * 匹配范围:FROM_UNIXTIME(、DATE_FORMAT(、STR_TO_DATE(
+ * @param sql 需要解析的sql
+ * @param dbType
+ * @return
+ */
+ public static String parseDateFormat(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pDateFormat.matcher(sql);
+ if (!matcher.find() || count++ >= 40) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = dateFormatHavaMark(sqlParse, bracketsMatch[0],false, dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配格式化 FROM_UNIXTIME(、DATE_FORMAT(、STR_TO_DATE(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseGroupByFormat(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pGroupByFormat.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ return sql;
+ }
+ int start = matcher.start();
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = dateFormatHavaMark(sqlParse, bracketsMatch[0], true, dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 SUBDATE(、SUBDATE(、SUBDATE(、ADDDATE(、DATE_SUB(、DATE_ADD(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseDateRelative(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pDateRelative.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, null, false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = dateRelativeHaveMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 UNIX_TIMESTAMP(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseUnixTime(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pUnixTime.matcher(sql);
+ if (!matcher.find() || count++ == 30) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, null, false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = parseUnixTimeHavaMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 DATE(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseDate(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pDate.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = parseDateHavaMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 YEAR(、QUARTER(、MONTH(、DAY(、HOUR(、MINUTE(、SECOND、DAYOFYEAR(、DAYOFMONTH(、DAYOFWEEK(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseUnitFuncGetNum(String sql, String dbType){
+ int count = 0;
+ while (true){
+ Matcher matcher = pUnitFuncGetNum.matcher(sql);
+ if(!matcher.find() || count++ >= 30){
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = parseUnitFuncGetNumHaveMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 pMarkDate(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseMarkDate(String sql, String dbType) {
+ int count = 0;
+ while (true) {
+ Matcher matcher = pMarkDate.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = parseMarkDateHaveMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 LAST_DAY(
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseLastDayFunc(String sql, String dbType){
+ int count = 0;
+ while (true) {
+ Matcher matcher = pLastDateFunc.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ return sql;
+ }
+ int start = matcher.start() + 1;
+ String sqlParse = sql.substring(start);
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String str = parseLastDayFuncHaveMark(sqlParse, bracketsMatch[0], dbType);
+ sql = sql.substring(0, start) + str;
+ }
+ }
+
+ /**
+ * 匹配 涉及DATE()-、DATE()+
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String parseDateCalculateNum(String sql, String dbType){
+ int count = 0;
+ String sqlTmp = "";
+ while (true) {
+ Matcher matcher = pDateCalculateNum.matcher(sql);
+ if (!matcher.find() || count++ >= 30) {
+ sql = sql.replace("##","(");
+ return sql;
+ }
+ String sqlParse = sql.substring(matcher.start());
+ if(sqlTmp.equals(sqlParse)){
+ sqlParse = "##" + sqlParse.substring(1);
+ sql = sql.substring(0, matcher.start()) + sqlParse;
+ continue;
+ }
+ sqlTmp = sqlParse;
+ int[] bracketsMatch = StringUtil.getBracketsMatch(sqlParse, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ String sqlParse2 = sql.substring(matcher.start()+1);
+ int[] bracketsMatch2 = StringUtil.getBracketsMatch(sqlParse2, "(", false);
+ if (bracketsMatch2[0] >= 1) {
+ --bracketsMatch2[0];
+ }
+ String str = parseDateCalculateNumHaveMark(sqlParse, bracketsMatch[0],bracketsMatch2[0], dbType);
+ sql = sql.substring(0, matcher.start()) + str;
+ }
+ }
+
+ /**
+ * 替换 涉及DATE()-、DATE()+
+ * @param sqlParse
+ * @param num 括号总个数
+ * @param num2 date函数内括号数
+ * @param dbType
+ * @return
+ */
+ private static String parseDateCalculateNumHaveMark(String sqlParse, int num, int num2, String dbType) {
+ Pattern pDateCalculateNumParse = Pattern.compile("(DATE)\\s*\\((.*?(.*?\\).*?){" + num2 + "})\\)(.*?(.*?\\).*?){" + (num - num2 - 1) + "})\\)", Pattern.CASE_INSENSITIVE);
+ StringBuffer sb = new StringBuffer();
+ Matcher mDateCalculateNumParse = pDateCalculateNumParse.matcher(sqlParse);
+ if (mDateCalculateNumParse.find() && (mDateCalculateNumParse.group(4).trim().startsWith("+") || mDateCalculateNumParse.group(4).trim().startsWith("-"))) {
+ String group0 = mDateCalculateNumParse.group(0);
+ String group2 = mDateCalculateNumParse.group(2);
+ String group4 = mDateCalculateNumParse.group(4);
+ if (StringUtil.getBracketsMatch(group0.substring(0, group0.length() - 1), "(", true)[0] == num) {
+ String[] split = group4.split(",");
+ String replaceValue = "";
+ if(DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)){
+ if (split.length == 1) {
+ replaceValue = "toStartOfDay#[toDateTime#[" + group2 + "]]+[" + group4 + "]*86400 )";
+ } else if (split.length == 2) {
+ replaceValue = "toStartOfDay#[toDateTime#[" + group2 + "]]+[" + split[0] + "]*86400 ," + split[1] + ")";
+ }
+ }else if(DBTypeEnum.DRUID.getValue().equals(dbType)){
+ if (split.length == 1) {
+ replaceValue = "TIME_SHIFT#[FLOOR#[ TIMESTAMP " + group2 + " to day],'P1D'," + group4 + "])";
+ } else if (split.length == 2) {
+ replaceValue = "TIME_SHIFT#[FLOOR#[ TIMESTAMP " + group2 + "to day],'P1D'," + split[0] + "]," + split[1] + ")";
+ }
+ }
+ mDateCalculateNumParse.appendReplacement(sb, replaceValue);
+ }
+ }
+ mDateCalculateNumParse.appendTail(sb);
+ return sb.toString();
+ }
+
+
+ /**
+ * 替换 LAST_DAY(date)
+ * @param sqlParse
+ * @param num 包含括号个数
+ * @param dbType
+ * @return
+ */
+ public static String parseLastDayFuncHaveMark(String sqlParse, int num, String dbType) {
+ Pattern pLastDayParse = Pattern.compile("(LAST_DAY)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ StringBuffer sb = new StringBuffer();
+ Matcher mLastDayParse = pLastDayParse.matcher(sqlParse);
+ if (mLastDayParse.find()) {
+ String replaceValue = lastDayRealize(mLastDayParse.group(1), mLastDayParse.group(2), dbType);
+ mLastDayParse.appendReplacement(sb, replaceValue);
+ }
+ mLastDayParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 实现替换:LAST_DAY(date)
+ * @param unit
+ * @param param
+ * @param dbType
+ * @return
+ */
+ private static String lastDayRealize(String unit, String param, String dbType) {
+ String replaceValue = null;
+ if("LAST_DAY".equals(unit.toUpperCase())){
+ if(DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)){
+ replaceValue = "addDays#[addMonths#[toStartOfMonth#[toDateTime#[" + param + "]],1],-1]";
+ }else if(DBTypeEnum.DRUID.getValue().equals(dbType)){
+ replaceValue = "TIME_SHIFT#[FLOOR#[TIME_SHIFT#[ TIMESTAMP " + param + ", 'P1M', 1] to month], 'P1D', -1]";
+ }
+ }
+ return replaceValue;
+ }
+
+ /**
+ * 替换:MAKEDATE(year,dayofyear)
+ * @param sqlParse
+ * @param num 函数参数包含括号的个数
+ * @param dbType
+ * @return
+ */
+ public static String parseMarkDateHaveMark(String sqlParse, int num, String dbType) {
+ Pattern pMarkDateParse = Pattern.compile("(MAKEDATE)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ StringBuffer sb = new StringBuffer();
+ Matcher mMarkDateParse = pMarkDateParse.matcher(sqlParse);
+ if (mMarkDateParse.find()) {
+ String replaceValue = markDateRealize(mMarkDateParse.group(1), mMarkDateParse.group(2), dbType);
+ mMarkDateParse.appendReplacement(sb, replaceValue);
+ }
+ mMarkDateParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 实现替换:MAKEDATE(year,dayofyear)
+ * @param func
+ * @param expr
+ * @param dbType
+ * @return
+ */
+ private static String markDateRealize(String func, String expr, String dbType) {
+ String replaceValue = null;
+ List<String> params = diviParam(expr, ",");
+ if ("MAKEDATE".equals(func.toUpperCase()) && params.size() == 2) {
+ Pattern pYearFunNum = Pattern.compile("(YEAR)\\s*\\(", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pYearFunNum.matcher(expr);
+ if (matcher.find() && "YEAR".equals(matcher.group(1).toUpperCase())) {
+ String temp = expr.substring(matcher.start());
+ int[] bracketsMatch = StringUtil.getBracketsMatch(temp, "(", false);
+ if (bracketsMatch[0] >= 1) {
+ --bracketsMatch[0];
+ }
+ Pattern pYear = Pattern.compile("^(.*?)(YEAR)\\s*\\((.*?(.*?\\).*?){" + bracketsMatch[0] + "})\\)(.*?)$", Pattern.CASE_INSENSITIVE);
+ Matcher mYear = pYear.matcher(params.get(0));
+ if (mYear.find()) {
+ String group1 = mYear.group(1);
+ String group5 = mYear.group(5);
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)) {
+ if (group1 != null && !"".equals(group1.trim()) && group5 != null && !"".equals(group5.trim())) {
+ replaceValue = "toDateTime#[addDays#[addYears#[toStartOfYear#[toDateTime#[" + mYear.group(3) + "]]," + group1 + " " + group5 + "]," + params.get(1) + " -1 ]]";
+ } else if (group5 != null && !"".equals(group5.trim())) {
+ replaceValue = "toDateTime#[addDays#[addYears#[toStartOfYear#[toDateTime#[" + mYear.group(3) + "]]," + group5 + "]," + params.get(1) + " -1 ]]";
+ } else if (group1 != null && !"".equals(group1.trim())) {
+ replaceValue = "toDateTime#[addDays#[addYears#[toStartOfYear#[toDateTime#[" + mYear.group(3) + "]]," + group1 + "0 ]," + params.get(1) + " -1 ]]";
+ } else {
+ replaceValue = "toDateTime#[addDays#[toStartOfYear#[toDateTime#[" + mYear.group(3) + "]]," + params.get(1) + " -1 ]]";
+ }
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType)) {
+ if (group1 != null && !"".equals(group1.trim()) && group5 != null && !"".equals(group5.trim())) {
+ replaceValue = "TIME_SHIFT#[TIME_SHIFT#[FLOOR#[ TIMESTAMP " + mYear.group(3) + " to year],'P1Y'," + group1 + " " + group5 + " ],'P1D'," + params.get(1) + " -1 ]";
+ } else if (group5 != null && !"".equals(group5.trim())) {
+ replaceValue = "TIME_SHIFT#[TIME_SHIFT#[FLOOR#[ TIMESTAMP " + mYear.group(3) + " to year],'P1Y'," + group5 + "],'P1D'," + params.get(1) + " -1 ]";
+ } else if (group1 != null && !"".equals(group1.trim())) {
+ replaceValue = "TIME_SHIFT#[TIME_SHIFT#[FLOOR#[ TIMESTAMP " + mYear.group(3) + " to year],'P1Y'," + group1 + "0 ],'P1D'," + params.get(1) + " -1 ]";
+ } else {
+ replaceValue = "TIME_SHIFT#[FLOOR#[ TIMESTAMP " + mYear.group(3) + " to year],'P1D'," + params.get(1) + "-1 ]";
+ }
+ }
+ }
+ }
+ }
+ return replaceValue;
+ }
+
+ /**
+ * 替换 YEAR(expr)、QUARTER(expr)、MONTH(expr)、DAY(expr)、HOUR(expr)、MINUTE(expr)、SECONDexpr)、DAYOFYEAR(expr)、DAYOFMONTH(expr)、DAYOFWEEK(expr)
+ * @param sqlParse
+ * @param num
+ * @param dbType
+ * @return
+ */
+ private static String parseUnitFuncGetNumHaveMark(String sqlParse, int num, String dbType) {
+ Pattern pUnitFuncParse = Pattern.compile("(YEAR|QUARTER|MONTH|DAY|HOUR|MINUTE|SECOND|DAYOFYEAR|DAYOFMONTH|DAYOFWEEK)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ StringBuffer sb = new StringBuffer();
+ Matcher mUnitFuncParse = pUnitFuncParse.matcher(sqlParse);
+ if (mUnitFuncParse.find()) {
+ String replaceValue = unitFuncGetNumRealize(mUnitFuncParse.group(1), mUnitFuncParse.group(2), dbType);
+ mUnitFuncParse.appendReplacement(sb, replaceValue);
+ }
+ mUnitFuncParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 实现替换:YEAR(expr)、QUARTER(expr)、MONTH(expr)、DAY(expr)、HOUR(expr)、MINUTE(expr)、SECONDexpr)、DAYOFYEAR(expr)、DAYOFMONTH(expr)、DAYOFWEEK(expr)
+ * @param unit
+ * @param expr
+ * @param dbType
+ * @return
+ */
+ private static String unitFuncGetNumRealize(String unit,String expr, String dbType) {
+ String replaceValue = null;
+ unit = unit.toUpperCase();
+ if(DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)){
+ switch (unit){
+ case "YEAR": replaceValue = "toYear#[toDateTime#[" + expr + "]]"; break;
+ case "QUARTER": replaceValue = "toQuarter#[toDateTime#[" + expr + "]]"; break;
+ case "MONTH": replaceValue = "toMonth#[toDateTime#[" + expr + "]]"; break;
+ case "DAY": replaceValue = "toDayOfMonth#[toDateTime#[" + expr + "]]"; break;
+ case "HOUR": replaceValue = "toHour#[toDateTime#[" + expr + "]]"; break;
+ case "MINUTE": replaceValue = "toMinute#[toDateTime#[" + expr + "]]"; break;
+ case "SECOND": replaceValue = "toSecond#[toDateTime#[" + expr + "]]"; break;
+ case "DAYOFYEAR": replaceValue = "toDayOfYear#[toDateTime#["+ expr + "]]";break;
+ case "DAYOFMONTH": replaceValue = "toDayOfMonth#[toDateTime#["+ expr + "]]";break;
+ case "DAYOFWEEK": replaceValue = "toDayOfWeek#[addDays#[toDateTime#["+ expr + "],1]]";break;
+ default: replaceValue = unit+"#[" + expr + "]";
+ }
+ }else if(DBTypeEnum.DRUID.getValue().equals(dbType)){
+ switch (unit){
+ case "YEAR": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'YEAR' ]"; break;
+ case "QUARTER": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'QUARTER' ]"; break;
+ case "MONTH": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'MONTH' ]"; break;
+ case "DAY": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'DAY' ]"; break;
+ case "HOUR": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'HOUR' ]"; break;
+ case "MINUTE": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'MINUTE' ]"; break;
+ case "SECOND": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'SECOND' ]"; break;
+ case "DAYOFYEAR": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'DOY' ]"; break;
+ case "DAYOFMONTH": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'PT1M',0], 'DAY' ]"; break;
+ case "DAYOFWEEK": replaceValue = "TIME_EXTRACT#[TIME_SHIFT#[ TIMESTAMP " + expr + ",'P1D',1], 'DOW' ]"; break;
+ default: replaceValue = unit+"#[" + expr + "]";
+ }
+ }
+ return replaceValue;
+ }
+
+ /**
+ * 实现相对时间相关替换:
+ * @param params
+ * @param sign ADD相关为true,SUB相关为false;
+ * @param dbType
+ * @return
+ */
+ private static String dateRelative(List<String> params, Boolean sign, String dbType) {
+ String replaceValue = "";
+ if (params.size() == 2 && params.get(1).toUpperCase().contains("INTERVAL")) {
+ String param1 = params.get(0);
+ String param2 = params.get(1);
+ Matcher matcher = pIntervalNumUnit.matcher(param2);
+ if (matcher.find()) {
+ param2 = matcher.group(1);
+ if(!sign){
+ param2 = "- #[" + param2 + "]";
+ }
+ if(DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)){
+ switch (matcher.group(2).toUpperCase()){
+ case "SECOND": replaceValue = "toDateTime#[" + param1 + "]" + param2; break;
+ case "MINUTE": replaceValue = "toDateTime#[" + param1 + "]" + param2 + "*60"; break;
+ case "HOUR": replaceValue = "toDateTime#[" + param1 + "]" + param2 + "*3600"; break;
+ case "DAY": replaceValue = "toDateTime#[" + param1 + "]" + param2+ "*86400"; break;
+ case "WEEK": replaceValue = "toDateTime#[" + param1 + "]" + param2 + "*604800"; break;
+ case "MONTH": replaceValue = "addMonths#[toDateTime#[" + param1 + "] ," + param2 + "]"; break;
+ case "QUARTER": replaceValue = "addQuarter#[toDateTime#[" + param1 + "]," + param2 + "]"; break;
+ case "YEAR": replaceValue = "addYears#[toDateTime#[" + param1 + "]," + param2 + "]"; break;
+ default:replaceValue = param1 + param2; break;
+ }
+ }else if(DBTypeEnum.DRUID.getValue().equals(dbType)){
+ switch (matcher.group(2).toUpperCase()) {
+ case "SECOND": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'PT1S',"+ param2 +"]"; break;
+ case "MINUTE": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'PT1M',"+ param2 +"]"; break;
+ case "HOUR": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'PT1H',"+ param2 +"]"; break;
+ case "DAY": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1D',"+ param2 +"]"; break;
+ case "WEEK": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1W',"+ param2 +"]"; break;
+ case "MONTH": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1M',"+ param2 +"]"; break;
+ case "QUARTER": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1M',"+ param2 +"]"; break;
+ case "YEAR": replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1Y',"+ param2 +"]"; break;
+ default:replaceValue = param1 + param2; break;
+ }
+ }
+ return replaceValue;
+ }
+ } else if (params.size() == 2) {
+ String param1 = params.get(0);
+ String param2 = params.get(1);
+ if(!sign){
+ param2 = "- #[" + param2 + "]";
+ }
+ if(DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)){
+ replaceValue = "addDays#[toDateTime#[" + param1 + "]," + param2 + "]";
+ }else if(DBTypeEnum.DRUID.getValue().equals(dbType)){
+ replaceValue = "TIME_SHIFT#[ TIMESTAMP " + param1 + ",'P1D'," + param2 + "]";
+ }
+ }
+ return replaceValue;
+ }
+
+ /**
+ * 实现替换:FROM_UNIXTIME(unix_timestamp)、FROM_UNIXTIME(date,format)、DATE_FORMAT(date,format)、STR_TO_DATE(date,format)
+ * @param func
+ * @param param
+ * @param dbType
+ * @return
+ */
+ private static String dateFormatRealize(String func, String param, boolean bool, String dbType) {
+ List<String> params = diviParam(param, ",");
+ String param1 = params.get(0);
+ String replaceValue = null;
+ if ("FROM_UNIXTIME".equals(func.toUpperCase()) && params.size() == 1) {
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)) {
+ replaceValue = "toDateTime#[" + param + "]";
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType) && !bool) {
+ replaceValue = "MILLIS_TO_TIMESTAMP#[ 1000 * [" + param + "]]";
+ }else if (DBTypeEnum.DRUID.getValue().equals(dbType) && bool) {
+ replaceValue = "TIME_FORMAT#[MILLIS_TO_TIMESTAMP#[ 1000 * [" + param + "]],'YYYY-MM-dd HH:mm:ss']";
+ }
+ } else if (func.contains("FROM_UNIXTIME") && DBTypeEnum.DRUID.getValue().equals(dbType) && "%Y-%m-%d %H:%i:%s".equals(params.get(1).replaceAll("\\'|\\\"", "").trim())) {
+ if (!bool) {
+ replaceValue = "MILLIS_TO_TIMESTAMP#[ 1000*[" + param1 + "]]";
+ } else if (bool) {
+ replaceValue = "TIME_FORMAT#[MILLIS_TO_TIMESTAMP#[ 1000 * [" + param + "]],'YYYY-MM-dd HH:mm:ss']";
+ }
+ } else {
+ String param2 = params.get(1).replaceAll("\\'|\\\"", "").trim();
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType) && bool) {
+ switch (param2) {
+ case "%Y-%m-%d %H:%i:%s": replaceValue = "toDateTime#[" + param1 + "]"; break;
+ case "%Y-%m-%d %H:%i:00": replaceValue = "toStartOfMinute#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-%d %H:00:00": replaceValue = "toStartOfHour#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-%d 00:00:00": replaceValue = "toStartOfDay#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-01 00:00:00": replaceValue = "toDateTime#[toStartOfMonth#[toDateTime#[" + param1 + "]]]"; break;
+ case "%Y-01-01 00:00:00": replaceValue = "toDateTime#[toStartOfYear#[toDateTime#[" + param1 + "]]]"; break;
+ case "%Y-%m-%d": replaceValue = "formatDateTime#[toDateTime#["+ param1 +"], '%Y-%m-%d]"; break;
+ case "%Y-%m-01": replaceValue = "formatDateTime#[toDateTime#["+ param1 +"], '%Y-%m-01]"; break;
+ case "%Y-01-01": replaceValue = "formatDateTime#[toDateTime#["+ param1 +"], '%Y-01-01]"; break;
+ default: replaceValue = "toDateTime#[" + param1 + "]";
+ }
+ }else if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType) && !bool) {
+ switch (param2) {
+ case "%Y-%m-%d %H:%i:%s": replaceValue = "toDateTime#[" + param1 + "]"; break;
+ case "%Y-%m-%d %H:%i:00": replaceValue = "toStartOfMinute#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-%d %H:00:00": replaceValue = "toStartOfHour#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-%d 00:00:00": replaceValue = "toStartOfDay#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-01 00:00:00": replaceValue = "toDateTime#[toStartOfMonth#[toDateTime#[" + param1 + "]]]"; break;
+ case "%Y-01-01 00:00:00": replaceValue = "toDateTime#[toStartOfYear#[toDateTime#[" + param1 + "]]]"; break;
+ case "%Y-%m-%d": replaceValue = "toStartOfDay#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-%m-01": replaceValue = "toStartOfMonth#[toDateTime#[" + param1 + "]]"; break;
+ case "%Y-01-01": replaceValue = "toStartOfYear#[toDateTime#[" + param1 + "]]"; break;
+ default: replaceValue = "toDateTime#[" + param1 + "]";
+ }
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType) && bool) {
+ switch (param2) {
+ case "%Y-%m-%d %H:%i:%s": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd HH:mm:ss']"; break;
+ case "%Y-%m-%d %H:%i:00": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd HH:mm:00']"; break;
+ case "%Y-%m-%d %H:00:00": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd HH:00:00']"; break;
+ case "%Y-%m-%d 00:00:00": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd 00:00:00']"; break;
+ case "%Y-%m-01 00:00:00": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-01 00:00:00']"; break;
+ case "%Y-01-01 00:00:00": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-01-01 00:00:00']"; break;
+ case "%Y-%m-%d": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd']"; break;
+ case "%Y-%m-01": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-01']"; break;
+ case "%Y-01-01": replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-01-01']"; break;
+ default: replaceValue = "TIME_FORMAT#[ TIMESTAMP "+ param1 +",'YYYY-MM-dd HH:mm:ss']";
+ }
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType) && !bool) {
+ switch (param2) {
+ case "%Y-%m-%d %H:%i:%s": replaceValue = " TIMESTAMP " + param1; break;
+ case "%Y-%m-%d %H:%i:00": replaceValue = "FLOOR#[ TIMESTAMP " + param1 + " to minute]"; break;
+ case "%Y-%m-%d %H:00:00": replaceValue = "FLOOR#[ TIMESTAMP " + param1 + " to hour]"; break;
+ case "%Y-%m-%d 00:00:00": replaceValue = "FLOOR#[ TIMESTAMP " + param1 + " to day]"; break;
+ case "%Y-%m-01 00:00:00": replaceValue = "FLOOR#[ TIMESTAMP " + param1 + " to month]"; break;
+ case "%Y-01-01 00:00:00": replaceValue = "FLOOR#[ TIMESTAMP " + param1 + " to year]"; break;
+ case "%Y-%m-%d": replaceValue = "TIME_FLOOR#[ TIMESTAMP " + param1 + ",'P1D']"; break;
+ case "%Y-%m-01": replaceValue = "TIME_FLOOR#[ TIMESTAMP " + param1 + ",'P1M']"; break;
+ case "%Y-01-01": replaceValue = "TIME_FLOOR#[ TIMESTAMP " + param1 + ",'P1Y']"; break;
+ default : replaceValue = " TIMESTAMP " + param1; break;
+ }
+ }
+ }
+ return replaceValue;
+ }
+
+ /**
+ * 解析之后的sql处理
+ * @param sql
+ * @param dbType
+ * @return
+ */
+ public static String specialDisposeEnd(String sql, String dbType) {
+ if (sql == null) {
+ return sql;
+ }
+ StringBuffer sb = new StringBuffer();
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)) {
+ Matcher mStrDateTime = pStrDateTime.matcher(sql);
+ while (mStrDateTime.find()) {
+ String innerValue = mStrDateTime.group(1) + " toDateTime#[" + mStrDateTime.group(2) + "]";
+ mStrDateTime.appendReplacement(sb, innerValue);
+ }
+ mStrDateTime.appendTail(sb);
+ sql = sb.toString();
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType)) {
+
+ Matcher mNowFun = pNow.matcher(sql);
+ while (mNowFun.find()) {
+ String innerValue = "CURRENT_TIMESTAMP";
+ mNowFun.appendReplacement(sb, innerValue);
+ }
+ mNowFun.appendTail(sb);
+ sql = sb.toString();
+
+ Matcher mStrDateTime = pStrDateTime.matcher(sql);
+ sb = new StringBuffer();
+ while (mStrDateTime.find()) {
+ String innerValue = mStrDateTime.group(1) + " TIMESTAMP " + mStrDateTime.group(2);
+ mStrDateTime.appendReplacement(sb, innerValue);
+ }
+ mStrDateTime.appendTail(sb);
+ sql = sb.toString();
+
+
+ Matcher matcher = pCKTimeStamp.matcher(sql);
+ sb = new StringBuffer();
+ while (matcher.find()) {
+ if (matcher.groupCount() == 1) {
+ matcher.appendReplacement(sb, " " + matcher.group(1));
+ }
+ }
+ matcher.appendTail(sb);
+ sql = sb.toString();
+ }
+ return sql;
+ }
+ /**
+ * 替换解析时候加入的[、]、#
+ * @param sql
+ * @return
+ */
+ public static String replaceMark(String sql) {
+ sql = sql.replace("[", "(");
+ sql = sql.replace("]", ")");
+ sql = sql.replace("#", "");
+ return sql;
+ }
+ /**
+ * 替换:FROM_UNIXTIME(unix_timestamp)、FROM_UNIXTIME(date,format)、DATE_FORMAT(date,format)、STR_TO_DATE(date,format)
+ * @param sqlParse
+ * @param num 包含括号的个数
+ * @param bool 应用的分组中为true 条件范围中则为false
+ * @param dbType
+ * @return
+ */
+ public static String dateFormatHavaMark(String sqlParse, int num, boolean bool, String dbType) {
+ Pattern pDateFormatMark = Pattern.compile("(FROM_UNIXTIME|DATE_FORMAT|STR_TO_DATE)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ Matcher mDateFormatMark = pDateFormatMark.matcher(sqlParse);
+ StringBuffer sb = new StringBuffer();
+ if (mDateFormatMark.find()) {
+ String group2 = mDateFormatMark.group(2);
+ String replaceValue = group2;
+ if (StringUtil.getBracketsMatch(group2, "(", false)[0] >= 0) {
+ replaceValue = dateFormatRealize(mDateFormatMark.group(1),group2, bool , dbType);
+ }
+ mDateFormatMark.appendReplacement(sb, replaceValue);
+ }
+ mDateFormatMark.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 替换SUBDATE(expr,days)、SUBDATE(expr,days)、SUBDATE(date,INTERVAL expr dbType)、ADDDATE(date,INTERVAL expr dbType)、DATE_SUB(date,INTERVAL expr dbType)、DATE_ADD(date,INTERVAL expr dbType)
+ * @param sqlParse
+ * @param num 包含的括号个数
+ * @param dbType
+ * @return
+ */
+ public static String dateRelativeHaveMark(String sqlParse,int num,String dbType){
+ Pattern pDateRelativeParse = Pattern.compile("(ADDDATE|DATE_ADD|SUBDATE|DATE_SUB)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ Matcher matcherParse = pDateRelativeParse.matcher(sqlParse);
+ String innerValue = "";
+ StringBuffer sb = new StringBuffer();
+ if (matcherParse.find()) {
+ Boolean sign;
+ if(matcherParse.group(1).toUpperCase().contains("ADD")){
+ sign = true;
+ }else {
+ sign = false;
+ }
+ if (StringUtil.getBracketsMatch(matcherParse.group(2), null,false)[0] >= 0) {
+ List<String> params = diviParam(matcherParse.group(2), ",");
+ innerValue = dateRelative(params, sign, dbType);
+ }
+ matcherParse.appendReplacement(sb, innerValue);
+ }
+ matcherParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 实现替换:UNIX_TIMESTAMP()、UNIX_TIMESTAMP(expr)
+ * @param sqlParse
+ * @param num 包含的括号个数
+ * @param dbType
+ * @return
+ */
+ public static String parseUnixTimeHavaMark(String sqlParse, int num, String dbType) {
+ Pattern pUnixTimeParse = Pattern.compile("(UNIX_TIMESTAMP)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ Matcher matcherParse = pUnixTimeParse.matcher(sqlParse);
+ StringBuffer sb = new StringBuffer();
+ String innerValue = null;
+ if (matcherParse.find()) {
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)) {
+ if (matcherParse.group(2) == null || "".equals(matcherParse.group(2).trim())) {
+ innerValue = "toUnixTimestamp#[now#[]]";
+ } else {
+ innerValue = "toUnixTimestamp#[ " + matcherParse.group(2) + "]";
+ }
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType)) {
+ if (matcherParse.group(2) == null || "".equals(matcherParse.group(2).trim())) {
+ innerValue = "0.001 * TIMESTAMP_TO_MILLIS[CURRENT_TIMESTAMP]";
+ } else {
+ innerValue = "0.001 * TIMESTAMP_TO_MILLIS#[ TIMESTAMP " + matcherParse.group(2) + "]";
+ }
+ }
+ matcherParse.appendReplacement(sb, innerValue);
+ }
+ matcherParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 实现替换:DATE(expr)
+ * @param sqlParse
+ * @param num 包含的括号个数
+ * @param dbType
+ * @return
+ */
+ public static String parseDateHavaMark(String sqlParse, int num, String dbType) {
+ Pattern pDateParse = Pattern.compile("(DATE)\\s*\\((.*?(.*?\\).*?){" + num + "})\\)", Pattern.CASE_INSENSITIVE);
+ Matcher matcherParse = pDateParse.matcher(sqlParse);
+ StringBuffer sb = new StringBuffer();
+ String innerValue = null;
+ if (matcherParse.find()) {
+ if (DBTypeEnum.CLICKHOUSE.getValue().equals(dbType)) {
+ innerValue = "toStartOfDay#[toDateTime#[" + matcherParse.group(2) + "]]";
+ } else if (DBTypeEnum.DRUID.getValue().equals(dbType)) {
+ innerValue = "TIME_FLOOR#[ TIMESTAMP " + matcherParse.group(2) + ",'P1D']";
+ }
+ matcherParse.appendReplacement(sb, innerValue);
+ }
+ matcherParse.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * 获取函数参数
+ * @param str
+ * @param div 参数分隔符
+ * @return
+ */
+ public static List<String> diviParam(String str, String div) {
+ if (str == null) {
+ return null;
+ }
+ List<String> result = new ArrayList<>();
+ String[] split = str.split(div);
+ String resultTemp = "";
+ for (int i = 0; i < split.length; i++) {
+ resultTemp += split[i];
+ if (StringUtil.getBracketsMatch(resultTemp, "(", true)[0] >= 0
+ && StringUtil.getBracketsMatch(resultTemp, "[", true)[0] >= 0) {
+ result.add(resultTemp);
+ resultTemp = "";
+ continue;
+ }
+ resultTemp += div;
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/com/mesa/reportservice/util/StringUtil.java b/src/main/java/com/mesa/reportservice/util/StringUtil.java
index f894e3c..74fd4f6 100644
--- a/src/main/java/com/mesa/reportservice/util/StringUtil.java
+++ b/src/main/java/com/mesa/reportservice/util/StringUtil.java
@@ -804,4 +804,53 @@ public final class StringUtil extends StringUtils {
return value;
}
+
+
+ /**
+ * 获取第一个括号对应右括号的索引以及包含括号的个数,向后匹配
+ * @param str
+ * @param bracketL 支持:( OR [ OR {
+ * @param bool 是否将传入的字符串全部匹配完成,若不需要则在匹配到第一个括号匹配完成时结束
+ * @return int[0]:匹配括号的个数,int[1]:第一个括号对应右括号的索引;若无匹配括号返回{0,-1};若有其中一个不成对则返回{-1,index}
+ */
+ public static int[] getBracketsMatch(String str, String bracketL, boolean bool) {
+ int[] result = {0, -1};
+ if (str == null) {
+ return result;
+ }
+ String bracketR = ")";
+ if (bracketL == null) {
+ bracketL = "(";
+ } else if (bracketL.equals("[")) {
+ bracketR = "]";
+ } else if (bracketL.equals("{")) {
+ bracketR = "}";
+ }
+ StringBuffer sb = new StringBuffer(str);
+ int countL = 0, countR = 0;
+ for (int a = 0; a < sb.length(); a++) {
+ if (sb.indexOf(bracketR) == a) {
+ ++countR;
+ sb.replace(a, a + 1, "$");
+ } else if (sb.indexOf(bracketL) == a) {
+ ++countL;
+ sb.replace(a, a + 1, "$");
+ }
+ if (countR > countL) {
+ result[0] = -1;
+ result[1] = -1;
+ return result;
+ }
+ result[0] = countL;
+ result[1] = a;
+ if (countL != 0 && (countL == countR) && !bool) {
+ return result;
+ }
+ }
+ if (countR != countL) {
+ result[0] = -1;
+ }
+ return result;
+ }
+
}
diff --git a/src/main/java/com/mesa/reportservice/util/ZkConfig.java b/src/main/java/com/mesa/reportservice/util/ZkConfig.java
new file mode 100644
index 0000000..24b0a8f
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/ZkConfig.java
@@ -0,0 +1,53 @@
+package com.mesa.reportservice.util;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Created by wk1 on 2020/1/6.
+ */
+@Configuration
+@EnableConfigurationProperties(ZkProperties.class)
+
+public class ZkConfig {
+
+ private final ZkProperties zkproperties;
+
+ @Autowired
+ public ZkConfig(ZkProperties zkproperties) {
+ this.zkproperties = zkproperties;
+ }
+
+ @Bean(name="curatorConnect")
+ public CuratorFramework CuratorConnect() {
+
+ CuratorFramework client = null;
+
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkproperties.getElapsedTimeMs(), zkproperties.getRetryCount());
+
+ // 实例化Curator客户端
+ client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
+ .connectString(zkproperties.getConnectString()) // 放入zookeeper服务器ip
+ .sessionTimeoutMs(zkproperties.getSessionTimeoutMs()).connectionTimeoutMs(zkproperties.getConnectionTimeoutMs()).retryPolicy(retryPolicy) // 设定会话时间以及重连策略
+ .namespace(zkproperties.getNameSpace()).build(); // 设置命名空间以及开始建立连接
+ if (zkproperties.getOpen()==0) {
+ client.start();
+ }
+ else{
+ Logs.info("repoertservice start local" );
+ }
+
+ return client;
+ }
+
+
+}
diff --git a/src/main/java/com/mesa/reportservice/util/ZkProperties.java b/src/main/java/com/mesa/reportservice/util/ZkProperties.java
new file mode 100644
index 0000000..f6bcccb
--- /dev/null
+++ b/src/main/java/com/mesa/reportservice/util/ZkProperties.java
@@ -0,0 +1,82 @@
+package com.mesa.reportservice.util;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Created by wk1 on 2020/1/6.
+ */
+@ConfigurationProperties(prefix = "zookeeper")
+public class ZkProperties{
+
+ private int open;
+
+ private int retryCount;
+
+ private int elapsedTimeMs;
+
+ private String connectString;
+
+ private int sessionTimeoutMs;
+
+ private int connectionTimeoutMs;
+
+ private String nameSpace;
+
+
+ public int getOpen() {
+ return open;
+ }
+
+ public void setOpen(int open) {
+ this.open = open;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public int getElapsedTimeMs() {
+ return elapsedTimeMs;
+ }
+
+ public String getConnectString() {
+ return connectString;
+ }
+
+ public int getSessionTimeoutMs() {
+ return sessionTimeoutMs;
+ }
+
+ public int getConnectionTimeoutMs() {
+ return connectionTimeoutMs;
+ }
+
+ public String getNameSpace() {
+ return nameSpace;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public void setElapsedTimeMs(int elapsedTimeMs) {
+ this.elapsedTimeMs = elapsedTimeMs;
+ }
+
+ public void setConnectString(String connectString) {
+ this.connectString = connectString;
+ }
+
+ public void setSessionTimeoutMs(int sessionTimeoutMs) {
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ }
+
+ public void setConnectionTimeoutMs(int connectionTimeoutMs) {
+ this.connectionTimeoutMs = connectionTimeoutMs;
+ }
+
+ public void setNameSpace(String nameSpace) {
+ this.nameSpace = nameSpace;
+ }
+}
diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties
index 191a01e..86411ff 100644
--- a/src/main/resources/config/application.properties
+++ b/src/main/resources/config/application.properties
@@ -1,7 +1,7 @@
server.port=9093
scan.mysql.scheduled.plan=0/10 * * * * ?
scan.job.scheduled.plan=0/15 * * * * ?
-
+#management.endpoints.web.exposure.include=*
#ÿ��ʮ���ȡһ�β�ѯ״̬��ѭ��350�Σ�3500��֮��ʱ
#globle.query_log_sleep=10000
#globle.loop_number=350
@@ -10,7 +10,7 @@ scan.job.scheduled.plan=0/15 * * * * ?
globle.job_thread=2
#���û�г���7200��ͼ������û�������������Σ�����kill
#globle.ck_timeout=7200000
-hbase.url=192.168.40.120:4444
+hbase.url=192.168.40.224:8084
hbase.table=tsg:report_result
hbase.columefamily=response:result
hbase.colume_job_id=detail:result_id
@@ -20,14 +20,14 @@ hbase.colume_read_rows=detail:read_rows
hbase.colume_memory_usage=detail:memory_usage
hbase.colume_sql=detail:sql
hbase.colume_exception=detail:exception
-ck.task_ip=192.168.40.182:8123
-ck.task_database=tsg_galaxy
+ck.task_ip=192.168.40.224:8123
+ck.task_database=tsg_galaxy_v3
#ck.task_database=k18_galaxy_service
-ck.task_user=ck
-ck.task_user_password=111111
-ck.log_ip=192.168.40.182:8123
+ck.task_user=default
+ck.task_user_password=ceiec2019
+ck.log_ip=192.168.40.224:8123
ck.log_user=default
-ck.log_user_password=111111
+ck.log_user_password=ceiec2019
#���������
http.maxTotal=300
#������
@@ -37,13 +37,14 @@ http.connectTimeout=10000
#�����ӳ��л�ȡ�����ӵ��ʱ��
http.connectionRequestTimeout=10000
#���ݴ�����ʱ��
-http.socketTimeout=3600000
+http.socketTimeout=21600000
#�ύ����ǰ���������Ƿ����
http.staleConnectionCheckEnabled=true
-db.url=jdbc\:mariadb\://192.168.40.120\:3306/tsg-bifang
+#db.url=jdbc\:mariadb\://192.168.40.120\:3306/tsg-bifang
#db.url=jdbc\:mariadb\://192.168.11.210\:3306/tsg
#db.url=jdbc\:mariadb\://47.254.24.224\:3306/test
-#db.url=jdbc\:mariadb\://10.4.35.1\:3306/tsg-bifang
+db.url=jdbc\:mariadb\://192.168.40.131\:3306/tsg-bifang
+#db.url=jdbc\:mariadb\://192.168.40.204\:3306/reporttest1
#����
#drivers=ru.yandex.clickhouse.ClickHouseDriver
@@ -51,7 +52,8 @@ db.drivers=org.mariadb.jdbc.Driver
#�û���
db.user=root
#����
-db.password=bifang!@#
+#db.password=bifang!@#
+db.password=111111
#��ʼ����С
db.initialsize=20
#��������
@@ -61,4 +63,11 @@ db.maxactive=300
db.filters=stat,wall,log4j,config
-
+#�Ƿ�����zookeeper 0����(��Ⱥ) 1����(����)
+zookeeper.open=1
+zookeeper.retryCount=5
+zookeeper.elapsedTimeMs=1000
+zookeeper.connectString=127.0.0.1:2181
+zookeeper.sessionTimeoutMs=5000
+zookeeper.connectionTimeoutMs=5000
+zookeeper.nameSpace=reportservice \ No newline at end of file
diff --git a/src/main/resources/docker/Dockerfile b/src/main/resources/docker/Dockerfile
new file mode 100644
index 0000000..dd44d1f
--- /dev/null
+++ b/src/main/resources/docker/Dockerfile
@@ -0,0 +1,14 @@
+ARG JDK_IMAGE
+FROM ${JDK_IMAGE}
+MAINTAINER Galaxy
+VOLUME /tmp
+WORKDIR /home/tsg/galaxy/galaxy-report-service
+COPY config config
+ARG JAR_FILE
+COPY ${JAR_FILE} galaxy-report-service.jar
+#dockercompose set JAVA_OPTS
+ENV JAVA_OPTS=" -Xms1024m -Xmx2048m "
+ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
+#ENV TZ=Asia/Almaty
+#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
+ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar galaxy-report-service.jar" ]