diff options
| author | lifengchao <[email protected]> | 2023-12-01 18:25:43 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-01 18:25:43 +0800 |
| commit | 5f21adf326bd9c1db7f1bb9faf65d63643c596b9 (patch) | |
| tree | 403f0951a6d2444ad6dc6303258d776e0f70a28e | |
| parent | 4bfceb141b4e68fd96c572a566f650d13ba76b3f (diff) | |
ClickHouse Connector
7 files changed, 206 insertions, 45 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java new file mode 100644 index 0000000..3ae2414 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.connectors.clickhouse; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.time.Duration; + +public class ClickHouseConnectorOptions { + public static final ConfigOption<String> TABLE = + ConfigOptions.key("table") + .stringType() + .noDefaultValue() + .withDescription("clickhouse table name."); + + public static final ConfigOption<String> HOST = + ConfigOptions.key("host") + .stringType() + .noDefaultValue() + .withDescription("clickhouse host and tcp port info. format: host1:port,host2:port"); + + public static final ConfigOption<Integer> BATCH_SIZE = + ConfigOptions.key("batch.size") + .intType() + .defaultValue(100000) + .withDescription("The flush max size , over this number of records, will flush data."); + + public static final ConfigOption<Duration> BATCH_INTERVAL = + ConfigOptions.key("batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription("The flush interval mills, over this time, asynchronous threads will flush data."); + + +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java new file mode 100644 index 0000000..20f9b23 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptionsUtil.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.connectors.clickhouse; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Properties; + +public class ClickHouseConnectorOptionsUtil { + public static final String CONNECTION_INFO_PREFIX = "connection."; + + public static Properties getClickHouseConnInfo(Map<String, String> tableOptions) { + final Properties connInfo = new Properties(); + + if (hasClickHouseConnInfo(tableOptions)) { + tableOptions.keySet().stream() + .filter(key -> key.startsWith(CONNECTION_INFO_PREFIX)) + .forEach( + key -> { + final String value = tableOptions.get(key); + final String subKey = key.substring((CONNECTION_INFO_PREFIX).length()); + if(!StringUtils.isBlank(value)){ + connInfo.put(subKey, value); + } + }); + } + return connInfo; + } + + /** + * Decides if the table options contains Kafka client properties that start with prefix + * 'properties'. + */ + private static boolean hasClickHouseConnInfo(Map<String, String> tableOptions) { + return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(CONNECTION_INFO_PREFIX)); + } +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java new file mode 100644 index 0000000..1fca98a --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.connectors.clickhouse; + +import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink; +import com.geedgenetworks.core.connector.sink.SinkProvider; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; +import com.geedgenetworks.core.factories.SinkTableFactory; +import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static com.geedgenetworks.connectors.clickhouse.ClickHouseConnectorOptions.*; +import static com.geedgenetworks.connectors.clickhouse.ClickHouseConnectorOptionsUtil.*; + +public class ClickHouseTableFactory implements SinkTableFactory { + public static final String IDENTIFIER = "clickhouse"; + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SinkProvider getSinkProvider(Context context) { + final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + helper.validateExcept(CONNECTION_INFO_PREFIX); // 校验参数 + + // sink暂时没有dataType + //StructType dataType = context.getSchema(); + ReadableConfig config = context.getConfiguration(); + + String host = config.get(HOST); + String table = config.get(TABLE); + int batchSize = config.get(BATCH_SIZE); + long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis(); + Properties connInfo = getClickHouseConnInfo(context.getOptions()); + + final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, connInfo); + + return new SinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { + return dataStream.addSink(sinkFunction); + } + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(HOST); + options.add(TABLE); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(BATCH_SIZE); + options.add(BATCH_INTERVAL); + return options; + } + +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index e295a87..e2607a2 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -94,35 +94,37 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun initClickHouseParams(); onInit(parameters); lastFlushTs = System.currentTimeMillis(); - outThread = - new Thread( - () -> { - while (!closed) { + final String threadName = "BatchIntervalSink-" + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "/" + getRuntimeContext().getNumberOfParallelSubtasks(); + outThread = new Thread(() -> { + while (!closed) { + try { + Block list; + try { + list = outBatchQueue.poll(2, TimeUnit.SECONDS); + if (list == null) { + if (System.currentTimeMillis() - lastFlushTs >= batchIntervalMs) { + lock.lock(); try { - Block list; - try { - list = outBatchQueue.poll(2, TimeUnit.SECONDS); - if (list == null) { - if (System.currentTimeMillis() - lastFlushTs - >= batchIntervalMs) { - flush(); - } - continue; - } - } catch (InterruptedException e) { - continue; + // 正常情况应该是一个线程生产,一个消费。防止极限情况生产线程刚好生产,造成死锁。 + if(outBatchQueue.isEmpty()){ + flush(); } - - onFlush(list); - } catch (Exception e) { - flushException = e; + } finally { + lock.unlock(); } } - }, - "BatchIntervalSink-" - + (getRuntimeContext().getIndexOfThisSubtask() + 1) - + "/" - + getRuntimeContext().getNumberOfParallelSubtasks()); + continue; + } + } catch (InterruptedException e) { + continue; + } + + doFlushAndResetBlock(list, true); + } catch (Exception e) { + flushException = e; + } + } + }, threadName); outThread.start(); } @@ -214,7 +216,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun // LOG.warn("flush"); flush(); } - } finally { + } catch (Exception e) { + LOG.error("转换ck类型异常", e); + } finally{ lock.unlock(); } } @@ -233,12 +237,23 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } } - private void onFlush(Block block) throws Exception { + private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception { + try { + doFlush(block); + } finally { + ClickHouseUtils.resetInsertBlockColumns(block); + if(recycle){ + // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞 + freeBatchQueue.put(block); + } + } + } + + private void doFlush(Block block) throws Exception { long start = System.currentTimeMillis(); int rowCnt = block.rowCnt(); ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试 - LOG.warn( - "flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs)); + LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs)); lastFlushTs = System.currentTimeMillis(); int retryCount = 0; @@ -259,16 +274,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock()); stmt.executeBatch(); - ClickHouseUtils.resetInsertBlockColumns(block); - freeBatchQueue.put(block); - - LOG.warn( - "flush " - + rowCnt - + " end:" - + new Timestamp(System.currentTimeMillis()) - + "," - + (System.currentTimeMillis() - start)); + LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); return; } catch (SQLException e) { @@ -279,7 +285,18 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } } finally { if (stmt != null) { - stmt.close(); + try { + stmt.close(); + } catch (SQLException e) { + LOG.error("ClickHouseBatchInsertFail url:" + url, e); + if (retryCount >= 3) { + //throw e; + if (connection != null) { + connection.close(); + } + return; + } + } } if (connection != null) { connection.close(); @@ -308,7 +325,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun lock.lock(); try { if (batch.rowCnt() > 0) { - onFlush(batch); + doFlushAndResetBlock(batch, false); } } catch (Exception e) { flushException = e; diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 617ec53..95e39a5 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -135,7 +135,7 @@ public class ClickHouseUtils { columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); } catch (SQLException e) { - e.printStackTrace(); + LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); if (retryCount >= 3) { throw e; } @@ -172,7 +172,7 @@ public class ClickHouseUtils { ((ClickHouseConnection) connection).serverContext()); return tz; } catch (SQLException e) { - e.printStackTrace(); + LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); if (retryCount >= 3) { throw e; } @@ -207,7 +207,7 @@ public class ClickHouseUtils { Block block = ((ClickHouseConnection) connection).getSampleBlock(sql); return block; } catch (SQLException e) { - e.printStackTrace(); + LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e); if (retryCount >= 3) { throw e; } diff --git a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..7950033 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.connectors.clickhouse.ClickHouseTableFactory diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java index 64db685..98b29cf 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java @@ -32,13 +32,14 @@ public class PrintTableFactory implements SinkTableFactory { // 获取encodingFormat EncodingFormat encodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT); + helper.validate(); // 校验参数 + StructType dataType = context.getSchema(); ReadableConfig config = context.getConfiguration(); int type = config.get(TYPE); final PrintSinkFunction sinkFunction = new PrintSinkFunction(dataType, encodingFormat, type); return new SinkProvider(){ - @Override public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { return dataStream.addSink(sinkFunction); |
