summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-04 16:08:24 +0800
committerlifengchao <[email protected]>2023-12-04 16:08:24 +0800
commit6bb40af602031fd8bab30dd1680ff81f8b92d0bd (patch)
tree680508d76ff60224ce145e2790da878a492f7e52 /groot-bootstrap
parenta6fc25fee94af1f44542b117362ae715e689ae29 (diff)
source支持配置解析struct类型
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java13
1 files changed, 10 insertions, 3 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
index 3846aa7..7229a03 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
@@ -12,6 +12,7 @@ import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.SourceConfig;
+import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.core.types.Types;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
@@ -46,7 +47,8 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
checkSourceConfig(sourceConfig);
sourceConfig.setName(key);
if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
- sourceConfig.setDataType(Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())));
+ StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
+ sourceConfig.setDataType(schema);
}
sourceConfigMap.put(key, sourceConfig);
});
@@ -63,8 +65,13 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(sourceConfig.getDataType(), sourceConfig.getDataType(), options, configuration);
+ StructType dataType = sourceConfig.getDataType();
+ TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
+ if(dataType != null){
+ System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString()));
+ System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString()));
+ }
dataStream = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
if (node.getParallelism() > 0) {
@@ -95,7 +102,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
return Long.parseLong(ms.toString());
}
};
- } else if ("ms".equals(timestampUnit)) {
+ } else if ("s".equals(timestampUnit)) {
timestampAssigner = (event, timestamp) -> {
Object s = event.getExtractedFields().get(watermarkTimestamp);
if(s instanceof Number){