summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-07-02 14:40:13 +0800
committerlifengchao <[email protected]>2024-07-02 14:40:13 +0800
commit06c905e9385f96f978d07cdbb23bd8ec390993e8 (patch)
tree38f57e87d8442678727256641e48ee13c5f88077
parent7982696e8f826374c183fdf3e265fce857a051c4 (diff)
[feature][connector-clickhouse] GAL-604 ck sink支持map类型列写入
-rw-r--r--docs/connector/source/file.md10
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java47
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java11
3 files changed, 63 insertions, 5 deletions
diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md
index edb4aab..6858fd6 100644
--- a/docs/connector/source/file.md
+++ b/docs/connector/source/file.md
@@ -54,4 +54,14 @@ application:
downstream: [ ]
```
+put file to hdfs:
+```sh
+# maka dir
+hadoop fs -mkdir hdfs://ns1/test
+# put local file to hdfs
+hadoop fs -put logs.json hdfs://ns1/test
+
+# list hdfs dir
+hadoop fs -ls logs.json hdfs://ns1/test
+```
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 6761d42..2f29b8f 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -392,6 +392,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
protected ValueConverter makeConverter(IDataType<?, ?> type) {
+ return makeConverter(type, true);
+ }
+
+ protected ValueConverter makeConverter(IDataType<?, ?> type, boolean objectReuse) {
// put the most common cast at first to avoid `instanceof` test overhead
if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
return makeStringConverter();
@@ -482,13 +486,18 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (type instanceof DataTypeArray) {
IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType();
- ValueConverter eleConverter = this.makeConverter(eleDataType);
+ ValueConverter eleConverter = this.makeConverter(eleDataType, false);
Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]);
return (obj, sizeHelper) -> {
return this.convertArray(obj, eleDataType, eleConverter, defaultValue, sizeHelper);
};
}
+ if (type instanceof DataTypeMap) {
+ IDataType<?, ?>[] kvTypes = ((DataTypeMap) type).getNestedTypes();
+ return makeMapConverter(kvTypes, objectReuse);
+ }
+
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -818,6 +827,42 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
throw new ClickHouseSQLException(-1, "require ClickHouseArray for column, but found " + obj.getClass());
}
+ private ValueConverter makeMapConverter(IDataType<?, ?>[] kvTypes, boolean objectReuse){
+ final ValueConverter keyConverter = this.makeConverter(kvTypes[0]);
+ final ValueConverter valueConverter = this.makeConverter(kvTypes[1], false);
+ return new ValueConverter() {
+ Map<Object, Object> map = objectReuse? new HashMap<>():null;
+ @Override
+ public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ if (obj == null) {
+ throw new ClickHouseSQLException(-1, "type doesn't support null value");
+ }
+ if (obj instanceof Map){
+ Map<?, ?> dataMap = (Map<?, ?>) obj;
+ if(dataMap.isEmpty()){
+ return dataMap;
+ }
+ Map<Object, Object> result;
+ if(!objectReuse || dataMap.size() > 100){
+ result = new HashMap<>();
+ } else {
+ result = map;
+ result.clear();
+ }
+
+ for (Map.Entry<?, ?> entry : dataMap.entrySet()) {
+ Object key = keyConverter.convert(entry.getKey(), sizeHelper);
+ Object value = valueConverter.convert(entry.getValue(), sizeHelper);
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ throw new ClickHouseSQLException(-1, "require Map for column, but found " + obj.getClass());
+ }
+ };
+ }
+
// copy from org.apache.flink.table.runtime.util.StringUtf8Utils
static int encodeUTF8(String str, byte[] bytes) {
int offset = 0;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
index 7c42498..49f8235 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
@@ -23,10 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -122,6 +119,8 @@ public class ClickHouseUtils {
if (defaultValue == null && !type.nullable()) {
if (type instanceof DataTypeArray) {
defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
+ } else if (type instanceof DataTypeMap) {
+ defaultValue = Collections.emptyMap();
} else {
defaultValue = type.defaultValue();
}
@@ -378,6 +377,10 @@ public class ClickHouseUtils {
return 0;
}
+ if (type instanceof DataTypeMap) {
+ return 0;
+ }
+
throw new UnsupportedOperationException("Unsupported type: " + type);
}