diff options
| author | lifengchao <[email protected]> | 2024-03-13 09:58:36 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-03-13 09:58:36 +0800 |
| commit | 4e2a9108198a32d9ea491765dd8ddd1d1c163877 (patch) | |
| tree | 03e9d132739d39370ae2d222a33e886cdbb65688 /groot-bootstrap | |
| parent | 039d9c0d0bf081865a3fc8a9cdc4359e0bac6331 (diff) | |
[feature][connector] GAL-508 Source/Sink Connector 支持引用外部的schema, ck sink支持http动态schema
Diffstat (limited to 'groot-bootstrap')
3 files changed, 101 insertions, 17 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index 7aee40f..a0bedc9 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -2,21 +2,20 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; -import com.geedgenetworks.common.config.SourceConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.sink.SinkProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SinkTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.SinkConfig; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -70,8 +69,20 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); Map<String, String> options = sinkConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - TableFactory.Context context = new TableFactory.Context(null, null, options, configuration); + Schema schema = null; + if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){ + schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema()); + } + TableFactory.Context context = new TableFactory.Context(schema, options, configuration); SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); + if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ + throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName())); + } + if(schema != null){ + System.out.println(String.format("sink(%s) dataType:\n%s", sinkConfig.getName(), schema.getDataType().toString())); + System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString())); + } + DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream); if (node.getParallelism() > 0) { dataStreamSink.setParallelism(node.getParallelism()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index eee78a2..f46e3fc 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,26 +1,24 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SourceConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SourceTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.SourceConfig; -import com.geedgenetworks.core.types.StructType; -import com.geedgenetworks.core.types.Types; import com.google.common.collect.Maps; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -53,10 +51,6 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class); sourceConfig.setName(key); - if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){ - StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())); - sourceConfig.setDataType(schema); - } sourceConfigMap.put(key, sourceConfig); }); @@ -73,12 +67,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); Map<String, String> options = sourceConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - StructType dataType = sourceConfig.getDataType(); - TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration); + Schema schema = null; + if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){ + schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema()); + } + TableFactory.Context context = new TableFactory.Context(schema, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); - if(dataType != null){ - System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString())); - System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString())); + if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ + throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName())); + } + if(schema != null){ + System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), schema.getDataType().toString())); + System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString())); } sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java new file mode 100644 index 0000000..c3076b4 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java @@ -0,0 +1,73 @@ +package com.geedgenetworks.bootstrap.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaConfigParse {
+ static final String KEY_BUILTIN = "fields";
+ static final String KEY_LOCAL_FILE = "local_file";
+ static final String KEY_HTTP = "url";
+
+ public static Schema parseSchemaConfig(Map<String, Object> schemaConfig){
+ if(schemaConfig == null && schemaConfig.isEmpty()){
+ return null;
+ }
+
+ int builtin = 0, localFile = 0, http = 0;
+ if(schemaConfig.containsKey(KEY_BUILTIN)){
+ builtin = 1;
+ }
+ if(schemaConfig.containsKey(KEY_LOCAL_FILE)){
+ localFile = 1;
+ }
+ if(schemaConfig.containsKey(KEY_HTTP)){
+ http = 1;
+ }
+ if(builtin + localFile + http > 1){
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support one type schema:" + schemaConfig);
+ }
+
+ if(builtin == 1){
+ Object fields = schemaConfig.get(KEY_BUILTIN);
+ if(fields instanceof List){
+ StructType dataType = Types.parseSchemaFromJson(JSON.toJSONString(fields));
+ return Schema.newSchema(dataType);
+ }else if(fields instanceof String){
+ StructType dataType = Types.parseStructType((String) fields);
+ return Schema.newSchema(dataType);
+ }else{
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support schema fields:" + fields);
+ }
+ }
+
+ if(localFile == 1){
+ String path = schemaConfig.get(KEY_LOCAL_FILE).toString();
+ try {
+ String content = FileUtils.readFileToString(new File(path), StandardCharsets.UTF_8);
+ StructType dataType = SchemaParser.PARSER_AVRO.parser(content);
+ return Schema.newSchema(dataType);
+ } catch (IOException e) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "schema path read error:" + path, e);
+ }
+ }
+
+ if(http == 1){
+ String url = schemaConfig.get(KEY_HTTP).toString();
+ return Schema.newHttpDynamicSchema(url);
+ }
+
+ return null;
+ }
+}
|
