summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java73
-rw-r--r--groot-common/pom.xml5
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java11
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java17
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java71
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java35
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java112
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java21
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java148
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java95
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java77
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/Types.java277
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java22
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java218
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java9
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java176
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java19
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java79
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java4
-rw-r--r--pom.xml6
31 files changed, 1433 insertions, 256 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index 7aee40f..a0bedc9 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -2,21 +2,20 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
-import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SinkTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SinkConfig;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -70,8 +69,20 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ Schema schema = null;
+ if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+ if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("sink(%s) dataType:\n%s", sinkConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString()));
+ }
+
DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
if (node.getParallelism() > 0) {
dataStreamSink.setParallelism(node.getParallelism());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index eee78a2..f46e3fc 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,26 +1,24 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SourceConfig;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -53,10 +51,6 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class);
sourceConfig.setName(key);
- if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
- StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
- sourceConfig.setDataType(schema);
- }
sourceConfigMap.put(key, sourceConfig);
});
@@ -73,12 +67,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- StructType dataType = sourceConfig.getDataType();
- TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration);
+ Schema schema = null;
+ if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
- if(dataType != null){
- System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString()));
- System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString()));
+ if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
new file mode 100644
index 0000000..c3076b4
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
@@ -0,0 +1,73 @@
+package com.geedgenetworks.bootstrap.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaConfigParse {
+ static final String KEY_BUILTIN = "fields";
+ static final String KEY_LOCAL_FILE = "local_file";
+ static final String KEY_HTTP = "url";
+
+ public static Schema parseSchemaConfig(Map<String, Object> schemaConfig){
+ if(schemaConfig == null && schemaConfig.isEmpty()){
+ return null;
+ }
+
+ int builtin = 0, localFile = 0, http = 0;
+ if(schemaConfig.containsKey(KEY_BUILTIN)){
+ builtin = 1;
+ }
+ if(schemaConfig.containsKey(KEY_LOCAL_FILE)){
+ localFile = 1;
+ }
+ if(schemaConfig.containsKey(KEY_HTTP)){
+ http = 1;
+ }
+ if(builtin + localFile + http > 1){
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support one type schema:" + schemaConfig);
+ }
+
+ if(builtin == 1){
+ Object fields = schemaConfig.get(KEY_BUILTIN);
+ if(fields instanceof List){
+ StructType dataType = Types.parseSchemaFromJson(JSON.toJSONString(fields));
+ return Schema.newSchema(dataType);
+ }else if(fields instanceof String){
+ StructType dataType = Types.parseStructType((String) fields);
+ return Schema.newSchema(dataType);
+ }else{
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support schema fields:" + fields);
+ }
+ }
+
+ if(localFile == 1){
+ String path = schemaConfig.get(KEY_LOCAL_FILE).toString();
+ try {
+ String content = FileUtils.readFileToString(new File(path), StandardCharsets.UTF_8);
+ StructType dataType = SchemaParser.PARSER_AVRO.parser(content);
+ return Schema.newSchema(dataType);
+ } catch (IOException e) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "schema path read error:" + path, e);
+ }
+ }
+
+ if(http == 1){
+ String url = schemaConfig.get(KEY_HTTP).toString();
+ return Schema.newHttpDynamicSchema(url);
+ }
+
+ return null;
+ }
+}
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index 8ff4910..d6463cb 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -42,6 +42,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>galaxy</artifactId>
<exclusions>
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
index 9462808..96a73b9 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.connectors.clickhouse;
import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
@@ -32,8 +33,7 @@ public class ClickHouseTableFactory implements SinkTableFactory {
helper.validateExcept(CONNECTION_INFO_PREFIX); // 校验参数
- // sink暂时没有dataType
- //StructType dataType = context.getSchema();
+ Schema schema = context.getSchema();
ReadableConfig config = context.getConfiguration();
String host = config.get(HOST);
@@ -42,13 +42,18 @@ public class ClickHouseTableFactory implements SinkTableFactory {
long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis();
Properties connInfo = getClickHouseConnInfo(context.getOptions());
- final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, connInfo);
+ final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchIntervalMs, host, table, connInfo);
return new SinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
return dataStream.addSink(sinkFunction);
}
+
+ @Override
+ public boolean supportDynamicSchema() {
+ return true;
+ }
};
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index ce58cda..18ce5b4 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -196,6 +196,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
void onInit(Configuration parameters) throws Exception {}
+ void onClose() throws Exception {}
abstract boolean addBatch(Block batch, T data) throws Exception;
@@ -327,7 +328,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
closed = true;
if (outThread != null) {
- outThread.join();
+ try {
+ outThread.join();
+ } catch (Exception e) {
+
+ }
}
// init中可能抛出异常
@@ -340,13 +345,19 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
// 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。
// ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。
- } catch (Exception e) {
- flushException = e;
+ } catch (Throwable t) {
+ if(t instanceof Exception){
+ flushException = (Exception) t;
+ }else{
+ flushException = new Exception(t);
+ }
} finally {
lock.unlock();
}
}
+ onClose();
+
LOG.warn("ck_sink_close_end");
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index 497f14c..c38324a 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -3,24 +3,51 @@ package com.geedgenetworks.connectors.clickhouse.sink;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.types.StructType;
import com.github.housepower.data.Block;
+import org.apache.flink.configuration.Configuration;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
+import java.util.stream.Collectors;
-public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> {
+public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> implements SchemaChangeAware {
+ private Schema schema;
+ private Set<String> disabledFields;
+ private String simpleName;
- public EventBatchIntervalClickHouseSink(
- int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
+ public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
super(batchSize, batchIntervalMs, host, table, connInfo);
+ this.schema = schema;
+ }
+
+ @Override
+ void onInit(Configuration parameters) throws Exception {
+ super.onInit(parameters);
+ simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask();
+ if(schema != null){
+ updateDisabledFields(schema.getDataType());
+ if(schema.isDynamic()){
+ Schema.registerSchemaChangeAware(schema, this);
+ }
+ }
}
@Override
boolean addBatch(Block batch, Event event) throws Exception {
Map<String, Object> map = event.getExtractedFields();
+ String columnName;
Object value;
for (int i = 0; i < columnNames.length; i++) {
- value = map.get(columnNames[i]);
+ columnName = columnNames[i];
+ if(disabledFields != null && disabledFields.contains(columnName)){
+ value = columnDefaultValues[i];
+ batch.setObject(i, value); // 默认值不用转换
+ continue;
+ }
+
+ value = map.get(columnName);
if (value == null) {
value = columnDefaultValues[i];
@@ -39,4 +66,36 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
return true;
}
+
+ @Override
+ public void schemaChange(StructType dataType) {
+ if(schema != null && schema.isDynamic()){
+ updateDisabledFields(dataType);
+ }
+ }
+
+ @Override
+ void onClose() throws Exception {
+ super.onClose();
+ if(schema != null && schema.isDynamic()){
+ Schema.unregisterSchemaChangeAware(schema, this);
+ }
+ }
+
+ private void updateDisabledFields(StructType dataType){
+ Set<String> disabledFields = new HashSet<>();
+ Set<String> names = Arrays.stream(dataType.fields).map(x -> x.name).collect(Collectors.toSet());
+ for (String columnName : this.columnNames) {
+ if(!names.contains(columnName)){
+ disabledFields.add(columnName);
+ }
+ }
+ LOG.info("disabledFields: {}", disabledFields);
+ this.disabledFields = disabledFields.isEmpty()?null: disabledFields;
+ }
+
+ @Override
+ public String toString() {
+ return simpleName;
+ }
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index b26c161..8829076 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -51,7 +51,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
- StructType dataType = context.getSchema();
+ StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
String topic = config.get(TOPIC).get(0);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
index 5882c9d..3bc4910 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
@@ -37,7 +37,7 @@ public class PrintTableFactory implements SinkTableFactory {
helper.validate(); // 校验参数
- StructType dataType = context.getSchema();
+ StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
PrintMode printMode = Optional.ofNullable(PrintMode.fromName(config.get(MODE))).orElse(STDOUT);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java
new file mode 100644
index 0000000..1b4f755
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public abstract class DynamicSchema implements Schema {
+ protected SchemaParser.Parser parser;
+ protected StructType dataType;
+ private String contentMd5;
+ protected final long intervalMs;
+
+ public DynamicSchema(SchemaParser.Parser parser, long intervalMs) {
+ this.parser = parser;
+ this.intervalMs = intervalMs;
+ }
+
+ public abstract String getCacheKey();
+ protected abstract String getDataTypeContent();
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ protected boolean parseDataType(String _content){
+ checkArgument(StringUtils.isNotBlank(_content), "DataType is null");
+ String _contentMd5 = computeMd5(_content);
+ if(_contentMd5.equals(contentMd5)){
+ return false;
+ }
+
+ StructType type;
+ if(dataType == null){
+ type = parser.parser(_content);
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+
+ type = parser.parser(_content);
+ if(dataType.equals(type)){
+ return false;
+ }else{
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+ }
+
+ // 更新并返回更新后的dataType, 如果没有更新返回null
+ public StructType updateDataType(){
+ String content = getDataTypeContent();
+ if(StringUtils.isBlank(content)){
+ return null;
+ }
+ if(parseDataType(content)){
+ return dataType;
+ }
+ return null;
+ }
+
+ final public void registerSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.registerSchemaChangeAware(this, aware);
+ }
+
+ final public void unregisterSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.unregisterSchemaChangeAware(this, aware);
+ }
+
+ String computeMd5(String text){
+ return DigestUtils.md5Hex(text);
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return true;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java
new file mode 100644
index 0000000..5eb6b87
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.utils.HttpClientPoolUtil;
+import com.geedgenetworks.shaded.org.apache.http.Header;
+import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class HttpDynamicSchema extends DynamicSchema{
+ static final Logger LOG = LoggerFactory.getLogger(HttpDynamicSchema.class);
+ private static final Header header = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
+ private final String url;
+ private final String key;
+ public HttpDynamicSchema(String url, SchemaParser.Parser parser, long intervalMs) {
+ super(parser, intervalMs);
+ checkNotNull(url);
+ this.url = url;
+ this.key = String.format("%s_%s", url, TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs)));
+ parseDataType(getDataTypeContent());
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ try {
+ String response = HttpClientPoolUtil.getInstance().httpGet(URI.create(url), header);
+ return response;
+ } catch (Exception e) {
+ LOG.error("request " + url + " error", e);
+ return null;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java
new file mode 100644
index 0000000..6bd6764
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java
@@ -0,0 +1,35 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public interface Schema extends Serializable {
+ StructType getDataType();
+
+ boolean isDynamic();
+
+ public static Schema newSchema(StructType dataType){
+ return new StaticSchema(dataType);
+ }
+
+ public static Schema newHttpDynamicSchema(String url){
+ HttpDynamicSchema dynamicSchema = new HttpDynamicSchema(url, SchemaParser.PARSER_AVRO, 1000 * 60 * 30);
+ checkArgument(dynamicSchema.getDataType() != null);
+ return dynamicSchema;
+ }
+
+ public static void registerSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.registerSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+
+ public static void unregisterSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.unregisterSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java
new file mode 100644
index 0000000..a70df24
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public interface SchemaChangeAware {
+ void schemaChange(StructType dataType);
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java
new file mode 100644
index 0000000..a2fcc21
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java
@@ -0,0 +1,112 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.ArrayType;
+import com.geedgenetworks.core.types.DataType;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.StructType.StructField;
+import com.geedgenetworks.core.types.Types;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaParser {
+ public static final String TYPE_BUILTIN = "builtin";
+ public static final String TYPE_AVRO = "avro";
+
+ public static final Parser PARSER_BUILTIN = new BuiltinParser();
+ public static final Parser PARSER_AVRO = new AvroParser();
+
+
+ public static Parser getParser(String type){
+ if(TYPE_BUILTIN.equals(type)){
+ return PARSER_BUILTIN;
+ }else if(TYPE_AVRO.equals(type)){
+ return PARSER_AVRO;
+ }
+
+ throw new UnsupportedOperationException("not supported parser:" + type);
+ }
+
+ public static class BuiltinParser implements Parser{
+ @Override
+ public StructType parser(String content){
+ if(JSON.isValidArray(content)){
+ return Types.parseSchemaFromJson(content);
+ }else{
+ return Types.parseStructType(content);
+ }
+ // throw new IllegalArgumentException("can not parse schema for:" + content);
+ }
+ }
+
+ public static class AvroParser implements Parser{
+ @Override
+ public StructType parser(String content) {
+ org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(content);
+ Set<String> disabledFields = getDisabledFields(JSON.parseObject(content).getJSONArray("fields"));
+ return convert2StructType(schema, disabledFields);
+ }
+
+ private StructType convert2StructType(org.apache.avro.Schema schema, Set<String> disabledFields){
+ List<org.apache.avro.Schema.Field> fields = schema.getFields();
+ List<StructField> _fields = new ArrayList<>(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).name();
+ if(disabledFields.contains(fieldName)){
+ continue;
+ }
+ org.apache.avro.Schema fieldSchema = fields.get(i).schema();
+ _fields.add(new StructField(fieldName, convert(fieldSchema)));
+ }
+ return new StructType(_fields.toArray(new StructField[_fields.size()]));
+ }
+
+ private DataType convert(org.apache.avro.Schema schema){
+ switch (schema.getType()){
+ case INT:
+ return Types.INT;
+ case LONG:
+ return Types.BIGINT;
+ case FLOAT:
+ return Types.FLOAT;
+ case DOUBLE:
+ return Types.DOUBLE;
+ case BOOLEAN:
+ return Types.BOOLEAN;
+ case STRING:
+ return Types.STRING;
+ case BYTES:
+ return Types.BINARY;
+ case ARRAY:
+ return new ArrayType(convert(schema.getElementType()));
+ case RECORD:
+ return convert2StructType(schema, Collections.EMPTY_SET);
+ default:
+ throw new UnsupportedOperationException(schema.toString());
+ }
+ }
+
+ private Set<String> getDisabledFields(JSONArray fields){
+ Set<String> disabledFields = new HashSet<>();
+ JSONObject fieldObject;
+ JSONObject doc;
+ for (int i = 0; i < fields.size(); i++) {
+ fieldObject = fields.getJSONObject(i);
+ doc = fieldObject.getJSONObject("doc");
+ // 过滤禁用的字段
+ if(doc != null && "disabled".equals(doc.getString("visibility"))){
+ disabledFields.add(fieldObject.getString("name"));
+ }
+ }
+ return disabledFields;
+ }
+ }
+
+ public interface Parser extends Serializable {
+ StructType parser(String content);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java
new file mode 100644
index 0000000..ab6893d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java
@@ -0,0 +1,21 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public class StaticSchema implements Schema{
+ private final StructType dataType;
+
+ public StaticSchema(StructType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return false;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java
new file mode 100644
index 0000000..0ee04d2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java
@@ -0,0 +1,148 @@
+package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+public class DynamicSchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManager.class);
+ private static final Map<String, DynamicSchemaWithAwares> registeredSchemaWithAwares = new LinkedHashMap<>();
+ private static ScheduledExecutorService scheduler = null;
+
+ // 注册某个dynamicSchema的监听感知
+ public static synchronized void registerSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ schemaWithAwares = new DynamicSchemaWithAwares(dynamicSchema);
+ schedule(schemaWithAwares);
+ registeredSchemaWithAwares.put(key, schemaWithAwares);
+ LOG.info("start schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+
+ for (SchemaChangeAware registeredAware : schemaWithAwares.awares) {
+ if(registeredAware == aware){
+ LOG.error("aware({}) for {} has already registered", aware, key);
+ return;
+ }
+ }
+
+ schemaWithAwares.awares.add(aware);
+ LOG.info("register aware({}) for {}", aware, key);
+ }
+
+ // 注销某个dynamicSchema的监听感知
+ public static synchronized void unregisterSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ LOG.error("not register aware for {}", key);
+ return;
+ }
+
+ Iterator<SchemaChangeAware> iter = schemaWithAwares.awares.iterator();
+ SchemaChangeAware registeredAware;
+ boolean find = false;
+ while (iter.hasNext()){
+ registeredAware = iter.next();
+ if(registeredAware == aware){
+ iter.remove();
+ find = true;
+ break;
+ }
+ }
+
+ if(find){
+ LOG.info("unregister aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ if(schemaWithAwares.awares.isEmpty()){
+ registeredSchemaWithAwares.remove(key);
+ LOG.info("stop schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+ if(registeredSchemaWithAwares.isEmpty()){
+ destroySchedule();
+ }
+ }else{
+ LOG.error("can not find register aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ }
+ }
+
+ private static void schedule(DynamicSchemaWithAwares schemaWithAwares){
+ if(scheduler == null){
+ scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("DynamicSchemaUpdateScheduler"));
+ LOG.info("create SchemaUpdateScheduler");
+ }
+ scheduler.schedule(schemaWithAwares, schemaWithAwares.dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private static void destroySchedule(){
+ if(scheduler != null){
+ try {
+ scheduler.shutdownNow();
+ LOG.info("destroy SchemaUpdateScheduler");
+ } catch (Exception e) {
+ LOG.error("shutdown error", e);
+ }
+ scheduler = null;
+ }
+ }
+
+ private static class DynamicSchemaWithAwares implements Runnable{
+ DynamicSchema dynamicSchema;
+ private List<SchemaChangeAware> awares;
+
+ public DynamicSchemaWithAwares(DynamicSchema dynamicSchema) {
+ this.dynamicSchema = dynamicSchema;
+ awares = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ if(awares.isEmpty()){
+ return;
+ }
+
+ try {
+ update();
+ } catch (Throwable e) {
+ LOG.error("schema update error", e);
+ }
+
+ if(!awares.isEmpty()){
+ scheduler.schedule(this, dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void update() {
+ StructType dataType = dynamicSchema.updateDataType();
+ // 距离上次没有更新
+ if(dataType == null){
+ return;
+ }
+
+ LOG.warn("schema for {} change to:{}", dynamicSchema.getCacheKey(), dataType.simpleString());
+ for (SchemaChangeAware aware : awares) {
+ try {
+ aware.schemaChange(dataType);
+ } catch (Exception e) {
+ LOG.error("schema change aware error", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
index ba31bf9..f143f7f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
@@ -8,4 +8,8 @@ import java.io.Serializable;
public interface SinkProvider extends Serializable {
DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream);
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
index f183ee9..4fc08dd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
@@ -15,4 +15,8 @@ public interface SourceProvider extends Serializable {
default StructType schema(){
return getPhysicalDataType();
}
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
index 69c462d..affeead 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
@@ -1,39 +1,56 @@
-package com.geedgenetworks.core.factories;
-
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.Configuration;
-
-import java.util.Map;
-
-public interface TableFactory extends Factory {
-
- public static class Context {
- private final StructType schema;
- private final StructType physicalDataType;
- private final Map<String, String> options;
- private final Configuration configuration;
-
- public Context(StructType schema, StructType physicalDataType, Map<String, String> options, Configuration configuration) {
- this.schema = schema;
- this.physicalDataType = physicalDataType;
- this.options = options;
- this.configuration = configuration;
- }
-
- public StructType getSchema() {
- return schema;
- }
-
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
-
- public Map<String, String> getOptions() {
- return options;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
- }
-}
+package com.geedgenetworks.core.factories;
+
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Map;
+
+public interface TableFactory extends Factory {
+
+ public static class Context {
+ private final Schema schema;
+ private final Map<String, String> options;
+ private final Configuration configuration;
+
+ public Context(Schema schema, Map<String, String> options, Configuration configuration) {
+ this.schema = schema;
+ this.options = options;
+ this.configuration = configuration;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public StructType getPhysicalDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public StructType getDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
index 2818474..66275d9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
@@ -1,34 +1,43 @@
-package com.geedgenetworks.core.pojo;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class SinkConfig implements Serializable {
- private String type;
- private Map<String, String> properties;
- private String name;
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-}
+package com.geedgenetworks.core.pojo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class SinkConfig implements Serializable {
+ private String type;
+ private Map<String, Object> schema;
+ private Map<String, String> properties;
+ private String name;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, Object> getSchema() {
+ return schema;
+ }
+
+ public void setSchema(Map<String, Object> schema) {
+ this.schema = schema;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
index 99e2d85..6d019e9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
@@ -8,8 +8,7 @@ import java.util.Map;
public class SourceConfig implements Serializable {
private String type;
- private List<Object> fields;
- private StructType dataType;
+ private Map<String, Object> schema;
private Map<String, String> properties;
private String name;
public String getType() {
@@ -20,20 +19,12 @@ public class SourceConfig implements Serializable {
this.type = type;
}
- public List<Object> getFields() {
- return fields;
+ public Map<String, Object> getSchema() {
+ return schema;
}
- public void setFields(List<Object> fields) {
- this.fields = fields;
- }
-
- public StructType getDataType() {
- return dataType;
- }
-
- public void setDataType(StructType dataType) {
- this.dataType = dataType;
+ public void setSchema(Map<String, Object> schema) {
+ this.schema = schema;
}
public Map<String, String> getProperties() {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
index 0bd1544..7cc3d3a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
@@ -1,133 +1,144 @@
-package com.geedgenetworks.core.types;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.core.types.StructType.StructField;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class Types {
- public static final IntegerType INT = new IntegerType();
- public static final LongType BIGINT = new LongType();
- public static final StringType STRING = new StringType();
- public static final FloatType FLOAT = new FloatType();
- public static final DoubleType DOUBLE = new DoubleType();
- public static final BooleanType BOOLEAN = new BooleanType();
- public static final BinaryType BINARY = new BinaryType();
-
- public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
- public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
-
- public static StructType parseSchemaFromJson(String jsonFields) {
- JSONArray fieldArray = JSON.parseArray(jsonFields);
- StructField[] fields = new StructField[fieldArray.size()];
-
- for (int i = 0; i < fieldArray.size(); i++) {
- JSONObject fieldObject = fieldArray.getJSONObject(i);
- String name = fieldObject.getString("name").trim();
- String type = fieldObject.getString("type").trim();
- DataType dataType = parseDataType(type);
- fields[i] = new StructField(name, dataType);
- }
-
- return new StructType(fields);
- }
-
- public static DataType parseDataType(String type){
- type = type.trim();
- if("int".equalsIgnoreCase(type)){
- return INT;
- } else if ("bigint".equalsIgnoreCase(type)){
- return BIGINT;
- } else if ("string".equalsIgnoreCase(type)){
- return STRING;
- } else if ("float".equalsIgnoreCase(type)){
- return FLOAT;
- } else if ("double".equalsIgnoreCase(type)){
- return DOUBLE;
- } else if ("boolean".equalsIgnoreCase(type)){
- return BOOLEAN;
- } else if ("binary".equalsIgnoreCase(type)){
- return BINARY;
- }
-
- // array类型
- Matcher matcher = ARRAY_RE.matcher(type);
- if(matcher.matches()){
- String eleType = matcher.group(1);
- DataType elementType = parseDataType(eleType);
- return new ArrayType(elementType);
- }
-
- // struct类型
- matcher = STRUCT_RE.matcher(type);
- if(matcher.matches()){
- List<StructField> fields = new ArrayList<>();
- String str = matcher.group(1);
- int startPos = 0, endPos = -1;
- int i = startPos + 1;
- int level = 0;
- while (i < str.length()){
- while (i < str.length()){
- if(str.charAt(i) == ':'){
- endPos = i;
- break;
- }
- i++;
- }
-
- if(endPos <= startPos){
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- String name = str.substring(startPos, endPos).trim();
- startPos = i + 1;
- endPos = -1;
- i = startPos + 1;
- while (i < str.length()){
- if(str.charAt(i) == ',' && level == 0){
- endPos = i;
- break;
- }
- if(str.charAt(i) == '<'){
- level++;
- }
- if(str.charAt(i) == '>'){
- level--;
- }
- i++;
- }
-
- if(i == str.length()){
- endPos = i;
- }
- if(endPos <= startPos){
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- String tp = str.substring(startPos, endPos).trim();
- fields.add(new StructField(name, parseDataType(tp)));
-
- i++;
- startPos = i;
- endPos = -1;
- }
-
- return new StructType(fields.toArray(new StructField[fields.size()]));
- }
-
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
- if(dataType instanceof ArrayType){
- ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
- } else if (dataType instanceof StructType) {
- ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
- }
- }
-}
+package com.geedgenetworks.core.types;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.StructType.StructField;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Types {
+ public static final IntegerType INT = new IntegerType();
+ public static final LongType BIGINT = new LongType();
+ public static final StringType STRING = new StringType();
+ public static final FloatType FLOAT = new FloatType();
+ public static final DoubleType DOUBLE = new DoubleType();
+ public static final BooleanType BOOLEAN = new BooleanType();
+ public static final BinaryType BINARY = new BinaryType();
+
+ public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+ public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+
+ public static StructType parseSchemaFromJson(String jsonFields) {
+ JSONArray fieldArray = JSON.parseArray(jsonFields);
+ StructField[] fields = new StructField[fieldArray.size()];
+
+ for (int i = 0; i < fieldArray.size(); i++) {
+ JSONObject fieldObject = fieldArray.getJSONObject(i);
+ String name = fieldObject.getString("name").trim();
+ String type = fieldObject.getString("type").trim();
+ DataType dataType = parseDataType(type);
+ fields[i] = new StructField(name, dataType);
+ }
+
+ return new StructType(fields);
+ }
+
+ // 解析struct<>中的字段
+ public static StructType parseStructType(String str){
+ // 外面是否包含struct<>都能解析
+ Matcher matcher = STRUCT_RE.matcher(str);
+ if(matcher.matches()){
+ str = matcher.group(1);
+ }
+
+ List<StructField> fields = new ArrayList<>();
+ int startPos = 0, endPos = -1;
+ int i = startPos + 1;
+ int level = 0;
+ while (i < str.length()){
+ while (i < str.length()){
+ if(str.charAt(i) == ':'){
+ endPos = i;
+ break;
+ }
+ i++;
+ }
+
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String name = str.substring(startPos, endPos).trim();
+ startPos = i + 1;
+ endPos = -1;
+ i = startPos + 1;
+ while (i < str.length()){
+ if(str.charAt(i) == ',' && level == 0){
+ endPos = i;
+ break;
+ }
+ if(str.charAt(i) == '<'){
+ level++;
+ }
+ if(str.charAt(i) == '>'){
+ level--;
+ }
+ i++;
+ }
+
+ if(i == str.length()){
+ endPos = i;
+ }
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String tp = str.substring(startPos, endPos).trim();
+ fields.add(new StructField(name, parseDataType(tp)));
+
+ i++;
+ startPos = i;
+ endPos = -1;
+ }
+
+ return new StructType(fields.toArray(new StructField[fields.size()]));
+ }
+
+ public static DataType parseDataType(String type){
+ type = type.trim();
+ if("int".equalsIgnoreCase(type)){
+ return INT;
+ } else if ("bigint".equalsIgnoreCase(type)){
+ return BIGINT;
+ } else if ("string".equalsIgnoreCase(type)){
+ return STRING;
+ } else if ("float".equalsIgnoreCase(type)){
+ return FLOAT;
+ } else if ("double".equalsIgnoreCase(type)){
+ return DOUBLE;
+ } else if ("boolean".equalsIgnoreCase(type)){
+ return BOOLEAN;
+ } else if ("binary".equalsIgnoreCase(type)){
+ return BINARY;
+ }
+
+ // array类型
+ Matcher matcher = ARRAY_RE.matcher(type);
+ if(matcher.matches()){
+ String eleType = matcher.group(1);
+ DataType elementType = parseDataType(eleType);
+ return new ArrayType(elementType);
+ }
+
+ // struct类型
+ matcher = STRUCT_RE.matcher(type);
+ if(matcher.matches()){
+ String str = matcher.group(1);
+ return parseStructType(str);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
+ if(dataType instanceof ArrayType){
+ ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ } else if (dataType instanceof StructType) {
+ ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
index 2238aeb..dd2c710 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
@@ -101,7 +101,7 @@ public class HttpClientPoolUtil {
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
- log.info("request header : {}", h);
+ // log.info("request header : {}", h);
}
}
try(CloseableHttpResponse response = httpClient.execute(httpGet)) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java
new file mode 100644
index 0000000..fbeaed7
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SchemaParserTest {
+
+ public static void main(String[] args) throws Exception{
+ String str = FileUtils.readFileToString(new File("D:\\WorkSpace\\groot-stream\\session_record_schema.json"), StandardCharsets.UTF_8);
+ SchemaParser.Parser parser = SchemaParser.PARSER_AVRO;
+
+ StructType structType = parser.parser(str);
+ System.out.println(structType.treeString());
+
+ }
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java
new file mode 100644
index 0000000..b5b672e
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java
@@ -0,0 +1,218 @@
+package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.HttpDynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DynamicSchemaManagerTest {
+ static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManagerTest.class.getSimpleName());
+
+ public static void main(String[] args) throws Exception{
+ //testOneThread();
+ //testMultiThread();
+ testMultiThreadForHttpDynamicSchema();
+ }
+
+ public static void testOneThread() throws Exception{
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema11 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema2 = new RandomDynamicSchema( 1000 * 10, "Schema1", 0.9);
+
+ LOG.info("start");
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1");
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2");
+ PrintSchemaChangeAware aware11 = new PrintSchemaChangeAware("aware11");
+ PrintSchemaChangeAware aware22 = new PrintSchemaChangeAware("aware22");
+
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+
+ schema11.registerSchemaChangeAware(aware11);
+ schema11.registerSchemaChangeAware(aware22);
+
+
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 60 * 2);
+ schema1.unregisterSchemaChangeAware(aware1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema1.unregisterSchemaChangeAware(aware11);
+ schema1.unregisterSchemaChangeAware(aware22);
+
+ Thread.sleep(1000 * 20);
+ schema2.unregisterSchemaChangeAware(aware1);
+ schema2.unregisterSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 3);
+
+
+ schema1.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 60 * 1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema2.unregisterSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 3);
+ }
+
+ public static void testMultiThreadForHttpDynamicSchema() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ DynamicSchema schema1 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema1.getDataType());
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ DynamicSchema schema2 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema2.getDataType());
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static void testMultiThread() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema2 = new RandomDynamicSchema(1000 * 10, "Schema1", 0.9);
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static class PrintSchemaChangeAware implements SchemaChangeAware {
+ private final String name;
+
+ public PrintSchemaChangeAware(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void schemaChange(StructType dataType) {
+ String info = String.format("%s receive schema change:%s", name, dataType);
+ //System.out.println(info);
+ LOG.info(info);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ public static class RandomDynamicSchema extends DynamicSchema{
+ private final String key;
+ private final double probability;
+
+ public RandomDynamicSchema(long intervalMs, String identifier, double probability) {
+ super(null, intervalMs);
+ this.key = identifier + "_" + TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs));
+ this.probability = probability;
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ return null;
+ }
+
+ @Override
+ public StructType updateDataType() {
+ if(ThreadLocalRandom.current().nextDouble() < probability){
+ return (StructType) Types.parseDataType(String.format("struct<name_%s: string>", key));
+ }
+ return null;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
index 32a3191..260e35a 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
@@ -18,14 +18,19 @@ public class JsonEventSerializationSchema implements SerializationSchema<Event>
}
};
private final StructType dataType;
+ private final JsonSerializer serializer;
public JsonEventSerializationSchema(StructType dataType) {
this.dataType = dataType;
+ this.serializer = dataType != null? new JsonSerializer(dataType): null;
}
@Override
public byte[] serialize(Event element) {
- // sink暂不支持类型推断, dataType为null
- return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ if(dataType == null){
+ return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ } else {
+ return serializer.serialize(element.getExtractedFields());
+ }
}
}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
new file mode 100644
index 0000000..fac90c8
--- /dev/null
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
@@ -0,0 +1,176 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSONWriter;
+import com.geedgenetworks.core.types.*;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonSerializer implements Serializable{
+
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+
+ public JsonSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = makeWriter(dataType);
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ try (JSONWriter writer = JSONWriter.ofUTF8()) {
+ if (data == null) {
+ writer.writeNull();
+ } else {
+ valueWriter.write(writer, data);
+ }
+ return writer.getBytes();
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return JsonSerializer::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return JsonSerializer::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return JsonSerializer::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return JsonSerializer::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return JsonSerializer::writeDouble;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (writer, obj) -> {
+ writeObject(writer, obj, fieldWriters);
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (writer, obj) -> {
+ writeArray(writer, obj, elementWriter);
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ static void writeString(JSONWriter writer, Object obj) {
+ writer.writeString(obj.toString());
+ }
+
+ static void writeInt(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeInt32(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ writer.writeInt32(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ static void writeLong(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeInt64(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ writer.writeInt64(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ static void writeFloat(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeFloat(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ writer.writeFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ static void writeDouble(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeDouble(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ writer.writeDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
+ if(obj instanceof Map){
+ Map<String, Object> map = (Map<String, Object>) obj;
+ writer.startObject();
+
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ /*if (key.startsWith("__")) {
+ continue;
+ }*/
+ value = entry.getValue();
+ if(value == null){
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if(valueWriter != null){
+ writer.writeName(key);
+ writer.writeColon();
+ valueWriter.write(writer, value);
+ }
+ }
+
+ writer.endObject();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ }
+
+ static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
+ if(obj instanceof List){
+ List<Object> list = (List<Object>) obj;
+ writer.startArray();
+
+ Object element;
+ for (int i = 0; i < list.size(); i++) {
+ if (i != 0) {
+ writer.writeComma();
+ }
+
+ element = list.get(i);
+ if (element == null) {
+ writer.writeNull();
+ continue;
+ }
+
+ elementWriter.write(writer, element);
+ }
+
+ writer.endArray();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(JSONWriter writer, Object obj);
+ }
+}
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
new file mode 100644
index 0000000..c61bf0a
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class JsonEventSerializationSchemaTest {
+
+
+ public static void main(String[] args) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("id", 1);
+ map.put("name", "aaa");
+ byte[] bytes = JSON.toJSONBytes(map);
+ System.out.println(bytes);
+ }
+
+} \ No newline at end of file
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
new file mode 100644
index 0000000..e5d6c10
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JsonSerializerTest {
+
+ @Test
+ public void testSerSimpleData(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("int", random.nextInt(1, Integer.MAX_VALUE));
+ map.put("int_null", null);
+ map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
+
+ map.put("int64", random.nextLong(1, Long.MAX_VALUE));
+ map.put("int64_null", null);
+ map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
+
+ map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
+ map.put("double_null", null);
+ map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
+
+ map.put("str", "ut8字符串");
+ map.put("str_null", null);
+ map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
+
+ map.put("int32_array", Arrays.asList(1, 3, 5));
+ map.put("int32_array_null", null);
+ map.put("int32_array_empty", Collections.emptyList());
+
+ map.put("int64_array", Arrays.asList(1, 3, 5));
+ map.put("int64_array_null", null);
+ map.put("int64_array_empty", Collections.emptyList());
+
+ map.put("str_array", Arrays.asList(1, 3, 5));
+
+ Map<String, Object> obj = new LinkedHashMap<>();
+ obj.put("id", 1);
+ obj.put("name", "name");
+ map.put("obj", obj);
+
+ List<Object> list = new ArrayList<>();
+ list.add(obj);
+ obj = new LinkedHashMap<>();
+ obj.put("id", 2);
+ obj.put("name", "name2");
+ list.add(obj);
+ map.put("obj_array", list);
+
+ StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
+ "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
+ " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+
+ byte[] bytes = serializer.serialize(map);
+ System.out.println(map);
+ System.out.println(bytes.length);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+
+ JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
+ Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(map);
+ System.out.println(rst);
+
+ System.out.println(serializer.serialize(rst).length);
+ System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+ }
+
+
+} \ No newline at end of file
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index 633f5d8..72f4ac9 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -34,14 +34,14 @@ class ProtobufFormatFactoryTest {
options.put("protobuf.message.name", messageName);
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
options = new HashMap<>();
options.put("format", "json");
configuration = Configuration.fromMap(options);
- context = new TableFactory.Context(null, null, options, configuration);
+ context = new TableFactory.Context( null, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/pom.xml b/pom.xml
index 266eaed..cf9731d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
<nacos.version>1.2.0</nacos.version>
<antlr4.version>4.8</antlr4.version>
<jcommander.version>1.81</jcommander.version>
+ <avro.version>1.9.1</avro.version>
<lombok.version>1.18.24</lombok.version>
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
@@ -274,6 +275,11 @@
<version>${hazelcast.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
<!-- flink dependencies -->
<dependency>