diff options
| author | gujinkai <[email protected]> | 2024-05-09 14:27:01 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-05-09 14:34:04 +0800 |
| commit | d1fbab9662807bfc1e5bb1bce2069275ece937c2 (patch) | |
| tree | abac3dd5316386d671a54354b87c3577faf70448 | |
| parent | 92cd5791c681de13172d27b98f08c73428bc0344 (diff) | |
perf: fix the bugs from sonarqube
10 files changed, 48 insertions, 518 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java index b095404..0d85a77 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java @@ -18,14 +18,14 @@ import java.util.Map; */ public class FirstAggregation extends AbstractFirstAggregation<LocationSubscriber> { - private final Logger logger = LoggerFactory.getLogger(FirstAggregation.class); + private final transient Logger logger = LoggerFactory.getLogger(FirstAggregation.class); - private final String locationSubscriberKey = "locationSubscriberMetric"; + private static final String LOCATION_SUBSCRIBER_KEY = "locationSubscriberMetric"; @Override public Map<String, Map<String, LocationSubscriber>> createAccumulator() { Map<String, Map<String, LocationSubscriber>> accumulator = new HashMap<>(); - accumulator.put(locationSubscriberKey, new HashMap<>()); + accumulator.put(LOCATION_SUBSCRIBER_KEY, new HashMap<>()); return accumulator; } @@ -34,7 +34,7 @@ public class FirstAggregation extends AbstractFirstAggregation<LocationSubscribe try { String subscriberId = value.getSubscriber_id(); if (StringUtils.isNotBlank(subscriberId)) { - Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(locationSubscriberKey); + Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(LOCATION_SUBSCRIBER_KEY); LocationSubscriber locationSubscriber = locationSubscriberMap.get(subscriberId); if (locationSubscriber != null) { updateLocationSubscriber(value, locationSubscriber); @@ -57,8 +57,8 @@ public class FirstAggregation extends AbstractFirstAggregation<LocationSubscribe locationSubscriber.setApn(value.getApn()); //todo 目前数据源字段不确定,先用随机经纬度代替 SubscriberUtils.Locate randomLocate = SubscriberUtils.getRandomLocate(); - locationSubscriber.setSubscriber_longitude(randomLocate.longitude); - locationSubscriber.setSubscriber_latitude(randomLocate.latitude); + locationSubscriber.setSubscriber_longitude(randomLocate.getLongitude()); + locationSubscriber.setSubscriber_latitude(randomLocate.getLatitude()); locationSubscriber.setRecv_time(value.getCommon_recv_time()); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java index fdd4192..5e3f673 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java @@ -11,8 +11,9 @@ public class SubscriberUtils { private static Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5, Locate.L6, Locate.L7, Locate.L8, Locate.L9, Locate.L10, Locate.L11, Locate.L12, Locate.L13, Locate.L14, Locate.L15, Locate.L16, Locate.L17, Locate.L18, Locate.L19, Locate.L20, Locate.L21, Locate.L22, Locate.L23, Locate.L24, Locate.L25}; + private static final Random random = new Random(); + public static Locate getRandomLocate() { - Random random = new Random(); int i = random.nextInt(25); return locates[i]; } @@ -45,12 +46,20 @@ public class SubscriberUtils { L25(116.367037, 39.947116); - public Double longitude; - public Double latitude; + private Double longitude; + private Double latitude; Locate(Double longitude, Double latitude) { this.longitude = longitude; this.latitude = latitude; } + + public Double getLongitude() { + return longitude; + } + + public Double getLatitude() { + return latitude; + } } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java b/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java index 9b808f4..22a4bc6 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java +++ b/platform-base/src/main/java/com/zdjizhi/base/rule/RuleManager.java @@ -26,21 +26,19 @@ public interface RuleManager<K, V> { * @param update The consumer for handling updates to the rule. * @throws Exception Thrown if an error occurs during the start of asynchronous flushing. */ - void startAsyncFlush(long intervalMs, Consumer<RuleClient.Updatable<K, V>> update) throws Exception; + void startAsyncFlush(long intervalMs, Consumer<RuleClient.Updatable<K, V>> update); /** * Waits for the asynchronous rule flush to finish. * * @throws InterruptedException if interrupted while waiting. */ - void awaitAsyncFlushFinish() throws InterruptedException; + boolean awaitAsyncFlushFinish() throws InterruptedException; /** * Stops the asynchronous rule flush. - * - * @throws Exception if an error occurs while stopping. */ - void stopAsyncFlush() throws Exception; + void stopAsyncFlush(); /** * Flushes the rules synchronously. diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java b/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java index c0163ff..d6e59aa 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java +++ b/platform-base/src/main/java/com/zdjizhi/base/rule/SimpleRuleManager.java @@ -25,10 +25,10 @@ public class SimpleRuleManager<K, V> implements RuleManager<K, V> { private static final Logger LOG = LoggerFactory.getLogger(SimpleRuleManager.class); // Atomic boolean to track if asynchronous flush is running. - private final static AtomicBoolean ASYNC_FLUSH_RUNNING = new AtomicBoolean(false); + private static final AtomicBoolean ASYNC_FLUSH_RUNNING = new AtomicBoolean(false); // CountDownLatch for asynchronous flush synchronization. - private final static CountDownLatch ASYNC_FLUSH_LATCH = new CountDownLatch(1); + private static final CountDownLatch ASYNC_FLUSH_LATCH = new CountDownLatch(1); private final RuleClient<K, V> client; @@ -73,11 +73,11 @@ public class SimpleRuleManager<K, V> implements RuleManager<K, V> { @SuppressWarnings("ResultOfMethodCallIgnored") @Override - public void awaitAsyncFlushFinish() throws InterruptedException { - ASYNC_FLUSH_LATCH.await(30, TimeUnit.SECONDS); + public boolean awaitAsyncFlushFinish() throws InterruptedException { + return ASYNC_FLUSH_LATCH.await(30, TimeUnit.SECONDS); } - public void stopAsyncFlush() throws Exception { + public void stopAsyncFlush() { if (this.fetcher != null) { this.fetcher.close(); } diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java b/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java index 8fe1b98..2e6a91e 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java +++ b/platform-base/src/main/java/com/zdjizhi/base/rule/async/Fetcher.java @@ -28,8 +28,6 @@ public interface Fetcher { /** * Closes the fetcher and releases any allocated resources. * Impl should perform cleanup operations here. - * - * @throws Exception If an error occurs during the closing process. */ - void close() throws Exception; + void close(); } diff --git a/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java b/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java index 9902fc4..a651bb5 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java +++ b/platform-base/src/main/java/com/zdjizhi/base/rule/async/FixedRateFetcher.java @@ -50,22 +50,16 @@ public class FixedRateFetcher implements Fetcher, ScheduledSupport { callback(e); } }, 0, rateIntervalMs, TimeUnit.MILLISECONDS); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - close(); - } catch (Exception e) { - throw new RuntimeException("Fetcher close error.", e); - } - })); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } /** * Closes the FixedRateFetcher, canceling the scheduled future. * - * @throws Exception if an error occurs while closing. + * @throws IllegalStateException if an error occurs while closing. */ @Override - public void close() throws Exception { + public void close() { if (scheduledFuture == null) { throw new IllegalStateException("Schedule task be not started."); } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java b/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java deleted file mode 100644 index 6b55661..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/DistributedLock.java +++ /dev/null @@ -1,198 +0,0 @@ -package com.zdjizhi.base.utils; - -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - - -public class DistributedLock implements Lock, Watcher { - private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class); - - private ZooKeeper zk = null; - /** - * 根节点 - */ - private final String ROOT_LOCK = "/locks"; - /** - * 竞争的资源 - */ - private String lockName; - /** - * 等待的前一个锁 - */ - private String waitLock; - /** - * 当前锁 - */ - private String currentLock; - /** - * 计数器 - */ - private CountDownLatch countDownLatch; - - private int sessionTimeout = 2000; - - private List<Exception> exceptionList = new ArrayList<Exception>(); - - /** - * 配置分布式锁 - * - * @param config 连接的url - * @param lockName 竞争资源 - */ - public DistributedLock(String config, String lockName) { - this.lockName = lockName; - try { - // 连接zookeeper - zk = new ZooKeeper(config, sessionTimeout, this); - Stat stat = zk.exists(ROOT_LOCK, false); - if (stat == null) { - // 如果根节点不存在,则创建根节点 - zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (IOException | InterruptedException | KeeperException e) { - logger.error("Node already exists!"); - } - } - - /** - * 节点监视器 - */ - @Override - public void process(WatchedEvent event) { - if (this.countDownLatch != null) { - this.countDownLatch.countDown(); - } - } - - @Override - public void lock() { - if (exceptionList.size() > 0) { - throw new LockException(exceptionList.get(0)); - } - try { - if (this.tryLock()) { - logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); - } else { - // 等待锁 - waitForLock(waitLock, sessionTimeout); - } - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁异常" + e); - } - } - - @Override - public boolean tryLock() { - try { - String splitStr = "_lock_"; - if (lockName.contains(splitStr)) { - throw new LockException("锁名有误"); - } - // 创建临时有序节点 - currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - // 取所有子节点 - List<String> subNodes = zk.getChildren(ROOT_LOCK, false); - // 取出所有lockName的锁 - List<String> lockObjects = new ArrayList<String>(); - for (String node : subNodes) { - String tmpNode = node.split(splitStr)[0]; - if (tmpNode.equals(lockName)) { - lockObjects.add(node); - } - } - Collections.sort(lockObjects); - // 若当前节点为最小节点,则获取锁成功 - if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { - return true; - } - // 若不是最小节点,则找到自己的前一个节点 - String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); - waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁过程异常" + e); - } - return false; - } - - - @Override - public boolean tryLock(long timeout, TimeUnit unit) { - try { - if (this.tryLock()) { - return true; - } - return waitForLock(waitLock, timeout); - } catch (KeeperException | InterruptedException | RuntimeException e) { - logger.error("判断是否锁定异常" + e); - } - return false; - } - - /** - * 等待锁 - * - * @param prev 锁名称 - * @param waitTime 等待时间 - * @return - * @throws KeeperException - * @throws InterruptedException - */ - private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { - Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); - - if (stat != null) { - this.countDownLatch = new CountDownLatch(1); - // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 - this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); - this.countDownLatch = null; - } - return true; - } - - @Override - public void unlock() { - try { - zk.delete(currentLock, -1); - currentLock = null; - zk.close(); - } catch (InterruptedException | KeeperException e) { - logger.error("关闭锁异常" + e); - } - } - - @Override - public Condition newCondition() { - return null; - } - - @Override - public void lockInterruptibly() throws InterruptedException { - this.lock(); - } - - - public class LockException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public LockException(String e) { - super(e); - } - - public LockException(Exception e) { - super(e); - } - } - -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java index 6889a4e..2c8a15f 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java +++ b/platform-base/src/main/java/com/zdjizhi/base/utils/HttpClientUtils.java @@ -5,103 +5,43 @@ import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.config.Configs; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.http.*; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.ParseException; import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; -import org.apache.http.config.Registry; -import org.apache.http.config.RegistryBuilder; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.conn.socket.ConnectionSocketFactory; -import org.apache.http.conn.socket.PlainConnectionSocketFactory; -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.message.BasicHeaderElementIterator; -import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.*; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.URI; -import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.X509Certificate; import java.util.Map; public class HttpClientUtils { - /** 全局连接池对象 */ - private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); public static final String ERROR_MESSAGE = "-1"; - /* - * 静态代码块配置连接池信息 - */ - static { - + private PoolingHttpClientConnectionManager getSslClientManager() { + // 创建ConnectionManager,添加Connection配置信息 + PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); // 设置最大连接数 - CONN_MANAGER.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION)); + connManager.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION)); // 设置每个连接的路由数 - CONN_MANAGER.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE)); - - } - - - /** - * 在调用SSL之前需要重写验证方法,取消检测SSL - * 创建ConnectionManager,添加Connection配置信息 - * - * @return HttpClient 支持https - */ - private PoolingHttpClientConnectionManager getSslClientManager() { - try { - // 在调用SSL之前需要重写验证方法,取消检测SSL - X509TrustManager trustManager = new X509TrustManager() { - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - @Override - public void checkClientTrusted(X509Certificate[] xcs, String str) { - } - @Override - public void checkServerTrusted(X509Certificate[] xcs, String str) { - } - }; - SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); - ctx.init(null, new TrustManager[]{trustManager}, null); - SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); - Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("http", PlainConnectionSocketFactory.INSTANCE) - .register("https", socketFactory).build(); - // 创建ConnectionManager,添加Connection配置信息 - PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); - // 设置最大连接数 - connManager.setMaxTotal(Configs.get(CommonConfig.HTTP_POOL_MAX_CONNECTION)); - // 设置每个连接的路由数 - connManager.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE)); - return connManager; - } catch (KeyManagementException | NoSuchAlgorithmException e) { - throw new RuntimeException(e.getMessage()); - } + connManager.setDefaultMaxPerRoute(Configs.get(CommonConfig.HTTP_POOL_MAX_PER_ROUTE)); + return connManager; } /** @@ -119,70 +59,15 @@ public class HttpClientUtils { .setSocketTimeout(Configs.get(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)) .build(); - /* - * 测出超时重试机制为了防止超时不生效而设置 - * 如果直接放回false,不重试 - * 这里会根据情况进行判断是否重试 - */ - HttpRequestRetryHandler retry = (exception, executionCount, context) -> { - if (executionCount >= 3) {// 如果已经重试了3次,就放弃 - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 - return false; - } - if (exception instanceof HttpHostConnectException) {// 连接被拒绝 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - return !(request instanceof HttpEntityEnclosingRequest); - }; - - - ConnectionKeepAliveStrategy myStrategy = (response, context) -> { - HeaderElementIterator it = new BasicHeaderElementIterator - (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); - while (it.hasNext()) { - HeaderElement he = it.nextElement(); - String param = he.getName(); - String value = he.getValue(); - if (value != null && "timeout".equalsIgnoreCase(param)) { - return Long.parseLong(value) * 1000; - } - } - return 60 * 1000;//如果没有约定,则默认定义时长为60s - }; - // 创建httpClient return HttpClients.custom() // 把请求相关的超时信息设置到连接客户端 .setDefaultRequestConfig(requestConfig) - // 把请求重试设置到连接客户端 - .setRetryHandler(retry) - .setKeepAliveStrategy(myStrategy) // 配置连接池管理对象 .setConnectionManager(getSslClientManager()) .build(); } - /** * GET请求 * @@ -309,7 +194,7 @@ public class HttpClientUtils { } } } catch (Exception e) { - logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); + logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder, path, params); } } diff --git a/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java b/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java deleted file mode 100644 index 6075521..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/utils/ZookeeperUtils.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.zdjizhi.base.utils; - -import cn.hutool.core.util.StrUtil; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - - -public class ZookeeperUtils implements Watcher { - private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class); - - private ZooKeeper zookeeper; - - private static final int SESSION_TIME_OUT = 20000; - - private CountDownLatch countDownLatch = new CountDownLatch(1); - - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - countDownLatch.countDown(); - } - } - - - /** - * 修改节点信息 - * - * @param path 节点路径 - */ - int modifyNode(String path, String zookeeperIp) { - createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); - int workerId = 0; - try { - connectZookeeper(zookeeperIp); - Stat stat = zookeeper.exists(path, true); - workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > 63) { - workerId = 0; - zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - } else { - String result = String.valueOf(workerId + 1); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); - } - } - } catch (KeeperException | InterruptedException e) { - logger.error("modify error Can't modify," + e); - } finally { - closeConn(); - } - logger.warn("workerID is:" + workerId); - return workerId; - } - - /** - * 连接zookeeper - * - * @param host 地址 - */ - private void connectZookeeper(String host) { - try { - zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); - countDownLatch.await(); - } catch (IOException | InterruptedException e) { - logger.error("Connection to the Zookeeper Exception! message:" + e); - } - } - - /** - * 关闭连接 - */ - private void closeConn() { - try { - if (zookeeper != null) { - zookeeper.close(); - } - } catch (InterruptedException e) { - logger.error("Close the Zookeeper connection Exception! message:" + e); - } - } - - /** - * 获取节点内容 - * - * @param path 节点路径 - * @return 内容/异常null - */ - private String getNodeDate(String path) { - String result = null; - Stat stat = new Stat(); - try { - byte[] resByte = zookeeper.getData(path, true, stat); - - result = StrUtil.str(resByte, "UTF-8"); - } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception" + e); - } - return result; - } - - /** - * @param path 节点创建的路径 - * @param date 节点所存储的数据的byte[] - * @param acls 控制权限策略 - */ - private void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) { - try { - connectZookeeper(zookeeperIp); - Stat exists = zookeeper.exists(path, true); - if (exists == null) { - Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); - if (existsSnowflakeld == null) { - zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); - } - zookeeper.create(path, date, acls, CreateMode.PERSISTENT); - } else { - logger.warn("Node already exists ! Don't need to create"); - } - } catch (KeeperException | InterruptedException e) { - logger.error(e.toString()); - } finally { - closeConn(); - } - } - -} diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java b/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java index 7a95f45..69b7a29 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/utils/csv/HighCsvReader.java @@ -2,12 +2,9 @@ package com.zdjizhi.etl.utils.csv; import com.opencsv.CSVParser; import com.opencsv.CSVReader; -import com.zdjizhi.base.utils.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.InputStreamReader; import java.io.Reader; import java.util.*; @@ -15,8 +12,6 @@ public class HighCsvReader { private final Logger logger = LoggerFactory.getLogger(HighCsvReader.class); - private CSVReader csvReader; - private String[] header = null; private final Map<String, Integer> columnNames = new HashMap<>(); @@ -32,9 +27,8 @@ public class HighCsvReader { } private void init(Reader reader, List<String> columns) { - try { - this.csvReader = new CSVReader(reader, CSVParser.DEFAULT_SEPARATOR, - CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER); + try (CSVReader csvReader = new CSVReader(reader, CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER)) { String[] allHeader = csvReader.readNext(); if (allHeader == null) { return; @@ -43,7 +37,7 @@ public class HighCsvReader { String columnName = allHeader[i]; columnNames.put(columnName, i); } - if (columns.size() == 0) { + if (columns.isEmpty()) { columns.addAll(columnNames.keySet()); } else { columns.removeIf(next -> !columnNames.containsKey(next)); @@ -60,14 +54,6 @@ public class HighCsvReader { } } catch (Exception e) { logger.error("csv parse error, error exception: " + e.getMessage(), e); - } finally { - if (csvReader != null) { - try { - csvReader.close(); - } catch (Exception ignored) { - - } - } } } @@ -83,16 +69,6 @@ public class HighCsvReader { return columnNames.get(fieldName); } - private InputStreamReader getReader(byte[] bytes) { - return new InputStreamReader(new ByteArrayInputStream(bytes)); - } - - private InputStreamReader getReader(String fileRelativePath) { - FileUtils fileUtils = new FileUtils("SINGLE"); - byte[] bytes = fileUtils.getFileBytes(fileRelativePath); - return getReader(bytes); - } - public class CsvIterator implements Iterator<Map<String, String>>{ private String[] header; @@ -116,7 +92,10 @@ public class HighCsvReader { @Override public Map<String, String> next() { - HashMap<String, String> lineMap = new HashMap<String, String>(header.length); + if (!hasNext()) { + throw new NoSuchElementException(); + } + HashMap<String, String> lineMap = new HashMap<>(header.length); String[] line = data.get(currentLineNumber); for (int i = 0; i < line.length; i++) { lineMap.put(header[i], line[i]); |
