summaryrefslogtreecommitdiff
path: root/src/main/java/com
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-18 18:08:47 +0800
committerzhanghongqing <[email protected]>2022-07-18 18:08:47 +0800
commitfa3f628658fc4d9197053e8755c685b3d80ff3b5 (patch)
treeb550a0f3a7a24b0b09b517bb574aff8a43bd0250 /src/main/java/com
parent95eefbd8b791f91f2b38e335dd77ce2816d81a1c (diff)
优化代码:增加入库批处理数量参数
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java132
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java68
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java335
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java3
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java52
7 files changed, 206 insertions, 422 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 3b68285..7889c88 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -44,8 +44,8 @@ public class FlowWriteConfig {
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
- public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
- public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
+ public static final Integer DATA_CENTER_ID_NUM = 0 ;
+ public static final Integer LOG_TYPE = FlowWriteConfigurations.getIntProperty(0, "log.type");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
@@ -85,7 +85,7 @@ public class FlowWriteConfig {
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
- public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
+ public static final String ZOOKEEPER_SERVERS = "zookeeper.servers";
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
@@ -114,22 +114,24 @@ public class FlowWriteConfig {
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
- public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip");
- public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain");
- public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain");
- public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip");
- public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
+ public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip");
+ public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain");
+ public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain");
+ public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.resolve.domain2ip");
+ public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.nx.domain2domain");
- public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
- public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
- public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
- public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password");
- public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
- public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
- public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
- public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
+ public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangodb.host");
+ public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangodb.port");
+ public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangodb.user");
+ public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangodb.password");
+ public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangodb.db.name");
+ public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangodb.ttl");
+ public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangodb.batch");
+ public static final Integer ARANGODB_THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "arangodb.thread.pool.number");
- public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time");
+ public static final Integer SINK_CK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.ck.batch.delay.time");
+ public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time");
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
+ public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
new file mode 100644
index 0000000..ba348bd
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java
@@ -0,0 +1,132 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+
+/**
+ *  * 带超时的计数窗口触发器
+ */
+public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
+ private static Log logger = LogFactory.get();
+
+ /**
+ * 窗口最大数据量
+ */
+ private int maxCount;
+ /**
+ * event time / process time
+ */
+ private TimeCharacteristic timeType;
+
+ private String stateName;
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public CountTriggerWithTimeout(String stateName) {
+ this.stateName = stateName;
+ }
+
+ /**
+ * 用于储存窗口当前数据量的状态对象
+ */
+ private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE);
+
+
+ public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) {
+
+ this.maxCount = maxCount;
+ this.timeType = timeType;
+ this.stateName = stateName;
+ }
+
+
+ private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
+ clear(window, ctx);
+ return TriggerResult.FIRE_AND_PURGE;
+ }
+
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+ ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
+ countState.add(1L);
+
+
+ if (countState.get() >= maxCount) {
+ logger.info("fire with count: " + countState.get());
+ return fireAndPurge(window, ctx);
+ }
+ if (timestamp >= window.getEnd()) {
+ logger.info("fire with tiem: " + timestamp);
+ return fireAndPurge(window, ctx);
+ } else {
+ return TriggerResult.CONTINUE;
+ }
+ }
+
+
+ @Override
+ public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+ if (timeType != TimeCharacteristic.ProcessingTime) {
+ return TriggerResult.CONTINUE;
+ }
+
+
+ if (time >= window.getEnd()) {
+ return TriggerResult.CONTINUE;
+ } else {
+ logger.info("fire with process tiem: " + time);
+ return fireAndPurge(window, ctx);
+ }
+ }
+
+
+ @Override
+ public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+ if (timeType != TimeCharacteristic.EventTime) {
+ return TriggerResult.CONTINUE;
+ }
+
+
+ if (time >= window.getEnd()) {
+ return TriggerResult.CONTINUE;
+ } else {
+ logger.info("fire with event tiem: " + time);
+ return fireAndPurge(window, ctx);
+ }
+ }
+
+
+ @Override
+ public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
+ ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
+ countState.clear();
+ }
+
+ /**
+ * 计数方法
+ */
+ class Sum implements ReduceFunction<Long> {
+
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return value1 + value2;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 7ca7acb..e7ac08e 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -7,12 +7,14 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.CKBatchWindow;
+import com.zdjizhi.etl.CountTriggerWithTimeout;
import com.zdjizhi.etl.connection.*;
import com.zdjizhi.etl.dns.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@@ -35,9 +37,10 @@ public class LogFlowWriteTopology {
env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
//1 connection,2 dns
- if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
+ if (FlowWriteConfig.LOG_TYPE == 1) {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
@@ -53,9 +56,11 @@ public class LogFlowWriteTopology {
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
@@ -74,20 +79,43 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
- connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
- sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
- sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
+ connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+
+ sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+ }
+ sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
//写入arangodb
- ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
-
- } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
+ ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new ArangodbBatchIPWindow())
+ .addSink(new ArangoDBSink(R_VISIT_IP2IP))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP);
+ } else if (FlowWriteConfig.LOG_TYPE == 2) {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.filter(Objects::nonNull)
.map(new DnsMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
@@ -96,23 +124,30 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
+ .setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
+ .setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//dns 原始日志 ck入库
- dnsSource.filter(Objects::nonNull)
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
- .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .name("CKSink");
+ if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
+ dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM)
+ .name("CKSink");
+ }
//dns 拆分后relation日志 ck入库
- dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
+ dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
@@ -125,7 +160,10 @@ public class LogFlowWriteTopology {
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow())
+ .setParallelism(SINK_PARALLELISM)
+ .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new ArangodbBatchDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java
index caf4e79..4902e5f 100644
--- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java
+++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java
@@ -29,7 +29,7 @@ public class ArangoDBConnect {
private static void getArangoDB() {
arangoDB = new ArangoDB.Builder()
- .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER)
+ .maxConnections(FlowWriteConfig.ARANGODB_THREAD_POOL_NUMBER)
.host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT)
.user(FlowWriteConfig.ARANGODB_USER)
.password(FlowWriteConfig.ARANGODB_PASSWORD)
diff --git a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java b/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java
deleted file mode 100644
index 2449244..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
-package com.zdjizhi.utils.ck;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import ru.yandex.clickhouse.ClickHouseConnection;
-import ru.yandex.clickhouse.ClickHouseDriver;
-import ru.yandex.clickhouse.ClickhouseJdbcUrlParser;
-import ru.yandex.clickhouse.settings.ClickHouseProperties;
-
-import javax.sql.DataSource;
-import java.io.PrintWriter;
-import java.net.URISyntaxException;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-*/
-/**
- * 提供负载均衡能力的datasource实现
- *//*
-
-public class BalancedClickhouseDataSource implements DataSource {
- private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
- private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");
- private PrintWriter printWriter;
- private int loginTimeoutSeconds;
- //随机数
- private final ThreadLocal<Random> randomThreadLocal;
- //所有的url
- private final List<String> allUrls;
- //可用的url
- private volatile List<String> enabledUrls;
- private final ClickHouseProperties properties;
- private final ClickHouseDriver driver;
-
- public BalancedClickhouseDataSource(String url) {
- this(splitUrl(url), getFromUrl(url));
- }
-
- public BalancedClickhouseDataSource(String url, Properties properties) {
- this(splitUrl(url), new ClickHouseProperties(properties));
- }
-
- public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {
- this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));
- }
-
- private BalancedClickhouseDataSource(List<String> urls) {
- this(urls, new ClickHouseProperties());
- }
-
- private BalancedClickhouseDataSource(List<String> urls, Properties info) {
- this(urls, new ClickHouseProperties(info));
- }
-
- private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {
- this.loginTimeoutSeconds = 0;
- this.randomThreadLocal = new ThreadLocal();
- this.driver = new ClickHouseDriver();
- if (urls.isEmpty()) {
- throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
- } else {
- try {
- //解析配置文件
- ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());
- localProperties.setHost((String)null);
- localProperties.setPort(-1);
- this.properties = localProperties;
- } catch (URISyntaxException var8) {
- throw new IllegalArgumentException(var8);
- }
-
- List<String> allUrls = new ArrayList(urls.size());
- Iterator var4 = urls.iterator();
-
- while(var4.hasNext()) {
- String url = (String)var4.next();
-
- try {
- //如果合法url
- if (this.driver.acceptsURL(url)) {
- //添加到所有的url列表
- allUrls.add(url);
- } else {
- log.error("that url is has not correct format: {}", url);
- }
- } catch (SQLException var7) {
- throw new IllegalArgumentException("error while checking url: " + url, var7);
- }
- }
-
- if (allUrls.isEmpty()) {
- throw new IllegalArgumentException("there are no correct urls");
- } else {
- //所有url
- this.allUrls = Collections.unmodifiableList(allUrls);
- //可用url
- this.enabledUrls = this.allUrls;
- }
- }
- }
-
- */
-/**
- * 切割url
- * @param url
- * @return
- *//*
-
- static List<String> splitUrl(String url) {
- //校验url合法性
- Matcher m = URL_TEMPLATE.matcher(url);
- if (!m.matches()) {
- throw new IllegalArgumentException("Incorrect url");
- } else {
- String database = m.group(2);
- if (database == null) {
- database = "";
- }
-
- //切割url串
- String[] hosts = m.group(1).split(",");
- List<String> result = new ArrayList(hosts.length);
- String[] var5 = hosts;
- int var6 = hosts.length;
-
- //遍历,添加切割后的url
- for(int var7 = 0; var7 < var6; ++var7) {
- String host = var5[var7];
- result.add("jdbc:clickhouse://" + host + database);
- }
-
- return result;
- }
- }
-
- */
-/**
- * ping url看是否可用
- * @param url
- * @return
- *//*
-
- private boolean ping(String url) {
- try {
- //执行简单sql测试url链接可用性
- this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");
- return true;
- } catch (Exception var3) {
- return false;
- }
- }
-
- */
-/**
- * 遍历所有url,通过ping的方式,选择出可用的url
- * @return
- *//*
-
- public synchronized int actualize() {
- //新建可用url列表
- List<String> enabledUrls = new ArrayList(this.allUrls.size());
- Iterator var2 = this.allUrls.iterator();
-
- while(var2.hasNext()) {
- String url = (String)var2.next();
- log.debug("Pinging disabled url: {}", url);
- if (this.ping(url)) {
- log.debug("Url is alive now: {}", url);
- //ping通的才添加进可用的
- enabledUrls.add(url);
- } else {
- log.debug("Url is dead now: {}", url);
- }
- }
-
- //重置可用url列表
- this.enabledUrls = Collections.unmodifiableList(enabledUrls);
- return enabledUrls.size();
- }
-
- */
-/**
- * 随机获取可用url返回
- * @return
- * @throws java.sql.SQLException
- *//*
-
- private String getAnyUrl() throws SQLException {
- //可用url列表
- List<String> localEnabledUrls = this.enabledUrls;
- if (localEnabledUrls.isEmpty()) {
- throw new SQLException("Unable to get connection: there are no enabled urls");
- } else {
- Random random = (Random)this.randomThreadLocal.get();
- if (random == null) {
- this.randomThreadLocal.set(new Random());
- //产生一个随机数
- random = (Random)this.randomThreadLocal.get();
- }
-
- int index = random.nextInt(localEnabledUrls.size());
- //用随机数选择一个可用的url返回
- return (String)localEnabledUrls.get(index);
- }
- }
-
- public ClickHouseConnection getConnection() throws SQLException {
- return this.driver.connect(this.getAnyUrl(), this.properties);
- }
-
- public ClickHouseConnection getConnection(String username, String password) throws SQLException {
- return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));
- }
-
- public <T> T unwrap(Class<T> iface) throws SQLException {
- if (iface.isAssignableFrom(this.getClass())) {
- return iface.cast(this);
- } else {
- throw new SQLException("Cannot unwrap to " + iface.getName());
- }
- }
-
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return iface.isAssignableFrom(this.getClass());
- }
-
- public PrintWriter getLogWriter() throws SQLException {
- return this.printWriter;
- }
-
- public void setLogWriter(PrintWriter printWriter) throws SQLException {
- this.printWriter = printWriter;
- }
-
- public void setLoginTimeout(int seconds) throws SQLException {
- this.loginTimeoutSeconds = seconds;
- }
-
- public int getLoginTimeout() throws SQLException {
- return this.loginTimeoutSeconds;
- }
-
- public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
- throw new SQLFeatureNotSupportedException();
- }
-
- */
-/**
- * 定期清理无用url链接
- * @param rate
- * @param timeUnit
- * @return
- *//*
-
- public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {
- this.driver.scheduleConnectionsCleaning(rate, timeUnit);
- return this;
- }
-
- */
-/**
- * 定期确认url,通过定时任务实现,以定时更新可用url列表
- * @param delay
- * @param timeUnit
- * @return
- *//*
-
- public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
- ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- try {
- BalancedClickhouseDataSource.this.actualize();
- } catch (Exception var2) {
- BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);
- }
-
- }
- }, 0L, (long)delay, timeUnit);
- return this;
- }
-
- public List<String> getAllClickhouseUrls() {
- return this.allUrls;
- }
-
- public List<String> getEnabledClickHouseUrls() {
- return this.enabledUrls;
- }
-
- */
-/**
- * 返回不可用url集合
- * 通过all 和 enable的差值来找
- *
- * @return
- *//*
-
- public List<String> getDisabledUrls() {
- List<String> enabledUrls = this.enabledUrls;
- if (!this.hasDisabledUrls()) {
- return Collections.emptyList();
- } else {
- List<String> disabledUrls = new ArrayList(this.allUrls);
- disabledUrls.removeAll(enabledUrls);
- return disabledUrls;
- }
- }
-
- public boolean hasDisabledUrls() {
- return this.allUrls.size() != this.enabledUrls.size();
- }
-
- public ClickHouseProperties getProperties() {
- return this.properties;
- }
-
- private static ClickHouseProperties getFromUrl(String url) {
- return new ClickHouseProperties(getFromUrlWithoutDefault(url));
- }
-
- private static Properties getFromUrlWithoutDefault(String url) {
- if (StringUtils.isBlank(url)) {
- return new Properties();
- } else {
- int index = url.indexOf("?");
- return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());
- }
- }
-}*/
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index d3e14cc..7f9c63d 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -50,9 +50,8 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
-// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
-// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
ClickHouseProperties properties = new ClickHouseProperties();
+
properties.setDatabase(CK_DATABASE);
properties.setUser(CK_USERNAME);
properties.setPassword(CK_PIN);
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
deleted file mode 100644
index 975731c..0000000
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.zdjizhi.utils.ck;
-
-import cn.hutool.core.io.IoUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import ru.yandex.clickhouse.BalancedClickhouseDataSource;
-import ru.yandex.clickhouse.settings.ClickHouseProperties;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import static com.zdjizhi.common.FlowWriteConfig.*;
-
-/**
- * @description:
- * @author: zhq
- * @create: 2022-07-10
- **/
-public class ClickhouseUtil {
-
- private static final Log log = LogFactory.get();
-
- private static Connection connection;
-
-
- public static Connection getConnection() {
- try {
-// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
-// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
- ClickHouseProperties properties = new ClickHouseProperties();
- properties.setDatabase(CK_DATABASE);
- properties.setUser(CK_USERNAME);
- properties.setPassword(CK_PIN);
- properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
- properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
-
- BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
- connection = dataSource.getConnection();
-
- log.info("get clickhouse connection success");
- return connection;
- } catch (SQLException e) {
- log.error("clickhouse connection error ,{}", e);
- }
- return null;
- }
-
- public static void close() {
- IoUtil.close(connection);
- }
-
-}