diff options
| author | lifengchao <[email protected]> | 2023-12-04 16:08:24 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-04 16:08:24 +0800 |
| commit | 6bb40af602031fd8bab30dd1680ff81f8b92d0bd (patch) | |
| tree | 680508d76ff60224ce145e2790da878a492f7e52 | |
| parent | a6fc25fee94af1f44542b117362ae715e689ae29 (diff) | |
source支持配置解析struct类型
6 files changed, 185 insertions, 17 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java index 3846aa7..7229a03 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java @@ -12,6 +12,7 @@ import com.geedgenetworks.core.factories.SourceTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.SourceConfig; +import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; @@ -46,7 +47,8 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou checkSourceConfig(sourceConfig); sourceConfig.setName(key); if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){ - sourceConfig.setDataType(Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()))); + StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())); + sourceConfig.setDataType(schema); } sourceConfigMap.put(key, sourceConfig); }); @@ -63,8 +65,13 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); Map<String, String> options = sourceConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - TableFactory.Context context = new TableFactory.Context(sourceConfig.getDataType(), sourceConfig.getDataType(), options, configuration); + StructType dataType = sourceConfig.getDataType(); + TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); + if(dataType != null){ + System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString())); + System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString())); + } dataStream = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); if (node.getParallelism() > 0) { @@ -95,7 +102,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou return Long.parseLong(ms.toString()); } }; - } else if ("ms".equals(timestampUnit)) { + } else if ("s".equals(timestampUnit)) { timestampAssigner = (event, timestamp) -> { Object s = event.getExtractedFields().get(watermarkTimestamp); if(s instanceof Number){ diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java index 21ccb67..7a526ac 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java @@ -9,6 +9,13 @@ public class ArrayType extends DataType { @Override public String simpleString() { - return "array"; + return String.format("array<%s>", elementType.simpleString()); + } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + if (maxDepth > 0) { + sb.append(String.format("%s-- element: %s\n", prefix, elementType.typeName())); + Types.buildFormattedString(elementType, prefix + " |", sb, maxDepth); + } } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java index 20102aa..f357015 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java @@ -4,4 +4,17 @@ import java.io.Serializable; public abstract class DataType implements Serializable { public abstract String simpleString(); + + public String typeName(){ + String typeName = this.getClass().getSimpleName(); + if(typeName.endsWith("Type")){ + typeName = typeName.substring(0, typeName.length() - 4); + } + return typeName.toLowerCase(); + } + + @Override + public String toString() { + return simpleString(); + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java index f1c72db..b16530d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java @@ -1,17 +1,62 @@ package com.geedgenetworks.core.types; +import org.apache.commons.lang3.StringUtils; + import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; public class StructType extends DataType { public final StructField[] fields; public StructType(StructField[] fields) { this.fields = fields; + validateFields(fields); } @Override public String simpleString() { - return "struct"; + return String.format("struct<%s>", Arrays.stream(fields).map(f -> f.name + ":" + f.dataType.simpleString()).collect(Collectors.joining(", "))); + } + + public String treeString() { + return treeString(Integer.MAX_VALUE); + } + + public String treeString(int maxDepth) { + StringBuilder sb = new StringBuilder(); + sb.append("root\n"); + String prefix = " |"; + int depth = maxDepth > 0? maxDepth: Integer.MAX_VALUE; + for (StructField field : fields) { + field.buildFormattedString(prefix, sb, depth); + } + return sb.toString(); + } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + for (StructField field : fields) { + field.buildFormattedString(prefix, sb, maxDepth); + } + } + + private static void validateFields(StructField[] fields) { + List<String> fieldNames = Arrays.stream(fields).map(f -> f.name).collect(Collectors.toList()); + if (fieldNames.stream().anyMatch(StringUtils::isBlank)) { + throw new IllegalArgumentException("Field names must contain at least one non-whitespace character."); + } + + final Set<String> duplicates = + fieldNames.stream() + .filter(n -> Collections.frequency(fieldNames, n) > 1) + .collect(Collectors.toSet()); + if (!duplicates.isEmpty()) { + throw new IllegalArgumentException( + String.format("Field names must be unique. Found duplicates: %s", duplicates)); + } } public static final class StructField implements Serializable { @@ -22,6 +67,13 @@ public class StructType extends DataType { this.name = name; this.dataType = dataType; } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + if(maxDepth > 0){ + sb.append(String.format("%s-- %s: %s\n", prefix, name, dataType.typeName())); + Types.buildFormattedString(dataType, prefix + " |", sb, maxDepth); + } + } } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java index af02d4b..9f1069e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java @@ -5,6 +5,8 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.core.types.StructType.StructField; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -15,14 +17,17 @@ public class Types { public static final FloatType FLOAT = new FloatType(); public static final DoubleType DOUBLE = new DoubleType(); + public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE); + public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE); + public static StructType parseSchemaFromJson(String jsonFields) { JSONArray fieldArray = JSON.parseArray(jsonFields); StructField[] fields = new StructField[fieldArray.size()]; for (int i = 0; i < fieldArray.size(); i++) { JSONObject fieldObject = fieldArray.getJSONObject(i); - String name = fieldObject.getString("name"); - String type = fieldObject.getString("type").toLowerCase().trim(); + String name = fieldObject.getString("name").trim(); + String type = fieldObject.getString("type").trim(); DataType dataType = parseDataType(type); fields[i] = new StructField(name, dataType); } @@ -30,23 +35,22 @@ public class Types { return new StructType(fields); } - private static DataType parseDataType(String type){ - - if("int".equals(type)){ + static DataType parseDataType(String type){ + type = type.trim(); + if("int".equalsIgnoreCase(type)){ return INT; - } else if ("bigint".equals(type)){ + } else if ("bigint".equalsIgnoreCase(type)){ return BIGINT; - } else if ("string".equals(type)){ + } else if ("string".equalsIgnoreCase(type)){ return STRING; - } else if ("float".equals(type)){ + } else if ("float".equalsIgnoreCase(type)){ return FLOAT; - } else if ("double".equals(type)){ + } else if ("double".equalsIgnoreCase(type)){ return DOUBLE; } // array类型 - Pattern arrayRe = Pattern.compile("array<(.+)>", Pattern.CASE_INSENSITIVE ); - Matcher matcher = arrayRe.matcher(type); + Matcher matcher = ARRAY_RE.matcher(type); if(matcher.matches()){ String eleType = matcher.group(1); DataType elementType = parseDataType(eleType); @@ -54,8 +58,70 @@ public class Types { } // struct类型 - // TODO + matcher = STRUCT_RE.matcher(type); + if(matcher.matches()){ + List<StructField> fields = new ArrayList<>(); + String str = matcher.group(1); + int startPos = 0, endPos = -1; + int i = startPos + 1; + int level = 0; + while (i < str.length()){ + while (i < str.length()){ + if(str.charAt(i) == ':'){ + endPos = i; + break; + } + i++; + } + + if(endPos <= startPos){ + throw new UnsupportedOperationException("不支持的类型:" + type); + } + + String name = str.substring(startPos, endPos).trim(); + startPos = i + 1; + endPos = -1; + i = startPos + 1; + while (i < str.length()){ + if(str.charAt(i) == ',' && level == 0){ + endPos = i; + break; + } + if(str.charAt(i) == '<'){ + level++; + } + if(str.charAt(i) == '>'){ + level--; + } + i++; + } + + if(i == str.length()){ + endPos = i; + } + if(endPos <= startPos){ + throw new UnsupportedOperationException("不支持的类型:" + type); + } + + String tp = str.substring(startPos, endPos).trim(); + fields.add(new StructField(name, parseDataType(tp))); + + i++; + startPos = i; + endPos = -1; + } + + return new StructType(fields.toArray(new StructField[fields.size()])); + } throw new UnsupportedOperationException("不支持的类型:" + type); } + + static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){ + if(dataType instanceof ArrayType){ + ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); + } else if (dataType instanceof StructType) { + ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); + } + } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java new file mode 100644 index 0000000..5651a0d --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java @@ -0,0 +1,23 @@ +package com.geedgenetworks.core.types; + +public class TypesTest { + + public static void main(String[] args) { + TypesTest test = new TypesTest(); + test.parseDataType(); + } + + public void parseDataType(){ + String ddl = "struct< id: int, obj: struct <id:int, arr: array < int >, name:string>, name : string>"; + StructType dataType = (StructType)Types.parseDataType(ddl); + System.out.println(dataType); + System.out.println(dataType.treeString()); + + System.out.println(); + ddl = "struct< id: int, objs: array<struct<id:int, name:string>>, arr: array<int>, name : string>"; + dataType = (StructType)Types.parseDataType(ddl); + System.out.println(dataType); + System.out.println(dataType.treeString()); + } + +}
\ No newline at end of file |
