summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-05-09 14:27:01 +0800
committergujinkai <[email protected]>2024-05-09 14:34:04 +0800
commitd1fbab9662807bfc1e5bb1bce2069275ece937c2 (patch)
treeabac3dd5316386d671a54354b87c3577faf70448
parent92cd5791c681de13172d27b98f08c73428bc0344 (diff)
perf: fix the bugs from sonarqube
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java12
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java15
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java8
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java10
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java4
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java12
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java198
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java137
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java135
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java35
10 files changed, 48 insertions, 518 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java
index b095404..0d85a77 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java
@@ -18,14 +18,14 @@ import java.util.Map;
*/
public class FirstAggregation extends AbstractFirstAggregation<LocationSubscriber> {
- private final Logger logger = LoggerFactory.getLogger(FirstAggregation.class);
+ private final transient Logger logger = LoggerFactory.getLogger(FirstAggregation.class);
- private final String locationSubscriberKey = "locationSubscriberMetric";
+ private static final String LOCATION_SUBSCRIBER_KEY = "locationSubscriberMetric";
@Override
public Map<String, Map<String, LocationSubscriber>> createAccumulator() {
Map<String, Map<String, LocationSubscriber>> accumulator = new HashMap<>();
- accumulator.put(locationSubscriberKey, new HashMap<>());
+ accumulator.put(LOCATION_SUBSCRIBER_KEY, new HashMap<>());
return accumulator;
}
@@ -34,7 +34,7 @@ public class FirstAggregation extends AbstractFirstAggregation<LocationSubscribe
try {
String subscriberId = value.getSubscriber_id();
if (StringUtils.isNotBlank(subscriberId)) {
- Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(locationSubscriberKey);
+ Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(LOCATION_SUBSCRIBER_KEY);
LocationSubscriber locationSubscriber = locationSubscriberMap.get(subscriberId);
if (locationSubscriber != null) {
updateLocationSubscriber(value, locationSubscriber);
@@ -57,8 +57,8 @@ public class FirstAggregation extends AbstractFirstAggregation<LocationSubscribe
locationSubscriber.setApn(value.getApn());
//todo 目前数据源字段不确定,先用随机经纬度代替
SubscriberUtils.Locate randomLocate = SubscriberUtils.getRandomLocate();
- locationSubscriber.setSubscriber_longitude(randomLocate.longitude);
- locationSubscriber.setSubscriber_latitude(randomLocate.latitude);
+ locationSubscriber.setSubscriber_longitude(randomLocate.getLongitude());
+ locationSubscriber.setSubscriber_latitude(randomLocate.getLatitude());
locationSubscriber.setRecv_time(value.getCommon_recv_time());
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java
index fdd4192..5e3f673 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java
@@ -11,8 +11,9 @@ public class SubscriberUtils {
private static Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5, Locate.L6, Locate.L7, Locate.L8, Locate.L9, Locate.L10, Locate.L11, Locate.L12, Locate.L13, Locate.L14, Locate.L15, Locate.L16, Locate.L17, Locate.L18, Locate.L19, Locate.L20, Locate.L21, Locate.L22, Locate.L23, Locate.L24, Locate.L25};
+ private static final Random random = new Random();
+
public static Locate getRandomLocate() {
- Random random = new Random();
int i = random.nextInt(25);
return locates[i];
}
@@ -45,12 +46,20 @@ public class SubscriberUtils {
L25(116.367037, 39.947116);
- public Double longitude;
- public Double latitude;
+ private Double longitude;
+ private Double latitude;
Locate(Double longitude, Double latitude) {
this.longitude = longitude;
this.latitude = latitude;
}
+
+ public Double getLongitude() {
+ return longitude;
+ }
+
+ public Double getLatitude() {
+ return latitude;
+ }
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java b/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java
index 9b808f4..22a4bc6 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java
@@ -26,21 +26,19 @@ public interface RuleManager<K, V> {
* @param update The consumer for handling updates to the rule.
* @throws Exception Thrown if an error occurs during the start of asynchronous flushing.
*/
- void startAsyncFlush(long intervalMs, Consumer<RuleClient.Updatable<K, V>> update) throws Exception;
+ void startAsyncFlush(long intervalMs, Consumer<RuleClient.Updatable<K, V>> update);
/**
* Waits for the asynchronous rule flush to finish.
*
* @throws InterruptedException if interrupted while waiting.
*/
- void awaitAsyncFlushFinish() throws InterruptedException;
+ boolean awaitAsyncFlushFinish() throws InterruptedException;
/**
* Stops the asynchronous rule flush.
- *
- * @throws Exception if an error occurs while stopping.
*/
- void stopAsyncFlush() throws Exception;
+ void stopAsyncFlush();
/**
* Flushes the rules synchronously.
diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java b/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java
index c0163ff..d6e59aa 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java
@@ -25,10 +25,10 @@ public class SimpleRuleManager<K, V> implements RuleManager<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRuleManager.class);
// Atomic boolean to track if asynchronous flush is running.
- private final static AtomicBoolean ASYNC_FLUSH_RUNNING = new AtomicBoolean(false);
+ private static final AtomicBoolean ASYNC_FLUSH_RUNNING = new AtomicBoolean(false);
// CountDownLatch for asynchronous flush synchronization.
- private final static CountDownLatch ASYNC_FLUSH_LATCH = new CountDownLatch(1);
+ private static final CountDownLatch ASYNC_FLUSH_LATCH = new CountDownLatch(1);
private final RuleClient<K, V> client;
@@ -73,11 +73,11 @@ public class SimpleRuleManager<K, V> implements RuleManager<K, V> {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
- public void awaitAsyncFlushFinish() throws InterruptedException {
- ASYNC_FLUSH_LATCH.await(30, TimeUnit.SECONDS);
+ public boolean awaitAsyncFlushFinish() throws InterruptedException {
+ return ASYNC_FLUSH_LATCH.await(30, TimeUnit.SECONDS);
}
- public void stopAsyncFlush() throws Exception {
+ public void stopAsyncFlush() {
if (this.fetcher != null) {
this.fetcher.close();
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java b/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java
index 8fe1b98..2e6a91e 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java
@@ -28,8 +28,6 @@ public interface Fetcher {
/**
* Closes the fetcher and releases any allocated resources.
* Impl should perform cleanup operations here.
- *
- * @throws Exception If an error occurs during the closing process.
*/
- void close() throws Exception;
+ void close();
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java b/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java
index 9902fc4..a651bb5 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java
@@ -50,22 +50,16 @@ public class FixedRateFetcher implements Fetcher, ScheduledSupport {
callback(e);
}
}, 0, rateIntervalMs, TimeUnit.MILLISECONDS);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- close();
- } catch (Exception e) {
- throw new RuntimeException("Fetcher close error.", e);
- }
- }));
+ Runtime.getRuntime().addShutdownHook(new Thread(this::close));
}
/**
* Closes the FixedRateFetcher, canceling the scheduled future.
*
- * @throws Exception if an error occurs while closing.
+ * @throws IllegalStateException if an error occurs while closing.
*/
@Override
- public void close() throws Exception {
+ public void close() {
if (scheduledFuture == null) {
throw new IllegalStateException("Schedule task be not started.");
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java b/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java
deleted file mode 100644
index 6b55661..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java
+++ /dev/null
@@ -1,198 +0,0 @@
-package com.zdjizhi.base.utils;
-
-import org.apache.zookeeper.*;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-
-public class DistributedLock implements Lock, Watcher {
- private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
-
- private ZooKeeper zk = null;
- /**
- * 根节点
- */
- private final String ROOT_LOCK = "/locks";
- /**
- * 竞争的资源
- */
- private String lockName;
- /**
- * 等待的前一个锁
- */
- private String waitLock;
- /**
- * 当前锁
- */
- private String currentLock;
- /**
- * 计数器
- */
- private CountDownLatch countDownLatch;
-
- private int sessionTimeout = 2000;
-
- private List<Exception> exceptionList = new ArrayList<Exception>();
-
- /**
- * 配置分布式锁
- *
- * @param config 连接的url
- * @param lockName 竞争资源
- */
- public DistributedLock(String config, String lockName) {
- this.lockName = lockName;
- try {
- // 连接zookeeper
- zk = new ZooKeeper(config, sessionTimeout, this);
- Stat stat = zk.exists(ROOT_LOCK, false);
- if (stat == null) {
- // 如果根节点不存在,则创建根节点
- zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (IOException | InterruptedException | KeeperException e) {
- logger.error("Node already exists!");
- }
- }
-
- /**
- * 节点监视器
- */
- @Override
- public void process(WatchedEvent event) {
- if (this.countDownLatch != null) {
- this.countDownLatch.countDown();
- }
- }
-
- @Override
- public void lock() {
- if (exceptionList.size() > 0) {
- throw new LockException(exceptionList.get(0));
- }
- try {
- if (this.tryLock()) {
- logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
- } else {
- // 等待锁
- waitForLock(waitLock, sessionTimeout);
- }
- } catch (InterruptedException | KeeperException e) {
- logger.error("获取锁异常" + e);
- }
- }
-
- @Override
- public boolean tryLock() {
- try {
- String splitStr = "_lock_";
- if (lockName.contains(splitStr)) {
- throw new LockException("锁名有误");
- }
- // 创建临时有序节点
- currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- // 取所有子节点
- List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
- // 取出所有lockName的锁
- List<String> lockObjects = new ArrayList<String>();
- for (String node : subNodes) {
- String tmpNode = node.split(splitStr)[0];
- if (tmpNode.equals(lockName)) {
- lockObjects.add(node);
- }
- }
- Collections.sort(lockObjects);
- // 若当前节点为最小节点,则获取锁成功
- if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
- return true;
- }
- // 若不是最小节点,则找到自己的前一个节点
- String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
- waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
- } catch (InterruptedException | KeeperException e) {
- logger.error("获取锁过程异常" + e);
- }
- return false;
- }
-
-
- @Override
- public boolean tryLock(long timeout, TimeUnit unit) {
- try {
- if (this.tryLock()) {
- return true;
- }
- return waitForLock(waitLock, timeout);
- } catch (KeeperException | InterruptedException | RuntimeException e) {
- logger.error("判断是否锁定异常" + e);
- }
- return false;
- }
-
- /**
- * 等待锁
- *
- * @param prev 锁名称
- * @param waitTime 等待时间
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
- Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
-
- if (stat != null) {
- this.countDownLatch = new CountDownLatch(1);
- // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
- this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
- this.countDownLatch = null;
- }
- return true;
- }
-
- @Override
- public void unlock() {
- try {
- zk.delete(currentLock, -1);
- currentLock = null;
- zk.close();
- } catch (InterruptedException | KeeperException e) {
- logger.error("关闭锁异常" + e);
- }
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- this.lock();
- }
-
-
- public class LockException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public LockException(String e) {
- super(e);
- }
-
- public LockException(Exception e) {
- super(e);
- }
- }
-
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java
index 6889a4e..2c8a15f 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java
@@ -5,103 +5,43 @@ import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.config.Configs;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.*;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.ParseException;
import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.config.Registry;
-import org.apache.http.config.RegistryBuilder;
-import org.apache.http.conn.ConnectTimeoutException;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
-import org.apache.http.conn.HttpHostConnectException;
-import org.apache.http.conn.socket.ConnectionSocketFactory;
-import org.apache.http.conn.socket.PlainConnectionSocketFactory;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.message.BasicHeaderElementIterator;
-import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.*;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.URI;
-import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.X509Certificate;
import java.util.Map;
public class HttpClientUtils {
- /** 全局连接池对象 */
- private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
public static final String ERROR_MESSAGE = "-1";
- /*
- * 静态代码块配置连接池信息
- */
- static {
-
+ private PoolingHttpClientConnectionManager getSslClientManager() {
+ // 创建ConnectionManager,添加Connection配置信息
+ PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
// 设置最大连接数
- CONN_MANAGER.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION));
+ connManager.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION));
// 设置每个连接的路由数
- CONN_MANAGER.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE));
-
- }
-
-
- /**
- * 在调用SSL之前需要重写验证方法,取消检测SSL
- * 创建ConnectionManager,添加Connection配置信息
- *
- * @return HttpClient 支持https
- */
- private PoolingHttpClientConnectionManager getSslClientManager() {
- try {
- // 在调用SSL之前需要重写验证方法,取消检测SSL
- X509TrustManager trustManager = new X509TrustManager() {
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return null;
- }
- @Override
- public void checkClientTrusted(X509Certificate[] xcs, String str) {
- }
- @Override
- public void checkServerTrusted(X509Certificate[] xcs, String str) {
- }
- };
- SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
- ctx.init(null, new TrustManager[]{trustManager}, null);
- SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
- Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
- .register("http", PlainConnectionSocketFactory.INSTANCE)
- .register("https", socketFactory).build();
- // 创建ConnectionManager,添加Connection配置信息
- PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
- // 设置最大连接数
- connManager.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION));
- // 设置每个连接的路由数
- connManager.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE));
- return connManager;
- } catch (KeyManagementException | NoSuchAlgorithmException e) {
- throw new RuntimeException(e.getMessage());
- }
+ connManager.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE));
+ return connManager;
}
/**
@@ -119,70 +59,15 @@ public class HttpClientUtils {
.setSocketTimeout(Configs.get(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT))
.build();
- /*
- * 测出超时重试机制为了防止超时不生效而设置
- * 如果直接放回false,不重试
- * 这里会根据情况进行判断是否重试
- */
- HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
- if (executionCount >= 3) {// 如果已经重试了3次,就放弃
- return false;
- }
- if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
- return true;
- }
- if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
- return false;
- }
- if (exception instanceof UnknownHostException) {// 目标服务器不可达
- return false;
- }
- if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
- return false;
- }
- if (exception instanceof HttpHostConnectException) {// 连接被拒绝
- return false;
- }
- if (exception instanceof SSLException) {// ssl握手异常
- return false;
- }
- if (exception instanceof InterruptedIOException) {// 超时
- return true;
- }
- HttpClientContext clientContext = HttpClientContext.adapt(context);
- HttpRequest request = clientContext.getRequest();
- // 如果请求是幂等的,就再次尝试
- return !(request instanceof HttpEntityEnclosingRequest);
- };
-
-
- ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
- HeaderElementIterator it = new BasicHeaderElementIterator
- (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
- while (it.hasNext()) {
- HeaderElement he = it.nextElement();
- String param = he.getName();
- String value = he.getValue();
- if (value != null && "timeout".equalsIgnoreCase(param)) {
- return Long.parseLong(value) * 1000;
- }
- }
- return 60 * 1000;//如果没有约定,则默认定义时长为60s
- };
-
// 创建httpClient
return HttpClients.custom()
// 把请求相关的超时信息设置到连接客户端
.setDefaultRequestConfig(requestConfig)
- // 把请求重试设置到连接客户端
- .setRetryHandler(retry)
- .setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
.setConnectionManager(getSslClientManager())
.build();
}
-
/**
* GET请求
*
@@ -309,7 +194,7 @@ public class HttpClientUtils {
}
}
} catch (Exception e) {
- logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
+ logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder, path, params);
}
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java
deleted file mode 100644
index 6075521..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package com.zdjizhi.base.utils;
-
-import cn.hutool.core.util.StrUtil;
-import org.apache.zookeeper.*;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-
-public class ZookeeperUtils implements Watcher {
- private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
-
- private ZooKeeper zookeeper;
-
- private static final int SESSION_TIME_OUT = 20000;
-
- private CountDownLatch countDownLatch = new CountDownLatch(1);
-
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- countDownLatch.countDown();
- }
- }
-
-
- /**
- * 修改节点信息
- *
- * @param path 节点路径
- */
- int modifyNode(String path, String zookeeperIp) {
- createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
- int workerId = 0;
- try {
- connectZookeeper(zookeeperIp);
- Stat stat = zookeeper.exists(path, true);
- workerId = Integer.parseInt(getNodeDate(path));
- if (workerId > 63) {
- workerId = 0;
- zookeeper.setData(path, "1".getBytes(), stat.getVersion());
- } else {
- String result = String.valueOf(workerId + 1);
- if (stat != null) {
- zookeeper.setData(path, result.getBytes(), stat.getVersion());
- } else {
- logger.error("Node does not exist!,Can't modify");
- }
- }
- } catch (KeeperException | InterruptedException e) {
- logger.error("modify error Can't modify," + e);
- } finally {
- closeConn();
- }
- logger.warn("workerID is:" + workerId);
- return workerId;
- }
-
- /**
- * 连接zookeeper
- *
- * @param host 地址
- */
- private void connectZookeeper(String host) {
- try {
- zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
- countDownLatch.await();
- } catch (IOException | InterruptedException e) {
- logger.error("Connection to the Zookeeper Exception! message:" + e);
- }
- }
-
- /**
- * 关闭连接
- */
- private void closeConn() {
- try {
- if (zookeeper != null) {
- zookeeper.close();
- }
- } catch (InterruptedException e) {
- logger.error("Close the Zookeeper connection Exception! message:" + e);
- }
- }
-
- /**
- * 获取节点内容
- *
- * @param path 节点路径
- * @return 内容/异常null
- */
- private String getNodeDate(String path) {
- String result = null;
- Stat stat = new Stat();
- try {
- byte[] resByte = zookeeper.getData(path, true, stat);
-
- result = StrUtil.str(resByte, "UTF-8");
- } catch (KeeperException | InterruptedException e) {
- logger.error("Get node information exception" + e);
- }
- return result;
- }
-
- /**
- * @param path 节点创建的路径
- * @param date 节点所存储的数据的byte[]
- * @param acls 控制权限策略
- */
- private void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
- try {
- connectZookeeper(zookeeperIp);
- Stat exists = zookeeper.exists(path, true);
- if (exists == null) {
- Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
- if (existsSnowflakeld == null) {
- zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
- }
- zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
- } else {
- logger.warn("Node already exists ! Don't need to create");
- }
- } catch (KeeperException | InterruptedException e) {
- logger.error(e.toString());
- } finally {
- closeConn();
- }
- }
-
-}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java
index 7a95f45..69b7a29 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java
@@ -2,12 +2,9 @@ package com.zdjizhi.etl.utils.csv;
import com.opencsv.CSVParser;
import com.opencsv.CSVReader;
-import com.zdjizhi.base.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.InputStreamReader;
import java.io.Reader;
import java.util.*;
@@ -15,8 +12,6 @@ public class HighCsvReader {
private final Logger logger = LoggerFactory.getLogger(HighCsvReader.class);
- private CSVReader csvReader;
-
private String[] header = null;
private final Map<String, Integer> columnNames = new HashMap<>();
@@ -32,9 +27,8 @@ public class HighCsvReader {
}
private void init(Reader reader, List<String> columns) {
- try {
- this.csvReader = new CSVReader(reader, CSVParser.DEFAULT_SEPARATOR,
- CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER);
+ try (CSVReader csvReader = new CSVReader(reader, CSVParser.DEFAULT_SEPARATOR,
+ CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER)) {
String[] allHeader = csvReader.readNext();
if (allHeader == null) {
return;
@@ -43,7 +37,7 @@ public class HighCsvReader {
String columnName = allHeader[i];
columnNames.put(columnName, i);
}
- if (columns.size() == 0) {
+ if (columns.isEmpty()) {
columns.addAll(columnNames.keySet());
} else {
columns.removeIf(next -> !columnNames.containsKey(next));
@@ -60,14 +54,6 @@ public class HighCsvReader {
}
} catch (Exception e) {
logger.error("csv parse error, error exception: " + e.getMessage(), e);
- } finally {
- if (csvReader != null) {
- try {
- csvReader.close();
- } catch (Exception ignored) {
-
- }
- }
}
}
@@ -83,16 +69,6 @@ public class HighCsvReader {
return columnNames.get(fieldName);
}
- private InputStreamReader getReader(byte[] bytes) {
- return new InputStreamReader(new ByteArrayInputStream(bytes));
- }
-
- private InputStreamReader getReader(String fileRelativePath) {
- FileUtils fileUtils = new FileUtils("SINGLE");
- byte[] bytes = fileUtils.getFileBytes(fileRelativePath);
- return getReader(bytes);
- }
-
public class CsvIterator implements Iterator<Map<String, String>>{
private String[] header;
@@ -116,7 +92,10 @@ public class HighCsvReader {
@Override
public Map<String, String> next() {
- HashMap<String, String> lineMap = new HashMap<String, String>(header.length);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ HashMap<String, String> lineMap = new HashMap<>(header.length);
String[] line = data.get(currentLineNumber);
for (int i = 0; i < line.length; i++) {
lineMap.put(header[i], line[i]);