summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-08 09:39:15 +0800
committerzhanghongqing <[email protected]>2022-07-08 09:39:15 +0800
commite9c92fb2866bff0cc0457dd3a0a5d87fc0bc2fb6 (patch)
tree1c6ad982119ce3e50a6a1943c8ee3fc21acb28e3 /src/main/java
parentf552793230d0428cbc63714ee296c1ce4971a31b (diff)
增加Arango 入库相关操作
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java49
-rw-r--r--src/main/java/com/zdjizhi/common/IpKeysSelector.java23
-rw-r--r--src/main/java/com/zdjizhi/common/TopMetricProcessV2.java65
-rw-r--r--src/main/java/com/zdjizhi/etl/ConnProcessFunction.java1
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java2
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsProcessFunction.java1
-rw-r--r--src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java45
-rw-r--r--src/main/java/com/zdjizhi/etl/SketchProcessFunction.java2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java95
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java47
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSink.java165
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java41
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java20
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java56
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java39
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java14
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java9
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java6
21 files changed, 660 insertions, 65 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index a84ebae..34674f4 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -2,6 +2,7 @@ package com.zdjizhi.common;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
+import org.apache.flink.configuration.ConfigUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -108,24 +109,26 @@ public class FlowWriteConfig {
/**
* common config
*/
- public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
- public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
- public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
- public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
- public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
/*
- * ck
- * */
- public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts");
- public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username");
- public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin");
- public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database");
-
- public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness");
- public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration");
- public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");;
+ * ck
+ * */
+ public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0, "ck.hosts");
+ public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username");
+ public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin");
+ public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database");
+
+ public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
+ public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
+ public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
+ public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
+ ;
public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
@@ -135,4 +138,20 @@ public class FlowWriteConfig {
public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
+
+
+ public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
+ public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
+ public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
+ public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password");
+ public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
+ public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
+ public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
+
+ public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
+ public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
+ public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
+ public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
+ public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
+ public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/common/IpKeysSelector.java
new file mode 100644
index 0000000..9528a1b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/IpKeysSelector.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
+
+ @Override
+ public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
+
+ return Tuple2.of(
+ String.valueOf(log.get("src_ip")),
+ String.valueOf(log.get("dst_ip")));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
new file mode 100644
index 0000000..46d308d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
@@ -0,0 +1,65 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+public class TopMetricProcessV2 extends ProcessFunction<Map<String,Object>, Collector<Map<String,Object>>> {
+
+
+ private ValueState<Long> currentTimer;
+ private ListState<Map<String,Object>> itemState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG));
+ ListStateDescriptor<Map<String,Object>> itemViewStateDesc = new ListStateDescriptor("_state", Map.class);
+ itemState = getRuntimeContext().getListState(itemViewStateDesc);
+ }
+
+ @Override
+ public void processElement(Map<String,Object> value, Context context, Collector<Collector<Map<String,Object>>> collector) throws Exception {
+ //判断定时器是否为空,为空则创建新的定时器
+ Long curTimeStamp = currentTimer.value();
+ if (curTimeStamp == null || curTimeStamp == 0) {
+ long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
+ context.timerService().registerEventTimeTimer(onTimer);
+ currentTimer.update(onTimer);
+ }
+ itemState.add(value);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Collector<Map<String, Object>>> out) throws Exception {
+ super.onTimer(timestamp, ctx, out);
+
+ Iterator<Map<String,Object>> iterator = itemState.get().iterator();
+ if(iterator.hasNext()){
+ out.collect((Collector<Map<String, Object>>) iterator.next());
+ }
+// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) {
+// Map last = baseLogs.last();
+// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) {
+// baseLogs.pollLast();
+// baseLogs.add(map);
+// }
+// } else {
+// baseLogs.add(map);
+// }
+// }
+ currentTimer.clear();
+ itemState.clear();
+
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
index fa2b5bb..6ed9eef 100644
--- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
@@ -3,7 +3,6 @@ package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
diff --git a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
index db69974..5e852e0 100644
--- a/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/DnsFlatMapFunction.java
@@ -5,8 +5,6 @@ import cn.hutool.core.util.StrUtil;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.pojo.DbLogEntity;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
index 46d0814..c9bc596 100644
--- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java
@@ -9,7 +9,6 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
new file mode 100644
index 0000000..0181b2d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
@@ -0,0 +1,45 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.date.DateUtil;
+import com.arangodb.entity.BaseDocument;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * 对ip去重
+ */
+public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseDocument, Tuple2<String, String>, TimeWindow> {
+
+ private static final Logger logger = LoggerFactory.getLogger(Ip2IpGraphProcessFunction.class);
+
+ @Override
+ public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseDocument> out) {
+
+ try {
+ long lastFoundTime = DateUtil.currentSeconds();
+ for (Map<String, Object> log : elements) {
+ long connStartTimetime = Convert.toLong(log.get("conn_start_time"));
+ lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime;
+ }
+ BaseDocument baseDocument = new BaseDocument();
+ baseDocument.setKey(String.join("-", keys.f0, keys.f1));
+ baseDocument.addAttribute("src_ip", keys.f0);
+ baseDocument.addAttribute("dst_ip", keys.f1);
+ baseDocument.addAttribute("last_found_time", lastFoundTime);
+ out.collect(baseDocument);
+ logger.debug("获取中间聚合结果:{}", baseDocument.toString());
+
+ } catch (Exception e) {
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
index 98a2fe5..09317fe 100644
--- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
@@ -1,10 +1,8 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
-import com.zdjizhi.enums.LogMetadata;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index cfbc18b..b372f9e 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -3,16 +3,20 @@ package com.zdjizhi.topology;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.arangodb.entity.BaseDocument;
import com.zdjizhi.common.DnsKeysSelector;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.common.KeysSelector;
+import com.zdjizhi.common.IpKeysSelector;
import com.zdjizhi.etl.ConnProcessFunction;
+import com.zdjizhi.etl.Ip2IpGraphProcessFunction;
import com.zdjizhi.etl.DnsProcessFunction;
import com.zdjizhi.etl.SketchProcessFunction;
+import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -33,17 +37,17 @@ public class LogFlowWriteTopology {
//两个输出之间的最大时间 (单位milliseconds)
env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
- //1connection,2dns
+ //1 connection,2 dns
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
- .filter(x -> Objects.nonNull(x))
+ .filter(Objects::nonNull)
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
- .filter(x -> Objects.nonNull(x))
+ .filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_SKETCH);
@@ -52,7 +56,7 @@ public class LogFlowWriteTopology {
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
- .keyBy(new KeysSelector())
+ .keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.filter(x -> Objects.nonNull(x))
@@ -61,22 +65,34 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
- .keyBy(new KeysSelector())
+ .keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
- //写入CKsink
+ //入Arangodb
+ SingleOutputStreamOperator<BaseDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
+ .keyBy(new IpKeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
+ .process(new Ip2IpGraphProcessFunction())
+ .filter(x -> Objects.nonNull(x))
+ .setParallelism(TRANSFORM_PARALLELISM);
+
+ //写入CKsink,批量处理
connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+// ip2ipGraph.addSink(new ArangoDBSink("R_VISIT_IP2IP"));
+
+
+
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
- .filter(x -> Objects.nonNull(x))
+ .filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
@@ -90,14 +106,14 @@ public class LogFlowWriteTopology {
.setParallelism(TRANSFORM_PARALLELISM);
//过滤空数据不发送到Kafka内
- dnsSource.filter(x -> Objects.nonNull(x))
+ dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("FilterOriginalData")
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
- dnsTransform.filter(x -> Objects.nonNull(x))
+ dnsTransform.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("FilterOriginalData")
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java
new file mode 100644
index 0000000..d2306e1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java
@@ -0,0 +1,95 @@
+package com.zdjizhi.utils.arangodb;
+
+import com.arangodb.ArangoCollection;
+import com.arangodb.ArangoCursor;
+import com.arangodb.ArangoDB;
+import com.arangodb.ArangoDatabase;
+import com.arangodb.entity.DocumentCreateEntity;
+import com.arangodb.entity.ErrorEntity;
+import com.arangodb.entity.MultiDocumentEntity;
+import com.arangodb.model.AqlQueryOptions;
+import com.arangodb.model.DocumentCreateOptions;
+import com.arangodb.util.MapBuilder;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class ArangoDBConnect {
+ private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
+ private static ArangoDB arangoDB = null;
+ private static ArangoDBConnect conn = null;
+ static {
+ getArangoDB();
+ }
+
+ private static void getArangoDB(){
+ arangoDB = new ArangoDB.Builder()
+ .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER)
+ .host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT)
+ .user(FlowWriteConfig.ARANGODB_USER)
+ .password(FlowWriteConfig.ARANGODB_PASSWORD)
+ .build();
+ }
+
+ public static synchronized ArangoDBConnect getInstance(){
+ if (null == conn){
+ conn = new ArangoDBConnect();
+ }
+ return conn;
+ }
+
+ private ArangoDatabase getDatabase(){
+ return arangoDB.db(FlowWriteConfig.ARANGODB_DB_NAME);
+ }
+
+ public void clean(){
+ try {
+ if (arangoDB != null){
+ arangoDB.shutdown();
+ }
+ }catch (Exception e){
+ LOG.error(e.getMessage());
+ }
+ }
+
+ public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){
+ ArangoDatabase database = getDatabase();
+ Map<String, Object> bindVars = new MapBuilder().get();
+ AqlQueryOptions options = new AqlQueryOptions()
+ .ttl(FlowWriteConfig.ARANGODB_TTL);
+ try {
+ return database.query(query, bindVars, options, type);
+ }catch (Exception e){
+ LOG.error(e.getMessage());
+ return null;
+ }finally {
+ bindVars.clear();
+ }
+ }
+
+ public <T> void overwrite(List<T> docOverwrite, String collectionName){
+ ArangoDatabase database = getDatabase();
+ try {
+ ArangoCollection collection = database.collection(collectionName);
+ if (!docOverwrite.isEmpty()){
+ DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
+ documentCreateOptions.overwrite(true);
+ documentCreateOptions.silent(true);
+ MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
+ Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
+ for (ErrorEntity errorEntity:errors){
+ LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage());
+ }
+ }
+ }catch (Exception e){
+ LOG.error("更新失败:"+e.toString());
+ }finally {
+ docOverwrite.clear();
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
new file mode 100644
index 0000000..a14ce16
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.utils.arangodb;
+
+import com.arangodb.entity.BaseDocument;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.util.List;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-07
+ **/
+public class ArangoDBSink extends RichSinkFunction<List<BaseDocument>> {
+
+ private static ArangoDBConnect arangoDBConnect;
+ private String collection;
+
+ @Override
+ public void invoke(List<BaseDocument> baseDocuments, Context context) throws Exception {
+ arangoDBConnect.overwrite(baseDocuments, getCollection());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ arangoDBConnect = ArangoDBConnect.getInstance();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ arangoDBConnect.clean();
+ }
+
+ public ArangoDBSink(String collection) {
+ this.collection = collection;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java
new file mode 100644
index 0000000..99a579a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/CKSink.java
@@ -0,0 +1,165 @@
+package com.zdjizhi.utils.ck;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.PreparedStatement;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class CKSink extends RichSinkFunction<Map<String, Object>> {
+
+ private static final Log log = LogFactory.get();
+
+ private static int count = 1;
+ private static ClickHouseConnection connection = null;
+ private static PreparedStatement preparedStatement = null;
+
+ static String database = "default";
+ static String address = "jdbc:clickhouse://192.168.45.102:8123/"+database;
+ static String username = "default";
+ static String password = "galaxy2019";
+ static String fieldStr = "id,name,age";
+ static String tableName = "user_table";
+
+ private String insertSql;
+
+ //创建连接对象和会话
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ try {
+ connection = getConn();
+ log.info("get clickhouse connection success !");
+ String insertSql = preparedSql(fieldStr, tableName);
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(insertSql);
+ } catch (Exception e) {
+ log.error("clickhouse初始化连接报错:", e);
+ }
+ }
+
+// @Override
+// public void close() throws Exception {
+// super.close();
+// //关闭连接和释放资源
+// if (connection != null) {
+// connection.close();
+// }
+// if (preparedStatement != null) {
+// preparedStatement.close();
+// }
+// }
+
+ //使用Batch批量写入,关闭自动提交
+ @Override
+ public void invoke(Map<String, Object> data, Context context) {
+ log.info(" invoke methed ");
+
+ try {
+
+ LinkedList<Object> values = new LinkedList<>(data.values());
+ for (int i = 1; i <= values.size(); i++) {
+ Object val = values.get(i - 1);
+ if (val instanceof Long) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Integer) {
+ preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
+ } else if (val instanceof Boolean) {
+ preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
+ } else {
+ preparedStatement.setString((i), StrUtil.toString(val));
+ }
+ }
+
+ preparedStatement.addBatch();
+ count = count + 1;
+ try {
+// if (count >= 50000) {
+// preparedStatement.executeBatch();
+// connection.commit();
+// preparedStatement.clearBatch();
+// count = 1;
+// }
+
+ //1w提交一次
+// if (count % 10000 == 0) {
+// preparedStatement.executeBatch();
+// connection.commit();
+// preparedStatement.clearBatch();
+// }
+ preparedStatement.executeBatch();
+ connection.commit();
+
+ } catch (Exception ee) {
+ log.error("数据插入click house 报错:", ee);
+ }
+ } catch (Exception ex) {
+ log.error("ClickhouseSink插入报错====", ex);
+ }
+ }
+
+ public static ClickHouseConnection getConn() {
+
+ int socketTimeout = 600000;
+ ClickHouseProperties properties = new ClickHouseProperties();
+ properties.setUser(username);
+ properties.setPassword(password);
+ properties.setDatabase(database);
+ properties.setSocketTimeout(socketTimeout);
+ ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
+ ClickHouseConnection conn = null;
+ try {
+ conn = clickHouseDataSource.getConnection();
+ return conn;
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static Map getField() {
+
+ return null;
+ }
+
+
+ public String preparedSql(String fieldStr, String tableName) {
+ List<String> fields = StrUtil.split(fieldStr, ",");
+ return getInsertSql(fields, tableName);
+ }
+
+ public String getInsertSql(List<String> fileds, String tableName) {
+ String sql = "";
+ String sqlStr1 = "INSERT INTO `" + database + "`." + tableName + " (";
+ String sqlStr2 = ") VALUES (";
+ String sqlStr3 = ")";
+ String sqlKey = "";
+ String sqlValue = "";
+ for (String key : fileds) {
+ sqlKey += key + ",";
+ sqlValue += "?,";
+ }
+ sqlKey = sqlKey.substring(0, sqlKey.length() - 1);
+ sqlValue = sqlValue.substring(0, sqlValue.length() - 1);
+ sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3);
+
+// String placeholders = Arrays.stream(fieldNames)
+// .map(f -> "?")
+// .collect(Collectors.joining(", "));
+// return "INSERT INTO " + quoteIdentifier(tableName) +
+// "(" + columns + ")" + " VALUES (" + placeholders + ")";
+
+
+ log.info(sql);
+ return sql;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
index e4d7a8c..55c99c4 100644
--- a/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/ck/CKSinkFlatMap.java
@@ -6,21 +6,14 @@ import cn.hutool.log.LogFactory;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-
-import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
public class CKSinkFlatMap extends RichFlatMapFunction<Map<String, Object>, String> {
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java
new file mode 100644
index 0000000..963eef1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCDialect.java
@@ -0,0 +1,41 @@
+/*
+package com.zdjizhi.utils.ck;
+
+import java.util.Optional;
+
+*/
+/**
+ * clickhouse方言
+ *//*
+
+public class ClickHouseJDBCDialect implements JDBCDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:clickhouse:");
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.of(getInsertIntoStatement(tableName, fieldNames));
+ }
+
+ @Override
+ public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
+ return null;
+ }
+
+}
+*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index 0407544..80a1d0c 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -27,15 +27,7 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
public String sink;
static {
- try {
- Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
- connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
-// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
-// connection = dataSource.getConnection();
- log.info("get clickhouse connection success");
- } catch (ClassNotFoundException | SQLException e) {
- log.error("clickhouse connection error ,{}", e);
- }
+
}
public ClickhouseSink(String sink) {
@@ -57,7 +49,15 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
@Override
public void open(Configuration parameters) throws Exception {
-
+ try {
+ Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+ connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
+// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
+// connection = dataSource.getConnection();
+ log.info("get clickhouse connection success");
+ } catch (ClassNotFoundException | SQLException e) {
+ log.error("clickhouse connection error ,{}", e);
+ }
}
@Override
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
new file mode 100644
index 0000000..5fc1894
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
@@ -0,0 +1,56 @@
+/*
+package com.zdjizhi.utils.ck;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+*/
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-06-29
+ **//*
+
+public class ClickhouseUtil {
+
+
+ public static ParameterTool getGlobalPro() {
+ Map<String, String> sinkPro = new HashMap<>();
+ //sink Properties
+ sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000");
+
+ // ClickHouse 本地写账号
+ sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "default");
+ sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "galaxy2019");
+ // sink common
+ sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10");
+ sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10");
+ sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3");
+ sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000");
+ sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
+ sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录
+
+ // env - sinkPro
+ ParameterTool parameters = ParameterTool.fromMap(sinkPro);
+
+
+ return parameters;
+
+ }
+
+ public static Properties getCKPro() {
+ // ClickHouseSink - sinkPro
+ Properties props = new Properties();
+ props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local");
+ props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
+ return props;
+ }
+
+
+}
+*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
new file mode 100644
index 0000000..9cd448a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
@@ -0,0 +1,39 @@
+/*
+package com.zdjizhi.utils.ck;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
+
+*/
+/**
+ * Handle the SQL dialect of jdbc driver.
+ *//*
+
+public interface JDBCDialect extends Serializable {
+ default Optional<String> getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+ default String getInsertIntoStatement(String tableName, String[] fieldNames) {
+ String columns = Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String placeholders = Arrays.stream(fieldNames)
+ .map(f -> "?")
+ .collect(Collectors.joining(", "));
+ return "INSERT INTO " + quoteIdentifier(tableName) +
+ "(" + columns + ")" + " VALUES (" + placeholders + ")";
+ }
+
+ default String getDeleteStatement(String tableName, String[] conditionFields) {
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(f -> quoteIdentifier(f) + "=?")
+ .collect(Collectors.joining(" AND "));
+ return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
+ }
+}
+*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
new file mode 100644
index 0000000..9c951e4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
@@ -0,0 +1,14 @@
+/*
+package com.zdjizhi.utils.ck;
+
+import java.util.Arrays;
+import java.util.List;
+
+public final class JDBCDialects {
+
+ private static final List<JDBCDialect> DIALECTS = Arrays.asList(
+// new DerbyDialect(),
+// new MySQLDialect(),
+// new PostgresDialect()
+ );
+}*/
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 4b8c8f0..97a53da 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,17 +1,8 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.pojo.DbLogEntity;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-
import java.util.Map;
import java.util.Properties;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
index 7b18ab5..8f3fccf 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
@@ -3,8 +3,6 @@ package com.zdjizhi.utils.kafka;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.enums.LogMetadata;
-import com.zdjizhi.pojo.DbLogEntity;
import com.zdjizhi.utils.JsonMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
index d793628..7275e06 100644
--- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -1,14 +1,8 @@
package com.zdjizhi.utils.system;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
-import java.io.StringReader;
import java.util.Locale;
import java.util.Properties;