summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-03-13 06:44:33 +0000
committer李奉超 <[email protected]>2024-03-13 06:44:33 +0000
commit7347323b963fad7f972e6f872711207415c513ce (patch)
tree375e393e7e5720fbd213818c9785a8d4e9c042b6
parent039d9c0d0bf081865a3fc8a9cdc4359e0bac6331 (diff)
parent40fe896585953874c09576a576e057c41ffe1288 (diff)
Merge branch 'feature/dynamicschema' into 'develop'
Feature/dynamicschema See merge request galaxy/platform/groot-stream!23
-rw-r--r--config/template/grootstream_job_debug.yaml15
-rw-r--r--config/template/grootstream_job_template.yaml51
-rw-r--r--docs/connector/connector.md75
-rw-r--r--docs/connector/sink/clickhouse.md43
-rw-r--r--docs/connector/sink/kafka.md43
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java73
-rw-r--r--groot-common/pom.xml5
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java11
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java17
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java71
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java)8
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java)252
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java)6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java35
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java112
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java21
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java148
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java95
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java77
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/Types.java277
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java22
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java218
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml1
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml1
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml11
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java9
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java176
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java19
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java79
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java4
-rw-r--r--pom.xml6
47 files changed, 1785 insertions, 547 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml
index 1c8c5e9..a1a287d 100644
--- a/config/template/grootstream_job_debug.yaml
+++ b/config/template/grootstream_job_debug.yaml
@@ -1,7 +1,11 @@
sources:
kafka_source:
type : kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ # source table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9092
@@ -25,7 +29,6 @@ sources:
inline_source:
type : inline
- fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","common_schema_type":"BASE"}'
format: json
@@ -42,9 +45,6 @@ sources:
ipfix_source:
type: ipfix
-# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
-# - name: log_id
-# type: bigint
properties:
port.range: 12345-12347
max.packet.size: 65535
@@ -220,6 +220,11 @@ postprocessing_pipelines:
sinks:
kafka_sink_a:
type: kafka
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties:
topic: SESSION-RECORD-JSON
kafka.bootstrap.servers: 192.168.44.12:9092
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 0cb39ad..2606d56 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -22,27 +22,28 @@ sources: # [object] Define connector source
inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology.
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
@@ -64,9 +65,6 @@ sources: # [object] Define connector source
ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology.
type: ipfix
-# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
-# - name: log_id
-# type: bigint
properties:
port.range: 12345-12347
max.packet.size: 65535
@@ -254,6 +252,11 @@ postprocessing_pipelines: # [object] Define Processors for postprocessing pipeli
sinks: # [object] Define connector sink
kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology.
type: kafka
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties:
topic: SESSION-RECORD-A
kafka.bootstrap.servers: 127.0.0.1:9092
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 6df1e23..6bcc878 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -7,23 +7,68 @@ Source Connector contains some common core features, and each source connector s
sources:
${source_name}:
type: ${source_connector_type}
- fields:
- - name: ${field_name}
- type: ${field_type}
+ # source table schema, config through fields or local_file or url
+ schema:
+ fields:
+ - name: ${field_name}
+ type: ${field_type}
+ # local_file: "schema path"
+ # url: "schema http url"
properties:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
-| fields | Array of `Field` | No | - | The structure of the data, including field names and field types. |
-| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. |
+| Name | Type | Required | Default | Description |
+|-------------------------------------|------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
+| schema | Map | No | - | The source table schema, config through fields or local_file or url. |
+| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. |
## Schema Field Projection
The source connector supports reading only specified fields from the data source. For example `KafkaSource` will read all content from topic and then use `fields` to filter unnecessary columns.
The Schema Structure refer to [Schema Structure](../user-guide.md#schema-structure).
+## Schema Config
+Schema can config through fields or local_file or url.
+
+### fields
+```yaml
+schema:
+ # by array
+ fields:
+ - name: ${field_name}
+ type: ${field_type}
+```
+
+```yaml
+schema:
+ # by sql
+ fields: "struct<field_name:field_type, ...>"
+ # can also without outer struct<>
+ # fields: "field_name:field_type, ..."
+```
+
+### local_file
+
+```yaml
+schema:
+ # by array
+ fields:
+ local_file: "schema path"
+```
+
+### url
+Retrieve updated schema from URL for cycle, support dynamic schema. Not all connector support dynamic schema.
+
+The connectors that currently support dynamic schema include: clickHouse sink.
+
+```yaml
+schema:
+ # by array
+ fields:
+ url: "schema http url"
+```
+
# Sink Connector
Sink Connector contains some common core features, and each sink connector supports them to varying degrees.
@@ -33,12 +78,18 @@ Sink Connector contains some common core features, and each sink connector suppo
sinks:
${sink_name}:
type: ${sink_connector_type}
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+ schema:
+ fields: "struct<field_name:field_type, ...>"
+ # local_file: "schema path"
+ # url: "schema url"
properties:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. |
-| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. |
+| Name | Type | Required | Default | Description |
+|-------------------------------------|--------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. |
+| schema | Map | No | - | The sink table schema, config through fields or local_file or url. |
+| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. |
diff --git a/docs/connector/sink/clickhouse.md b/docs/connector/sink/clickhouse.md
index d794767..ac37d24 100644
--- a/docs/connector/sink/clickhouse.md
+++ b/docs/connector/sink/clickhouse.md
@@ -50,27 +50,28 @@ This example read data of inline test source and write to ClickHouse table `test
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md
index 6793b21..92976d8 100644
--- a/docs/connector/sink/kafka.md
+++ b/docs/connector/sink/kafka.md
@@ -26,27 +26,28 @@ This example read data of inline test source and write to kafka topic `SESSION-R
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index 7aee40f..a0bedc9 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -2,21 +2,20 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
-import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SinkTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SinkConfig;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -70,8 +69,20 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ Schema schema = null;
+ if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+ if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("sink(%s) dataType:\n%s", sinkConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString()));
+ }
+
DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
if (node.getParallelism() > 0) {
dataStreamSink.setParallelism(node.getParallelism());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index eee78a2..f46e3fc 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,26 +1,24 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SourceConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
import com.geedgenetworks.core.pojo.SourceConfig;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -53,10 +51,6 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class);
sourceConfig.setName(key);
- if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
- StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
- sourceConfig.setDataType(schema);
- }
sourceConfigMap.put(key, sourceConfig);
});
@@ -73,12 +67,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
- StructType dataType = sourceConfig.getDataType();
- TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration);
+ Schema schema = null;
+ if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){
+ schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema());
+ }
+ TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
- if(dataType != null){
- System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString()));
- System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString()));
+ if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
+ throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName()));
+ }
+ if(schema != null){
+ System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), schema.getDataType().toString()));
+ System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
new file mode 100644
index 0000000..c3076b4
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
@@ -0,0 +1,73 @@
+package com.geedgenetworks.bootstrap.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaConfigParse {
+ static final String KEY_BUILTIN = "fields";
+ static final String KEY_LOCAL_FILE = "local_file";
+ static final String KEY_HTTP = "url";
+
+ public static Schema parseSchemaConfig(Map<String, Object> schemaConfig){
+ if(schemaConfig == null && schemaConfig.isEmpty()){
+ return null;
+ }
+
+ int builtin = 0, localFile = 0, http = 0;
+ if(schemaConfig.containsKey(KEY_BUILTIN)){
+ builtin = 1;
+ }
+ if(schemaConfig.containsKey(KEY_LOCAL_FILE)){
+ localFile = 1;
+ }
+ if(schemaConfig.containsKey(KEY_HTTP)){
+ http = 1;
+ }
+ if(builtin + localFile + http > 1){
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support one type schema:" + schemaConfig);
+ }
+
+ if(builtin == 1){
+ Object fields = schemaConfig.get(KEY_BUILTIN);
+ if(fields instanceof List){
+ StructType dataType = Types.parseSchemaFromJson(JSON.toJSONString(fields));
+ return Schema.newSchema(dataType);
+ }else if(fields instanceof String){
+ StructType dataType = Types.parseStructType((String) fields);
+ return Schema.newSchema(dataType);
+ }else{
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support schema fields:" + fields);
+ }
+ }
+
+ if(localFile == 1){
+ String path = schemaConfig.get(KEY_LOCAL_FILE).toString();
+ try {
+ String content = FileUtils.readFileToString(new File(path), StandardCharsets.UTF_8);
+ StructType dataType = SchemaParser.PARSER_AVRO.parser(content);
+ return Schema.newSchema(dataType);
+ } catch (IOException e) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "schema path read error:" + path, e);
+ }
+ }
+
+ if(http == 1){
+ String url = schemaConfig.get(KEY_HTTP).toString();
+ return Schema.newHttpDynamicSchema(url);
+ }
+
+ return null;
+ }
+}
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index 8ff4910..d6463cb 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -42,6 +42,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>galaxy</artifactId>
<exclusions>
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
index 9462808..96a73b9 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.connectors.clickhouse;
import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
+import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
@@ -32,8 +33,7 @@ public class ClickHouseTableFactory implements SinkTableFactory {
helper.validateExcept(CONNECTION_INFO_PREFIX); // 校验参数
- // sink暂时没有dataType
- //StructType dataType = context.getSchema();
+ Schema schema = context.getSchema();
ReadableConfig config = context.getConfiguration();
String host = config.get(HOST);
@@ -42,13 +42,18 @@ public class ClickHouseTableFactory implements SinkTableFactory {
long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis();
Properties connInfo = getClickHouseConnInfo(context.getOptions());
- final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, connInfo);
+ final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchIntervalMs, host, table, connInfo);
return new SinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
return dataStream.addSink(sinkFunction);
}
+
+ @Override
+ public boolean supportDynamicSchema() {
+ return true;
+ }
};
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index ce58cda..18ce5b4 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -196,6 +196,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
void onInit(Configuration parameters) throws Exception {}
+ void onClose() throws Exception {}
abstract boolean addBatch(Block batch, T data) throws Exception;
@@ -327,7 +328,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
closed = true;
if (outThread != null) {
- outThread.join();
+ try {
+ outThread.join();
+ } catch (Exception e) {
+
+ }
}
// init中可能抛出异常
@@ -340,13 +345,19 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
// 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。
// ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。
- } catch (Exception e) {
- flushException = e;
+ } catch (Throwable t) {
+ if(t instanceof Exception){
+ flushException = (Exception) t;
+ }else{
+ flushException = new Exception(t);
+ }
} finally {
lock.unlock();
}
}
+ onClose();
+
LOG.warn("ck_sink_close_end");
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index 497f14c..c38324a 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -3,24 +3,51 @@ package com.geedgenetworks.connectors.clickhouse.sink;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.types.StructType;
import com.github.housepower.data.Block;
+import org.apache.flink.configuration.Configuration;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
+import java.util.stream.Collectors;
-public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> {
+public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> implements SchemaChangeAware {
+ private Schema schema;
+ private Set<String> disabledFields;
+ private String simpleName;
- public EventBatchIntervalClickHouseSink(
- int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
+ public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
super(batchSize, batchIntervalMs, host, table, connInfo);
+ this.schema = schema;
+ }
+
+ @Override
+ void onInit(Configuration parameters) throws Exception {
+ super.onInit(parameters);
+ simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask();
+ if(schema != null){
+ updateDisabledFields(schema.getDataType());
+ if(schema.isDynamic()){
+ Schema.registerSchemaChangeAware(schema, this);
+ }
+ }
}
@Override
boolean addBatch(Block batch, Event event) throws Exception {
Map<String, Object> map = event.getExtractedFields();
+ String columnName;
Object value;
for (int i = 0; i < columnNames.length; i++) {
- value = map.get(columnNames[i]);
+ columnName = columnNames[i];
+ if(disabledFields != null && disabledFields.contains(columnName)){
+ value = columnDefaultValues[i];
+ batch.setObject(i, value); // 默认值不用转换
+ continue;
+ }
+
+ value = map.get(columnName);
if (value == null) {
value = columnDefaultValues[i];
@@ -39,4 +66,36 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
return true;
}
+
+ @Override
+ public void schemaChange(StructType dataType) {
+ if(schema != null && schema.isDynamic()){
+ updateDisabledFields(dataType);
+ }
+ }
+
+ @Override
+ void onClose() throws Exception {
+ super.onClose();
+ if(schema != null && schema.isDynamic()){
+ Schema.unregisterSchemaChangeAware(schema, this);
+ }
+ }
+
+ private void updateDisabledFields(StructType dataType){
+ Set<String> disabledFields = new HashSet<>();
+ Set<String> names = Arrays.stream(dataType.fields).map(x -> x.name).collect(Collectors.toSet());
+ for (String columnName : this.columnNames) {
+ if(!names.contains(columnName)){
+ disabledFields.add(columnName);
+ }
+ }
+ LOG.info("disabledFields: {}", disabledFields);
+ this.disabledFields = disabledFields.isEmpty()?null: disabledFields;
+ }
+
+ @Override
+ public String toString() {
+ return simpleName;
+ }
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index a50c1e4..761d1ad 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2;
+import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer;
import java.util.Optional;
import java.util.Properties;
@@ -36,7 +36,7 @@ public class KafkaSinkProvider implements SinkProvider {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
- FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>(
+ GrootFlinkKafkaProducer<Event> kafkaProducer = new GrootFlinkKafkaProducer<>(
topic,
valueSerialization,
properties,
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index ce0ddf8..ad34557 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2;
+import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaConsumer;
import java.util.List;
import java.util.Properties;
@@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider {
@Override
public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>(
+ GrootFlinkKafkaConsumer<Event> kafkaConsumer = new GrootFlinkKafkaConsumer<>(
topics,
new EventKafkaDeserializationSchema(valueDeserialization),
properties
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index b26c161..8829076 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -51,7 +51,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
- StructType dataType = context.getSchema();
+ StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
String topic = config.get(TOPIC).get(0);
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java
index d37121d..4721969 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java
@@ -6,7 +6,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.GrootKafkaFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.SerializedValue;
@@ -15,9 +15,9 @@ import java.util.Map;
import java.util.Properties;
@PublicEvolving
-public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+public class GrootFlinkKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
- public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ public GrootFlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
@@ -36,7 +36,7 @@ public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
// this overwrites whatever setting the user configured in the properties
adjustAutoCommitConfig(properties, offsetCommitMode);
- return new KafkaFetcher2<>(
+ return new GrootKafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarkStrategy,
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
index 818df4e..ff0bb34 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
@@ -77,15 +77,15 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Flink Sink to produce data into a Kafka topic. By default producer will use {@link
- * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
+ * GrootFlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
*/
@PublicEvolving
-public class FlinkKafkaProducer2<IN>
+public class GrootFlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
- FlinkKafkaProducer2.KafkaTransactionState,
- FlinkKafkaProducer2.KafkaTransactionContext> {
+ GrootFlinkKafkaProducer.KafkaTransactionState,
+ GrootFlinkKafkaProducer.KafkaTransactionContext> {
/**
* Semantics that can be chosen.
@@ -99,13 +99,13 @@ public class FlinkKafkaProducer2<IN>
* Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
* that will be committed to Kafka on a checkpoint.
*
- * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link
+ * <p>In this mode {@link GrootFlinkKafkaProducer} sets up a pool of {@link
* FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
- * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If
- * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run
+ * which is committed on {@link GrootFlinkKafkaProducer#notifyCheckpointComplete(long)}. If
+ * checkpoint complete notifications are running late, {@link GrootFlinkKafkaProducer} can run
* out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
- * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and
- * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from
+ * {@link GrootFlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and
+ * {@link GrootFlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from
* the previous checkpoint. To decrease the chance of failing checkpoints there are four
* options:
* <li>decrease number of max concurrent checkpoints
@@ -128,7 +128,7 @@ public class FlinkKafkaProducer2<IN>
NONE
}
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class);
+ private static final Logger LOG = LoggerFactory.getLogger(GrootFlinkKafkaProducer.class);
private static final long serialVersionUID = 1L;
@@ -142,8 +142,8 @@ public class FlinkKafkaProducer2<IN>
* This coefficient determines what is the safe scale down factor.
*
* <p>If the Flink application previously failed before first checkpoint completed or we are
- * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the
- * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used
+ * starting new batch of {@link GrootFlinkKafkaProducer} from scratch without clean shutdown of the
+ * previous one, {@link GrootFlinkKafkaProducer} doesn't know what was the set of previously used
* Kafka's transactionalId's. In that case, it will try to play safe and abort all of the
* possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() *
* kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) }
@@ -158,7 +158,7 @@ public class FlinkKafkaProducer2<IN>
/**
* Default number of KafkaProducers in the pool. See {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}.
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}.
*/
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
@@ -174,27 +174,27 @@ public class FlinkKafkaProducer2<IN>
* NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.
*/
@Deprecated
- private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint>
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
new ListStateDescriptor<>(
"next-transactional-id-hint",
TypeInformation.of(NextTransactionalIdHint.class));
- private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint>
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 =
new ListStateDescriptor<>(
"next-transactional-id-hint-v2",
new NextTransactionalIdHintSerializer());
/** State for nextTransactionalIdHint. */
- private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private transient ListState<GrootFlinkKafkaProducer.NextTransactionalIdHint>
nextTransactionalIdHintState;
/** Generator for Transactional IDs. */
private transient TransactionalIdsGenerator transactionalIdsGenerator;
/** Hint for picking next transactional id. */
- private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint;
+ private transient GrootFlinkKafkaProducer.NextTransactionalIdHint nextTransactionalIdHint;
/** User defined properties for the Producer. */
protected final Properties producerConfig;
@@ -236,7 +236,7 @@ public class FlinkKafkaProducer2<IN>
private boolean logFailuresOnly;
/** Semantic chosen for this instance. */
- protected FlinkKafkaProducer2.Semantic semantic;
+ protected GrootFlinkKafkaProducer.Semantic semantic;
// -------------------------------- Runtime fields ------------------------------------------
@@ -263,7 +263,7 @@ public class FlinkKafkaProducer2<IN>
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined (keyless) serialization schema.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
}
@@ -275,14 +275,14 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* SerializationSchema, Properties, Optional)} instead.
*
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined key-less serialization schema.
* @param producerConfig Properties with the producer configuration.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig) {
@@ -311,7 +311,7 @@ public class FlinkKafkaProducer2<IN>
* partitions. If a partitioner is not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@@ -343,16 +343,16 @@ public class FlinkKafkaProducer2<IN>
* partitions. If a partitioner is not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
topicId,
@@ -374,17 +374,17 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param brokerList Comma separated addresses of the brokers
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
this(
topicId,
@@ -400,17 +400,17 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig) {
@@ -432,16 +432,16 @@ public class FlinkKafkaProducer2<IN>
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * GrootFlinkKafkaProducer.Semantic}).
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic) {
+ GrootFlinkKafkaProducer.Semantic semantic) {
this(
topicId,
serializationSchema,
@@ -472,11 +472,11 @@ public class FlinkKafkaProducer2<IN>
* each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
* keys are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
@@ -486,7 +486,7 @@ public class FlinkKafkaProducer2<IN>
serializationSchema,
producerConfig,
customPartitioner,
- FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE,
+ GrootFlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
@@ -512,19 +512,19 @@ public class FlinkKafkaProducer2<IN>
* keys are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
defaultTopicId,
@@ -537,7 +537,7 @@ public class FlinkKafkaProducer2<IN>
}
/**
- * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the
+ * Creates a {@link GrootFlinkKafkaProducer} for a given topic. The sink produces its input to the
* topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
* ProducerRecord}, including partitioning information.
*
@@ -547,13 +547,13 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopic,
KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic) {
+ GrootFlinkKafkaProducer.Semantic semantic) {
this(
defaultTopic,
serializationSchema,
@@ -573,15 +573,15 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopic,
KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
defaultTopic,
@@ -617,21 +617,21 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- private FlinkKafkaProducer2(
+ private GrootFlinkKafkaProducer(
String defaultTopic,
KeyedSerializationSchema<IN> keyedSchema,
FlinkKafkaPartitioner<IN> customPartitioner,
KafkaSerializationSchema<IN> kafkaSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
super(
- new FlinkKafkaProducer2.TransactionStateSerializer(),
- new FlinkKafkaProducer2.ContextStateSerializer());
+ new GrootFlinkKafkaProducer.TransactionStateSerializer(),
+ new GrootFlinkKafkaProducer.ContextStateSerializer());
this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
@@ -711,7 +711,7 @@ public class FlinkKafkaProducer2<IN>
// Enable transactionTimeoutWarnings to avoid silent data loss
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
- if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ if (semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
final Object object =
this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long transactionTimeout;
@@ -771,7 +771,7 @@ public class FlinkKafkaProducer2<IN>
* attempt at least one commit of the transaction before giving up.
*/
@Override
- public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() {
+ public GrootFlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() {
super.ignoreFailuresAfterTransactionTimeout();
return this;
}
@@ -850,7 +850,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void invoke(
- FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context)
+ GrootFlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
throws FlinkKafkaException {
checkErroneous();
@@ -976,24 +976,24 @@ public class FlinkKafkaProducer2<IN>
// ------------------- Logic for handling checkpoint flushing -------------------------- //
@Override
- protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction()
+ protected GrootFlinkKafkaProducer.KafkaTransactionState beginTransaction()
throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
- final FlinkKafkaProducer2.KafkaTransactionState currentTransaction =
+ final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction =
currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
currentTransaction.producer);
}
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
@@ -1001,7 +1001,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ protected void preCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction)
throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
@@ -1017,7 +1017,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void commit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
@@ -1028,7 +1028,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void recoverAndCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
@@ -1051,7 +1051,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void abort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
@@ -1059,7 +1059,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void recoverAndAbort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
@@ -1087,7 +1087,7 @@ public class FlinkKafkaProducer2<IN>
*
* @param transaction
*/
- private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ private void flush(GrootFlinkKafkaProducer.KafkaTransactionState transaction)
throws FlinkKafkaException {
if (transaction.producer != null) {
transaction.producer.flush();
@@ -1111,7 +1111,7 @@ public class FlinkKafkaProducer2<IN>
// Otherwise all of the
// subtasks would write exactly same information.
if (getRuntimeContext().getIndexOfThisSubtask() == 0
- && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ && semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(
nextTransactionalIdHint != null,
"nextTransactionalIdHint must be set for EXACTLY_ONCE");
@@ -1129,7 +1129,7 @@ public class FlinkKafkaProducer2<IN>
}
nextTransactionalIdHintState.add(
- new FlinkKafkaProducer2.NextTransactionalIdHint(
+ new GrootFlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
@@ -1137,13 +1137,13 @@ public class FlinkKafkaProducer2<IN>
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- if (semantic != FlinkKafkaProducer2.Semantic.NONE
+ if (semantic != GrootFlinkKafkaProducer.Semantic.NONE
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
semantic,
- FlinkKafkaProducer2.Semantic.NONE);
- semantic = FlinkKafkaProducer2.Semantic.NONE;
+ GrootFlinkKafkaProducer.Semantic.NONE);
+ semantic = GrootFlinkKafkaProducer.Semantic.NONE;
}
nextTransactionalIdHintState =
@@ -1177,16 +1177,16 @@ public class FlinkKafkaProducer2<IN>
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
- if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
- ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints =
+ ArrayList<GrootFlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
"There should be at most one next transactional id hint written by the first subtask");
} else if (transactionalIdHints.size() == 0) {
- nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0);
+ nextTransactionalIdHint = new GrootFlinkKafkaProducer.NextTransactionalIdHint(0, 0);
// this means that this is either:
// (1) the first execution of this application
@@ -1203,14 +1203,14 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() {
- if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ protected Optional<GrootFlinkKafkaProducer.KafkaTransactionContext> initializeUserContext() {
+ if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
return Optional.empty();
}
Set<String> transactionalIds = generateNewTransactionalIds();
resetAvailableTransactionalIdsPool(transactionalIds);
- return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds));
+ return Optional.of(new GrootFlinkKafkaProducer.KafkaTransactionContext(transactionalIds));
}
private Set<String> generateNewTransactionalIds() {
@@ -1227,7 +1227,7 @@ public class FlinkKafkaProducer2<IN>
@Override
protected void finishRecoveringContext(
- Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
cleanUpUserContext(handledTransactions);
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds);
@@ -1245,7 +1245,7 @@ public class FlinkKafkaProducer2<IN>
* need further handling
*/
private void cleanUpUserContext(
- Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
if (!getUserContext().isPresent()) {
return;
}
@@ -1300,7 +1300,7 @@ public class FlinkKafkaProducer2<IN>
}
int getTransactionCoordinatorId() {
- final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction();
+ final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction == null || currentTransaction.producer == null) {
throw new IllegalArgumentException();
}
@@ -1520,8 +1520,8 @@ public class FlinkKafkaProducer2<IN>
return false;
}
- FlinkKafkaProducer2.KafkaTransactionState that =
- (FlinkKafkaProducer2.KafkaTransactionState) o;
+ GrootFlinkKafkaProducer.KafkaTransactionState that =
+ (GrootFlinkKafkaProducer.KafkaTransactionState) o;
if (producerId != that.producerId) {
return false;
@@ -1544,7 +1544,7 @@ public class FlinkKafkaProducer2<IN>
}
/**
- * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track
+ * Context associated to this instance of the {@link GrootFlinkKafkaProducer}. User for keeping track
* of the transactionalIds.
*/
@VisibleForTesting
@@ -1567,8 +1567,8 @@ public class FlinkKafkaProducer2<IN>
return false;
}
- FlinkKafkaProducer2.KafkaTransactionContext that =
- (FlinkKafkaProducer2.KafkaTransactionContext) o;
+ GrootFlinkKafkaProducer.KafkaTransactionContext that =
+ (GrootFlinkKafkaProducer.KafkaTransactionContext) o;
return transactionalIds.equals(that.transactionalIds);
}
@@ -1581,12 +1581,12 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.KafkaTransactionState}.
+ * GrootFlinkKafkaProducer.KafkaTransactionState}.
*/
@VisibleForTesting
@Internal
public static class TransactionStateSerializer
- extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> {
+ extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionState> {
private static final long serialVersionUID = 1L;
@@ -1596,20 +1596,20 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState createInstance() {
+ public GrootFlinkKafkaProducer.KafkaTransactionState createInstance() {
return null;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState copy(
- FlinkKafkaProducer2.KafkaTransactionState from) {
+ public GrootFlinkKafkaProducer.KafkaTransactionState copy(
+ GrootFlinkKafkaProducer.KafkaTransactionState from) {
return from;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState copy(
- FlinkKafkaProducer2.KafkaTransactionState from,
- FlinkKafkaProducer2.KafkaTransactionState reuse) {
+ public GrootFlinkKafkaProducer.KafkaTransactionState copy(
+ GrootFlinkKafkaProducer.KafkaTransactionState from,
+ GrootFlinkKafkaProducer.KafkaTransactionState reuse) {
return from;
}
@@ -1620,7 +1620,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void serialize(
- FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target)
+ GrootFlinkKafkaProducer.KafkaTransactionState record, DataOutputView target)
throws IOException {
if (record.transactionalId == null) {
target.writeBoolean(false);
@@ -1633,7 +1633,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(DataInputView source)
throws IOException {
String transactionalId = null;
if (source.readBoolean()) {
@@ -1641,13 +1641,13 @@ public class FlinkKafkaProducer2<IN>
}
long producerId = source.readLong();
short epoch = source.readShort();
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
transactionalId, producerId, epoch, null);
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState deserialize(
- FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(
+ GrootFlinkKafkaProducer.KafkaTransactionState reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@@ -1666,7 +1666,7 @@ public class FlinkKafkaProducer2<IN>
// -----------------------------------------------------------------------------------
@Override
- public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState>
+ public TypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState>
snapshotConfiguration() {
return new TransactionStateSerializerSnapshot();
}
@@ -1674,7 +1674,7 @@ public class FlinkKafkaProducer2<IN>
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class TransactionStateSerializerSnapshot
- extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> {
+ extends SimpleTypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> {
public TransactionStateSerializerSnapshot() {
super(TransactionStateSerializer::new);
@@ -1684,12 +1684,12 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.KafkaTransactionContext}.
+ * GrootFlinkKafkaProducer.KafkaTransactionContext}.
*/
@VisibleForTesting
@Internal
public static class ContextStateSerializer
- extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> {
+ extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionContext> {
private static final long serialVersionUID = 1L;
@@ -1699,20 +1699,20 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext createInstance() {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext createInstance() {
return null;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext copy(
- FlinkKafkaProducer2.KafkaTransactionContext from) {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext copy(
+ GrootFlinkKafkaProducer.KafkaTransactionContext from) {
return from;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext copy(
- FlinkKafkaProducer2.KafkaTransactionContext from,
- FlinkKafkaProducer2.KafkaTransactionContext reuse) {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext copy(
+ GrootFlinkKafkaProducer.KafkaTransactionContext from,
+ GrootFlinkKafkaProducer.KafkaTransactionContext reuse) {
return from;
}
@@ -1723,7 +1723,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void serialize(
- FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target)
+ GrootFlinkKafkaProducer.KafkaTransactionContext record, DataOutputView target)
throws IOException {
int numIds = record.transactionalIds.size();
target.writeInt(numIds);
@@ -1733,19 +1733,19 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(DataInputView source)
throws IOException {
int numIds = source.readInt();
Set<String> ids = new HashSet<>(numIds);
for (int i = 0; i < numIds; i++) {
ids.add(source.readUTF());
}
- return new FlinkKafkaProducer2.KafkaTransactionContext(ids);
+ return new GrootFlinkKafkaProducer.KafkaTransactionContext(ids);
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext deserialize(
- FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(
+ GrootFlinkKafkaProducer.KafkaTransactionContext reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@@ -1830,7 +1830,7 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.NextTransactionalIdHint}.
+ * GrootFlinkKafkaProducer.NextTransactionalIdHint}.
*/
@VisibleForTesting
@Internal
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java
index 4000079..2cfc473 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java
@@ -6,16 +6,14 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import java.util.Map;
import java.util.Properties;
-public class KafkaFetcher2<T> extends KafkaFetcher<T> {
- public KafkaFetcher2(
+public class GrootKafkaFetcher<T> extends KafkaFetcher<T> {
+ public GrootKafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
index 5882c9d..3bc4910 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
@@ -37,7 +37,7 @@ public class PrintTableFactory implements SinkTableFactory {
helper.validate(); // 校验参数
- StructType dataType = context.getSchema();
+ StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
PrintMode printMode = Optional.ofNullable(PrintMode.fromName(config.get(MODE))).orElse(STDOUT);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java
new file mode 100644
index 0000000..1b4f755
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public abstract class DynamicSchema implements Schema {
+ protected SchemaParser.Parser parser;
+ protected StructType dataType;
+ private String contentMd5;
+ protected final long intervalMs;
+
+ public DynamicSchema(SchemaParser.Parser parser, long intervalMs) {
+ this.parser = parser;
+ this.intervalMs = intervalMs;
+ }
+
+ public abstract String getCacheKey();
+ protected abstract String getDataTypeContent();
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ protected boolean parseDataType(String _content){
+ checkArgument(StringUtils.isNotBlank(_content), "DataType is null");
+ String _contentMd5 = computeMd5(_content);
+ if(_contentMd5.equals(contentMd5)){
+ return false;
+ }
+
+ StructType type;
+ if(dataType == null){
+ type = parser.parser(_content);
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+
+ type = parser.parser(_content);
+ if(dataType.equals(type)){
+ return false;
+ }else{
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+ }
+
+ // 更新并返回更新后的dataType, 如果没有更新返回null
+ public StructType updateDataType(){
+ String content = getDataTypeContent();
+ if(StringUtils.isBlank(content)){
+ return null;
+ }
+ if(parseDataType(content)){
+ return dataType;
+ }
+ return null;
+ }
+
+ final public void registerSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.registerSchemaChangeAware(this, aware);
+ }
+
+ final public void unregisterSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.unregisterSchemaChangeAware(this, aware);
+ }
+
+ String computeMd5(String text){
+ return DigestUtils.md5Hex(text);
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return true;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java
new file mode 100644
index 0000000..5eb6b87
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.utils.HttpClientPoolUtil;
+import com.geedgenetworks.shaded.org.apache.http.Header;
+import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class HttpDynamicSchema extends DynamicSchema{
+ static final Logger LOG = LoggerFactory.getLogger(HttpDynamicSchema.class);
+ private static final Header header = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
+ private final String url;
+ private final String key;
+ public HttpDynamicSchema(String url, SchemaParser.Parser parser, long intervalMs) {
+ super(parser, intervalMs);
+ checkNotNull(url);
+ this.url = url;
+ this.key = String.format("%s_%s", url, TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs)));
+ parseDataType(getDataTypeContent());
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ try {
+ String response = HttpClientPoolUtil.getInstance().httpGet(URI.create(url), header);
+ return response;
+ } catch (Exception e) {
+ LOG.error("request " + url + " error", e);
+ return null;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java
new file mode 100644
index 0000000..6bd6764
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java
@@ -0,0 +1,35 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public interface Schema extends Serializable {
+ StructType getDataType();
+
+ boolean isDynamic();
+
+ public static Schema newSchema(StructType dataType){
+ return new StaticSchema(dataType);
+ }
+
+ public static Schema newHttpDynamicSchema(String url){
+ HttpDynamicSchema dynamicSchema = new HttpDynamicSchema(url, SchemaParser.PARSER_AVRO, 1000 * 60 * 30);
+ checkArgument(dynamicSchema.getDataType() != null);
+ return dynamicSchema;
+ }
+
+ public static void registerSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.registerSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+
+ public static void unregisterSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.unregisterSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java
new file mode 100644
index 0000000..a70df24
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public interface SchemaChangeAware {
+ void schemaChange(StructType dataType);
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java
new file mode 100644
index 0000000..a2fcc21
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java
@@ -0,0 +1,112 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.ArrayType;
+import com.geedgenetworks.core.types.DataType;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.StructType.StructField;
+import com.geedgenetworks.core.types.Types;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaParser {
+ public static final String TYPE_BUILTIN = "builtin";
+ public static final String TYPE_AVRO = "avro";
+
+ public static final Parser PARSER_BUILTIN = new BuiltinParser();
+ public static final Parser PARSER_AVRO = new AvroParser();
+
+
+ public static Parser getParser(String type){
+ if(TYPE_BUILTIN.equals(type)){
+ return PARSER_BUILTIN;
+ }else if(TYPE_AVRO.equals(type)){
+ return PARSER_AVRO;
+ }
+
+ throw new UnsupportedOperationException("not supported parser:" + type);
+ }
+
+ public static class BuiltinParser implements Parser{
+ @Override
+ public StructType parser(String content){
+ if(JSON.isValidArray(content)){
+ return Types.parseSchemaFromJson(content);
+ }else{
+ return Types.parseStructType(content);
+ }
+ // throw new IllegalArgumentException("can not parse schema for:" + content);
+ }
+ }
+
+ public static class AvroParser implements Parser{
+ @Override
+ public StructType parser(String content) {
+ org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(content);
+ Set<String> disabledFields = getDisabledFields(JSON.parseObject(content).getJSONArray("fields"));
+ return convert2StructType(schema, disabledFields);
+ }
+
+ private StructType convert2StructType(org.apache.avro.Schema schema, Set<String> disabledFields){
+ List<org.apache.avro.Schema.Field> fields = schema.getFields();
+ List<StructField> _fields = new ArrayList<>(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).name();
+ if(disabledFields.contains(fieldName)){
+ continue;
+ }
+ org.apache.avro.Schema fieldSchema = fields.get(i).schema();
+ _fields.add(new StructField(fieldName, convert(fieldSchema)));
+ }
+ return new StructType(_fields.toArray(new StructField[_fields.size()]));
+ }
+
+ private DataType convert(org.apache.avro.Schema schema){
+ switch (schema.getType()){
+ case INT:
+ return Types.INT;
+ case LONG:
+ return Types.BIGINT;
+ case FLOAT:
+ return Types.FLOAT;
+ case DOUBLE:
+ return Types.DOUBLE;
+ case BOOLEAN:
+ return Types.BOOLEAN;
+ case STRING:
+ return Types.STRING;
+ case BYTES:
+ return Types.BINARY;
+ case ARRAY:
+ return new ArrayType(convert(schema.getElementType()));
+ case RECORD:
+ return convert2StructType(schema, Collections.EMPTY_SET);
+ default:
+ throw new UnsupportedOperationException(schema.toString());
+ }
+ }
+
+ private Set<String> getDisabledFields(JSONArray fields){
+ Set<String> disabledFields = new HashSet<>();
+ JSONObject fieldObject;
+ JSONObject doc;
+ for (int i = 0; i < fields.size(); i++) {
+ fieldObject = fields.getJSONObject(i);
+ doc = fieldObject.getJSONObject("doc");
+ // 过滤禁用的字段
+ if(doc != null && "disabled".equals(doc.getString("visibility"))){
+ disabledFields.add(fieldObject.getString("name"));
+ }
+ }
+ return disabledFields;
+ }
+ }
+
+ public interface Parser extends Serializable {
+ StructType parser(String content);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java
new file mode 100644
index 0000000..ab6893d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java
@@ -0,0 +1,21 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public class StaticSchema implements Schema{
+ private final StructType dataType;
+
+ public StaticSchema(StructType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return false;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java
new file mode 100644
index 0000000..0ee04d2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java
@@ -0,0 +1,148 @@
+package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+public class DynamicSchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManager.class);
+ private static final Map<String, DynamicSchemaWithAwares> registeredSchemaWithAwares = new LinkedHashMap<>();
+ private static ScheduledExecutorService scheduler = null;
+
+ // 注册某个dynamicSchema的监听感知
+ public static synchronized void registerSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ schemaWithAwares = new DynamicSchemaWithAwares(dynamicSchema);
+ schedule(schemaWithAwares);
+ registeredSchemaWithAwares.put(key, schemaWithAwares);
+ LOG.info("start schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+
+ for (SchemaChangeAware registeredAware : schemaWithAwares.awares) {
+ if(registeredAware == aware){
+ LOG.error("aware({}) for {} has already registered", aware, key);
+ return;
+ }
+ }
+
+ schemaWithAwares.awares.add(aware);
+ LOG.info("register aware({}) for {}", aware, key);
+ }
+
+ // 注销某个dynamicSchema的监听感知
+ public static synchronized void unregisterSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ LOG.error("not register aware for {}", key);
+ return;
+ }
+
+ Iterator<SchemaChangeAware> iter = schemaWithAwares.awares.iterator();
+ SchemaChangeAware registeredAware;
+ boolean find = false;
+ while (iter.hasNext()){
+ registeredAware = iter.next();
+ if(registeredAware == aware){
+ iter.remove();
+ find = true;
+ break;
+ }
+ }
+
+ if(find){
+ LOG.info("unregister aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ if(schemaWithAwares.awares.isEmpty()){
+ registeredSchemaWithAwares.remove(key);
+ LOG.info("stop schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+ if(registeredSchemaWithAwares.isEmpty()){
+ destroySchedule();
+ }
+ }else{
+ LOG.error("can not find register aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ }
+ }
+
+ private static void schedule(DynamicSchemaWithAwares schemaWithAwares){
+ if(scheduler == null){
+ scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("DynamicSchemaUpdateScheduler"));
+ LOG.info("create SchemaUpdateScheduler");
+ }
+ scheduler.schedule(schemaWithAwares, schemaWithAwares.dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private static void destroySchedule(){
+ if(scheduler != null){
+ try {
+ scheduler.shutdownNow();
+ LOG.info("destroy SchemaUpdateScheduler");
+ } catch (Exception e) {
+ LOG.error("shutdown error", e);
+ }
+ scheduler = null;
+ }
+ }
+
+ private static class DynamicSchemaWithAwares implements Runnable{
+ DynamicSchema dynamicSchema;
+ private List<SchemaChangeAware> awares;
+
+ public DynamicSchemaWithAwares(DynamicSchema dynamicSchema) {
+ this.dynamicSchema = dynamicSchema;
+ awares = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ if(awares.isEmpty()){
+ return;
+ }
+
+ try {
+ update();
+ } catch (Throwable e) {
+ LOG.error("schema update error", e);
+ }
+
+ if(!awares.isEmpty()){
+ scheduler.schedule(this, dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void update() {
+ StructType dataType = dynamicSchema.updateDataType();
+ // 距离上次没有更新
+ if(dataType == null){
+ return;
+ }
+
+ LOG.warn("schema for {} change to:{}", dynamicSchema.getCacheKey(), dataType.simpleString());
+ for (SchemaChangeAware aware : awares) {
+ try {
+ aware.schemaChange(dataType);
+ } catch (Exception e) {
+ LOG.error("schema change aware error", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
index ba31bf9..f143f7f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java
@@ -8,4 +8,8 @@ import java.io.Serializable;
public interface SinkProvider extends Serializable {
DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream);
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
index f183ee9..4fc08dd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java
@@ -15,4 +15,8 @@ public interface SourceProvider extends Serializable {
default StructType schema(){
return getPhysicalDataType();
}
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
index 69c462d..affeead 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java
@@ -1,39 +1,56 @@
-package com.geedgenetworks.core.factories;
-
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.Configuration;
-
-import java.util.Map;
-
-public interface TableFactory extends Factory {
-
- public static class Context {
- private final StructType schema;
- private final StructType physicalDataType;
- private final Map<String, String> options;
- private final Configuration configuration;
-
- public Context(StructType schema, StructType physicalDataType, Map<String, String> options, Configuration configuration) {
- this.schema = schema;
- this.physicalDataType = physicalDataType;
- this.options = options;
- this.configuration = configuration;
- }
-
- public StructType getSchema() {
- return schema;
- }
-
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
-
- public Map<String, String> getOptions() {
- return options;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
- }
-}
+package com.geedgenetworks.core.factories;
+
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Map;
+
+public interface TableFactory extends Factory {
+
+ public static class Context {
+ private final Schema schema;
+ private final Map<String, String> options;
+ private final Configuration configuration;
+
+ public Context(Schema schema, Map<String, String> options, Configuration configuration) {
+ this.schema = schema;
+ this.options = options;
+ this.configuration = configuration;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public StructType getPhysicalDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public StructType getDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
index 2818474..66275d9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java
@@ -1,34 +1,43 @@
-package com.geedgenetworks.core.pojo;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class SinkConfig implements Serializable {
- private String type;
- private Map<String, String> properties;
- private String name;
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-}
+package com.geedgenetworks.core.pojo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class SinkConfig implements Serializable {
+ private String type;
+ private Map<String, Object> schema;
+ private Map<String, String> properties;
+ private String name;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, Object> getSchema() {
+ return schema;
+ }
+
+ public void setSchema(Map<String, Object> schema) {
+ this.schema = schema;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
index 99e2d85..6d019e9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java
@@ -8,8 +8,7 @@ import java.util.Map;
public class SourceConfig implements Serializable {
private String type;
- private List<Object> fields;
- private StructType dataType;
+ private Map<String, Object> schema;
private Map<String, String> properties;
private String name;
public String getType() {
@@ -20,20 +19,12 @@ public class SourceConfig implements Serializable {
this.type = type;
}
- public List<Object> getFields() {
- return fields;
+ public Map<String, Object> getSchema() {
+ return schema;
}
- public void setFields(List<Object> fields) {
- this.fields = fields;
- }
-
- public StructType getDataType() {
- return dataType;
- }
-
- public void setDataType(StructType dataType) {
- this.dataType = dataType;
+ public void setSchema(Map<String, Object> schema) {
+ this.schema = schema;
}
public Map<String, String> getProperties() {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
index 0bd1544..7cc3d3a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java
@@ -1,133 +1,144 @@
-package com.geedgenetworks.core.types;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.core.types.StructType.StructField;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class Types {
- public static final IntegerType INT = new IntegerType();
- public static final LongType BIGINT = new LongType();
- public static final StringType STRING = new StringType();
- public static final FloatType FLOAT = new FloatType();
- public static final DoubleType DOUBLE = new DoubleType();
- public static final BooleanType BOOLEAN = new BooleanType();
- public static final BinaryType BINARY = new BinaryType();
-
- public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
- public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
-
- public static StructType parseSchemaFromJson(String jsonFields) {
- JSONArray fieldArray = JSON.parseArray(jsonFields);
- StructField[] fields = new StructField[fieldArray.size()];
-
- for (int i = 0; i < fieldArray.size(); i++) {
- JSONObject fieldObject = fieldArray.getJSONObject(i);
- String name = fieldObject.getString("name").trim();
- String type = fieldObject.getString("type").trim();
- DataType dataType = parseDataType(type);
- fields[i] = new StructField(name, dataType);
- }
-
- return new StructType(fields);
- }
-
- public static DataType parseDataType(String type){
- type = type.trim();
- if("int".equalsIgnoreCase(type)){
- return INT;
- } else if ("bigint".equalsIgnoreCase(type)){
- return BIGINT;
- } else if ("string".equalsIgnoreCase(type)){
- return STRING;
- } else if ("float".equalsIgnoreCase(type)){
- return FLOAT;
- } else if ("double".equalsIgnoreCase(type)){
- return DOUBLE;
- } else if ("boolean".equalsIgnoreCase(type)){
- return BOOLEAN;
- } else if ("binary".equalsIgnoreCase(type)){
- return BINARY;
- }
-
- // array类型
- Matcher matcher = ARRAY_RE.matcher(type);
- if(matcher.matches()){
- String eleType = matcher.group(1);
- DataType elementType = parseDataType(eleType);
- return new ArrayType(elementType);
- }
-
- // struct类型
- matcher = STRUCT_RE.matcher(type);
- if(matcher.matches()){
- List<StructField> fields = new ArrayList<>();
- String str = matcher.group(1);
- int startPos = 0, endPos = -1;
- int i = startPos + 1;
- int level = 0;
- while (i < str.length()){
- while (i < str.length()){
- if(str.charAt(i) == ':'){
- endPos = i;
- break;
- }
- i++;
- }
-
- if(endPos <= startPos){
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- String name = str.substring(startPos, endPos).trim();
- startPos = i + 1;
- endPos = -1;
- i = startPos + 1;
- while (i < str.length()){
- if(str.charAt(i) == ',' && level == 0){
- endPos = i;
- break;
- }
- if(str.charAt(i) == '<'){
- level++;
- }
- if(str.charAt(i) == '>'){
- level--;
- }
- i++;
- }
-
- if(i == str.length()){
- endPos = i;
- }
- if(endPos <= startPos){
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- String tp = str.substring(startPos, endPos).trim();
- fields.add(new StructField(name, parseDataType(tp)));
-
- i++;
- startPos = i;
- endPos = -1;
- }
-
- return new StructType(fields.toArray(new StructField[fields.size()]));
- }
-
- throw new UnsupportedOperationException("不支持的类型:" + type);
- }
-
- static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
- if(dataType instanceof ArrayType){
- ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
- } else if (dataType instanceof StructType) {
- ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
- }
- }
-}
+package com.geedgenetworks.core.types;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.StructType.StructField;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Types {
+ public static final IntegerType INT = new IntegerType();
+ public static final LongType BIGINT = new LongType();
+ public static final StringType STRING = new StringType();
+ public static final FloatType FLOAT = new FloatType();
+ public static final DoubleType DOUBLE = new DoubleType();
+ public static final BooleanType BOOLEAN = new BooleanType();
+ public static final BinaryType BINARY = new BinaryType();
+
+ public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+ public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+
+ public static StructType parseSchemaFromJson(String jsonFields) {
+ JSONArray fieldArray = JSON.parseArray(jsonFields);
+ StructField[] fields = new StructField[fieldArray.size()];
+
+ for (int i = 0; i < fieldArray.size(); i++) {
+ JSONObject fieldObject = fieldArray.getJSONObject(i);
+ String name = fieldObject.getString("name").trim();
+ String type = fieldObject.getString("type").trim();
+ DataType dataType = parseDataType(type);
+ fields[i] = new StructField(name, dataType);
+ }
+
+ return new StructType(fields);
+ }
+
+ // 解析struct<>中的字段
+ public static StructType parseStructType(String str){
+ // 外面是否包含struct<>都能解析
+ Matcher matcher = STRUCT_RE.matcher(str);
+ if(matcher.matches()){
+ str = matcher.group(1);
+ }
+
+ List<StructField> fields = new ArrayList<>();
+ int startPos = 0, endPos = -1;
+ int i = startPos + 1;
+ int level = 0;
+ while (i < str.length()){
+ while (i < str.length()){
+ if(str.charAt(i) == ':'){
+ endPos = i;
+ break;
+ }
+ i++;
+ }
+
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String name = str.substring(startPos, endPos).trim();
+ startPos = i + 1;
+ endPos = -1;
+ i = startPos + 1;
+ while (i < str.length()){
+ if(str.charAt(i) == ',' && level == 0){
+ endPos = i;
+ break;
+ }
+ if(str.charAt(i) == '<'){
+ level++;
+ }
+ if(str.charAt(i) == '>'){
+ level--;
+ }
+ i++;
+ }
+
+ if(i == str.length()){
+ endPos = i;
+ }
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String tp = str.substring(startPos, endPos).trim();
+ fields.add(new StructField(name, parseDataType(tp)));
+
+ i++;
+ startPos = i;
+ endPos = -1;
+ }
+
+ return new StructType(fields.toArray(new StructField[fields.size()]));
+ }
+
+ public static DataType parseDataType(String type){
+ type = type.trim();
+ if("int".equalsIgnoreCase(type)){
+ return INT;
+ } else if ("bigint".equalsIgnoreCase(type)){
+ return BIGINT;
+ } else if ("string".equalsIgnoreCase(type)){
+ return STRING;
+ } else if ("float".equalsIgnoreCase(type)){
+ return FLOAT;
+ } else if ("double".equalsIgnoreCase(type)){
+ return DOUBLE;
+ } else if ("boolean".equalsIgnoreCase(type)){
+ return BOOLEAN;
+ } else if ("binary".equalsIgnoreCase(type)){
+ return BINARY;
+ }
+
+ // array类型
+ Matcher matcher = ARRAY_RE.matcher(type);
+ if(matcher.matches()){
+ String eleType = matcher.group(1);
+ DataType elementType = parseDataType(eleType);
+ return new ArrayType(elementType);
+ }
+
+ // struct类型
+ matcher = STRUCT_RE.matcher(type);
+ if(matcher.matches()){
+ String str = matcher.group(1);
+ return parseStructType(str);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
+ if(dataType instanceof ArrayType){
+ ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ } else if (dataType instanceof StructType) {
+ ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
index 2238aeb..dd2c710 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
@@ -101,7 +101,7 @@ public class HttpClientPoolUtil {
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
- log.info("request header : {}", h);
+ // log.info("request header : {}", h);
}
}
try(CloseableHttpResponse response = httpClient.execute(httpGet)) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java
new file mode 100644
index 0000000..fbeaed7
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SchemaParserTest {
+
+ public static void main(String[] args) throws Exception{
+ String str = FileUtils.readFileToString(new File("D:\\WorkSpace\\groot-stream\\session_record_schema.json"), StandardCharsets.UTF_8);
+ SchemaParser.Parser parser = SchemaParser.PARSER_AVRO;
+
+ StructType structType = parser.parser(str);
+ System.out.println(structType.treeString());
+
+ }
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java
new file mode 100644
index 0000000..b5b672e
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java
@@ -0,0 +1,218 @@
+package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.HttpDynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DynamicSchemaManagerTest {
+ static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManagerTest.class.getSimpleName());
+
+ public static void main(String[] args) throws Exception{
+ //testOneThread();
+ //testMultiThread();
+ testMultiThreadForHttpDynamicSchema();
+ }
+
+ public static void testOneThread() throws Exception{
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema11 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema2 = new RandomDynamicSchema( 1000 * 10, "Schema1", 0.9);
+
+ LOG.info("start");
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1");
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2");
+ PrintSchemaChangeAware aware11 = new PrintSchemaChangeAware("aware11");
+ PrintSchemaChangeAware aware22 = new PrintSchemaChangeAware("aware22");
+
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+
+ schema11.registerSchemaChangeAware(aware11);
+ schema11.registerSchemaChangeAware(aware22);
+
+
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 60 * 2);
+ schema1.unregisterSchemaChangeAware(aware1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema1.unregisterSchemaChangeAware(aware11);
+ schema1.unregisterSchemaChangeAware(aware22);
+
+ Thread.sleep(1000 * 20);
+ schema2.unregisterSchemaChangeAware(aware1);
+ schema2.unregisterSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 3);
+
+
+ schema1.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 60 * 1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema2.unregisterSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 3);
+ }
+
+ public static void testMultiThreadForHttpDynamicSchema() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ DynamicSchema schema1 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema1.getDataType());
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ DynamicSchema schema2 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema2.getDataType());
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static void testMultiThread() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema2 = new RandomDynamicSchema(1000 * 10, "Schema1", 0.9);
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static class PrintSchemaChangeAware implements SchemaChangeAware {
+ private final String name;
+
+ public PrintSchemaChangeAware(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void schemaChange(StructType dataType) {
+ String info = String.format("%s receive schema change:%s", name, dataType);
+ //System.out.println(info);
+ LOG.info(info);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ public static class RandomDynamicSchema extends DynamicSchema{
+ private final String key;
+ private final double probability;
+
+ public RandomDynamicSchema(long intervalMs, String identifier, double probability) {
+ super(null, intervalMs);
+ this.key = identifier + "_" + TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs));
+ this.probability = probability;
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ return null;
+ }
+
+ @Override
+ public StructType updateDataType() {
+ if(ThreadLocalRandom.current().nextDouble() < probability){
+ return (StructType) Types.parseDataType(String.format("struct<name_%s: string>", key));
+ }
+ return null;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
index 5a8fcb0..b328f01 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
@@ -1,7 +1,6 @@
sources:
kafka_source:
type: kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
index 7c448f6..61f4d9e 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -1,7 +1,6 @@
sources:
kafka_source:
type: kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
index 370b7a8..829741d 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
@@ -1,27 +1,28 @@
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
index a70f588..a5c5ece 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
@@ -1,27 +1,28 @@
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index d42c05a..cfd3917 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -1,27 +1,28 @@
sources:
inline_source:
type: inline
- fields:
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields:
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
index d3c46b7..e883bce 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
@@ -1,11 +1,12 @@
sources:
kafka_source:
type : kafka
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: client_ip
- type: string
- - name: server_ip
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
properties: # [object] Kafka source properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9092
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
index 32a3191..260e35a 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
@@ -18,14 +18,19 @@ public class JsonEventSerializationSchema implements SerializationSchema<Event>
}
};
private final StructType dataType;
+ private final JsonSerializer serializer;
public JsonEventSerializationSchema(StructType dataType) {
this.dataType = dataType;
+ this.serializer = dataType != null? new JsonSerializer(dataType): null;
}
@Override
public byte[] serialize(Event element) {
- // sink暂不支持类型推断, dataType为null
- return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ if(dataType == null){
+ return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ } else {
+ return serializer.serialize(element.getExtractedFields());
+ }
}
}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
new file mode 100644
index 0000000..fac90c8
--- /dev/null
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
@@ -0,0 +1,176 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSONWriter;
+import com.geedgenetworks.core.types.*;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonSerializer implements Serializable{
+
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+
+ public JsonSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = makeWriter(dataType);
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ try (JSONWriter writer = JSONWriter.ofUTF8()) {
+ if (data == null) {
+ writer.writeNull();
+ } else {
+ valueWriter.write(writer, data);
+ }
+ return writer.getBytes();
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return JsonSerializer::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return JsonSerializer::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return JsonSerializer::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return JsonSerializer::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return JsonSerializer::writeDouble;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (writer, obj) -> {
+ writeObject(writer, obj, fieldWriters);
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (writer, obj) -> {
+ writeArray(writer, obj, elementWriter);
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ static void writeString(JSONWriter writer, Object obj) {
+ writer.writeString(obj.toString());
+ }
+
+ static void writeInt(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeInt32(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ writer.writeInt32(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ static void writeLong(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeInt64(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ writer.writeInt64(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ static void writeFloat(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeFloat(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ writer.writeFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ static void writeDouble(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeDouble(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ writer.writeDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
+ if(obj instanceof Map){
+ Map<String, Object> map = (Map<String, Object>) obj;
+ writer.startObject();
+
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ /*if (key.startsWith("__")) {
+ continue;
+ }*/
+ value = entry.getValue();
+ if(value == null){
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if(valueWriter != null){
+ writer.writeName(key);
+ writer.writeColon();
+ valueWriter.write(writer, value);
+ }
+ }
+
+ writer.endObject();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ }
+
+ static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
+ if(obj instanceof List){
+ List<Object> list = (List<Object>) obj;
+ writer.startArray();
+
+ Object element;
+ for (int i = 0; i < list.size(); i++) {
+ if (i != 0) {
+ writer.writeComma();
+ }
+
+ element = list.get(i);
+ if (element == null) {
+ writer.writeNull();
+ continue;
+ }
+
+ elementWriter.write(writer, element);
+ }
+
+ writer.endArray();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(JSONWriter writer, Object obj);
+ }
+}
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
new file mode 100644
index 0000000..c61bf0a
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class JsonEventSerializationSchemaTest {
+
+
+ public static void main(String[] args) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("id", 1);
+ map.put("name", "aaa");
+ byte[] bytes = JSON.toJSONBytes(map);
+ System.out.println(bytes);
+ }
+
+} \ No newline at end of file
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
new file mode 100644
index 0000000..e5d6c10
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JsonSerializerTest {
+
+ @Test
+ public void testSerSimpleData(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("int", random.nextInt(1, Integer.MAX_VALUE));
+ map.put("int_null", null);
+ map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
+
+ map.put("int64", random.nextLong(1, Long.MAX_VALUE));
+ map.put("int64_null", null);
+ map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
+
+ map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
+ map.put("double_null", null);
+ map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
+
+ map.put("str", "ut8字符串");
+ map.put("str_null", null);
+ map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
+
+ map.put("int32_array", Arrays.asList(1, 3, 5));
+ map.put("int32_array_null", null);
+ map.put("int32_array_empty", Collections.emptyList());
+
+ map.put("int64_array", Arrays.asList(1, 3, 5));
+ map.put("int64_array_null", null);
+ map.put("int64_array_empty", Collections.emptyList());
+
+ map.put("str_array", Arrays.asList(1, 3, 5));
+
+ Map<String, Object> obj = new LinkedHashMap<>();
+ obj.put("id", 1);
+ obj.put("name", "name");
+ map.put("obj", obj);
+
+ List<Object> list = new ArrayList<>();
+ list.add(obj);
+ obj = new LinkedHashMap<>();
+ obj.put("id", 2);
+ obj.put("name", "name2");
+ list.add(obj);
+ map.put("obj_array", list);
+
+ StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
+ "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
+ " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+
+ byte[] bytes = serializer.serialize(map);
+ System.out.println(map);
+ System.out.println(bytes.length);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+
+ JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
+ Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(map);
+ System.out.println(rst);
+
+ System.out.println(serializer.serialize(rst).length);
+ System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+ }
+
+
+} \ No newline at end of file
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index 633f5d8..72f4ac9 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -34,14 +34,14 @@ class ProtobufFormatFactoryTest {
options.put("protobuf.message.name", messageName);
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
options = new HashMap<>();
options.put("format", "json");
configuration = Configuration.fromMap(options);
- context = new TableFactory.Context(null, null, options, configuration);
+ context = new TableFactory.Context( null, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/pom.xml b/pom.xml
index 266eaed..cf9731d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
<nacos.version>1.2.0</nacos.version>
<antlr4.version>4.8</antlr4.version>
<jcommander.version>1.81</jcommander.version>
+ <avro.version>1.9.1</avro.version>
<lombok.version>1.18.24</lombok.version>
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
@@ -274,6 +275,11 @@
<version>${hazelcast.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
<!-- flink dependencies -->
<dependency>