summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-03-13 14:36:22 +0800
committerlifengchao <[email protected]>2024-03-13 14:36:22 +0800
commit40fe896585953874c09576a576e057c41ffe1288 (patch)
tree375e393e7e5720fbd213818c9785a8d4e9c042b6 /groot-connectors
parent420dc8afbc8721b343bf29af238a8578043e34a2 (diff)
* [feature][connector-kafka] 修改扩展kafka connector类改为以Groot为前缀feature/dynamicschema
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java)8
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java)252
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java (renamed from groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java)6
5 files changed, 136 insertions, 138 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index a50c1e4..761d1ad 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2;
+import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer;
import java.util.Optional;
import java.util.Properties;
@@ -36,7 +36,7 @@ public class KafkaSinkProvider implements SinkProvider {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
- FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>(
+ GrootFlinkKafkaProducer<Event> kafkaProducer = new GrootFlinkKafkaProducer<>(
topic,
valueSerialization,
properties,
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index ce0ddf8..ad34557 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2;
+import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaConsumer;
import java.util.List;
import java.util.Properties;
@@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider {
@Override
public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>(
+ GrootFlinkKafkaConsumer<Event> kafkaConsumer = new GrootFlinkKafkaConsumer<>(
topics,
new EventKafkaDeserializationSchema(valueDeserialization),
properties
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java
index d37121d..4721969 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java
@@ -6,7 +6,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.GrootKafkaFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.SerializedValue;
@@ -15,9 +15,9 @@ import java.util.Map;
import java.util.Properties;
@PublicEvolving
-public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+public class GrootFlinkKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
- public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ public GrootFlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
@@ -36,7 +36,7 @@ public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
// this overwrites whatever setting the user configured in the properties
adjustAutoCommitConfig(properties, offsetCommitMode);
- return new KafkaFetcher2<>(
+ return new GrootKafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarkStrategy,
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
index 818df4e..ff0bb34 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
@@ -77,15 +77,15 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Flink Sink to produce data into a Kafka topic. By default producer will use {@link
- * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
+ * GrootFlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
*/
@PublicEvolving
-public class FlinkKafkaProducer2<IN>
+public class GrootFlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
- FlinkKafkaProducer2.KafkaTransactionState,
- FlinkKafkaProducer2.KafkaTransactionContext> {
+ GrootFlinkKafkaProducer.KafkaTransactionState,
+ GrootFlinkKafkaProducer.KafkaTransactionContext> {
/**
* Semantics that can be chosen.
@@ -99,13 +99,13 @@ public class FlinkKafkaProducer2<IN>
* Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
* that will be committed to Kafka on a checkpoint.
*
- * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link
+ * <p>In this mode {@link GrootFlinkKafkaProducer} sets up a pool of {@link
* FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
- * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If
- * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run
+ * which is committed on {@link GrootFlinkKafkaProducer#notifyCheckpointComplete(long)}. If
+ * checkpoint complete notifications are running late, {@link GrootFlinkKafkaProducer} can run
* out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
- * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and
- * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from
+ * {@link GrootFlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and
+ * {@link GrootFlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from
* the previous checkpoint. To decrease the chance of failing checkpoints there are four
* options:
* <li>decrease number of max concurrent checkpoints
@@ -128,7 +128,7 @@ public class FlinkKafkaProducer2<IN>
NONE
}
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class);
+ private static final Logger LOG = LoggerFactory.getLogger(GrootFlinkKafkaProducer.class);
private static final long serialVersionUID = 1L;
@@ -142,8 +142,8 @@ public class FlinkKafkaProducer2<IN>
* This coefficient determines what is the safe scale down factor.
*
* <p>If the Flink application previously failed before first checkpoint completed or we are
- * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the
- * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used
+ * starting new batch of {@link GrootFlinkKafkaProducer} from scratch without clean shutdown of the
+ * previous one, {@link GrootFlinkKafkaProducer} doesn't know what was the set of previously used
* Kafka's transactionalId's. In that case, it will try to play safe and abort all of the
* possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() *
* kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) }
@@ -158,7 +158,7 @@ public class FlinkKafkaProducer2<IN>
/**
* Default number of KafkaProducers in the pool. See {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}.
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}.
*/
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
@@ -174,27 +174,27 @@ public class FlinkKafkaProducer2<IN>
* NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.
*/
@Deprecated
- private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint>
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
new ListStateDescriptor<>(
"next-transactional-id-hint",
TypeInformation.of(NextTransactionalIdHint.class));
- private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint>
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 =
new ListStateDescriptor<>(
"next-transactional-id-hint-v2",
new NextTransactionalIdHintSerializer());
/** State for nextTransactionalIdHint. */
- private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint>
+ private transient ListState<GrootFlinkKafkaProducer.NextTransactionalIdHint>
nextTransactionalIdHintState;
/** Generator for Transactional IDs. */
private transient TransactionalIdsGenerator transactionalIdsGenerator;
/** Hint for picking next transactional id. */
- private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint;
+ private transient GrootFlinkKafkaProducer.NextTransactionalIdHint nextTransactionalIdHint;
/** User defined properties for the Producer. */
protected final Properties producerConfig;
@@ -236,7 +236,7 @@ public class FlinkKafkaProducer2<IN>
private boolean logFailuresOnly;
/** Semantic chosen for this instance. */
- protected FlinkKafkaProducer2.Semantic semantic;
+ protected GrootFlinkKafkaProducer.Semantic semantic;
// -------------------------------- Runtime fields ------------------------------------------
@@ -263,7 +263,7 @@ public class FlinkKafkaProducer2<IN>
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined (keyless) serialization schema.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
}
@@ -275,14 +275,14 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* SerializationSchema, Properties, Optional)} instead.
*
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined key-less serialization schema.
* @param producerConfig Properties with the producer configuration.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig) {
@@ -311,7 +311,7 @@ public class FlinkKafkaProducer2<IN>
* partitions. If a partitioner is not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@@ -343,16 +343,16 @@ public class FlinkKafkaProducer2<IN>
* partitions. If a partitioner is not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
topicId,
@@ -374,17 +374,17 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param brokerList Comma separated addresses of the brokers
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
this(
topicId,
@@ -400,17 +400,17 @@ public class FlinkKafkaProducer2<IN>
* partitioner. This default partitioner maps each sink subtask to a single Kafka partition
* (i.e. all records received by a sink subtask will end up in the same Kafka partition).
*
- * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String,
* KeyedSerializationSchema, Properties, Optional)} instead.
*
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig) {
@@ -432,16 +432,16 @@ public class FlinkKafkaProducer2<IN>
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * GrootFlinkKafkaProducer.Semantic}).
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic) {
+ GrootFlinkKafkaProducer.Semantic semantic) {
this(
topicId,
serializationSchema,
@@ -472,11 +472,11 @@ public class FlinkKafkaProducer2<IN>
* each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
* keys are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
@@ -486,7 +486,7 @@ public class FlinkKafkaProducer2<IN>
serializationSchema,
producerConfig,
customPartitioner,
- FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE,
+ GrootFlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
@@ -512,19 +512,19 @@ public class FlinkKafkaProducer2<IN>
* keys are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
- * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
- * FlinkKafkaProducer2.Semantic)}
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * GrootFlinkKafkaProducer.Semantic)}
*/
@Deprecated
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
defaultTopicId,
@@ -537,7 +537,7 @@ public class FlinkKafkaProducer2<IN>
}
/**
- * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the
+ * Creates a {@link GrootFlinkKafkaProducer} for a given topic. The sink produces its input to the
* topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
* ProducerRecord}, including partitioning information.
*
@@ -547,13 +547,13 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopic,
KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic) {
+ GrootFlinkKafkaProducer.Semantic semantic) {
this(
defaultTopic,
serializationSchema,
@@ -573,15 +573,15 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- public FlinkKafkaProducer2(
+ public GrootFlinkKafkaProducer(
String defaultTopic,
KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
defaultTopic,
@@ -617,21 +617,21 @@ public class FlinkKafkaProducer2<IN>
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see {@link
- * FlinkKafkaProducer2.Semantic}).
+ * GrootFlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
- * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
- private FlinkKafkaProducer2(
+ private GrootFlinkKafkaProducer(
String defaultTopic,
KeyedSerializationSchema<IN> keyedSchema,
FlinkKafkaPartitioner<IN> customPartitioner,
KafkaSerializationSchema<IN> kafkaSchema,
Properties producerConfig,
- FlinkKafkaProducer2.Semantic semantic,
+ GrootFlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
super(
- new FlinkKafkaProducer2.TransactionStateSerializer(),
- new FlinkKafkaProducer2.ContextStateSerializer());
+ new GrootFlinkKafkaProducer.TransactionStateSerializer(),
+ new GrootFlinkKafkaProducer.ContextStateSerializer());
this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
@@ -711,7 +711,7 @@ public class FlinkKafkaProducer2<IN>
// Enable transactionTimeoutWarnings to avoid silent data loss
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
- if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ if (semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
final Object object =
this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long transactionTimeout;
@@ -771,7 +771,7 @@ public class FlinkKafkaProducer2<IN>
* attempt at least one commit of the transaction before giving up.
*/
@Override
- public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() {
+ public GrootFlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() {
super.ignoreFailuresAfterTransactionTimeout();
return this;
}
@@ -850,7 +850,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void invoke(
- FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context)
+ GrootFlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
throws FlinkKafkaException {
checkErroneous();
@@ -976,24 +976,24 @@ public class FlinkKafkaProducer2<IN>
// ------------------- Logic for handling checkpoint flushing -------------------------- //
@Override
- protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction()
+ protected GrootFlinkKafkaProducer.KafkaTransactionState beginTransaction()
throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
- final FlinkKafkaProducer2.KafkaTransactionState currentTransaction =
+ final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction =
currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
currentTransaction.producer);
}
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
@@ -1001,7 +1001,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ protected void preCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction)
throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
@@ -1017,7 +1017,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void commit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
@@ -1028,7 +1028,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void recoverAndCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
@@ -1051,7 +1051,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void abort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
@@ -1059,7 +1059,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ protected void recoverAndAbort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
@@ -1087,7 +1087,7 @@ public class FlinkKafkaProducer2<IN>
*
* @param transaction
*/
- private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ private void flush(GrootFlinkKafkaProducer.KafkaTransactionState transaction)
throws FlinkKafkaException {
if (transaction.producer != null) {
transaction.producer.flush();
@@ -1111,7 +1111,7 @@ public class FlinkKafkaProducer2<IN>
// Otherwise all of the
// subtasks would write exactly same information.
if (getRuntimeContext().getIndexOfThisSubtask() == 0
- && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ && semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(
nextTransactionalIdHint != null,
"nextTransactionalIdHint must be set for EXACTLY_ONCE");
@@ -1129,7 +1129,7 @@ public class FlinkKafkaProducer2<IN>
}
nextTransactionalIdHintState.add(
- new FlinkKafkaProducer2.NextTransactionalIdHint(
+ new GrootFlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
@@ -1137,13 +1137,13 @@ public class FlinkKafkaProducer2<IN>
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- if (semantic != FlinkKafkaProducer2.Semantic.NONE
+ if (semantic != GrootFlinkKafkaProducer.Semantic.NONE
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
semantic,
- FlinkKafkaProducer2.Semantic.NONE);
- semantic = FlinkKafkaProducer2.Semantic.NONE;
+ GrootFlinkKafkaProducer.Semantic.NONE);
+ semantic = GrootFlinkKafkaProducer.Semantic.NONE;
}
nextTransactionalIdHintState =
@@ -1177,16 +1177,16 @@ public class FlinkKafkaProducer2<IN>
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
- if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
- ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints =
+ ArrayList<GrootFlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
"There should be at most one next transactional id hint written by the first subtask");
} else if (transactionalIdHints.size() == 0) {
- nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0);
+ nextTransactionalIdHint = new GrootFlinkKafkaProducer.NextTransactionalIdHint(0, 0);
// this means that this is either:
// (1) the first execution of this application
@@ -1203,14 +1203,14 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() {
- if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ protected Optional<GrootFlinkKafkaProducer.KafkaTransactionContext> initializeUserContext() {
+ if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
return Optional.empty();
}
Set<String> transactionalIds = generateNewTransactionalIds();
resetAvailableTransactionalIdsPool(transactionalIds);
- return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds));
+ return Optional.of(new GrootFlinkKafkaProducer.KafkaTransactionContext(transactionalIds));
}
private Set<String> generateNewTransactionalIds() {
@@ -1227,7 +1227,7 @@ public class FlinkKafkaProducer2<IN>
@Override
protected void finishRecoveringContext(
- Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
cleanUpUserContext(handledTransactions);
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds);
@@ -1245,7 +1245,7 @@ public class FlinkKafkaProducer2<IN>
* need further handling
*/
private void cleanUpUserContext(
- Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
if (!getUserContext().isPresent()) {
return;
}
@@ -1300,7 +1300,7 @@ public class FlinkKafkaProducer2<IN>
}
int getTransactionCoordinatorId() {
- final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction();
+ final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction == null || currentTransaction.producer == null) {
throw new IllegalArgumentException();
}
@@ -1520,8 +1520,8 @@ public class FlinkKafkaProducer2<IN>
return false;
}
- FlinkKafkaProducer2.KafkaTransactionState that =
- (FlinkKafkaProducer2.KafkaTransactionState) o;
+ GrootFlinkKafkaProducer.KafkaTransactionState that =
+ (GrootFlinkKafkaProducer.KafkaTransactionState) o;
if (producerId != that.producerId) {
return false;
@@ -1544,7 +1544,7 @@ public class FlinkKafkaProducer2<IN>
}
/**
- * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track
+ * Context associated to this instance of the {@link GrootFlinkKafkaProducer}. User for keeping track
* of the transactionalIds.
*/
@VisibleForTesting
@@ -1567,8 +1567,8 @@ public class FlinkKafkaProducer2<IN>
return false;
}
- FlinkKafkaProducer2.KafkaTransactionContext that =
- (FlinkKafkaProducer2.KafkaTransactionContext) o;
+ GrootFlinkKafkaProducer.KafkaTransactionContext that =
+ (GrootFlinkKafkaProducer.KafkaTransactionContext) o;
return transactionalIds.equals(that.transactionalIds);
}
@@ -1581,12 +1581,12 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.KafkaTransactionState}.
+ * GrootFlinkKafkaProducer.KafkaTransactionState}.
*/
@VisibleForTesting
@Internal
public static class TransactionStateSerializer
- extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> {
+ extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionState> {
private static final long serialVersionUID = 1L;
@@ -1596,20 +1596,20 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState createInstance() {
+ public GrootFlinkKafkaProducer.KafkaTransactionState createInstance() {
return null;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState copy(
- FlinkKafkaProducer2.KafkaTransactionState from) {
+ public GrootFlinkKafkaProducer.KafkaTransactionState copy(
+ GrootFlinkKafkaProducer.KafkaTransactionState from) {
return from;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState copy(
- FlinkKafkaProducer2.KafkaTransactionState from,
- FlinkKafkaProducer2.KafkaTransactionState reuse) {
+ public GrootFlinkKafkaProducer.KafkaTransactionState copy(
+ GrootFlinkKafkaProducer.KafkaTransactionState from,
+ GrootFlinkKafkaProducer.KafkaTransactionState reuse) {
return from;
}
@@ -1620,7 +1620,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void serialize(
- FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target)
+ GrootFlinkKafkaProducer.KafkaTransactionState record, DataOutputView target)
throws IOException {
if (record.transactionalId == null) {
target.writeBoolean(false);
@@ -1633,7 +1633,7 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(DataInputView source)
throws IOException {
String transactionalId = null;
if (source.readBoolean()) {
@@ -1641,13 +1641,13 @@ public class FlinkKafkaProducer2<IN>
}
long producerId = source.readLong();
short epoch = source.readShort();
- return new FlinkKafkaProducer2.KafkaTransactionState(
+ return new GrootFlinkKafkaProducer.KafkaTransactionState(
transactionalId, producerId, epoch, null);
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionState deserialize(
- FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(
+ GrootFlinkKafkaProducer.KafkaTransactionState reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@@ -1666,7 +1666,7 @@ public class FlinkKafkaProducer2<IN>
// -----------------------------------------------------------------------------------
@Override
- public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState>
+ public TypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState>
snapshotConfiguration() {
return new TransactionStateSerializerSnapshot();
}
@@ -1674,7 +1674,7 @@ public class FlinkKafkaProducer2<IN>
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class TransactionStateSerializerSnapshot
- extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> {
+ extends SimpleTypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> {
public TransactionStateSerializerSnapshot() {
super(TransactionStateSerializer::new);
@@ -1684,12 +1684,12 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.KafkaTransactionContext}.
+ * GrootFlinkKafkaProducer.KafkaTransactionContext}.
*/
@VisibleForTesting
@Internal
public static class ContextStateSerializer
- extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> {
+ extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionContext> {
private static final long serialVersionUID = 1L;
@@ -1699,20 +1699,20 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext createInstance() {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext createInstance() {
return null;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext copy(
- FlinkKafkaProducer2.KafkaTransactionContext from) {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext copy(
+ GrootFlinkKafkaProducer.KafkaTransactionContext from) {
return from;
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext copy(
- FlinkKafkaProducer2.KafkaTransactionContext from,
- FlinkKafkaProducer2.KafkaTransactionContext reuse) {
+ public GrootFlinkKafkaProducer.KafkaTransactionContext copy(
+ GrootFlinkKafkaProducer.KafkaTransactionContext from,
+ GrootFlinkKafkaProducer.KafkaTransactionContext reuse) {
return from;
}
@@ -1723,7 +1723,7 @@ public class FlinkKafkaProducer2<IN>
@Override
public void serialize(
- FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target)
+ GrootFlinkKafkaProducer.KafkaTransactionContext record, DataOutputView target)
throws IOException {
int numIds = record.transactionalIds.size();
target.writeInt(numIds);
@@ -1733,19 +1733,19 @@ public class FlinkKafkaProducer2<IN>
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(DataInputView source)
throws IOException {
int numIds = source.readInt();
Set<String> ids = new HashSet<>(numIds);
for (int i = 0; i < numIds; i++) {
ids.add(source.readUTF());
}
- return new FlinkKafkaProducer2.KafkaTransactionContext(ids);
+ return new GrootFlinkKafkaProducer.KafkaTransactionContext(ids);
}
@Override
- public FlinkKafkaProducer2.KafkaTransactionContext deserialize(
- FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source)
+ public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(
+ GrootFlinkKafkaProducer.KafkaTransactionContext reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@@ -1830,7 +1830,7 @@ public class FlinkKafkaProducer2<IN>
/**
* {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
- * FlinkKafkaProducer2.NextTransactionalIdHint}.
+ * GrootFlinkKafkaProducer.NextTransactionalIdHint}.
*/
@VisibleForTesting
@Internal
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java
index 4000079..2cfc473 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java
@@ -6,16 +6,14 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import java.util.Map;
import java.util.Properties;
-public class KafkaFetcher2<T> extends KafkaFetcher<T> {
- public KafkaFetcher2(
+public class GrootKafkaFetcher<T> extends KafkaFetcher<T> {
+ public GrootKafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,