summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-04 16:08:24 +0800
committerlifengchao <[email protected]>2023-12-04 16:08:24 +0800
commit6bb40af602031fd8bab30dd1680ff81f8b92d0bd (patch)
tree680508d76ff60224ce145e2790da878a492f7e52
parenta6fc25fee94af1f44542b117362ae715e689ae29 (diff)
source支持配置解析struct类型
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java54
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/Types.java90
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java23
6 files changed, 185 insertions, 17 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
index 3846aa7..7229a03 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
@@ -12,6 +12,7 @@ import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.SourceConfig;
+import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.core.types.Types;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
@@ -46,7 +47,8 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
checkSourceConfig(sourceConfig);
sourceConfig.setName(key);
if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
- sourceConfig.setDataType(Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())));
+ StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
+ sourceConfig.setDataType(schema);
}
sourceConfigMap.put(key, sourceConfig);
});
@@ -63,8 +65,13 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(sourceConfig.getDataType(), sourceConfig.getDataType(), options, configuration);
+ StructType dataType = sourceConfig.getDataType();
+ TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
+ if(dataType != null){
+ System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString()));
+ System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString()));
+ }
dataStream = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
if (node.getParallelism() > 0) {
@@ -95,7 +102,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
return Long.parseLong(ms.toString());
}
};
- } else if ("ms".equals(timestampUnit)) {
+ } else if ("s".equals(timestampUnit)) {
timestampAssigner = (event, timestamp) -> {
Object s = event.getExtractedFields().get(watermarkTimestamp);
if(s instanceof Number){
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java
index 21ccb67..7a526ac 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/ArrayType.java
@@ -9,6 +9,13 @@ public class ArrayType extends DataType {
@Override
public String simpleString() {
- return "array";
+ return String.format("array<%s>", elementType.simpleString());
+ }
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ if (maxDepth > 0) {
+ sb.append(String.format("%s-- element: %s\n", prefix, elementType.typeName()));
+ Types.buildFormattedString(elementType, prefix + " |", sb, maxDepth);
+ }
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java
index 20102aa..f357015 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/DataType.java
@@ -4,4 +4,17 @@ import java.io.Serializable;
public abstract class DataType implements Serializable {
public abstract String simpleString();
+
+ public String typeName(){
+ String typeName = this.getClass().getSimpleName();
+ if(typeName.endsWith("Type")){
+ typeName = typeName.substring(0, typeName.length() - 4);
+ }
+ return typeName.toLowerCase();
+ }
+
+ @Override
+ public String toString() {
+ return simpleString();
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java
index f1c72db..b16530d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/StructType.java
@@ -1,17 +1,62 @@
package com.geedgenetworks.core.types;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
public class StructType extends DataType {
public final StructField[] fields;
public StructType(StructField[] fields) {
this.fields = fields;
+ validateFields(fields);
}
@Override
public String simpleString() {
- return "struct";
+ return String.format("struct<%s>", Arrays.stream(fields).map(f -> f.name + ":" + f.dataType.simpleString()).collect(Collectors.joining(", ")));
+ }
+
+ public String treeString() {
+ return treeString(Integer.MAX_VALUE);
+ }
+
+ public String treeString(int maxDepth) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("root\n");
+ String prefix = " |";
+ int depth = maxDepth > 0? maxDepth: Integer.MAX_VALUE;
+ for (StructField field : fields) {
+ field.buildFormattedString(prefix, sb, depth);
+ }
+ return sb.toString();
+ }
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ for (StructField field : fields) {
+ field.buildFormattedString(prefix, sb, maxDepth);
+ }
+ }
+
+ private static void validateFields(StructField[] fields) {
+ List<String> fieldNames = Arrays.stream(fields).map(f -> f.name).collect(Collectors.toList());
+ if (fieldNames.stream().anyMatch(StringUtils::isBlank)) {
+ throw new IllegalArgumentException("Field names must contain at least one non-whitespace character.");
+ }
+
+ final Set<String> duplicates =
+ fieldNames.stream()
+ .filter(n -> Collections.frequency(fieldNames, n) > 1)
+ .collect(Collectors.toSet());
+ if (!duplicates.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Field names must be unique. Found duplicates: %s", duplicates));
+ }
}
public static final class StructField implements Serializable {
@@ -22,6 +67,13 @@ public class StructType extends DataType {
this.name = name;
this.dataType = dataType;
}
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ if(maxDepth > 0){
+ sb.append(String.format("%s-- %s: %s\n", prefix, name, dataType.typeName()));
+ Types.buildFormattedString(dataType, prefix + " |", sb, maxDepth);
+ }
+ }
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
index af02d4b..9f1069e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
@@ -5,6 +5,8 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.core.types.StructType.StructField;
+import java.util.ArrayList;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -15,14 +17,17 @@ public class Types {
public static final FloatType FLOAT = new FloatType();
public static final DoubleType DOUBLE = new DoubleType();
+ public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+ public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+
public static StructType parseSchemaFromJson(String jsonFields) {
JSONArray fieldArray = JSON.parseArray(jsonFields);
StructField[] fields = new StructField[fieldArray.size()];
for (int i = 0; i < fieldArray.size(); i++) {
JSONObject fieldObject = fieldArray.getJSONObject(i);
- String name = fieldObject.getString("name");
- String type = fieldObject.getString("type").toLowerCase().trim();
+ String name = fieldObject.getString("name").trim();
+ String type = fieldObject.getString("type").trim();
DataType dataType = parseDataType(type);
fields[i] = new StructField(name, dataType);
}
@@ -30,23 +35,22 @@ public class Types {
return new StructType(fields);
}
- private static DataType parseDataType(String type){
-
- if("int".equals(type)){
+ static DataType parseDataType(String type){
+ type = type.trim();
+ if("int".equalsIgnoreCase(type)){
return INT;
- } else if ("bigint".equals(type)){
+ } else if ("bigint".equalsIgnoreCase(type)){
return BIGINT;
- } else if ("string".equals(type)){
+ } else if ("string".equalsIgnoreCase(type)){
return STRING;
- } else if ("float".equals(type)){
+ } else if ("float".equalsIgnoreCase(type)){
return FLOAT;
- } else if ("double".equals(type)){
+ } else if ("double".equalsIgnoreCase(type)){
return DOUBLE;
}
// array类型
- Pattern arrayRe = Pattern.compile("array<(.+)>", Pattern.CASE_INSENSITIVE );
- Matcher matcher = arrayRe.matcher(type);
+ Matcher matcher = ARRAY_RE.matcher(type);
if(matcher.matches()){
String eleType = matcher.group(1);
DataType elementType = parseDataType(eleType);
@@ -54,8 +58,70 @@ public class Types {
}
// struct类型
- // TODO
+ matcher = STRUCT_RE.matcher(type);
+ if(matcher.matches()){
+ List<StructField> fields = new ArrayList<>();
+ String str = matcher.group(1);
+ int startPos = 0, endPos = -1;
+ int i = startPos + 1;
+ int level = 0;
+ while (i < str.length()){
+ while (i < str.length()){
+ if(str.charAt(i) == ':'){
+ endPos = i;
+ break;
+ }
+ i++;
+ }
+
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("不支持的类型:" + type);
+ }
+
+ String name = str.substring(startPos, endPos).trim();
+ startPos = i + 1;
+ endPos = -1;
+ i = startPos + 1;
+ while (i < str.length()){
+ if(str.charAt(i) == ',' && level == 0){
+ endPos = i;
+ break;
+ }
+ if(str.charAt(i) == '<'){
+ level++;
+ }
+ if(str.charAt(i) == '>'){
+ level--;
+ }
+ i++;
+ }
+
+ if(i == str.length()){
+ endPos = i;
+ }
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("不支持的类型:" + type);
+ }
+
+ String tp = str.substring(startPos, endPos).trim();
+ fields.add(new StructField(name, parseDataType(tp)));
+
+ i++;
+ startPos = i;
+ endPos = -1;
+ }
+
+ return new StructType(fields.toArray(new StructField[fields.size()]));
+ }
throw new UnsupportedOperationException("不支持的类型:" + type);
}
+
+ static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
+ if(dataType instanceof ArrayType){
+ ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ } else if (dataType instanceof StructType) {
+ ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ }
+ }
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
new file mode 100644
index 0000000..5651a0d
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
@@ -0,0 +1,23 @@
+package com.geedgenetworks.core.types;
+
+public class TypesTest {
+
+ public static void main(String[] args) {
+ TypesTest test = new TypesTest();
+ test.parseDataType();
+ }
+
+ public void parseDataType(){
+ String ddl = "struct< id: int, obj: struct <id:int, arr: array < int >, name:string>, name : string>";
+ StructType dataType = (StructType)Types.parseDataType(ddl);
+ System.out.println(dataType);
+ System.out.println(dataType.treeString());
+
+ System.out.println();
+ ddl = "struct< id: int, objs: array<struct<id:int, name:string>>, arr: array<int>, name : string>";
+ dataType = (StructType)Types.parseDataType(ddl);
+ System.out.println(dataType);
+ System.out.println(dataType.treeString());
+ }
+
+} \ No newline at end of file