diff options
| author | zhanghongqing <[email protected]> | 2022-07-18 18:08:47 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-07-18 18:08:47 +0800 |
| commit | fa3f628658fc4d9197053e8755c685b3d80ff3b5 (patch) | |
| tree | b550a0f3a7a24b0b09b517bb574aff8a43bd0250 /src | |
| parent | 95eefbd8b791f91f2b38e335dd77ce2816d81a1c (diff) | |
优化代码:增加入库批处理数量参数
Diffstat (limited to 'src')
7 files changed, 206 insertions, 422 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 3b68285..7889c88 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -44,8 +44,8 @@ public class FlowWriteConfig { public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism"); public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); - public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); - public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final Integer DATA_CENTER_ID_NUM = 0 ; + public static final Integer LOG_TYPE = FlowWriteConfigurations.getIntProperty(0, "log.type"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); @@ -85,7 +85,7 @@ public class FlowWriteConfig { */ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String ZOOKEEPER_SERVERS = "zookeeper.servers"; public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); @@ -114,22 +114,24 @@ public class FlowWriteConfig { public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); - public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip"); - public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain"); - public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain"); - public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip"); - public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain"); + public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip"); + public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain"); + public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain"); + public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.resolve.domain2ip"); + public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.nx.domain2domain"); - public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host"); - public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port"); - public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user"); - public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password"); - public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name"); - public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl"); - public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch"); - public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number"); + public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangodb.host"); + public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangodb.port"); + public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangodb.user"); + public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangodb.password"); + public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangodb.db.name"); + public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangodb.ttl"); + public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangodb.batch"); + public static final Integer ARANGODB_THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "arangodb.thread.pool.number"); - public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time"); + public static final Integer SINK_CK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.ck.batch.delay.time"); + public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time"); public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch"); + public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java new file mode 100644 index 0000000..ba348bd --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java @@ -0,0 +1,132 @@ +package com.zdjizhi.etl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + +/** + * * 带超时的计数窗口触发器 + */ +public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> { + private static Log logger = LogFactory.get(); + + /** + * 窗口最大数据量 + */ + private int maxCount; + /** + * event time / process time + */ + private TimeCharacteristic timeType; + + private String stateName; + + public String getStateName() { + return stateName; + } + + public void setStateName(String stateName) { + this.stateName = stateName; + } + + public CountTriggerWithTimeout(String stateName) { + this.stateName = stateName; + } + + /** + * 用于储存窗口当前数据量的状态对象 + */ + private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE); + + + public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) { + + this.maxCount = maxCount; + this.timeType = timeType; + this.stateName = stateName; + } + + + private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { + clear(window, ctx); + return TriggerResult.FIRE_AND_PURGE; + } + + + @Override + public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); + countState.add(1L); + + + if (countState.get() >= maxCount) { + logger.info("fire with count: " + countState.get()); + return fireAndPurge(window, ctx); + } + if (timestamp >= window.getEnd()) { + logger.info("fire with tiem: " + timestamp); + return fireAndPurge(window, ctx); + } else { + return TriggerResult.CONTINUE; + } + } + + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + if (timeType != TimeCharacteristic.ProcessingTime) { + return TriggerResult.CONTINUE; + } + + + if (time >= window.getEnd()) { + return TriggerResult.CONTINUE; + } else { + logger.info("fire with process tiem: " + time); + return fireAndPurge(window, ctx); + } + } + + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + if (timeType != TimeCharacteristic.EventTime) { + return TriggerResult.CONTINUE; + } + + + if (time >= window.getEnd()) { + return TriggerResult.CONTINUE; + } else { + logger.info("fire with event tiem: " + time); + return fireAndPurge(window, ctx); + } + } + + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); + countState.clear(); + } + + /** + * 计数方法 + */ + class Sum implements ReduceFunction<Long> { + + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 7ca7acb..e7ac08e 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -7,12 +7,14 @@ import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.CKBatchWindow; +import com.zdjizhi.etl.CountTriggerWithTimeout; import com.zdjizhi.etl.connection.*; import com.zdjizhi.etl.dns.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; @@ -35,9 +37,10 @@ public class LogFlowWriteTopology { env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); //1 connection,2 dns - if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + if (FlowWriteConfig.LOG_TYPE == 1) { //connection DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .filter(Objects::nonNull) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); @@ -53,9 +56,11 @@ public class LogFlowWriteTopology { .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); @@ -74,20 +79,43 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + } + sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); //写入arangodb - ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); - - } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { + ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchIPWindow()) + .addSink(new ArangoDBSink(R_VISIT_IP2IP)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP); + } else if (FlowWriteConfig.LOG_TYPE == 2) { DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .filter(Objects::nonNull) .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) @@ -96,23 +124,30 @@ public class LogFlowWriteTopology { DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .setParallelism(TRANSFORM_PARALLELISM) .flatMap(new DnsSplitFlatMapFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //dns 原始日志 ck入库 - dnsSource.filter(Objects::nonNull) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + } //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) + dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); @@ -125,7 +160,10 @@ public class LogFlowWriteTopology { for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow()) + .setParallelism(SINK_PARALLELISM) + .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java index caf4e79..4902e5f 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java @@ -29,7 +29,7 @@ public class ArangoDBConnect { private static void getArangoDB() { arangoDB = new ArangoDB.Builder() - .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER) + .maxConnections(FlowWriteConfig.ARANGODB_THREAD_POOL_NUMBER) .host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT) .user(FlowWriteConfig.ARANGODB_USER) .password(FlowWriteConfig.ARANGODB_PASSWORD) diff --git a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java b/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java deleted file mode 100644 index 2449244..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java +++ /dev/null @@ -1,335 +0,0 @@ -/* -package com.zdjizhi.utils.ck; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHouseDriver; -import ru.yandex.clickhouse.ClickhouseJdbcUrlParser; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import javax.sql.DataSource; -import java.io.PrintWriter; -import java.net.URISyntaxException; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -*/ -/** - * 提供负载均衡能力的datasource实现 - *//* - -public class BalancedClickhouseDataSource implements DataSource { - private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class); - private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?"); - private PrintWriter printWriter; - private int loginTimeoutSeconds; - //随机数 - private final ThreadLocal<Random> randomThreadLocal; - //所有的url - private final List<String> allUrls; - //可用的url - private volatile List<String> enabledUrls; - private final ClickHouseProperties properties; - private final ClickHouseDriver driver; - - public BalancedClickhouseDataSource(String url) { - this(splitUrl(url), getFromUrl(url)); - } - - public BalancedClickhouseDataSource(String url, Properties properties) { - this(splitUrl(url), new ClickHouseProperties(properties)); - } - - public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) { - this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url))); - } - - private BalancedClickhouseDataSource(List<String> urls) { - this(urls, new ClickHouseProperties()); - } - - private BalancedClickhouseDataSource(List<String> urls, Properties info) { - this(urls, new ClickHouseProperties(info)); - } - - private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) { - this.loginTimeoutSeconds = 0; - this.randomThreadLocal = new ThreadLocal(); - this.driver = new ClickHouseDriver(); - if (urls.isEmpty()) { - throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty"); - } else { - try { - //解析配置文件 - ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties()); - localProperties.setHost((String)null); - localProperties.setPort(-1); - this.properties = localProperties; - } catch (URISyntaxException var8) { - throw new IllegalArgumentException(var8); - } - - List<String> allUrls = new ArrayList(urls.size()); - Iterator var4 = urls.iterator(); - - while(var4.hasNext()) { - String url = (String)var4.next(); - - try { - //如果合法url - if (this.driver.acceptsURL(url)) { - //添加到所有的url列表 - allUrls.add(url); - } else { - log.error("that url is has not correct format: {}", url); - } - } catch (SQLException var7) { - throw new IllegalArgumentException("error while checking url: " + url, var7); - } - } - - if (allUrls.isEmpty()) { - throw new IllegalArgumentException("there are no correct urls"); - } else { - //所有url - this.allUrls = Collections.unmodifiableList(allUrls); - //可用url - this.enabledUrls = this.allUrls; - } - } - } - - */ -/** - * 切割url - * @param url - * @return - *//* - - static List<String> splitUrl(String url) { - //校验url合法性 - Matcher m = URL_TEMPLATE.matcher(url); - if (!m.matches()) { - throw new IllegalArgumentException("Incorrect url"); - } else { - String database = m.group(2); - if (database == null) { - database = ""; - } - - //切割url串 - String[] hosts = m.group(1).split(","); - List<String> result = new ArrayList(hosts.length); - String[] var5 = hosts; - int var6 = hosts.length; - - //遍历,添加切割后的url - for(int var7 = 0; var7 < var6; ++var7) { - String host = var5[var7]; - result.add("jdbc:clickhouse://" + host + database); - } - - return result; - } - } - - */ -/** - * ping url看是否可用 - * @param url - * @return - *//* - - private boolean ping(String url) { - try { - //执行简单sql测试url链接可用性 - this.driver.connect(url, this.properties).createStatement().execute("SELECT 1"); - return true; - } catch (Exception var3) { - return false; - } - } - - */ -/** - * 遍历所有url,通过ping的方式,选择出可用的url - * @return - *//* - - public synchronized int actualize() { - //新建可用url列表 - List<String> enabledUrls = new ArrayList(this.allUrls.size()); - Iterator var2 = this.allUrls.iterator(); - - while(var2.hasNext()) { - String url = (String)var2.next(); - log.debug("Pinging disabled url: {}", url); - if (this.ping(url)) { - log.debug("Url is alive now: {}", url); - //ping通的才添加进可用的 - enabledUrls.add(url); - } else { - log.debug("Url is dead now: {}", url); - } - } - - //重置可用url列表 - this.enabledUrls = Collections.unmodifiableList(enabledUrls); - return enabledUrls.size(); - } - - */ -/** - * 随机获取可用url返回 - * @return - * @throws java.sql.SQLException - *//* - - private String getAnyUrl() throws SQLException { - //可用url列表 - List<String> localEnabledUrls = this.enabledUrls; - if (localEnabledUrls.isEmpty()) { - throw new SQLException("Unable to get connection: there are no enabled urls"); - } else { - Random random = (Random)this.randomThreadLocal.get(); - if (random == null) { - this.randomThreadLocal.set(new Random()); - //产生一个随机数 - random = (Random)this.randomThreadLocal.get(); - } - - int index = random.nextInt(localEnabledUrls.size()); - //用随机数选择一个可用的url返回 - return (String)localEnabledUrls.get(index); - } - } - - public ClickHouseConnection getConnection() throws SQLException { - return this.driver.connect(this.getAnyUrl(), this.properties); - } - - public ClickHouseConnection getConnection(String username, String password) throws SQLException { - return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password)); - } - - public <T> T unwrap(Class<T> iface) throws SQLException { - if (iface.isAssignableFrom(this.getClass())) { - return iface.cast(this); - } else { - throw new SQLException("Cannot unwrap to " + iface.getName()); - } - } - - public boolean isWrapperFor(Class<?> iface) throws SQLException { - return iface.isAssignableFrom(this.getClass()); - } - - public PrintWriter getLogWriter() throws SQLException { - return this.printWriter; - } - - public void setLogWriter(PrintWriter printWriter) throws SQLException { - this.printWriter = printWriter; - } - - public void setLoginTimeout(int seconds) throws SQLException { - this.loginTimeoutSeconds = seconds; - } - - public int getLoginTimeout() throws SQLException { - return this.loginTimeoutSeconds; - } - - public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { - throw new SQLFeatureNotSupportedException(); - } - - */ -/** - * 定期清理无用url链接 - * @param rate - * @param timeUnit - * @return - *//* - - public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) { - this.driver.scheduleConnectionsCleaning(rate, timeUnit); - return this; - } - - */ -/** - * 定期确认url,通过定时任务实现,以定时更新可用url列表 - * @param delay - * @param timeUnit - * @return - *//* - - public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) { - ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() { - public void run() { - try { - BalancedClickhouseDataSource.this.actualize(); - } catch (Exception var2) { - BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2); - } - - } - }, 0L, (long)delay, timeUnit); - return this; - } - - public List<String> getAllClickhouseUrls() { - return this.allUrls; - } - - public List<String> getEnabledClickHouseUrls() { - return this.enabledUrls; - } - - */ -/** - * 返回不可用url集合 - * 通过all 和 enable的差值来找 - * - * @return - *//* - - public List<String> getDisabledUrls() { - List<String> enabledUrls = this.enabledUrls; - if (!this.hasDisabledUrls()) { - return Collections.emptyList(); - } else { - List<String> disabledUrls = new ArrayList(this.allUrls); - disabledUrls.removeAll(enabledUrls); - return disabledUrls; - } - } - - public boolean hasDisabledUrls() { - return this.allUrls.size() != this.enabledUrls.size(); - } - - public ClickHouseProperties getProperties() { - return this.properties; - } - - private static ClickHouseProperties getFromUrl(String url) { - return new ClickHouseProperties(getFromUrlWithoutDefault(url)); - } - - private static Properties getFromUrlWithoutDefault(String url) { - if (StringUtils.isBlank(url)) { - return new Properties(); - } else { - int index = url.indexOf("?"); - return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties()); - } - } -}*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index d3e14cc..7f9c63d 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -50,9 +50,8 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> public void open(Configuration parameters) throws Exception { super.open(parameters); try { -// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); -// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); ClickHouseProperties properties = new ClickHouseProperties(); + properties.setDatabase(CK_DATABASE); properties.setUser(CK_USERNAME); properties.setPassword(CK_PIN); diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java deleted file mode 100644 index 975731c..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.zdjizhi.utils.ck; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import ru.yandex.clickhouse.BalancedClickhouseDataSource; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import java.sql.Connection; -import java.sql.SQLException; - -import static com.zdjizhi.common.FlowWriteConfig.*; - -/** - * @description: - * @author: zhq - * @create: 2022-07-10 - **/ -public class ClickhouseUtil { - - private static final Log log = LogFactory.get(); - - private static Connection connection; - - - public static Connection getConnection() { - try { -// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); -// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); - ClickHouseProperties properties = new ClickHouseProperties(); - properties.setDatabase(CK_DATABASE); - properties.setUser(CK_USERNAME); - properties.setPassword(CK_PIN); - properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT); - properties.setSocketTimeout(CK_SOCKET_TIMEOUT); - - BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); - connection = dataSource.getConnection(); - - log.info("get clickhouse connection success"); - return connection; - } catch (SQLException e) { - log.error("clickhouse connection error ,{}", e); - } - return null; - } - - public static void close() { - IoUtil.close(connection); - } - -} |
