diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2020-12-04 10:12:04 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2020-12-04 10:12:04 +0800 |
| commit | 748e36cffb446568b0089b455f2eceda3c883a52 (patch) | |
| tree | e07b604d83b4e7aeca7f580cbc2484e156b37dfb | |
| parent | 11425f1b9eea2bf47120c31fa1accdcb68335a42 (diff) | |
更改zk中uuid生成策略
16 files changed, 64 insertions, 307 deletions
diff --git a/.gitignore b/.gitignore deleted file mode 100644 index f17ae77..0000000 --- a/.gitignore +++ /dev/null @@ -1,34 +0,0 @@ -HELP.md -/target/ -!.mvn/wrapper/maven-wrapper.jar - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -/build/ - -### VS Code ### -.vscode/ -logs/galaxy-report-service.log -logs/galaxy-report-service.log.2019-07-30 -logs/galaxy-report-service.log.2019-07-31 -logs/galaxy-report-service.log.2019-08-28 -logs/galaxy-report-service.log.2019-08-21 @@ -10,7 +10,7 @@ </parent> <groupId>com.mesa</groupId> <artifactId>galaxy-report-service</artifactId> - <version>20.10.20</version> + <version>20.12.03</version> <name>galaxy-report-service</name> <description>Demo project for Spring Boot</description> @@ -219,7 +219,7 @@ <forceTags>true</forceTags> <imageTags> - <imageTag>20.10.20</imageTag> + <imageTag>20.12.03</imageTag> </imageTags> <!--远程docker构建,供dockerfile使用--> <dockerHost>http://192.168.40.153:2375</dockerHost> diff --git a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java index 54c7d84..2824ecf 100644 --- a/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java +++ b/src/main/java/com/mesa/reportservice/bean/MonitorEntity.java @@ -14,6 +14,7 @@ public class MonitorEntity { private Map<String,JobEntity> joblist ; + private String status; public MonitorEntity() { @@ -59,4 +60,13 @@ public class MonitorEntity { public void setJoblist(Map<String, JobEntity> joblist) { this.joblist = joblist; } + + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } } diff --git a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java index 643d198..f860788 100644 --- a/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java +++ b/src/main/java/com/mesa/reportservice/configuration/GlobelConfig.java @@ -5,6 +5,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** @@ -20,4 +21,5 @@ public class GlobelConfig { public void setJob_thread(int job_thread) { GlobelConfig.job_thread = job_thread; } + public final static String zkuuid = UUID.randomUUID().toString().replaceAll("-",""); } diff --git a/src/main/java/com/mesa/reportservice/controller/MonitorController.java b/src/main/java/com/mesa/reportservice/controller/MonitorController.java index 094e867..91ccabd 100644 --- a/src/main/java/com/mesa/reportservice/controller/MonitorController.java +++ b/src/main/java/com/mesa/reportservice/controller/MonitorController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray; import com.mesa.reportservice.bean.MonitorEntity; import com.mesa.reportservice.configuration.GlobelConfig; import com.mesa.reportservice.service.MysqlService; +import com.mesa.reportservice.service.ZkService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.*; @@ -21,6 +22,8 @@ public class MonitorController { private final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(this.getClass()); @Autowired private MysqlService mysqlService; + @Autowired + private ZkService zs; @GetMapping(value = "/monitor") @@ -36,6 +39,12 @@ public class MonitorController { me.setTodaySuccessJobNum(numMap.get("todaySuccessNum")); me.setTodayErrorJobNum(numMap.get("todayErrorNum")); me.setJoblist(GlobelConfig.mapresult); + if(zs.isMaster()){ + me.setStatus("active"); + } + else{ + me.setStatus("standby"); + } GlobelConfig.mapresult.size(); Object obj = JSONArray.toJSON(me); json = obj.toString(); diff --git a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java index c193e15..b5ca8b9 100644 --- a/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java +++ b/src/main/java/com/mesa/reportservice/controller/ScheduledResultController.java @@ -48,8 +48,8 @@ public class ScheduledResultController { for (JobEntity jobEntity : joblist) { String sql = jobEntity.getQuerySql().trim(); sql = sql.replace("$exe_time", "toDateTime('" + jobEntity.getIssuedTime().trim() + "')"); - //sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); - //sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')"); + sql = sql.replace("$start_time", "toDateTime('" + jobEntity.getStartTime().trim() + "')"); + sql = sql.replace("$end_time", "toDateTime('" + jobEntity.getEndTime().trim() + "')"); String queryid = DigestUtils.md5Hex(jobEntity.getResultId() + sql); jobEntity.setQuery_id(queryid); @@ -57,7 +57,15 @@ public class ScheduledResultController { eps.killQuery(jobEntity); GlobelConfig.mapresult.get(jobEntity.getQuery_id()).setIsValid(0); } else if (!GlobelConfig.mapresult.containsKey(jobEntity.getQuery_id())) { - eps.killQuery(jobEntity); + eps.reSet(jobEntity); + } + if (GlobelConfig.mapresult.containsKey(jobEntity.getQuery_id())) { + if (jobEntity.getIsValid() == 0) { + eps.killQuery(jobEntity); + GlobelConfig.mapresult.get(jobEntity.getQuery_id()).setIsValid(0); + } + } else { + eps.reSet(jobEntity); } } @@ -100,6 +108,15 @@ public class ScheduledResultController { logger.info("无待执行任务"); } } + else { + if (GlobelConfig.mapresult.size() > 0) { + for (Map.Entry<String, JobEntity> entry : GlobelConfig.mapresult.entrySet()) { + logger.info("key = " + entry.getKey() + ", value = " + entry.getValue().getStatus()); + eps.killQuery(entry.getValue()); + } + GlobelConfig.mapresult.clear(); + } + } } catch (Exception e) { logger.error(e.toString()); } diff --git a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java index d35f6d4..94e31db 100644 --- a/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java +++ b/src/main/java/com/mesa/reportservice/service/ExcuteProcessService.java @@ -10,6 +10,8 @@ import java.util.List; public interface ExcuteProcessService { void updateResultMessage(JobEntity job); - void killQuery(JobEntity jobEntity); + void reSet(JobEntity jobEntity); void updateProcessMessage(JobEntity job); + void killQuery(JobEntity jobEntity); + } diff --git a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java index 54cab5c..0f30f34 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ClickhouseServiceImpl.java @@ -20,9 +20,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Map; -import static com.mesa.reportservice.util.GZIPUtils.compress; -import static com.mesa.reportservice.util.GZIPUtils.uncompress; - /** * Created by wk1 on 2019/5/15. */ diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java index c79190a..4ddcd93 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteProcessServiceImpl.java @@ -110,7 +110,7 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { } @Override - public void killQuery(JobEntity jobEntity) { + public void reSet(JobEntity jobEntity) { String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id()); try { cs.QuerySystemForDelete(killurl); @@ -124,7 +124,16 @@ public class ExcuteProcessServiceImpl implements ExcuteProcessService { } } + @Override + public void killQuery(JobEntity jobEntity) { + String killurl = ClickhouseConfig.getKillUrl(jobEntity.getQuery_id()); + try { + cs.QuerySystemForDelete(killurl); + } catch (Exception e) { + logger.error(e.toString()); + } + } /** * 获取进度条信息 */ diff --git a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java index 7109c93..7a7d4f9 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ExcuteserviceImpl.java @@ -41,8 +41,8 @@ public class ExcuteserviceImpl implements ExcuteService { String sql = job.getQuerySql().trim(); sql = sql.replace("$exe_time", "toDateTime('" + job.getIssuedTime().trim() + "')"); - //sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); - //sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')"); + sql = sql.replace("$start_time", "toDateTime('" + job.getStartTime().trim() + "')"); + sql = sql.replace("$end_time", "toDateTime('" + job.getEndTime().trim() + "')"); job.setQuerySql(sql); String queryid = DigestUtils.md5Hex(job.getResultId() + sql); job.setQuery_id(queryid); diff --git a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java index 4509736..88369d6 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/HbaseServiceImpl.java @@ -5,23 +5,12 @@ import com.mesa.reportservice.bean.JobEntity; import com.mesa.reportservice.configuration.HbaseProperties; import com.mesa.reportservice.service.HbaseService; import com.mesa.reportservice.util.Logs; -import com.mesa.reportservice.util.ziputil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.IOException; -import java.util.Map; /** * Created by wk1 on 2019/5/15. diff --git a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java b/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java index 23e970f..dc40104 100644 --- a/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java +++ b/src/main/java/com/mesa/reportservice/service/impl/ZkServiceImpl.java @@ -1,5 +1,6 @@ package com.mesa.reportservice.service.impl; +import com.mesa.reportservice.configuration.GlobelConfig; import com.mesa.reportservice.configuration.ZkProperties; import com.mesa.reportservice.service.ZkService; import com.mesa.reportservice.util.Logs; @@ -31,18 +32,14 @@ public class ZkServiceImpl implements ZkService { boolean isZkCuratorStarted = curatorConnect.isStarted(); String nodePath = "/masterip"; System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭...")); - ExistsBuilder sss = curatorConnect.checkExists(); + ExistsBuilder eb = curatorConnect.checkExists(); Stat statExist = curatorConnect.checkExists().forPath(nodePath); - InetAddress address = InetAddress.getLocalHost(); - String localip = address.getHostAddress().trim(); if (statExist == null) { - System.out.println(address.getHostAddress()); - byte[] data = address.getHostAddress().getBytes(); // 节点数据 + byte[] data = GlobelConfig.zkuuid.getBytes(); // 节点数据 String result = curatorConnect.create().creatingParentsIfNeeded() // 创建父节点,也就是会递归创建 .withMode(CreateMode.EPHEMERAL) // 节点类型 .forPath(nodePath, data); - System.out.println(result + "节点,创建成功..."); return true; } else { @@ -50,10 +47,10 @@ public class ZkServiceImpl implements ZkService { Stat stat = new Stat(); byte[] nodeData = curatorConnect.getData().storingStatIn(stat).forPath(nodePath); - String masterip = new String(nodeData).trim(); - System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData)); + String masterid = new String(nodeData).trim(); + System.out.println("uuid="+ GlobelConfig.zkuuid+" 节点 " + nodePath + " 的数据为:" + new String(nodeData)); - if (masterip.equals(localip)) { + if (masterid.equals(GlobelConfig.zkuuid)) { return true; diff --git a/src/main/java/com/mesa/reportservice/util/GZIPUtils.java b/src/main/java/com/mesa/reportservice/util/GZIPUtils.java deleted file mode 100644 index 5185351..0000000 --- a/src/main/java/com/mesa/reportservice/util/GZIPUtils.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.mesa.reportservice.util; - -/** - * Created by wk1 on 2020/5/14. - */ -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -public class GZIPUtils { - public static final String GZIP_ENCODE_UTF_8 = "UTF-8"; - public static final String GZIP_ENCODE_ISO_8859_1 = "ISO-8859-1"; - - - public static byte[] compress(byte[] str) { - if (str == null || str.length == 0) { - return null; - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - GZIPOutputStream gzip; - try { - gzip = new GZIPOutputStream(out); - gzip.write(str); - gzip.close(); - } catch ( Exception e) { - e.printStackTrace(); - } - return out.toByteArray(); - } - - - - public static byte[] uncompress(byte[] bytes) { - if (bytes == null || bytes.length == 0) { - return null; - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayInputStream in = new ByteArrayInputStream(bytes); - try { - GZIPInputStream ungzip = new GZIPInputStream(in); - byte[] buffer = new byte[256]; - int n; - while ((n = ungzip.read(buffer)) >= 0) { - out.write(buffer, 0, n); - } - } catch (Exception e) { - e.printStackTrace(); - } - return out.toByteArray(); - } - - public static String uncompressToString(byte[] bytes, String encoding) { - if (bytes == null || bytes.length == 0) { - return null; - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayInputStream in = new ByteArrayInputStream(bytes); - try { - GZIPInputStream ungzip = new GZIPInputStream(in); - byte[] buffer = new byte[256]; - int n; - while ((n = ungzip.read(buffer)) >= 0) { - out.write(buffer, 0, n); - } - return out.toString(encoding); - } catch (Exception e) { - e.printStackTrace(); - } - return null; - } - - public static String uncompressToString(byte[] bytes) { - return uncompressToString(bytes, GZIP_ENCODE_UTF_8); - } - - public static void main(String[] args) throws IOException { - String s = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - System.out.println("字符串长度:"+s.length()); - //System.out.println("压缩后::"+compress(s).length); - // System.out.println("解压后:"+uncompress(compress(s)).length); - // System.out.println("解压字符串后::"+uncompressToString(compress(s)).length()); - } -} diff --git a/src/main/java/com/mesa/reportservice/util/IdleConnectionEvictor.java b/src/main/java/com/mesa/reportservice/util/IdleConnectionEvictor.java deleted file mode 100644 index cca9f05..0000000 --- a/src/main/java/com/mesa/reportservice/util/IdleConnectionEvictor.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.mesa.reportservice.util; - -import org.apache.http.conn.HttpClientConnectionManager; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * Created by wk1 on 2019/5/15. - */ - - -@Component - -public class IdleConnectionEvictor extends Thread { - - @Autowired - private HttpClientConnectionManager connMgr; - - private volatile boolean shutdown; - - public IdleConnectionEvictor() { - super(); - super.start(); - } - - @Override - public void run() { - try { - while (!shutdown) { - synchronized (this) { - wait(50000); -// 关闭失效的连接 - connMgr.closeExpiredConnections(); - } - } - } catch (InterruptedException ex) { -// 结束 - } - } - - //关闭清理无效连接的线程 - public void shutdown() { - shutdown = true; - synchronized (this) { - notifyAll(); - } - } -} - - diff --git a/src/main/java/com/mesa/reportservice/util/ziputil.java b/src/main/java/com/mesa/reportservice/util/ziputil.java deleted file mode 100644 index f530f45..0000000 --- a/src/main/java/com/mesa/reportservice/util/ziputil.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.mesa.reportservice.util; - - -import net.jpountz.lz4.*; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; -import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; -import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.xerial.snappy.Snappy; - -import java.io.ByteArrayInputStream; -import java.io.IOException; - -/** - * Created by wk1 on 2020/5/15. - */ -public class ziputil { - - - public static byte[] commonGzipCompress(byte[] data) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - GzipCompressorOutputStream gcos = new GzipCompressorOutputStream(baos); - gcos.write(data); - gcos.close(); - return baos.toByteArray(); - } - - public static byte[] commonGzipDecompress(byte[] data) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - GzipCompressorInputStream gcis = new GzipCompressorInputStream(new ByteArrayInputStream(data)); - int count; - byte[] buffer = new byte[8192]; - while ((count = gcis.read(buffer)) != -1) { - baos.write(buffer, 0, count); - } - gcis.close(); - return baos.toByteArray(); - } - - - - public static byte[] snappyCompress(byte[] data) throws IOException { - return Snappy.compress(data); - } - - public static byte[] snappyDecompress(byte[] data) throws IOException { - return Snappy.uncompress(data); - } - - - public static byte[] commonXzCompress(byte[] data) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); - XZCompressorOutputStream pcos = new XZCompressorOutputStream(baos); - pcos.write(data); - pcos.close(); - return baos.toByteArray(); - } - - public static byte[] commonXzDecompress(byte[] data) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); - XZCompressorInputStream xzis = new XZCompressorInputStream(new ByteArrayInputStream(data)); - int count; - byte[] buffer = new byte[8192]; - while ((count = xzis.read(buffer)) != -1) { - baos.write(buffer, 0, count); - } - xzis.close(); - return baos.toByteArray(); - } - - - public static byte[] lz4Compress(byte[] data) throws IOException { - LZ4Factory factory = LZ4Factory.fastestInstance(); - ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); - LZ4Compressor compressor = factory.fastCompressor(); - LZ4BlockOutputStream compressedOutput = new LZ4BlockOutputStream(byteOutput, 8192, compressor); - compressedOutput.write(data); - compressedOutput.close(); - return byteOutput.toByteArray(); - } - - public static byte[] lz4Decompress(byte[] data) throws IOException { - LZ4Factory factory = LZ4Factory.fastestInstance(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(8192); - LZ4FastDecompressor decompresser = factory.fastDecompressor(); - LZ4BlockInputStream lzis = new LZ4BlockInputStream(new ByteArrayInputStream(data), decompresser); - int count; - byte[] buffer = new byte[8192]; - while ((count = lzis.read(buffer)) != -1) { - baos.write(buffer, 0, count); - } - lzis.close(); - return baos.toByteArray(); - } -} diff --git a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java index 61e9172..4ae38f5 100644 --- a/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java +++ b/src/test/java/com/mesa/reportservice/ReportserviceApplicationTests.java @@ -1,24 +1,14 @@ package com.mesa.reportservice; -import com.mesa.reportservice.bean.HttpResult; -import com.mesa.reportservice.util.ziputil; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; - -import java.io.File; import java.io.IOException; -import java.util.Date; -import java.util.Map; -import java.util.TreeMap; @RunWith(SpringRunner.class) @SpringBootTest |
