diff options
| author | qidaijie <[email protected]> | 2021-06-11 11:10:16 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-06-11 11:10:16 +0800 |
| commit | d7f3e40340299b7f8f528b0e93e43cb13144b021 (patch) | |
| tree | 0f905607662a43fb09d224a035f21d56666f3d22 | |
| parent | 88ffda19a25fa2b39e70bc9f1b5910c22b1724ef (diff) | |
1:增加default配置文件。
2:修复解析异常程序终止性bug。
19 files changed, 74 insertions, 86 deletions
@@ -4,7 +4,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-stream-completion-schema</artifactId> - <version>v3.21.04.23-appId</version> + <version>v3.21.06.07-Array</version> <packaging>jar</packaging> <name>log-stream-completion-schema</name> @@ -34,7 +34,7 @@ <executions> <execution> - <phase>package</phase> + <phase>install</phase> <goals> <goal>shade</goal> </goals> @@ -128,7 +128,7 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> - <!--<scope>provided</scope>--> + <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/properties/kafka_config.properties b/properties/default_config.properties index 9cfa949..a8c1ea0 100644 --- a/properties/kafka_config.properties +++ b/properties/default_config.properties @@ -14,4 +14,13 @@ batch.size=262144 buffer.memory=67108864 #�������������ÿ�η���Kafka���������������С,Ĭ��1048576 -max.request.size=5242880
\ No newline at end of file +max.request.size=5242880 + +#worker���������ⷢ����Ϣ�Ļ����С +transfer_buffer_size=32 + +#executor�̵߳Ľ��ն��д�С����ҪΪ2�ı��� +executor_receive_buffer_size=16384 + +#executor�̵߳ķ��Ͷ��д�С����ҪΪ2�ı��� +executor_send_buffer_size=16384 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index edcb30e..33c1667 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #ip.library=/home/bigdata/topology/dat/ #网关的schema位置 -schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log #网关APP_ID 获取接口 app.id.http=http://192.168.44.67:9999/open-api/appDicList @@ -26,10 +26,10 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -kafka.topic=test +kafka.topic=PROXY-EVENT-LOG #补全数据 输出 topic -results.output.topic=test-result +results.output.topic=PROXY-EVENT-COMPLETED-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=connection-record-log-20200818-1-test @@ -66,10 +66,10 @@ kafka.bolt.parallelism=6 #数据中心(UID) data.center.id.num=15 -#hbase 更新时间 +#hbase 更新时间,如填写0则不更新缓存 hbase.tick.tuple.freq.secs=60 -#app_id 更新时间 +#app_id 更新时间,如填写0则不更新缓存 app.tick.tuple.freq.secs=60 #--------------------------------默认值配置------------------------------# @@ -101,3 +101,4 @@ mail.default.charset=UTF-8 #需不要补全,不需要则原样日志输出 log.need.complete=yes + diff --git a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java index b20858f..d1ca4fa 100644 --- a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java +++ b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java @@ -1,10 +1,8 @@ package com.zdjizhi.bolt; -import com.zdjizhi.common.FlowWriteConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -39,8 +37,8 @@ public class CompletionBolt extends BaseBasicBolt { if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(dealCommonMessage(message))); } - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + } catch (RuntimeException e) { + logger.error("处理原始日志下发过程异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java index a61edcf..46c6353 100644 --- a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java +++ b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java @@ -1,7 +1,6 @@ -package com.zdjizhi.bolt.kafka; +package com.zdjizhi.bolt; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.kafka.KafkaLogSend; import com.zdjizhi.utils.system.TupleUtils; import cn.hutool.log.Log; @@ -53,8 +52,8 @@ public class LogSendBolt extends BaseBasicBolt { list.clear(); } } - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); + } catch (RuntimeException e) { + logger.error("补全日志发送Kafka过程出现异常,异常信息:" + e); } } diff --git a/src/main/java/com/zdjizhi/common/KafkaProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java index 6e7d616..3fdb013 100644 --- a/src/main/java/com/zdjizhi/common/KafkaProConfig.java +++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java @@ -6,7 +6,7 @@ import com.zdjizhi.utils.system.FlowWriteConfigurations; /** * @author Administrator */ -public class KafkaProConfig { +public class DefaultProConfig { public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); @@ -15,6 +15,8 @@ public class KafkaProConfig { public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); - + public static final Integer TRANSFER_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "transfer_buffer_size"); + public static final Integer EXECUTOR_RECEIVE_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_receive_buffer_size"); + public static final Integer EXECUTOR_SEND_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_send_buffer_size"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 36f2ea4..fc2e116 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -14,6 +14,7 @@ public class FlowWriteConfig { public static final String IS_JSON_KEY_TAG = "$."; public static final String IF_CONDITION_SPLITTER = "="; public static final String MODEL = "remote"; + public static final String PROTOCOL_SPLITTER = "\\."; /** * System */ diff --git a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java index 150e02c..b2aad25 100644 --- a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java @@ -4,8 +4,6 @@ import cn.hutool.core.thread.ThreadUtil; import com.zdjizhi.common.FlowWriteConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.KafkaProConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -78,8 +76,8 @@ public class CustomizedKafkaSpout extends BaseRichSpout { for (ConsumerRecord<String, String> record : records) { this.collector.emit(new Values(record.value())); } - } catch (StreamCompletionException e) { - logger.error("KafkaSpout发送消息出现异常!", e); + } catch (RuntimeException e) { + logger.error("KafkaSpout发送消息出现异常,异常信息:", e); } } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index ccde5df..036f922 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -2,12 +2,12 @@ package com.zdjizhi.topology; import com.zdjizhi.bolt.CompletionBolt; -import com.zdjizhi.bolt.kafka.LogSendBolt; +import com.zdjizhi.bolt.LogSendBolt; +import com.zdjizhi.common.DefaultProConfig; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.spout.CustomizedKafkaSpout; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -51,8 +51,9 @@ public class LogFlowWriteTopology { private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 - topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE); StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); } @@ -63,7 +64,7 @@ public class LogFlowWriteTopology { if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) { builder.setBolt("LogCompletionBolt", new CompletionBolt(), FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("CompletionLogSendBolt", new LogSendBolt(), + builder.setBolt("LogSendBolt", new LogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt"); } else { builder.setBolt("LogSendBolt", new LogSendBolt(), @@ -92,7 +93,7 @@ public class LogFlowWriteTopology { logger.info("执行远程部署模式..."); flowWriteTopology.runRemotely(); } - } catch (StreamCompletionException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) { + } catch (RuntimeException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) { logger.error("Topology Start ERROR! message is:" + e); } } diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java index 08890a6..1193b13 100644 --- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -44,7 +44,7 @@ public class AppUtils { */ private AppUtils() { //定时更新 - updateHabaseCache(); + updateAppIdCache(); } /** @@ -92,13 +92,15 @@ public class AppUtils { /** * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie */ - private void updateHabaseCache() { + private void updateAppIdCache() { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { - change(); + if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } } catch (RuntimeException e) { logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); } diff --git a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java b/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java deleted file mode 100644 index 2a31b11..0000000 --- a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.zdjizhi.utils.exception; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.exception - * @Description: - * @date 2021/3/2510:14 - */ -public class StreamCompletionException extends RuntimeException { - - public StreamCompletionException(Exception e) { - super(e); - } - - public StreamCompletionException(String e) { - super(e); - } -} diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java index 9663dc6..f67a17b 100644 --- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.general; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.zookeeper.DistributedLock; import com.zdjizhi.utils.zookeeper.ZookeeperUtils; import cn.hutool.log.Log; @@ -127,7 +126,7 @@ public class SnowflakeId { } this.workerId = tmpWorkerId; this.dataCenterId = dataCenterIdNum; - } catch (StreamCompletionException e) { + } catch (RuntimeException e) { logger.error("This is not usual error!!!===>>>" + e + "<<<==="); }finally { lock.unlock(); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java index a5e26fb..66eadde 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java @@ -2,7 +2,6 @@ package com.zdjizhi.utils.general; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.json.JsonParseUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -38,11 +37,6 @@ public class TransFormUtils { private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); /** - * 补全工具类 - */ -// private static FormatUtils build = new FormatUtils.Builder(false).build(); - - /** * IP定位库工具类 */ private static IpLookup ipLookup = new IpLookup.Builder(false) @@ -61,10 +55,8 @@ public class TransFormUtils { * @return 补全后的日志 */ public static String dealCommonMessage(String message) { - - Object object = JSONObject.parseObject(message, mapObject.getClass()); - try { + Object object = JSONObject.parseObject(message, mapObject.getClass()); for (String[] strings : jobList) { //用到的参数的值 Object name = JsonParseUtil.getValue(object, strings[0]); @@ -80,8 +72,8 @@ public class TransFormUtils { functionSet(function, object, appendToKeyName, appendTo, name, param); } return JSONObject.toJSONString(object); - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志预处理过程出现异常"); + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e); return ""; } } @@ -105,8 +97,6 @@ public class TransFormUtils { } break; case "snowflake_id": -// JsonParseUtil.setValue(object, appendToKeyName, -// build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM)); JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); break; case "geo_ip_detail": @@ -150,8 +140,8 @@ public class TransFormUtils { } break; case "app_match": - if ((int) name != 0 && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString()))); + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); } break; case "decode_of_base64": diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 9d8a355..bfb71a2 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -85,11 +85,12 @@ class TransFunction { /** * appId与缓存中对应关系补全appName * - * @param appId id + * @param appIds app id 列表 * @return appName */ - static String appMatch(int appId) { - String appName = AppUtils.getAppName(appId); + static String appMatch(String appIds) { + String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + String appName = AppUtils.getAppName(Integer.parseInt(appId)); if (StringUtil.isBlank(appName)) { logger.warn("AppMap get appName is null, ID is :{}", appId); } diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 042a930..89814dc 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -51,7 +51,7 @@ public class HBaseUtils { //拉取所有 getAll(); //定时更新 - updateHabaseCache(); + updateHBaseCache(); } private static void getHbaseConn() { @@ -164,7 +164,7 @@ public class HBaseUtils { /** * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie */ - private void updateHabaseCache() { + private void updateHBaseCache() { // ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, // new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); @@ -172,7 +172,9 @@ public class HBaseUtils { @Override public void run() { try { - change(); + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } } catch (RuntimeException e) { logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 10e07d1..e4ee207 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -43,6 +43,9 @@ public class JsonParseUtil { case "long": clazz = long.class; break; + case "array": + clazz = JSONArray.class; + break; case "Integer": clazz = Integer.class; break; @@ -135,6 +138,9 @@ public class JsonParseUtil { if (checkKeepField(filedStr)) { String name = JsonPath.read(filedStr, "$.name").toString(); String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } //组合用来生成实体类的map map.put(name, getClassName(type)); } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java index 87509b4..d4c86fc 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java @@ -1,7 +1,7 @@ package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.KafkaProConfig; +import com.zdjizhi.common.DefaultProConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.kafka.clients.producer.*; @@ -70,12 +70,12 @@ public class KafkaLogSend { properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", FlowWriteConfig.PRODUCER_ACK); - properties.put("retries", KafkaProConfig.RETRIES); - properties.put("linger.ms", KafkaProConfig.LINGER_MS); - properties.put("request.timeout.ms", KafkaProConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", KafkaProConfig.BATCH_SIZE); - properties.put("buffer.memory", KafkaProConfig.BUFFER_MEMORY); - properties.put("max.request.size", KafkaProConfig.MAX_REQUEST_SIZE); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); // properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE); /** diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java index 837d881..08fa29b 100644 --- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.system; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.StreamCompletionException; import java.io.IOException; import java.util.Locale; @@ -62,8 +61,8 @@ public final class FlowWriteConfigurations { static { try { propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("kafka_config.properties")); - } catch (IOException | StreamCompletionException e) { + propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { propKafka = null; propService = null; } diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java index a8a7312..2afab03 100644 --- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -2,8 +2,6 @@ package com.zdjizhi.utils.zookeeper; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.exception.StreamCompletionException; -import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -136,7 +134,7 @@ public class DistributedLock implements Lock, Watcher { return true; } return waitForLock(waitLock, timeout); - } catch (KeeperException | InterruptedException | StreamCompletionException e) { + } catch (KeeperException | InterruptedException | RuntimeException e) { logger.error("判断是否锁定异常" + e); } return false; |
