summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-07-16 11:47:05 +0800
committerlifengchao <[email protected]>2024-07-16 11:47:05 +0800
commit354f8df5f77bed8e52e627f8ccdf389466904ee1 (patch)
tree2901f992879b3c9102d2be3e612c47fa3b155336
parent42e72d63c5484f96b86acb82acd7abee5c25017e (diff)
[feature][connector-clickhouse] GAL-619 groot ck sink 添加插入列参数
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java22
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java6
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java31
3 files changed, 43 insertions, 16 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 2f29b8f..98f5f8b 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
@@ -36,6 +37,8 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -56,6 +59,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
// 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS
public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
+ public static final DateTimeFormatter DATETIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss").appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).toFormatter();
private final int batchSize;
private final int batchByteSize;
private final long batchIntervalMs;
@@ -75,6 +79,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
private int urlIndex;
private final Properties connInfo;
private final String table;
+ private final String[] insertColumns;
private String insertSql;
protected ZoneId tz;
protected String[] columnNames;
@@ -85,12 +90,17 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
protected final SizeHelper writeSizeHelper;
public AbstractBatchIntervalClickHouseSink(int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, Properties connInfo) {
+ this(batchSize, batchByteSize, batchIntervalMs, host, table, null, connInfo);
+ }
+
+ public AbstractBatchIntervalClickHouseSink(int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, @Nullable String[] insertColumns, Properties connInfo) {
this.batchSize = batchSize;
this.batchByteSize = batchByteSize;
this.batchIntervalMs = batchIntervalMs;
this.writeSizeHelper = new SizeHelper();
this.urls = ClickHouseUtils.buildUrlsFromHost(host);
this.table = table;
+ this.insertColumns = insertColumns;
this.connInfo = connInfo;
if(!this.connInfo.containsKey(SettingKey.connect_timeout.name())){
this.connInfo.setProperty(SettingKey.connect_timeout.name(), "30");
@@ -152,7 +162,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
// 获取要插入的列信息
- Tuple3<String[], Object[], int[]> columnsAndDefaultValuesAndDefaultSizes = ClickHouseUtils.getInsertColumnsAndDefaultValuesAndDefaultSizesForTable( urls, urlIndex, connInfo, table);
+ Tuple3<String[], Object[], int[]> columnsAndDefaultValuesAndDefaultSizes = ClickHouseUtils.getInsertColumnsAndDefaultValuesAndDefaultSizesForTable(urls, urlIndex, connInfo, table, insertColumns);
columnNames = columnsAndDefaultValuesAndDefaultSizes.f0;
columnDefaultValues = columnsAndDefaultValuesAndDefaultSizes.f1;
columnDefaultSizes = columnsAndDefaultValuesAndDefaultSizes.f2;
@@ -212,10 +222,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (flushException != null) throw flushException;
}
- void onInit(Configuration parameters) throws Exception {}
- void onClose() throws Exception {}
+ protected void onInit(Configuration parameters) throws Exception {}
+ protected void onClose() throws Exception {}
- abstract int addBatch(Block batch, T data) throws Exception;
+ protected abstract int addBatch(Block batch, T data) throws Exception;
@Override
public final void invoke(T value, Context context) throws Exception {
@@ -623,6 +633,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz);
} else if (str.length() == 23) {
return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz);
+ } else if (str.length() >= 19) {
+ return LocalDateTime.parse(str, DATETIME_FORMATTER).atZone(tz);
}
}
@@ -650,6 +662,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz);
} else if (str.length() == 23) {
return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz);
+ } else if (str.length() >= 19) {
+ return LocalDateTime.parse(str, DATETIME_FORMATTER).atZone(tz);
}
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index ab7a9b8..1b9396c 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -23,7 +23,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
}
@Override
- void onInit(Configuration parameters) throws Exception {
+ protected void onInit(Configuration parameters) throws Exception {
super.onInit(parameters);
simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask();
if(schema != null){
@@ -35,7 +35,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
}
@Override
- int addBatch(Block batch, Event event) throws Exception {
+ protected int addBatch(Block batch, Event event) throws Exception {
int writeSize = 0;
Map<String, Object> map = event.getExtractedFields();
String columnName;
@@ -80,7 +80,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
}
@Override
- void onClose() throws Exception {
+ protected void onClose() throws Exception {
super.onClose();
if(schema != null && schema.isDynamic()){
Schema.unregisterSchemaChangeAware(schema, this);
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
index 49f8235..0209431 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
@@ -16,6 +16,7 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -85,9 +86,14 @@ public class ClickHouseUtils {
}
public static Tuple3<String[], Object[], int[]> getInsertColumnsAndDefaultValuesAndDefaultSizesForTable(
- String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ String[] urls, int urlIndex, Properties connInfo, String table, @Nullable String[] insertColumns) throws Exception {
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+ boolean insertColumnsPresent = insertColumns != null;
+ Set<String> insertColumnSet = insertColumnsPresent ? Arrays.stream(insertColumns).collect(Collectors.toSet()):null;
+ Preconditions.checkArgument(!insertColumnsPresent || insertColumnSet.size() == insertColumns.length, "column names must be unique");
+
+ List<Tuple3<String, Object, Integer>> columnInfos = new ArrayList<>();
int retryCount = 0;
while (true) {
retryCount++;
@@ -98,15 +104,14 @@ public class ClickHouseUtils {
connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
stmt = connection.createStatement();
rst = stmt.executeQuery("desc " + table);
-
- List<String> columnNames = new ArrayList<>();
- List<Object> columnDefaultValues = new ArrayList<>();
- List<Integer> columnDefaultSizes = new ArrayList<>();
while (rst.next()) {
String name = rst.getString("name");
String typeStr = rst.getString("type");
String defaultTypeStr = rst.getString("default_type");
String defaultExpression = rst.getString("default_expression");
+ if (insertColumnsPresent && !insertColumnSet.contains(name)) {
+ continue;
+ }
if ("LowCardinality(String)".equals(typeStr)) {
typeStr = "String";
}
@@ -125,12 +130,20 @@ public class ClickHouseUtils {
defaultValue = type.defaultValue();
}
}
- columnNames.add(name);
- columnDefaultValues.add(defaultValue);
- columnDefaultSizes.add(getDefaultValueSize(type, defaultExpression));
+ columnInfos.add(Tuple3.of(name, defaultValue, getDefaultValueSize(type, defaultExpression)));
}
- return new Tuple3<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]), columnDefaultSizes.stream().mapToInt(x -> x).toArray());
+ Map<String, Tuple3<String, Object, Integer>> columnInfoMap = columnInfos.stream().collect(Collectors.toMap(x -> x.f0, x -> x));
+ String[] columnNames = insertColumnsPresent?insertColumns:columnInfos.stream().map(x->x.f0).toArray(String[]::new);
+ Object[] columnDefaultValues = new Object[columnNames.length];
+ int[] columnDefaultSizes = new int[columnNames.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ Tuple3<String, Object, Integer> columnInfo = columnInfoMap.get(columnNames[i]);
+ Preconditions.checkNotNull(columnInfo, "not exist column:" + columnNames[i]);
+ columnDefaultValues[i] = columnInfo.f1;
+ columnDefaultSizes[i] = columnInfo.f2;
+ }
+ return Tuple3.of(columnNames, columnDefaultValues, columnDefaultSizes);
} catch (SQLException e) {
LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {