diff options
| author | 李奉超 <[email protected]> | 2024-03-13 06:44:33 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-03-13 06:44:33 +0000 |
| commit | 7347323b963fad7f972e6f872711207415c513ce (patch) | |
| tree | 375e393e7e5720fbd213818c9785a8d4e9c042b6 | |
| parent | 039d9c0d0bf081865a3fc8a9cdc4359e0bac6331 (diff) | |
| parent | 40fe896585953874c09576a576e057c41ffe1288 (diff) | |
Merge branch 'feature/dynamicschema' into 'develop'
Feature/dynamicschema
See merge request galaxy/platform/groot-stream!23
47 files changed, 1785 insertions, 547 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml index 1c8c5e9..a1a287d 100644 --- a/config/template/grootstream_job_debug.yaml +++ b/config/template/grootstream_job_debug.yaml @@ -1,7 +1,11 @@ sources: kafka_source: type : kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + # source table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9092 @@ -25,7 +29,6 @@ sources: inline_source: type : inline - fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","common_schema_type":"BASE"}' format: json @@ -42,9 +45,6 @@ sources: ipfix_source: type: ipfix -# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. -# - name: log_id -# type: bigint properties: port.range: 12345-12347 max.packet.size: 65535 @@ -220,6 +220,11 @@ postprocessing_pipelines: sinks: kafka_sink_a: type: kafka + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: topic: SESSION-RECORD-JSON kafka.bootstrap.servers: 192.168.44.12:9092 diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 0cb39ad..2606d56 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -22,27 +22,28 @@ sources: # [object] Define connector source inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology. type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. @@ -64,9 +65,6 @@ sources: # [object] Define connector source ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology. type: ipfix -# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. -# - name: log_id -# type: bigint properties: port.range: 12345-12347 max.packet.size: 65535 @@ -254,6 +252,11 @@ postprocessing_pipelines: # [object] Define Processors for postprocessing pipeli sinks: # [object] Define connector sink kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology. type: kafka + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: topic: SESSION-RECORD-A kafka.bootstrap.servers: 127.0.0.1:9092 diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 6df1e23..6bcc878 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -7,23 +7,68 @@ Source Connector contains some common core features, and each source connector s sources: ${source_name}: type: ${source_connector_type} - fields: - - name: ${field_name} - type: ${field_type} + # source table schema, config through fields or local_file or url + schema: + fields: + - name: ${field_name} + type: ${field_type} + # local_file: "schema path" + # url: "schema http url" properties: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | -| fields | Array of `Field` | No | - | The structure of the data, including field names and field types. | -| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. | +| Name | Type | Required | Default | Description | +|-------------------------------------|------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | +| schema | Map | No | - | The source table schema, config through fields or local_file or url. | +| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. | ## Schema Field Projection The source connector supports reading only specified fields from the data source. For example `KafkaSource` will read all content from topic and then use `fields` to filter unnecessary columns. The Schema Structure refer to [Schema Structure](../user-guide.md#schema-structure). +## Schema Config +Schema can config through fields or local_file or url. + +### fields +```yaml +schema: + # by array + fields: + - name: ${field_name} + type: ${field_type} +``` + +```yaml +schema: + # by sql + fields: "struct<field_name:field_type, ...>" + # can also without outer struct<> + # fields: "field_name:field_type, ..." +``` + +### local_file + +```yaml +schema: + # by array + fields: + local_file: "schema path" +``` + +### url +Retrieve updated schema from URL for cycle, support dynamic schema. Not all connector support dynamic schema. + +The connectors that currently support dynamic schema include: clickHouse sink. + +```yaml +schema: + # by array + fields: + url: "schema http url" +``` + # Sink Connector Sink Connector contains some common core features, and each sink connector supports them to varying degrees. @@ -33,12 +78,18 @@ Sink Connector contains some common core features, and each sink connector suppo sinks: ${sink_name}: type: ${sink_connector_type} + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. + schema: + fields: "struct<field_name:field_type, ...>" + # local_file: "schema path" + # url: "schema url" properties: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. | -| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. | +| Name | Type | Required | Default | Description | +|-------------------------------------|--------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. | +| schema | Map | No | - | The sink table schema, config through fields or local_file or url. | +| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. | diff --git a/docs/connector/sink/clickhouse.md b/docs/connector/sink/clickhouse.md index d794767..ac37d24 100644 --- a/docs/connector/sink/clickhouse.md +++ b/docs/connector/sink/clickhouse.md @@ -50,27 +50,28 @@ This example read data of inline test source and write to ClickHouse table `test sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 6793b21..92976d8 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -26,27 +26,28 @@ This example read data of inline test source and write to kafka topic `SESSION-R sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index 7aee40f..a0bedc9 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -2,21 +2,20 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; -import com.geedgenetworks.common.config.SourceConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.sink.SinkProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SinkTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.SinkConfig; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -70,8 +69,20 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); Map<String, String> options = sinkConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - TableFactory.Context context = new TableFactory.Context(null, null, options, configuration); + Schema schema = null; + if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){ + schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema()); + } + TableFactory.Context context = new TableFactory.Context(schema, options, configuration); SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); + if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ + throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName())); + } + if(schema != null){ + System.out.println(String.format("sink(%s) dataType:\n%s", sinkConfig.getName(), schema.getDataType().toString())); + System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString())); + } + DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream); if (node.getParallelism() > 0) { dataStreamSink.setParallelism(node.getParallelism()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index eee78a2..f46e3fc 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,26 +1,24 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SourceConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SourceTableFactory; import com.geedgenetworks.core.factories.TableFactory; import com.geedgenetworks.core.pojo.SourceConfig; -import com.geedgenetworks.core.types.StructType; -import com.geedgenetworks.core.types.Types; import com.google.common.collect.Maps; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -53,10 +51,6 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class); sourceConfig.setName(key); - if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){ - StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())); - sourceConfig.setDataType(schema); - } sourceConfigMap.put(key, sourceConfig); }); @@ -73,12 +67,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); Map<String, String> options = sourceConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); - StructType dataType = sourceConfig.getDataType(); - TableFactory.Context context = new TableFactory.Context(dataType, dataType, options, configuration); + Schema schema = null; + if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){ + schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema()); + } + TableFactory.Context context = new TableFactory.Context(schema, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); - if(dataType != null){ - System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), dataType.toString())); - System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), dataType.treeString())); + if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ + throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName())); + } + if(schema != null){ + System.out.println(String.format("source(%s) dataType:\n%s", sourceConfig.getName(), schema.getDataType().toString())); + System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString())); } sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java new file mode 100644 index 0000000..c3076b4 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java @@ -0,0 +1,73 @@ +package com.geedgenetworks.bootstrap.utils;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaConfigParse {
+ static final String KEY_BUILTIN = "fields";
+ static final String KEY_LOCAL_FILE = "local_file";
+ static final String KEY_HTTP = "url";
+
+ public static Schema parseSchemaConfig(Map<String, Object> schemaConfig){
+ if(schemaConfig == null && schemaConfig.isEmpty()){
+ return null;
+ }
+
+ int builtin = 0, localFile = 0, http = 0;
+ if(schemaConfig.containsKey(KEY_BUILTIN)){
+ builtin = 1;
+ }
+ if(schemaConfig.containsKey(KEY_LOCAL_FILE)){
+ localFile = 1;
+ }
+ if(schemaConfig.containsKey(KEY_HTTP)){
+ http = 1;
+ }
+ if(builtin + localFile + http > 1){
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support one type schema:" + schemaConfig);
+ }
+
+ if(builtin == 1){
+ Object fields = schemaConfig.get(KEY_BUILTIN);
+ if(fields instanceof List){
+ StructType dataType = Types.parseSchemaFromJson(JSON.toJSONString(fields));
+ return Schema.newSchema(dataType);
+ }else if(fields instanceof String){
+ StructType dataType = Types.parseStructType((String) fields);
+ return Schema.newSchema(dataType);
+ }else{
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "only support schema fields:" + fields);
+ }
+ }
+
+ if(localFile == 1){
+ String path = schemaConfig.get(KEY_LOCAL_FILE).toString();
+ try {
+ String content = FileUtils.readFileToString(new File(path), StandardCharsets.UTF_8);
+ StructType dataType = SchemaParser.PARSER_AVRO.parser(content);
+ return Schema.newSchema(dataType);
+ } catch (IOException e) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, "schema path read error:" + path, e);
+ }
+ }
+
+ if(http == 1){
+ String url = schemaConfig.get(KEY_HTTP).toString();
+ return Schema.newHttpDynamicSchema(url);
+ }
+
+ return null;
+ }
+}
diff --git a/groot-common/pom.xml b/groot-common/pom.xml index 8ff4910..d6463cb 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -42,6 +42,11 @@ </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>galaxy</artifactId> <exclusions> diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java index 9462808..96a73b9 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -1,6 +1,7 @@ package com.geedgenetworks.connectors.clickhouse; import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink; +import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.sink.SinkProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; @@ -32,8 +33,7 @@ public class ClickHouseTableFactory implements SinkTableFactory { helper.validateExcept(CONNECTION_INFO_PREFIX); // 校验参数 - // sink暂时没有dataType - //StructType dataType = context.getSchema(); + Schema schema = context.getSchema(); ReadableConfig config = context.getConfiguration(); String host = config.get(HOST); @@ -42,13 +42,18 @@ public class ClickHouseTableFactory implements SinkTableFactory { long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis(); Properties connInfo = getClickHouseConnInfo(context.getOptions()); - final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, connInfo); + final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchIntervalMs, host, table, connInfo); return new SinkProvider() { @Override public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { return dataStream.addSink(sinkFunction); } + + @Override + public boolean supportDynamicSchema() { + return true; + } }; } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index ce58cda..18ce5b4 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -196,6 +196,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
void onInit(Configuration parameters) throws Exception {}
+ void onClose() throws Exception {}
abstract boolean addBatch(Block batch, T data) throws Exception;
@@ -327,7 +328,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun closed = true;
if (outThread != null) {
- outThread.join();
+ try {
+ outThread.join();
+ } catch (Exception e) {
+
+ }
}
// init中可能抛出异常
@@ -340,13 +345,19 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
// 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。
// ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。
- } catch (Exception e) {
- flushException = e;
+ } catch (Throwable t) {
+ if(t instanceof Exception){
+ flushException = (Exception) t;
+ }else{
+ flushException = new Exception(t);
+ }
} finally {
lock.unlock();
}
}
+ onClose();
+
LOG.warn("ck_sink_close_end");
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index 497f14c..c38324a 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -3,24 +3,51 @@ package com.geedgenetworks.connectors.clickhouse.sink; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.schema.Schema; +import com.geedgenetworks.core.connector.schema.SchemaChangeAware; +import com.geedgenetworks.core.types.StructType; import com.github.housepower.data.Block; +import org.apache.flink.configuration.Configuration; -import java.util.Map; -import java.util.Properties; +import java.util.*; +import java.util.stream.Collectors; -public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> { +public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> implements SchemaChangeAware { + private Schema schema; + private Set<String> disabledFields; + private String simpleName; - public EventBatchIntervalClickHouseSink( - int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { + public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { super(batchSize, batchIntervalMs, host, table, connInfo); + this.schema = schema; + } + + @Override + void onInit(Configuration parameters) throws Exception { + super.onInit(parameters); + simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask(); + if(schema != null){ + updateDisabledFields(schema.getDataType()); + if(schema.isDynamic()){ + Schema.registerSchemaChangeAware(schema, this); + } + } } @Override boolean addBatch(Block batch, Event event) throws Exception { Map<String, Object> map = event.getExtractedFields(); + String columnName; Object value; for (int i = 0; i < columnNames.length; i++) { - value = map.get(columnNames[i]); + columnName = columnNames[i]; + if(disabledFields != null && disabledFields.contains(columnName)){ + value = columnDefaultValues[i]; + batch.setObject(i, value); // 默认值不用转换 + continue; + } + + value = map.get(columnName); if (value == null) { value = columnDefaultValues[i]; @@ -39,4 +66,36 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick return true; } + + @Override + public void schemaChange(StructType dataType) { + if(schema != null && schema.isDynamic()){ + updateDisabledFields(dataType); + } + } + + @Override + void onClose() throws Exception { + super.onClose(); + if(schema != null && schema.isDynamic()){ + Schema.unregisterSchemaChangeAware(schema, this); + } + } + + private void updateDisabledFields(StructType dataType){ + Set<String> disabledFields = new HashSet<>(); + Set<String> names = Arrays.stream(dataType.fields).map(x -> x.name).collect(Collectors.toSet()); + for (String columnName : this.columnNames) { + if(!names.contains(columnName)){ + disabledFields.add(columnName); + } + } + LOG.info("disabledFields: {}", disabledFields); + this.disabledFields = disabledFields.isEmpty()?null: disabledFields; + } + + @Override + public String toString() { + return simpleName; + } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index a50c1e4..761d1ad 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2; +import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer; import java.util.Optional; import java.util.Properties; @@ -36,7 +36,7 @@ public class KafkaSinkProvider implements SinkProvider { @Override public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { - FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>( + GrootFlinkKafkaProducer<Event> kafkaProducer = new GrootFlinkKafkaProducer<>( topic, valueSerialization, properties, diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index ce0ddf8..ad34557 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2; +import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaConsumer; import java.util.List; import java.util.Properties; @@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider { @Override public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { - FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>( + GrootFlinkKafkaConsumer<Event> kafkaConsumer = new GrootFlinkKafkaConsumer<>( topics, new EventKafkaDeserializationSchema(valueDeserialization), properties diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index b26c161..8829076 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -51,7 +51,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
- StructType dataType = context.getSchema();
+ StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
String topic = config.get(TOPIC).get(0);
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java index d37121d..4721969 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java @@ -6,7 +6,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.GrootKafkaFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.SerializedValue;
@@ -15,9 +15,9 @@ import java.util.Map; import java.util.Properties;
@PublicEvolving
-public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+public class GrootFlinkKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
- public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ public GrootFlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
@@ -36,7 +36,7 @@ public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> { // this overwrites whatever setting the user configured in the properties
adjustAutoCommitConfig(properties, offsetCommitMode);
- return new KafkaFetcher2<>(
+ return new GrootKafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarkStrategy,
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java index 818df4e..ff0bb34 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java @@ -77,15 +77,15 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Flink Sink to produce data into a Kafka topic. By default producer will use {@link - * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation. + * GrootFlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation. */ @PublicEvolving -public class FlinkKafkaProducer2<IN> +public class GrootFlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction< IN, - FlinkKafkaProducer2.KafkaTransactionState, - FlinkKafkaProducer2.KafkaTransactionContext> { + GrootFlinkKafkaProducer.KafkaTransactionState, + GrootFlinkKafkaProducer.KafkaTransactionContext> { /** * Semantics that can be chosen. @@ -99,13 +99,13 @@ public class FlinkKafkaProducer2<IN> * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction * that will be committed to Kafka on a checkpoint. * - * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link + * <p>In this mode {@link GrootFlinkKafkaProducer} sets up a pool of {@link * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created, - * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If - * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run + * which is committed on {@link GrootFlinkKafkaProducer#notifyCheckpointComplete(long)}. If + * checkpoint complete notifications are running late, {@link GrootFlinkKafkaProducer} can run * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent - * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and - * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from + * {@link GrootFlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and + * {@link GrootFlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from * the previous checkpoint. To decrease the chance of failing checkpoints there are four * options: * <li>decrease number of max concurrent checkpoints @@ -128,7 +128,7 @@ public class FlinkKafkaProducer2<IN> NONE } - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class); + private static final Logger LOG = LoggerFactory.getLogger(GrootFlinkKafkaProducer.class); private static final long serialVersionUID = 1L; @@ -142,8 +142,8 @@ public class FlinkKafkaProducer2<IN> * This coefficient determines what is the safe scale down factor. * * <p>If the Flink application previously failed before first checkpoint completed or we are - * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the - * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used + * starting new batch of {@link GrootFlinkKafkaProducer} from scratch without clean shutdown of the + * previous one, {@link GrootFlinkKafkaProducer} doesn't know what was the set of previously used * Kafka's transactionalId's. In that case, it will try to play safe and abort all of the * possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() * * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) } @@ -158,7 +158,7 @@ public class FlinkKafkaProducer2<IN> /** * Default number of KafkaProducers in the pool. See {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}. + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}. */ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; @@ -174,27 +174,27 @@ public class FlinkKafkaProducer2<IN> * NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2. */ @Deprecated - private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = new ListStateDescriptor<>( "next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class)); - private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 = new ListStateDescriptor<>( "next-transactional-id-hint-v2", new NextTransactionalIdHintSerializer()); /** State for nextTransactionalIdHint. */ - private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint> + private transient ListState<GrootFlinkKafkaProducer.NextTransactionalIdHint> nextTransactionalIdHintState; /** Generator for Transactional IDs. */ private transient TransactionalIdsGenerator transactionalIdsGenerator; /** Hint for picking next transactional id. */ - private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint; + private transient GrootFlinkKafkaProducer.NextTransactionalIdHint nextTransactionalIdHint; /** User defined properties for the Producer. */ protected final Properties producerConfig; @@ -236,7 +236,7 @@ public class FlinkKafkaProducer2<IN> private boolean logFailuresOnly; /** Semantic chosen for this instance. */ - protected FlinkKafkaProducer2.Semantic semantic; + protected GrootFlinkKafkaProducer.Semantic semantic; // -------------------------------- Runtime fields ------------------------------------------ @@ -263,7 +263,7 @@ public class FlinkKafkaProducer2<IN> * @param topicId ID of the Kafka topic. * @param serializationSchema User defined (keyless) serialization schema. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); } @@ -275,14 +275,14 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * SerializationSchema, Properties, Optional)} instead. * * @param topicId ID of the Kafka topic. * @param serializationSchema User defined key-less serialization schema. * @param producerConfig Properties with the producer configuration. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { @@ -311,7 +311,7 @@ public class FlinkKafkaProducer2<IN> * partitions. If a partitioner is not provided, records will be distributed to Kafka * partitions in a round-robin fashion. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, @@ -343,16 +343,16 @@ public class FlinkKafkaProducer2<IN> * partitions. If a partitioner is not provided, records will be distributed to Kafka * partitions in a round-robin fashion. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( topicId, @@ -374,17 +374,17 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * KeyedSerializationSchema, Properties, Optional)} instead. * * @param brokerList Comma separated addresses of the brokers * @param topicId ID of the Kafka topic. * @param serializationSchema User defined serialization schema supporting key/value messages - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { this( topicId, @@ -400,17 +400,17 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * KeyedSerializationSchema, Properties, Optional)} instead. * * @param topicId ID of the Kafka topic. * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { @@ -432,16 +432,16 @@ public class FlinkKafkaProducer2<IN> * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * GrootFlinkKafkaProducer.Semantic}). + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic) { + GrootFlinkKafkaProducer.Semantic semantic) { this( topicId, serializationSchema, @@ -472,11 +472,11 @@ public class FlinkKafkaProducer2<IN> * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the * keys are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, @@ -486,7 +486,7 @@ public class FlinkKafkaProducer2<IN> serializationSchema, producerConfig, customPartitioner, - FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE, + GrootFlinkKafkaProducer.Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } @@ -512,19 +512,19 @@ public class FlinkKafkaProducer2<IN> * keys are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( defaultTopicId, @@ -537,7 +537,7 @@ public class FlinkKafkaProducer2<IN> } /** - * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the + * Creates a {@link GrootFlinkKafkaProducer} for a given topic. The sink produces its input to the * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link * ProducerRecord}, including partitioning information. * @@ -547,13 +547,13 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic) { + GrootFlinkKafkaProducer.Semantic semantic) { this( defaultTopic, serializationSchema, @@ -573,15 +573,15 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( defaultTopic, @@ -617,21 +617,21 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - private FlinkKafkaProducer2( + private GrootFlinkKafkaProducer( String defaultTopic, KeyedSerializationSchema<IN> keyedSchema, FlinkKafkaPartitioner<IN> customPartitioner, KafkaSerializationSchema<IN> kafkaSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { super( - new FlinkKafkaProducer2.TransactionStateSerializer(), - new FlinkKafkaProducer2.ContextStateSerializer()); + new GrootFlinkKafkaProducer.TransactionStateSerializer(), + new GrootFlinkKafkaProducer.ContextStateSerializer()); this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null"); @@ -711,7 +711,7 @@ public class FlinkKafkaProducer2<IN> // Enable transactionTimeoutWarnings to avoid silent data loss // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): // The KafkaProducer may not throw an exception if the transaction failed to commit - if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + if (semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long transactionTimeout; @@ -771,7 +771,7 @@ public class FlinkKafkaProducer2<IN> * attempt at least one commit of the transaction before giving up. */ @Override - public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() { + public GrootFlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() { super.ignoreFailuresAfterTransactionTimeout(); return this; } @@ -850,7 +850,7 @@ public class FlinkKafkaProducer2<IN> @Override public void invoke( - FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context) + GrootFlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous(); @@ -976,24 +976,24 @@ public class FlinkKafkaProducer2<IN> // ------------------- Logic for handling checkpoint flushing -------------------------- // @Override - protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction() + protected GrootFlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException { switch (semantic) { case EXACTLY_ONCE: FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer(); producer.beginTransaction(); - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( producer.getTransactionalId(), producer); case AT_LEAST_ONCE: case NONE: // Do not create new producer on each beginTransaction() if it is not necessary - final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = + final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null && currentTransaction.producer != null) { - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( currentTransaction.producer); } - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( initNonTransactionalProducer(true)); default: throw new UnsupportedOperationException("Not implemented semantic"); @@ -1001,7 +1001,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) + protected void preCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException { switch (semantic) { case EXACTLY_ONCE: @@ -1017,7 +1017,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void commit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try { transaction.producer.commitTransaction(); @@ -1028,7 +1028,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void recoverAndCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { @@ -1051,7 +1051,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void abort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { transaction.producer.abortTransaction(); recycleTransactionalProducer(transaction.producer); @@ -1059,7 +1059,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void recoverAndAbort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { @@ -1087,7 +1087,7 @@ public class FlinkKafkaProducer2<IN> * * @param transaction */ - private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction) + private void flush(GrootFlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException { if (transaction.producer != null) { transaction.producer.flush(); @@ -1111,7 +1111,7 @@ public class FlinkKafkaProducer2<IN> // Otherwise all of the // subtasks would write exactly same information. if (getRuntimeContext().getIndexOfThisSubtask() == 0 - && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + && semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { checkState( nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE"); @@ -1129,7 +1129,7 @@ public class FlinkKafkaProducer2<IN> } nextTransactionalIdHintState.add( - new FlinkKafkaProducer2.NextTransactionalIdHint( + new GrootFlinkKafkaProducer.NextTransactionalIdHint( getRuntimeContext().getNumberOfParallelSubtasks(), nextFreeTransactionalId)); } @@ -1137,13 +1137,13 @@ public class FlinkKafkaProducer2<IN> @Override public void initializeState(FunctionInitializationContext context) throws Exception { - if (semantic != FlinkKafkaProducer2.Semantic.NONE + if (semantic != GrootFlinkKafkaProducer.Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { LOG.warn( "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, - FlinkKafkaProducer2.Semantic.NONE); - semantic = FlinkKafkaProducer2.Semantic.NONE; + GrootFlinkKafkaProducer.Semantic.NONE); + semantic = GrootFlinkKafkaProducer.Semantic.NONE; } nextTransactionalIdHintState = @@ -1177,16 +1177,16 @@ public class FlinkKafkaProducer2<IN> kafkaProducersPoolSize, SAFE_SCALE_DOWN_FACTOR); - if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { nextTransactionalIdHint = null; } else { - ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints = + ArrayList<GrootFlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get()); if (transactionalIdHints.size() > 1) { throw new IllegalStateException( "There should be at most one next transactional id hint written by the first subtask"); } else if (transactionalIdHints.size() == 0) { - nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0); + nextTransactionalIdHint = new GrootFlinkKafkaProducer.NextTransactionalIdHint(0, 0); // this means that this is either: // (1) the first execution of this application @@ -1203,14 +1203,14 @@ public class FlinkKafkaProducer2<IN> } @Override - protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() { - if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + protected Optional<GrootFlinkKafkaProducer.KafkaTransactionContext> initializeUserContext() { + if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { return Optional.empty(); } Set<String> transactionalIds = generateNewTransactionalIds(); resetAvailableTransactionalIdsPool(transactionalIds); - return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds)); + return Optional.of(new GrootFlinkKafkaProducer.KafkaTransactionContext(transactionalIds)); } private Set<String> generateNewTransactionalIds() { @@ -1227,7 +1227,7 @@ public class FlinkKafkaProducer2<IN> @Override protected void finishRecoveringContext( - Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) { cleanUpUserContext(handledTransactions); resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds); LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds); @@ -1245,7 +1245,7 @@ public class FlinkKafkaProducer2<IN> * need further handling */ private void cleanUpUserContext( - Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) { if (!getUserContext().isPresent()) { return; } @@ -1300,7 +1300,7 @@ public class FlinkKafkaProducer2<IN> } int getTransactionCoordinatorId() { - final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction(); + final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction == null || currentTransaction.producer == null) { throw new IllegalArgumentException(); } @@ -1520,8 +1520,8 @@ public class FlinkKafkaProducer2<IN> return false; } - FlinkKafkaProducer2.KafkaTransactionState that = - (FlinkKafkaProducer2.KafkaTransactionState) o; + GrootFlinkKafkaProducer.KafkaTransactionState that = + (GrootFlinkKafkaProducer.KafkaTransactionState) o; if (producerId != that.producerId) { return false; @@ -1544,7 +1544,7 @@ public class FlinkKafkaProducer2<IN> } /** - * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track + * Context associated to this instance of the {@link GrootFlinkKafkaProducer}. User for keeping track * of the transactionalIds. */ @VisibleForTesting @@ -1567,8 +1567,8 @@ public class FlinkKafkaProducer2<IN> return false; } - FlinkKafkaProducer2.KafkaTransactionContext that = - (FlinkKafkaProducer2.KafkaTransactionContext) o; + GrootFlinkKafkaProducer.KafkaTransactionContext that = + (GrootFlinkKafkaProducer.KafkaTransactionContext) o; return transactionalIds.equals(that.transactionalIds); } @@ -1581,12 +1581,12 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.KafkaTransactionState}. + * GrootFlinkKafkaProducer.KafkaTransactionState}. */ @VisibleForTesting @Internal public static class TransactionStateSerializer - extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> { + extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionState> { private static final long serialVersionUID = 1L; @@ -1596,20 +1596,20 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionState createInstance() { + public GrootFlinkKafkaProducer.KafkaTransactionState createInstance() { return null; } @Override - public FlinkKafkaProducer2.KafkaTransactionState copy( - FlinkKafkaProducer2.KafkaTransactionState from) { + public GrootFlinkKafkaProducer.KafkaTransactionState copy( + GrootFlinkKafkaProducer.KafkaTransactionState from) { return from; } @Override - public FlinkKafkaProducer2.KafkaTransactionState copy( - FlinkKafkaProducer2.KafkaTransactionState from, - FlinkKafkaProducer2.KafkaTransactionState reuse) { + public GrootFlinkKafkaProducer.KafkaTransactionState copy( + GrootFlinkKafkaProducer.KafkaTransactionState from, + GrootFlinkKafkaProducer.KafkaTransactionState reuse) { return from; } @@ -1620,7 +1620,7 @@ public class FlinkKafkaProducer2<IN> @Override public void serialize( - FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target) + GrootFlinkKafkaProducer.KafkaTransactionState record, DataOutputView target) throws IOException { if (record.transactionalId == null) { target.writeBoolean(false); @@ -1633,7 +1633,7 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(DataInputView source) throws IOException { String transactionalId = null; if (source.readBoolean()) { @@ -1641,13 +1641,13 @@ public class FlinkKafkaProducer2<IN> } long producerId = source.readLong(); short epoch = source.readShort(); - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( transactionalId, producerId, epoch, null); } @Override - public FlinkKafkaProducer2.KafkaTransactionState deserialize( - FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionState deserialize( + GrootFlinkKafkaProducer.KafkaTransactionState reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -1666,7 +1666,7 @@ public class FlinkKafkaProducer2<IN> // ----------------------------------------------------------------------------------- @Override - public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> + public TypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() { return new TransactionStateSerializerSnapshot(); } @@ -1674,7 +1674,7 @@ public class FlinkKafkaProducer2<IN> /** Serializer configuration snapshot for compatibility and format evolution. */ @SuppressWarnings("WeakerAccess") public static final class TransactionStateSerializerSnapshot - extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> { + extends SimpleTypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> { public TransactionStateSerializerSnapshot() { super(TransactionStateSerializer::new); @@ -1684,12 +1684,12 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.KafkaTransactionContext}. + * GrootFlinkKafkaProducer.KafkaTransactionContext}. */ @VisibleForTesting @Internal public static class ContextStateSerializer - extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> { + extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionContext> { private static final long serialVersionUID = 1L; @@ -1699,20 +1699,20 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionContext createInstance() { + public GrootFlinkKafkaProducer.KafkaTransactionContext createInstance() { return null; } @Override - public FlinkKafkaProducer2.KafkaTransactionContext copy( - FlinkKafkaProducer2.KafkaTransactionContext from) { + public GrootFlinkKafkaProducer.KafkaTransactionContext copy( + GrootFlinkKafkaProducer.KafkaTransactionContext from) { return from; } @Override - public FlinkKafkaProducer2.KafkaTransactionContext copy( - FlinkKafkaProducer2.KafkaTransactionContext from, - FlinkKafkaProducer2.KafkaTransactionContext reuse) { + public GrootFlinkKafkaProducer.KafkaTransactionContext copy( + GrootFlinkKafkaProducer.KafkaTransactionContext from, + GrootFlinkKafkaProducer.KafkaTransactionContext reuse) { return from; } @@ -1723,7 +1723,7 @@ public class FlinkKafkaProducer2<IN> @Override public void serialize( - FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target) + GrootFlinkKafkaProducer.KafkaTransactionContext record, DataOutputView target) throws IOException { int numIds = record.transactionalIds.size(); target.writeInt(numIds); @@ -1733,19 +1733,19 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(DataInputView source) throws IOException { int numIds = source.readInt(); Set<String> ids = new HashSet<>(numIds); for (int i = 0; i < numIds; i++) { ids.add(source.readUTF()); } - return new FlinkKafkaProducer2.KafkaTransactionContext(ids); + return new GrootFlinkKafkaProducer.KafkaTransactionContext(ids); } @Override - public FlinkKafkaProducer2.KafkaTransactionContext deserialize( - FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize( + GrootFlinkKafkaProducer.KafkaTransactionContext reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -1830,7 +1830,7 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.NextTransactionalIdHint}. + * GrootFlinkKafkaProducer.NextTransactionalIdHint}. */ @VisibleForTesting @Internal diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java index 4000079..2cfc473 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java @@ -6,16 +6,14 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import java.util.Map;
import java.util.Properties;
-public class KafkaFetcher2<T> extends KafkaFetcher<T> {
- public KafkaFetcher2(
+public class GrootKafkaFetcher<T> extends KafkaFetcher<T> {
+ public GrootKafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java index 5882c9d..3bc4910 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java @@ -37,7 +37,7 @@ public class PrintTableFactory implements SinkTableFactory { helper.validate(); // 校验参数 - StructType dataType = context.getSchema(); + StructType dataType = context.getDataType(); ReadableConfig config = context.getConfiguration(); PrintMode printMode = Optional.ofNullable(PrintMode.fromName(config.get(MODE))).orElse(STDOUT); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java new file mode 100644 index 0000000..1b4f755 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/DynamicSchema.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public abstract class DynamicSchema implements Schema {
+ protected SchemaParser.Parser parser;
+ protected StructType dataType;
+ private String contentMd5;
+ protected final long intervalMs;
+
+ public DynamicSchema(SchemaParser.Parser parser, long intervalMs) {
+ this.parser = parser;
+ this.intervalMs = intervalMs;
+ }
+
+ public abstract String getCacheKey();
+ protected abstract String getDataTypeContent();
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ protected boolean parseDataType(String _content){
+ checkArgument(StringUtils.isNotBlank(_content), "DataType is null");
+ String _contentMd5 = computeMd5(_content);
+ if(_contentMd5.equals(contentMd5)){
+ return false;
+ }
+
+ StructType type;
+ if(dataType == null){
+ type = parser.parser(_content);
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+
+ type = parser.parser(_content);
+ if(dataType.equals(type)){
+ return false;
+ }else{
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+ }
+
+ // 更新并返回更新后的dataType, 如果没有更新返回null
+ public StructType updateDataType(){
+ String content = getDataTypeContent();
+ if(StringUtils.isBlank(content)){
+ return null;
+ }
+ if(parseDataType(content)){
+ return dataType;
+ }
+ return null;
+ }
+
+ final public void registerSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.registerSchemaChangeAware(this, aware);
+ }
+
+ final public void unregisterSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.unregisterSchemaChangeAware(this, aware);
+ }
+
+ String computeMd5(String text){
+ return DigestUtils.md5Hex(text);
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return true;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java new file mode 100644 index 0000000..5eb6b87 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/HttpDynamicSchema.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.utils.HttpClientPoolUtil;
+import com.geedgenetworks.shaded.org.apache.http.Header;
+import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class HttpDynamicSchema extends DynamicSchema{
+ static final Logger LOG = LoggerFactory.getLogger(HttpDynamicSchema.class);
+ private static final Header header = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
+ private final String url;
+ private final String key;
+ public HttpDynamicSchema(String url, SchemaParser.Parser parser, long intervalMs) {
+ super(parser, intervalMs);
+ checkNotNull(url);
+ this.url = url;
+ this.key = String.format("%s_%s", url, TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs)));
+ parseDataType(getDataTypeContent());
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ try {
+ String response = HttpClientPoolUtil.getInstance().httpGet(URI.create(url), header);
+ return response;
+ } catch (Exception e) {
+ LOG.error("request " + url + " error", e);
+ return null;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java new file mode 100644 index 0000000..6bd6764 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/Schema.java @@ -0,0 +1,35 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public interface Schema extends Serializable {
+ StructType getDataType();
+
+ boolean isDynamic();
+
+ public static Schema newSchema(StructType dataType){
+ return new StaticSchema(dataType);
+ }
+
+ public static Schema newHttpDynamicSchema(String url){
+ HttpDynamicSchema dynamicSchema = new HttpDynamicSchema(url, SchemaParser.PARSER_AVRO, 1000 * 60 * 30);
+ checkArgument(dynamicSchema.getDataType() != null);
+ return dynamicSchema;
+ }
+
+ public static void registerSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.registerSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+
+ public static void unregisterSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.unregisterSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java new file mode 100644 index 0000000..a70df24 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaChangeAware.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public interface SchemaChangeAware {
+ void schemaChange(StructType dataType);
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java new file mode 100644 index 0000000..a2fcc21 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/SchemaParser.java @@ -0,0 +1,112 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.ArrayType;
+import com.geedgenetworks.core.types.DataType;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.StructType.StructField;
+import com.geedgenetworks.core.types.Types;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaParser {
+ public static final String TYPE_BUILTIN = "builtin";
+ public static final String TYPE_AVRO = "avro";
+
+ public static final Parser PARSER_BUILTIN = new BuiltinParser();
+ public static final Parser PARSER_AVRO = new AvroParser();
+
+
+ public static Parser getParser(String type){
+ if(TYPE_BUILTIN.equals(type)){
+ return PARSER_BUILTIN;
+ }else if(TYPE_AVRO.equals(type)){
+ return PARSER_AVRO;
+ }
+
+ throw new UnsupportedOperationException("not supported parser:" + type);
+ }
+
+ public static class BuiltinParser implements Parser{
+ @Override
+ public StructType parser(String content){
+ if(JSON.isValidArray(content)){
+ return Types.parseSchemaFromJson(content);
+ }else{
+ return Types.parseStructType(content);
+ }
+ // throw new IllegalArgumentException("can not parse schema for:" + content);
+ }
+ }
+
+ public static class AvroParser implements Parser{
+ @Override
+ public StructType parser(String content) {
+ org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(content);
+ Set<String> disabledFields = getDisabledFields(JSON.parseObject(content).getJSONArray("fields"));
+ return convert2StructType(schema, disabledFields);
+ }
+
+ private StructType convert2StructType(org.apache.avro.Schema schema, Set<String> disabledFields){
+ List<org.apache.avro.Schema.Field> fields = schema.getFields();
+ List<StructField> _fields = new ArrayList<>(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).name();
+ if(disabledFields.contains(fieldName)){
+ continue;
+ }
+ org.apache.avro.Schema fieldSchema = fields.get(i).schema();
+ _fields.add(new StructField(fieldName, convert(fieldSchema)));
+ }
+ return new StructType(_fields.toArray(new StructField[_fields.size()]));
+ }
+
+ private DataType convert(org.apache.avro.Schema schema){
+ switch (schema.getType()){
+ case INT:
+ return Types.INT;
+ case LONG:
+ return Types.BIGINT;
+ case FLOAT:
+ return Types.FLOAT;
+ case DOUBLE:
+ return Types.DOUBLE;
+ case BOOLEAN:
+ return Types.BOOLEAN;
+ case STRING:
+ return Types.STRING;
+ case BYTES:
+ return Types.BINARY;
+ case ARRAY:
+ return new ArrayType(convert(schema.getElementType()));
+ case RECORD:
+ return convert2StructType(schema, Collections.EMPTY_SET);
+ default:
+ throw new UnsupportedOperationException(schema.toString());
+ }
+ }
+
+ private Set<String> getDisabledFields(JSONArray fields){
+ Set<String> disabledFields = new HashSet<>();
+ JSONObject fieldObject;
+ JSONObject doc;
+ for (int i = 0; i < fields.size(); i++) {
+ fieldObject = fields.getJSONObject(i);
+ doc = fieldObject.getJSONObject("doc");
+ // 过滤禁用的字段
+ if(doc != null && "disabled".equals(doc.getString("visibility"))){
+ disabledFields.add(fieldObject.getString("name"));
+ }
+ }
+ return disabledFields;
+ }
+ }
+
+ public interface Parser extends Serializable {
+ StructType parser(String content);
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java new file mode 100644 index 0000000..ab6893d --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/StaticSchema.java @@ -0,0 +1,21 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+
+public class StaticSchema implements Schema{
+ private final StructType dataType;
+
+ public StaticSchema(StructType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return false;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java new file mode 100644 index 0000000..0ee04d2 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManager.java @@ -0,0 +1,148 @@ +package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+public class DynamicSchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManager.class);
+ private static final Map<String, DynamicSchemaWithAwares> registeredSchemaWithAwares = new LinkedHashMap<>();
+ private static ScheduledExecutorService scheduler = null;
+
+ // 注册某个dynamicSchema的监听感知
+ public static synchronized void registerSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ schemaWithAwares = new DynamicSchemaWithAwares(dynamicSchema);
+ schedule(schemaWithAwares);
+ registeredSchemaWithAwares.put(key, schemaWithAwares);
+ LOG.info("start schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+
+ for (SchemaChangeAware registeredAware : schemaWithAwares.awares) {
+ if(registeredAware == aware){
+ LOG.error("aware({}) for {} has already registered", aware, key);
+ return;
+ }
+ }
+
+ schemaWithAwares.awares.add(aware);
+ LOG.info("register aware({}) for {}", aware, key);
+ }
+
+ // 注销某个dynamicSchema的监听感知
+ public static synchronized void unregisterSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ LOG.error("not register aware for {}", key);
+ return;
+ }
+
+ Iterator<SchemaChangeAware> iter = schemaWithAwares.awares.iterator();
+ SchemaChangeAware registeredAware;
+ boolean find = false;
+ while (iter.hasNext()){
+ registeredAware = iter.next();
+ if(registeredAware == aware){
+ iter.remove();
+ find = true;
+ break;
+ }
+ }
+
+ if(find){
+ LOG.info("unregister aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ if(schemaWithAwares.awares.isEmpty()){
+ registeredSchemaWithAwares.remove(key);
+ LOG.info("stop schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+ if(registeredSchemaWithAwares.isEmpty()){
+ destroySchedule();
+ }
+ }else{
+ LOG.error("can not find register aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ }
+ }
+
+ private static void schedule(DynamicSchemaWithAwares schemaWithAwares){
+ if(scheduler == null){
+ scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("DynamicSchemaUpdateScheduler"));
+ LOG.info("create SchemaUpdateScheduler");
+ }
+ scheduler.schedule(schemaWithAwares, schemaWithAwares.dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private static void destroySchedule(){
+ if(scheduler != null){
+ try {
+ scheduler.shutdownNow();
+ LOG.info("destroy SchemaUpdateScheduler");
+ } catch (Exception e) {
+ LOG.error("shutdown error", e);
+ }
+ scheduler = null;
+ }
+ }
+
+ private static class DynamicSchemaWithAwares implements Runnable{
+ DynamicSchema dynamicSchema;
+ private List<SchemaChangeAware> awares;
+
+ public DynamicSchemaWithAwares(DynamicSchema dynamicSchema) {
+ this.dynamicSchema = dynamicSchema;
+ awares = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ if(awares.isEmpty()){
+ return;
+ }
+
+ try {
+ update();
+ } catch (Throwable e) {
+ LOG.error("schema update error", e);
+ }
+
+ if(!awares.isEmpty()){
+ scheduler.schedule(this, dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void update() {
+ StructType dataType = dynamicSchema.updateDataType();
+ // 距离上次没有更新
+ if(dataType == null){
+ return;
+ }
+
+ LOG.warn("schema for {} change to:{}", dynamicSchema.getCacheKey(), dataType.simpleString());
+ for (SchemaChangeAware aware : awares) {
+ try {
+ aware.schemaChange(dataType);
+ } catch (Exception e) {
+ LOG.error("schema change aware error", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java index ba31bf9..f143f7f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/sink/SinkProvider.java @@ -8,4 +8,8 @@ import java.io.Serializable; public interface SinkProvider extends Serializable { DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream); + + default boolean supportDynamicSchema(){ + return false; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java index f183ee9..4fc08dd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/source/SourceProvider.java @@ -15,4 +15,8 @@ public interface SourceProvider extends Serializable { default StructType schema(){ return getPhysicalDataType(); } + + default boolean supportDynamicSchema(){ + return false; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java index 69c462d..affeead 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/factories/TableFactory.java @@ -1,39 +1,56 @@ -package com.geedgenetworks.core.factories; - -import com.geedgenetworks.core.types.StructType; -import org.apache.flink.configuration.Configuration; - -import java.util.Map; - -public interface TableFactory extends Factory { - - public static class Context { - private final StructType schema; - private final StructType physicalDataType; - private final Map<String, String> options; - private final Configuration configuration; - - public Context(StructType schema, StructType physicalDataType, Map<String, String> options, Configuration configuration) { - this.schema = schema; - this.physicalDataType = physicalDataType; - this.options = options; - this.configuration = configuration; - } - - public StructType getSchema() { - return schema; - } - - public StructType getPhysicalDataType() { - return physicalDataType; - } - - public Map<String, String> getOptions() { - return options; - } - - public Configuration getConfiguration() { - return configuration; - } - } -} +package com.geedgenetworks.core.factories;
+
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Map;
+
+public interface TableFactory extends Factory {
+
+ public static class Context {
+ private final Schema schema;
+ private final Map<String, String> options;
+ private final Configuration configuration;
+
+ public Context(Schema schema, Map<String, String> options, Configuration configuration) {
+ this.schema = schema;
+ this.options = options;
+ this.configuration = configuration;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public StructType getPhysicalDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public StructType getDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java index 2818474..66275d9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SinkConfig.java @@ -1,34 +1,43 @@ -package com.geedgenetworks.core.pojo; - -import java.io.Serializable; -import java.util.Map; - -public class SinkConfig implements Serializable { - private String type; - private Map<String, String> properties; - private String name; - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } -} +package com.geedgenetworks.core.pojo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class SinkConfig implements Serializable {
+ private String type;
+ private Map<String, Object> schema;
+ private Map<String, String> properties;
+ private String name;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, Object> getSchema() {
+ return schema;
+ }
+
+ public void setSchema(Map<String, Object> schema) {
+ this.schema = schema;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java index 99e2d85..6d019e9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SourceConfig.java @@ -8,8 +8,7 @@ import java.util.Map; public class SourceConfig implements Serializable { private String type; - private List<Object> fields; - private StructType dataType; + private Map<String, Object> schema; private Map<String, String> properties; private String name; public String getType() { @@ -20,20 +19,12 @@ public class SourceConfig implements Serializable { this.type = type; } - public List<Object> getFields() { - return fields; + public Map<String, Object> getSchema() { + return schema; } - public void setFields(List<Object> fields) { - this.fields = fields; - } - - public StructType getDataType() { - return dataType; - } - - public void setDataType(StructType dataType) { - this.dataType = dataType; + public void setSchema(Map<String, Object> schema) { + this.schema = schema; } public Map<String, String> getProperties() { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java index 0bd1544..7cc3d3a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/Types.java @@ -1,133 +1,144 @@ -package com.geedgenetworks.core.types; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.core.types.StructType.StructField; - -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class Types { - public static final IntegerType INT = new IntegerType(); - public static final LongType BIGINT = new LongType(); - public static final StringType STRING = new StringType(); - public static final FloatType FLOAT = new FloatType(); - public static final DoubleType DOUBLE = new DoubleType(); - public static final BooleanType BOOLEAN = new BooleanType(); - public static final BinaryType BINARY = new BinaryType(); - - public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE); - public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE); - - public static StructType parseSchemaFromJson(String jsonFields) { - JSONArray fieldArray = JSON.parseArray(jsonFields); - StructField[] fields = new StructField[fieldArray.size()]; - - for (int i = 0; i < fieldArray.size(); i++) { - JSONObject fieldObject = fieldArray.getJSONObject(i); - String name = fieldObject.getString("name").trim(); - String type = fieldObject.getString("type").trim(); - DataType dataType = parseDataType(type); - fields[i] = new StructField(name, dataType); - } - - return new StructType(fields); - } - - public static DataType parseDataType(String type){ - type = type.trim(); - if("int".equalsIgnoreCase(type)){ - return INT; - } else if ("bigint".equalsIgnoreCase(type)){ - return BIGINT; - } else if ("string".equalsIgnoreCase(type)){ - return STRING; - } else if ("float".equalsIgnoreCase(type)){ - return FLOAT; - } else if ("double".equalsIgnoreCase(type)){ - return DOUBLE; - } else if ("boolean".equalsIgnoreCase(type)){ - return BOOLEAN; - } else if ("binary".equalsIgnoreCase(type)){ - return BINARY; - } - - // array类型 - Matcher matcher = ARRAY_RE.matcher(type); - if(matcher.matches()){ - String eleType = matcher.group(1); - DataType elementType = parseDataType(eleType); - return new ArrayType(elementType); - } - - // struct类型 - matcher = STRUCT_RE.matcher(type); - if(matcher.matches()){ - List<StructField> fields = new ArrayList<>(); - String str = matcher.group(1); - int startPos = 0, endPos = -1; - int i = startPos + 1; - int level = 0; - while (i < str.length()){ - while (i < str.length()){ - if(str.charAt(i) == ':'){ - endPos = i; - break; - } - i++; - } - - if(endPos <= startPos){ - throw new UnsupportedOperationException("不支持的类型:" + type); - } - - String name = str.substring(startPos, endPos).trim(); - startPos = i + 1; - endPos = -1; - i = startPos + 1; - while (i < str.length()){ - if(str.charAt(i) == ',' && level == 0){ - endPos = i; - break; - } - if(str.charAt(i) == '<'){ - level++; - } - if(str.charAt(i) == '>'){ - level--; - } - i++; - } - - if(i == str.length()){ - endPos = i; - } - if(endPos <= startPos){ - throw new UnsupportedOperationException("不支持的类型:" + type); - } - - String tp = str.substring(startPos, endPos).trim(); - fields.add(new StructField(name, parseDataType(tp))); - - i++; - startPos = i; - endPos = -1; - } - - return new StructType(fields.toArray(new StructField[fields.size()])); - } - - throw new UnsupportedOperationException("不支持的类型:" + type); - } - - static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){ - if(dataType instanceof ArrayType){ - ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); - } else if (dataType instanceof StructType) { - ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); - } - } -} +package com.geedgenetworks.core.types;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.core.types.StructType.StructField;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Types {
+ public static final IntegerType INT = new IntegerType();
+ public static final LongType BIGINT = new LongType();
+ public static final StringType STRING = new StringType();
+ public static final FloatType FLOAT = new FloatType();
+ public static final DoubleType DOUBLE = new DoubleType();
+ public static final BooleanType BOOLEAN = new BooleanType();
+ public static final BinaryType BINARY = new BinaryType();
+
+ public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+ public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+
+ public static StructType parseSchemaFromJson(String jsonFields) {
+ JSONArray fieldArray = JSON.parseArray(jsonFields);
+ StructField[] fields = new StructField[fieldArray.size()];
+
+ for (int i = 0; i < fieldArray.size(); i++) {
+ JSONObject fieldObject = fieldArray.getJSONObject(i);
+ String name = fieldObject.getString("name").trim();
+ String type = fieldObject.getString("type").trim();
+ DataType dataType = parseDataType(type);
+ fields[i] = new StructField(name, dataType);
+ }
+
+ return new StructType(fields);
+ }
+
+ // 解析struct<>中的字段
+ public static StructType parseStructType(String str){
+ // 外面是否包含struct<>都能解析
+ Matcher matcher = STRUCT_RE.matcher(str);
+ if(matcher.matches()){
+ str = matcher.group(1);
+ }
+
+ List<StructField> fields = new ArrayList<>();
+ int startPos = 0, endPos = -1;
+ int i = startPos + 1;
+ int level = 0;
+ while (i < str.length()){
+ while (i < str.length()){
+ if(str.charAt(i) == ':'){
+ endPos = i;
+ break;
+ }
+ i++;
+ }
+
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String name = str.substring(startPos, endPos).trim();
+ startPos = i + 1;
+ endPos = -1;
+ i = startPos + 1;
+ while (i < str.length()){
+ if(str.charAt(i) == ',' && level == 0){
+ endPos = i;
+ break;
+ }
+ if(str.charAt(i) == '<'){
+ level++;
+ }
+ if(str.charAt(i) == '>'){
+ level--;
+ }
+ i++;
+ }
+
+ if(i == str.length()){
+ endPos = i;
+ }
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String tp = str.substring(startPos, endPos).trim();
+ fields.add(new StructField(name, parseDataType(tp)));
+
+ i++;
+ startPos = i;
+ endPos = -1;
+ }
+
+ return new StructType(fields.toArray(new StructField[fields.size()]));
+ }
+
+ public static DataType parseDataType(String type){
+ type = type.trim();
+ if("int".equalsIgnoreCase(type)){
+ return INT;
+ } else if ("bigint".equalsIgnoreCase(type)){
+ return BIGINT;
+ } else if ("string".equalsIgnoreCase(type)){
+ return STRING;
+ } else if ("float".equalsIgnoreCase(type)){
+ return FLOAT;
+ } else if ("double".equalsIgnoreCase(type)){
+ return DOUBLE;
+ } else if ("boolean".equalsIgnoreCase(type)){
+ return BOOLEAN;
+ } else if ("binary".equalsIgnoreCase(type)){
+ return BINARY;
+ }
+
+ // array类型
+ Matcher matcher = ARRAY_RE.matcher(type);
+ if(matcher.matches()){
+ String eleType = matcher.group(1);
+ DataType elementType = parseDataType(eleType);
+ return new ArrayType(elementType);
+ }
+
+ // struct类型
+ matcher = STRUCT_RE.matcher(type);
+ if(matcher.matches()){
+ String str = matcher.group(1);
+ return parseStructType(str);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
+ if(dataType instanceof ArrayType){
+ ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ } else if (dataType instanceof StructType) {
+ ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java index 2238aeb..dd2c710 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java @@ -101,7 +101,7 @@ public class HttpClientPoolUtil { if (StringUtil.isNotEmpty(headers)) { for (Header h : headers) { httpGet.addHeader(h); - log.info("request header : {}", h); + // log.info("request header : {}", h); } } try(CloseableHttpResponse response = httpClient.execute(httpGet)) { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java new file mode 100644 index 0000000..fbeaed7 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/SchemaParserTest.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.core.connector.schema;
+
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SchemaParserTest {
+
+ public static void main(String[] args) throws Exception{
+ String str = FileUtils.readFileToString(new File("D:\\WorkSpace\\groot-stream\\session_record_schema.json"), StandardCharsets.UTF_8);
+ SchemaParser.Parser parser = SchemaParser.PARSER_AVRO;
+
+ StructType structType = parser.parser(str);
+ System.out.println(structType.treeString());
+
+ }
+
+}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java new file mode 100644 index 0000000..b5b672e --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/connector/schema/utils/DynamicSchemaManagerTest.java @@ -0,0 +1,218 @@ +package com.geedgenetworks.core.connector.schema.utils;
+
+import com.geedgenetworks.core.connector.schema.DynamicSchema;
+import com.geedgenetworks.core.connector.schema.HttpDynamicSchema;
+import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.connector.schema.SchemaParser;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DynamicSchemaManagerTest {
+ static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManagerTest.class.getSimpleName());
+
+ public static void main(String[] args) throws Exception{
+ //testOneThread();
+ //testMultiThread();
+ testMultiThreadForHttpDynamicSchema();
+ }
+
+ public static void testOneThread() throws Exception{
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema11 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ RandomDynamicSchema schema2 = new RandomDynamicSchema( 1000 * 10, "Schema1", 0.9);
+
+ LOG.info("start");
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1");
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2");
+ PrintSchemaChangeAware aware11 = new PrintSchemaChangeAware("aware11");
+ PrintSchemaChangeAware aware22 = new PrintSchemaChangeAware("aware22");
+
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+ schema1.registerSchemaChangeAware(aware1);
+ schema1.registerSchemaChangeAware(aware2);
+
+ schema11.registerSchemaChangeAware(aware11);
+ schema11.registerSchemaChangeAware(aware22);
+
+
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ schema2.registerSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 60 * 2);
+ schema1.unregisterSchemaChangeAware(aware1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema1.unregisterSchemaChangeAware(aware11);
+ schema1.unregisterSchemaChangeAware(aware22);
+
+ Thread.sleep(1000 * 20);
+ schema2.unregisterSchemaChangeAware(aware1);
+ schema2.unregisterSchemaChangeAware(aware2);
+
+ Thread.sleep(1000 * 3);
+
+
+ schema1.registerSchemaChangeAware(aware2);
+ schema2.registerSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 60 * 1);
+ schema1.unregisterSchemaChangeAware(aware2);
+ schema2.unregisterSchemaChangeAware(aware1);
+ Thread.sleep(1000 * 3);
+ }
+
+ public static void testMultiThreadForHttpDynamicSchema() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ DynamicSchema schema1 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema1.getDataType());
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ DynamicSchema schema2 = new HttpDynamicSchema( "http://127.0.0.1/session_record_schema.json", SchemaParser.PARSER_AVRO, 1000 * 5);
+ System.out.println(schema2.getDataType());
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static void testMultiThread() throws Exception{
+ LOG.info("start");
+
+ List<Thread> threads = new ArrayList<>(10);
+ Thread thread;
+ for (int i = 0; i < 5; i++) {
+ int finalI = i + 1;
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema1 = new RandomDynamicSchema( 1000 * 5, "Schema1", 0.25);
+ PrintSchemaChangeAware aware1 = new PrintSchemaChangeAware("aware1_" + finalI);
+ schema1.registerSchemaChangeAware(aware1);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema1.unregisterSchemaChangeAware(aware1);
+ });
+ threads.add(thread);
+
+ thread = new Thread(() -> {
+ RandomDynamicSchema schema2 = new RandomDynamicSchema(1000 * 10, "Schema1", 0.9);
+ PrintSchemaChangeAware aware2 = new PrintSchemaChangeAware("aware2_" + finalI);
+ schema2.registerSchemaChangeAware(aware2);
+ try {
+ Thread.sleep(1000 * 60 * 1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ schema2.unregisterSchemaChangeAware(aware2);
+ });
+ threads.add(thread);
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.start();
+ }
+
+ for (int i = 0; i < threads.size(); i++) {
+ thread = threads.get(i);
+ thread.join();
+ }
+ Thread.sleep(1000 * 3);
+ LOG.info("done");
+ }
+
+ public static class PrintSchemaChangeAware implements SchemaChangeAware {
+ private final String name;
+
+ public PrintSchemaChangeAware(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void schemaChange(StructType dataType) {
+ String info = String.format("%s receive schema change:%s", name, dataType);
+ //System.out.println(info);
+ LOG.info(info);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ public static class RandomDynamicSchema extends DynamicSchema{
+ private final String key;
+ private final double probability;
+
+ public RandomDynamicSchema(long intervalMs, String identifier, double probability) {
+ super(null, intervalMs);
+ this.key = identifier + "_" + TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs));
+ this.probability = probability;
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ return null;
+ }
+
+ @Override
+ public StructType updateDataType() {
+ if(ThreadLocalRandom.current().nextDouble() < probability){
+ return (StructType) Types.parseDataType(String.format("struct<name_%s: string>", key));
+ }
+ return null;
+ }
+
+ }
+}
\ No newline at end of file diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml index 5a8fcb0..b328f01 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml @@ -1,7 +1,6 @@ sources: kafka_source: type: kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 7c448f6..61f4d9e 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -1,7 +1,6 @@ sources: kafka_source: type: kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml index 370b7a8..829741d 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml @@ -1,27 +1,28 @@ sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index a70f588..a5c5ece 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -1,27 +1,28 @@ sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index d42c05a..cfd3917 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -1,27 +1,28 @@ sources: inline_source: type: inline - fields: - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml index d3c46b7..e883bce 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml @@ -1,11 +1,12 @@ sources: kafka_source: type : kafka - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: client_ip - type: string - - name: server_ip - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string properties: # [object] Kafka source properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9092 diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java index 32a3191..260e35a 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java @@ -18,14 +18,19 @@ public class JsonEventSerializationSchema implements SerializationSchema<Event> } }; private final StructType dataType; + private final JsonSerializer serializer; public JsonEventSerializationSchema(StructType dataType) { this.dataType = dataType; + this.serializer = dataType != null? new JsonSerializer(dataType): null; } @Override public byte[] serialize(Event element) { - // sink暂不支持类型推断, dataType为null - return JSON.toJSONBytes(element.getExtractedFields(), proFilter); + if(dataType == null){ + return JSON.toJSONBytes(element.getExtractedFields(), proFilter); + } else { + return serializer.serialize(element.getExtractedFields()); + } } } diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java new file mode 100644 index 0000000..fac90c8 --- /dev/null +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java @@ -0,0 +1,176 @@ +package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSONWriter;
+import com.geedgenetworks.core.types.*;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonSerializer implements Serializable{
+
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+
+ public JsonSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = makeWriter(dataType);
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ try (JSONWriter writer = JSONWriter.ofUTF8()) {
+ if (data == null) {
+ writer.writeNull();
+ } else {
+ valueWriter.write(writer, data);
+ }
+ return writer.getBytes();
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return JsonSerializer::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return JsonSerializer::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return JsonSerializer::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return JsonSerializer::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return JsonSerializer::writeDouble;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (writer, obj) -> {
+ writeObject(writer, obj, fieldWriters);
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (writer, obj) -> {
+ writeArray(writer, obj, elementWriter);
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ static void writeString(JSONWriter writer, Object obj) {
+ writer.writeString(obj.toString());
+ }
+
+ static void writeInt(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeInt32(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ writer.writeInt32(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ static void writeLong(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeInt64(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ writer.writeInt64(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ static void writeFloat(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeFloat(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ writer.writeFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ static void writeDouble(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeDouble(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ writer.writeDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
+ if(obj instanceof Map){
+ Map<String, Object> map = (Map<String, Object>) obj;
+ writer.startObject();
+
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ /*if (key.startsWith("__")) {
+ continue;
+ }*/
+ value = entry.getValue();
+ if(value == null){
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if(valueWriter != null){
+ writer.writeName(key);
+ writer.writeColon();
+ valueWriter.write(writer, value);
+ }
+ }
+
+ writer.endObject();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ }
+
+ static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
+ if(obj instanceof List){
+ List<Object> list = (List<Object>) obj;
+ writer.startArray();
+
+ Object element;
+ for (int i = 0; i < list.size(); i++) {
+ if (i != 0) {
+ writer.writeComma();
+ }
+
+ element = list.get(i);
+ if (element == null) {
+ writer.writeNull();
+ continue;
+ }
+
+ elementWriter.write(writer, element);
+ }
+
+ writer.endArray();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(JSONWriter writer, Object obj);
+ }
+}
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java new file mode 100644 index 0000000..c61bf0a --- /dev/null +++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class JsonEventSerializationSchemaTest {
+
+
+ public static void main(String[] args) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("id", 1);
+ map.put("name", "aaa");
+ byte[] bytes = JSON.toJSONBytes(map);
+ System.out.println(bytes);
+ }
+
+}
\ No newline at end of file diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java new file mode 100644 index 0000000..e5d6c10 --- /dev/null +++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java @@ -0,0 +1,79 @@ +package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JsonSerializerTest {
+
+ @Test
+ public void testSerSimpleData(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("int", random.nextInt(1, Integer.MAX_VALUE));
+ map.put("int_null", null);
+ map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
+
+ map.put("int64", random.nextLong(1, Long.MAX_VALUE));
+ map.put("int64_null", null);
+ map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
+
+ map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
+ map.put("double_null", null);
+ map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
+
+ map.put("str", "ut8字符串");
+ map.put("str_null", null);
+ map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
+
+ map.put("int32_array", Arrays.asList(1, 3, 5));
+ map.put("int32_array_null", null);
+ map.put("int32_array_empty", Collections.emptyList());
+
+ map.put("int64_array", Arrays.asList(1, 3, 5));
+ map.put("int64_array_null", null);
+ map.put("int64_array_empty", Collections.emptyList());
+
+ map.put("str_array", Arrays.asList(1, 3, 5));
+
+ Map<String, Object> obj = new LinkedHashMap<>();
+ obj.put("id", 1);
+ obj.put("name", "name");
+ map.put("obj", obj);
+
+ List<Object> list = new ArrayList<>();
+ list.add(obj);
+ obj = new LinkedHashMap<>();
+ obj.put("id", 2);
+ obj.put("name", "name2");
+ list.add(obj);
+ map.put("obj_array", list);
+
+ StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
+ "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
+ " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+
+ byte[] bytes = serializer.serialize(map);
+ System.out.println(map);
+ System.out.println(bytes.length);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+
+ JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
+ Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(map);
+ System.out.println(rst);
+
+ System.out.println(serializer.serialize(rst).length);
+ System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+ }
+
+
+}
\ No newline at end of file diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java index 633f5d8..72f4ac9 100644 --- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java +++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java @@ -34,14 +34,14 @@ class ProtobufFormatFactoryTest { options.put("protobuf.message.name", messageName); Configuration configuration = Configuration.fromMap(options); - TableFactory.Context context = new TableFactory.Context(null, null, options, configuration); + TableFactory.Context context = new TableFactory.Context( null, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print"); options = new HashMap<>(); options.put("format", "json"); configuration = Configuration.fromMap(options); - context = new TableFactory.Context(null, null, options, configuration); + context = new TableFactory.Context( null, options, configuration); SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -60,6 +60,7 @@ <nacos.version>1.2.0</nacos.version> <antlr4.version>4.8</antlr4.version> <jcommander.version>1.81</jcommander.version> + <avro.version>1.9.1</avro.version> <lombok.version>1.18.24</lombok.version> <config.version>1.3.3</config.version> <hazelcast.version>5.1</hazelcast.version> @@ -274,6 +275,11 @@ <version>${hazelcast.version}</version> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> <!-- flink dependencies --> <dependency> |
