summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-03-13 09:58:36 +0800
committerlifengchao <[email protected]>2024-03-13 09:58:36 +0800
commit4e2a9108198a32d9ea491765dd8ddd1d1c163877 (patch)
tree03e9d132739d39370ae2d222a33e886cdbb65688 /groot-bootstrap
parent039d9c0d0bf081865a3fc8a9cdc4359e0bac6331 (diff)
[feature][connector] GAL-508 Source/Sink Connector 支持引用外部的schema, ck sink支持http动态schema
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java73
3 files changed, 101 insertions, 17 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index 7aee40f..a0bedc9 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -2,21 +2,20 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
-import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SinkTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SinkConfig;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -70,8 +69,20 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ Schema schema = null;
+ if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+ if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("sink(%s) dataType:\n%s", sinkConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString()));
+ }
+
DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
if (node.getParallelism() > 0) {
dataStreamSink.setParallelism(node.getParallelism());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index eee78a2..f46e3fc 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,26 +1,24 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SourceConfig;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -53,10 +51,6 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class);
sourceConfig.setName(key);
- if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
- StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
- sourceConfig.setDataType(schema);
- }
sourceConfigMap.put(key, sourceConfig);
});
@@ -73,12 +67,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- StructType dataType = sourceConfig.getDataType();
- TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration);
+ Schema schema = null;
+ if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
- if(dataType != null){
- System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString()));
- System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString()));
+ if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
new file mode 100644
index 0000000..c3076b4
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
@@ -0,0 +1,73 @@
+package com.geedgenetworks.bootstrap.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaConfigParse {
+ static final String KEY_BUILTIN = "fields";
+ static final String KEY_LOCAL_FILE = "local_file";
+ static final String KEY_HTTP = "url";
+
+ public static Schema parseSchemaConfig(Map<String, Object> schemaConfig){
+ if(schemaConfig == null && schemaConfig.isEmpty()){
+ return null;
+ }
+
+ int builtin = 0, localFile = 0, http = 0;
+ if(schemaConfig.containsKey(KEY_BUILTIN)){
+ builtin = 1;
+ }
+ if(schemaConfig.containsKey(KEY_LOCAL_FILE)){
+ localFile = 1;
+ }
+ if(schemaConfig.containsKey(KEY_HTTP)){
+ http = 1;
+ }
+ if(builtin + localFile + http > 1){
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support one type schema:" + schemaConfig);
+ }
+
+ if(builtin == 1){
+ Object fields = schemaConfig.get(KEY_BUILTIN);
+ if(fields instanceof List){
+ StructType dataType = Types.parseSchemaFromJson(JSON.toJSONString(fields));
+ return Schema.newSchema(dataType);
+ }else if(fields instanceof String){
+ StructType dataType = Types.parseStructType((String) fields);
+ return Schema.newSchema(dataType);
+ }else{
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support schema fields:" + fields);
+ }
+ }
+
+ if(localFile == 1){
+ String path = schemaConfig.get(KEY_LOCAL_FILE).toString();
+ try {
+ String content = FileUtils.readFileToString(new File(path), StandardCharsets.UTF_8);
+ StructType dataType = SchemaParser.PARSER_AVRO.parser(content);
+ return Schema.newSchema(dataType);
+ } catch (IOException e) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "schema path read error:" + path, e);
+ }
+ }
+
+ if(http == 1){
+ String url = schemaConfig.get(KEY_HTTP).toString();
+ return Schema.newHttpDynamicSchema(url);
+ }
+
+ return null;
+ }
+}