summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-01 18:25:43 +0800
committerlifengchao <[email protected]>2023-12-01 18:25:43 +0800
commit5f21adf326bd9c1db7f1bb9faf65d63643c596b9 (patch)
tree403f0951a6d2444ad6dc6303258d776e0f70a28e
parent4bfceb141b4e68fd96c572a566f650d13ba76b3f (diff)
ClickHouse Connector
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java34
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java36
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java72
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java99
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java6
-rw-r--r--groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java3
7 files changed, 206 insertions, 45 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java
new file mode 100644
index 0000000..3ae2414
--- /dev/null
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.connectors.clickhouse;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+public class ClickHouseConnectorOptions {
+ public static final ConfigOption<String> TABLE =
+ ConfigOptions.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("clickhouse table name.");
+
+ public static final ConfigOption<String> HOST =
+ ConfigOptions.key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("clickhouse host and tcp port info. format: host1:port,host2:port");
+
+ public static final ConfigOption<Integer> BATCH_SIZE =
+ ConfigOptions.key("batch.size")
+ .intType()
+ .defaultValue(100000)
+ .withDescription("The flush max size , over this number of records, will flush data.");
+
+ public static final ConfigOption<Duration> BATCH_INTERVAL =
+ ConfigOptions.key("batch.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription("The flush interval mills, over this time, asynchronous threads will flush data.");
+
+
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java
new file mode 100644
index 0000000..20f9b23
--- /dev/null
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java
@@ -0,0 +1,36 @@
+package com.geedgenetworks.connectors.clickhouse;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class ClickHouseConnectorOptionsUtil {
+ public static final String CONNECTION_INFO_PREFIX = "connection.";
+
+ public static Properties getClickHouseConnInfo(Map<String, String> tableOptions) {
+ final Properties connInfo = new Properties();
+
+ if (hasClickHouseConnInfo(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(CONNECTION_INFO_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((CONNECTION_INFO_PREFIX).length());
+ if(!StringUtils.isBlank(value)){
+ connInfo.put(subKey, value);
+ }
+ });
+ }
+ return connInfo;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasClickHouseConnInfo(Map<String, String> tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(CONNECTION_INFO_PREFIX));
+ }
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
new file mode 100644
index 0000000..1fca98a
--- /dev/null
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.connectors.clickhouse;
+
+import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.clickhouse.ClickHouseConnectorOptions.*;
+import static com.geedgenetworks.connectors.clickhouse.ClickHouseConnectorOptionsUtil.*;
+
+public class ClickHouseTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "clickhouse";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ helper.validateExcept(CONNECTION_INFO_PREFIX); // 校验参数
+
+ // sink暂时没有dataType
+ //StructType dataType = context.getSchema();
+ ReadableConfig config = context.getConfiguration();
+
+ String host = config.get(HOST);
+ String table = config.get(TABLE);
+ int batchSize = config.get(BATCH_SIZE);
+ long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis();
+ Properties connInfo = getClickHouseConnInfo(context.getOptions());
+
+ final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, connInfo);
+
+ return new SinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ return dataStream.addSink(sinkFunction);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOST);
+ options.add(TABLE);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BATCH_SIZE);
+ options.add(BATCH_INTERVAL);
+ return options;
+ }
+
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index e295a87..e2607a2 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -94,35 +94,37 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
initClickHouseParams();
onInit(parameters);
lastFlushTs = System.currentTimeMillis();
- outThread =
- new Thread(
- () -> {
- while (!closed) {
+ final String threadName = "BatchIntervalSink-" + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "/" + getRuntimeContext().getNumberOfParallelSubtasks();
+ outThread = new Thread(() -> {
+ while (!closed) {
+ try {
+ Block list;
+ try {
+ list = outBatchQueue.poll(2, TimeUnit.SECONDS);
+ if (list == null) {
+ if (System.currentTimeMillis() - lastFlushTs >= batchIntervalMs) {
+ lock.lock();
try {
- Block list;
- try {
- list = outBatchQueue.poll(2, TimeUnit.SECONDS);
- if (list == null) {
- if (System.currentTimeMillis() - lastFlushTs
- >= batchIntervalMs) {
- flush();
- }
- continue;
- }
- } catch (InterruptedException e) {
- continue;
+ // 正常情况应该是一个线程生产,一个消费。防止极限情况生产线程刚好生产,造成死锁。
+ if(outBatchQueue.isEmpty()){
+ flush();
}
-
- onFlush(list);
- } catch (Exception e) {
- flushException = e;
+ } finally {
+ lock.unlock();
}
}
- },
- "BatchIntervalSink-"
- + (getRuntimeContext().getIndexOfThisSubtask() + 1)
- + "/"
- + getRuntimeContext().getNumberOfParallelSubtasks());
+ continue;
+ }
+ } catch (InterruptedException e) {
+ continue;
+ }
+
+ doFlushAndResetBlock(list, true);
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }, threadName);
outThread.start();
}
@@ -214,7 +216,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
// LOG.warn("flush");
flush();
}
- } finally {
+ } catch (Exception e) {
+ LOG.error("转换ck类型异常", e);
+ } finally{
lock.unlock();
}
}
@@ -233,12 +237,23 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
}
- private void onFlush(Block block) throws Exception {
+ private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception {
+ try {
+ doFlush(block);
+ } finally {
+ ClickHouseUtils.resetInsertBlockColumns(block);
+ if(recycle){
+ // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞
+ freeBatchQueue.put(block);
+ }
+ }
+ }
+
+ private void doFlush(Block block) throws Exception {
long start = System.currentTimeMillis();
int rowCnt = block.rowCnt();
ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试
- LOG.warn(
- "flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs));
+ LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs));
lastFlushTs = System.currentTimeMillis();
int retryCount = 0;
@@ -259,16 +274,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- ClickHouseUtils.resetInsertBlockColumns(block);
- freeBatchQueue.put(block);
-
- LOG.warn(
- "flush "
- + rowCnt
- + " end:"
- + new Timestamp(System.currentTimeMillis())
- + ","
- + (System.currentTimeMillis() - start));
+ LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (SQLException e) {
@@ -279,7 +285,18 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
} finally {
if (stmt != null) {
- stmt.close();
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ LOG.error("ClickHouseBatchInsertFail url:" + url, e);
+ if (retryCount >= 3) {
+ //throw e;
+ if (connection != null) {
+ connection.close();
+ }
+ return;
+ }
+ }
}
if (connection != null) {
connection.close();
@@ -308,7 +325,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
lock.lock();
try {
if (batch.rowCnt() > 0) {
- onFlush(batch);
+ doFlushAndResetBlock(batch, false);
}
} catch (Exception e) {
flushException = e;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
index 617ec53..95e39a5 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
@@ -135,7 +135,7 @@ public class ClickHouseUtils {
columnNames.toArray(new String[columnNames.size()]),
columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
} catch (SQLException e) {
- e.printStackTrace();
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {
throw e;
}
@@ -172,7 +172,7 @@ public class ClickHouseUtils {
((ClickHouseConnection) connection).serverContext());
return tz;
} catch (SQLException e) {
- e.printStackTrace();
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {
throw e;
}
@@ -207,7 +207,7 @@ public class ClickHouseUtils {
Block block = ((ClickHouseConnection) connection).getSampleBlock(sql);
return block;
} catch (SQLException e) {
- e.printStackTrace();
+ LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {
throw e;
}
diff --git a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..7950033
--- /dev/null
+++ b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.connectors.clickhouse.ClickHouseTableFactory
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
index 64db685..98b29cf 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
@@ -32,13 +32,14 @@ public class PrintTableFactory implements SinkTableFactory {
// 获取encodingFormat
EncodingFormat encodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
+ helper.validate(); // 校验参数
+
StructType dataType = context.getSchema();
ReadableConfig config = context.getConfiguration();
int type = config.get(TYPE);
final PrintSinkFunction sinkFunction = new PrintSinkFunction(dataType, encodingFormat, type);
return new SinkProvider(){
-
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
return dataStream.addSink(sinkFunction);