diff options
| author | lifengchao <[email protected]> | 2024-10-24 17:25:50 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-10-24 17:25:50 +0800 |
| commit | cd80b3884314d5d457c973f0ef99d589313e30e7 (patch) | |
| tree | 4a8635db5e9c57e49501927e1670f137670f10cd | |
| parent | 1d422a44bbaaa845f6bccb2b4751fa7dcac81144 (diff) | |
[feature][connector-kafka] GAL-681 kafka connector 支持配置解析kafka record headers
8 files changed, 83 insertions, 17 deletions
diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 716a179..78b7f34 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -20,19 +20,20 @@ In order to use the Kafka connector, the following dependencies are required. Th Kafka sink custom properties. if properties belongs to Kafka Producer Config, you can use `kafka.` prefix to set. -| Name | Type | Required | Default | Description | -|------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | -| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | -| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | -| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | -| [format].config | / | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | -| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | -| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | -| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | -| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| kafka.config | / | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | +| Name | Type | Required | Default | Description | +|-------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | +| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | +| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | +| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | +| [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | +| headers.config | Map | No | (none) | Kafka record headers info. exp: 'headers.key: value' will put key and value into record headers. | +| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | +| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | +| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | +| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| kafka.config | Map | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | ## Example diff --git a/docs/connector/source/kafka.md b/docs/connector/source/kafka.md index 07dff22..680d1c1 100644 --- a/docs/connector/source/kafka.md +++ b/docs/connector/source/kafka.md @@ -24,6 +24,13 @@ Kafka source custom properties. if properties belongs to Kafka Consumer Config, | [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | | kafka.config | Map | No | (none) | Kafka consumer properties. Please refer to [Kafka Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) for details. | +## Internal Fields + +| Name | Type | Description | +|-------------|---------------------|-------------------------------------| +| __timestamp | Long | The timestamp of this kafka record. | +| __headers | Map[String, String] | The headers of this kafka record. | + ## Example This example read data of kafka topic `SESSION-RECORD` and print to console. diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java index 4ab4aef..7733c66 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java @@ -8,6 +8,7 @@ import java.util.Map; @Data public class Event implements Serializable { public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; + public static final String INTERNAL_HEADERS_KEY = "__headers"; public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java index aae6678..bd95dd6 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java @@ -9,9 +9,15 @@ import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>, RuntimeContextAware { private static final Logger LOG = LoggerFactory.getLogger(EventKafkaDeserializationSchema.class); private final DeserializationSchema<Event> valueDeserialization; @@ -46,6 +52,12 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche Event event = valueDeserialization.deserialize(record.value()); if(event != null){ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp()); + Headers headers = record.headers(); + Map<String, String> headersMap = new HashMap<>(); + for (Header header : headers) { + headersMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); + } + event.getExtractedFields().put(Event.INTERNAL_HEADERS_KEY, headersMap); out.collect(event); internalMetrics.incrementOutEvents(); return; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java index 48734ea..7924f8d 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java @@ -1,11 +1,13 @@ package com.geedgenetworks.connectors.kafka;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaConnectorOptionsUtil {
public static final String PROPERTIES_PREFIX = "kafka.";
+ public static final String HEADERS_PREFIX = "headers.";
public static Properties getKafkaProperties(Map<String, String> tableOptions) {
final Properties kafkaProperties = new Properties();
@@ -23,6 +25,21 @@ public class KafkaConnectorOptionsUtil { return kafkaProperties;
}
+ public static Map<String, String> getKafkaHeaders(Map<String, String> tableOptions) {
+ final Map<String, String> headers = new HashMap<>();
+
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(HEADERS_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((HEADERS_PREFIX).length());
+ headers.put(subKey, value);
+ });
+
+ return headers;
+ }
+
/**
* Decides if the table options contains Kafka client properties that start with prefix
* 'properties'.
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index 964aa94..496e6a3 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -10,6 +10,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer; +import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -19,6 +20,7 @@ public class KafkaSinkProvider implements SinkProvider { private final String topic; private final Properties properties; + private final Map<String, String> headers; private final boolean logFailuresOnly; private final RateLimitingStrategy rateLimitingStrategy; @@ -27,6 +29,7 @@ public class KafkaSinkProvider implements SinkProvider { EncodingFormat valueEncodingFormat, String topic, Properties properties, + Map<String, String> headers, boolean logFailuresOnly, RateLimitingStrategy rateLimitingStrategy ) { @@ -34,6 +37,7 @@ public class KafkaSinkProvider implements SinkProvider { this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType); this.topic = topic; this.properties = properties; + this.headers = headers; this.logFailuresOnly = logFailuresOnly; this.rateLimitingStrategy = rateLimitingStrategy; } @@ -48,6 +52,7 @@ public class KafkaSinkProvider implements SinkProvider { ); kafkaProducer.setLogFailuresOnly(logFailuresOnly); kafkaProducer.setRateLimitingStrategy(rateLimitingStrategy); + kafkaProducer.setHeaders(headers); return dataStream.addSink(kafkaProducer); } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index fbbaed2..394e618 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -18,8 +18,7 @@ import org.apache.flink.util.Preconditions; import java.util.*;
import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*;
public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
public static final String IDENTIFIER = "kafka";
@@ -51,7 +50,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { // 获取valueEncodingFormat
EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+ helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数
StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
@@ -59,8 +58,9 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { String topic = config.get(TOPIC).get(0);
boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
final Properties properties = getKafkaProperties(context.getOptions());
+ Map<String, String> headers = getKafkaHeaders(context.getOptions());
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly, getRateLimitingStrategy(config));
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config));
}
@Override
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java index 09e8190..3b7e0c5 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java @@ -61,12 +61,15 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; import java.util.concurrent.BlockingDeque; @@ -257,6 +260,8 @@ public class GrootFlinkKafkaProducer<IN> private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); private RateLimitingStrategy rateLimitingStrategy; private boolean rateLimitingEnable; + private Map<String, String> headers; + private Header[] recordHeaders; private transient InternalMetrics internalMetrics; @@ -769,6 +774,10 @@ public class GrootFlinkKafkaProducer<IN> this.rateLimitingEnable = rateLimitingStrategy != null && rateLimitingStrategy.rateLimited(); } + public void setHeaders(Map<String, String> headers) { + this.headers = headers; + } + /** * Disables the propagation of exceptions thrown when committing presumably timed out Kafka * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will @@ -796,6 +805,17 @@ public class GrootFlinkKafkaProducer<IN> rateLimitingStrategy = rateLimitingStrategy.withMaxRate(subTaskMaxRate); LOG.error("rateLimitingStrategy: {}", rateLimitingStrategy); } + if(headers != null && !headers.isEmpty()){ + recordHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry<String, String> entry : headers.entrySet()) { + recordHeaders[i++] = new RecordHeader(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)); + } + checkState(i == headers.size()); + }else{ + recordHeaders = new Header[0]; + } + internalMetrics = new InternalMetrics(getRuntimeContext()); if (logFailuresOnly) { callback = @@ -929,6 +949,9 @@ public class GrootFlinkKafkaProducer<IN> record = kafkaSchema.serialize(next, context.timestamp()); } + for (int i = 0; i < recordHeaders.length; i++) { + record.headers().add(recordHeaders[i]); + } if (!rateLimitingEnable) { pendingRecords.incrementAndGet(); transaction.producer.send(record, callback); |
