diff options
| author | 李奉超 <[email protected]> | 2024-07-02 06:51:30 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-07-02 06:51:30 +0000 |
| commit | bbb50013547e0bf83bb64a7611b911a5a92a41c0 (patch) | |
| tree | 38f57e87d8442678727256641e48ee13c5f88077 | |
| parent | 7982696e8f826374c183fdf3e265fce857a051c4 (diff) | |
| parent | 06c905e9385f96f978d07cdbb23bd8ec390993e8 (diff) | |
Merge branch 'ck-map' into 'develop'
[feature][connector-clickhouse] GAL-604 ck sink支持map类型列写入
See merge request galaxy/platform/groot-stream!69
3 files changed, 63 insertions, 5 deletions
diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md index edb4aab..6858fd6 100644 --- a/docs/connector/source/file.md +++ b/docs/connector/source/file.md @@ -54,4 +54,14 @@ application: downstream: [ ] ``` +put file to hdfs: +```sh +# maka dir +hadoop fs -mkdir hdfs://ns1/test +# put local file to hdfs +hadoop fs -put logs.json hdfs://ns1/test + +# list hdfs dir +hadoop fs -ls logs.json hdfs://ns1/test +``` diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 6761d42..2f29b8f 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -392,6 +392,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
protected ValueConverter makeConverter(IDataType<?, ?> type) {
+ return makeConverter(type, true);
+ }
+
+ protected ValueConverter makeConverter(IDataType<?, ?> type, boolean objectReuse) {
// put the most common cast at first to avoid `instanceof` test overhead
if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
return makeStringConverter();
@@ -482,13 +486,18 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (type instanceof DataTypeArray) {
IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType();
- ValueConverter eleConverter = this.makeConverter(eleDataType);
+ ValueConverter eleConverter = this.makeConverter(eleDataType, false);
Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]);
return (obj, sizeHelper) -> {
return this.convertArray(obj, eleDataType, eleConverter, defaultValue, sizeHelper);
};
}
+ if (type instanceof DataTypeMap) {
+ IDataType<?, ?>[] kvTypes = ((DataTypeMap) type).getNestedTypes();
+ return makeMapConverter(kvTypes, objectReuse);
+ }
+
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -818,6 +827,42 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "require ClickHouseArray for column, but found " + obj.getClass());
}
+ private ValueConverter makeMapConverter(IDataType<?, ?>[] kvTypes, boolean objectReuse){
+ final ValueConverter keyConverter = this.makeConverter(kvTypes[0]);
+ final ValueConverter valueConverter = this.makeConverter(kvTypes[1], false);
+ return new ValueConverter() {
+ Map<Object, Object> map = objectReuse? new HashMap<>():null;
+ @Override
+ public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ if (obj == null) {
+ throw new ClickHouseSQLException(-1, "type doesn't support null value");
+ }
+ if (obj instanceof Map){
+ Map<?, ?> dataMap = (Map<?, ?>) obj;
+ if(dataMap.isEmpty()){
+ return dataMap;
+ }
+ Map<Object, Object> result;
+ if(!objectReuse || dataMap.size() > 100){
+ result = new HashMap<>();
+ } else {
+ result = map;
+ result.clear();
+ }
+
+ for (Map.Entry<?, ?> entry : dataMap.entrySet()) {
+ Object key = keyConverter.convert(entry.getKey(), sizeHelper);
+ Object value = valueConverter.convert(entry.getValue(), sizeHelper);
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ throw new ClickHouseSQLException(-1, "require Map for column, but found " + obj.getClass());
+ }
+ };
+ }
+
// copy from org.apache.flink.table.runtime.util.StringUtf8Utils
static int encodeUTF8(String str, byte[] bytes) {
int offset = 0;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 7c42498..49f8235 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -23,10 +23,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -122,6 +119,8 @@ public class ClickHouseUtils { if (defaultValue == null && !type.nullable()) {
if (type instanceof DataTypeArray) {
defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
+ } else if (type instanceof DataTypeMap) {
+ defaultValue = Collections.emptyMap();
} else {
defaultValue = type.defaultValue();
}
@@ -378,6 +377,10 @@ public class ClickHouseUtils { return 0;
}
+ if (type instanceof DataTypeMap) {
+ return 0;
+ }
+
throw new UnsupportedOperationException("Unsupported type: " + type);
}
|
