diff options
| author | lifengchao <[email protected]> | 2023-12-04 16:08:24 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-04 16:08:24 +0800 |
| commit | 6bb40af602031fd8bab30dd1680ff81f8b92d0bd (patch) | |
| tree | 680508d76ff60224ce145e2790da878a492f7e52 /groot-bootstrap | |
| parent | a6fc25fee94af1f44542b117362ae715e689ae29 (diff) | |
source支持配置解析struct类型
Diffstat (limited to 'groot-bootstrap')
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java index 3846aa7..7229a03 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java @@ -12,6 +12,7 @@ import com.geedgenetworks.core.factories.SourceTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.SourceConfig; +import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; @@ -46,7 +47,8 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou checkSourceConfig(sourceConfig); sourceConfig.setName(key); if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){ - sourceConfig.setDataType(Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()))); + StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())); + sourceConfig.setDataType(schema); } sourceConfigMap.put(key, sourceConfig); }); @@ -63,8 +65,13 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); Map<String, String> options = sourceConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - TableFactory.Context context = new TableFactory.Context(sourceConfig.getDataType(), sourceConfig.getDataType(), options, configuration); + StructType dataType = sourceConfig.getDataType(); + TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); + if(dataType != null){ + System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString())); + System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString())); + } dataStream = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); if (node.getParallelism() > 0) { @@ -95,7 +102,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou return Long.parseLong(ms.toString()); } }; - } else if ("ms".equals(timestampUnit)) { + } else if ("s".equals(timestampUnit)) { timestampAssigner = (event, timestamp) -> { Object s = event.getExtractedFields().get(watermarkTimestamp); if(s instanceof Number){ |
