diff options
| author | lifengchao <[email protected]> | 2024-03-13 14:36:22 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-03-13 14:36:22 +0800 |
| commit | 40fe896585953874c09576a576e057c41ffe1288 (patch) | |
| tree | 375e393e7e5720fbd213818c9785a8d4e9c042b6 /groot-connectors | |
| parent | 420dc8afbc8721b343bf29af238a8578043e34a2 (diff) | |
* [feature][connector-kafka] 修改扩展kafka connector类改为以Groot为前缀feature/dynamicschema
Diffstat (limited to 'groot-connectors')
5 files changed, 136 insertions, 138 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index a50c1e4..761d1ad 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2; +import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer; import java.util.Optional; import java.util.Properties; @@ -36,7 +36,7 @@ public class KafkaSinkProvider implements SinkProvider { @Override public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { - FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>( + GrootFlinkKafkaProducer<Event> kafkaProducer = new GrootFlinkKafkaProducer<>( topic, valueSerialization, properties, diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index ce0ddf8..ad34557 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2; +import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaConsumer; import java.util.List; import java.util.Properties; @@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider { @Override public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { - FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>( + GrootFlinkKafkaConsumer<Event> kafkaConsumer = new GrootFlinkKafkaConsumer<>( topics, new EventKafkaDeserializationSchema(valueDeserialization), properties diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java index d37121d..4721969 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaConsumer.java @@ -6,7 +6,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.GrootKafkaFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.SerializedValue;
@@ -15,9 +15,9 @@ import java.util.Map; import java.util.Properties;
@PublicEvolving
-public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+public class GrootFlinkKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
- public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ public GrootFlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
@@ -36,7 +36,7 @@ public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> { // this overwrites whatever setting the user configured in the properties
adjustAutoCommitConfig(properties, offsetCommitMode);
- return new KafkaFetcher2<>(
+ return new GrootKafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarkStrategy,
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java index 818df4e..ff0bb34 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java @@ -77,15 +77,15 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Flink Sink to produce data into a Kafka topic. By default producer will use {@link - * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation. + * GrootFlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation. */ @PublicEvolving -public class FlinkKafkaProducer2<IN> +public class GrootFlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction< IN, - FlinkKafkaProducer2.KafkaTransactionState, - FlinkKafkaProducer2.KafkaTransactionContext> { + GrootFlinkKafkaProducer.KafkaTransactionState, + GrootFlinkKafkaProducer.KafkaTransactionContext> { /** * Semantics that can be chosen. @@ -99,13 +99,13 @@ public class FlinkKafkaProducer2<IN> * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction * that will be committed to Kafka on a checkpoint. * - * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link + * <p>In this mode {@link GrootFlinkKafkaProducer} sets up a pool of {@link * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created, - * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If - * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run + * which is committed on {@link GrootFlinkKafkaProducer#notifyCheckpointComplete(long)}. If + * checkpoint complete notifications are running late, {@link GrootFlinkKafkaProducer} can run * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent - * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and - * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from + * {@link GrootFlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and + * {@link GrootFlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from * the previous checkpoint. To decrease the chance of failing checkpoints there are four * options: * <li>decrease number of max concurrent checkpoints @@ -128,7 +128,7 @@ public class FlinkKafkaProducer2<IN> NONE } - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class); + private static final Logger LOG = LoggerFactory.getLogger(GrootFlinkKafkaProducer.class); private static final long serialVersionUID = 1L; @@ -142,8 +142,8 @@ public class FlinkKafkaProducer2<IN> * This coefficient determines what is the safe scale down factor. * * <p>If the Flink application previously failed before first checkpoint completed or we are - * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the - * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used + * starting new batch of {@link GrootFlinkKafkaProducer} from scratch without clean shutdown of the + * previous one, {@link GrootFlinkKafkaProducer} doesn't know what was the set of previously used * Kafka's transactionalId's. In that case, it will try to play safe and abort all of the * possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() * * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) } @@ -158,7 +158,7 @@ public class FlinkKafkaProducer2<IN> /** * Default number of KafkaProducers in the pool. See {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}. + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}. */ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; @@ -174,27 +174,27 @@ public class FlinkKafkaProducer2<IN> * NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2. */ @Deprecated - private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = new ListStateDescriptor<>( "next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class)); - private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + private static final ListStateDescriptor<GrootFlinkKafkaProducer.NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 = new ListStateDescriptor<>( "next-transactional-id-hint-v2", new NextTransactionalIdHintSerializer()); /** State for nextTransactionalIdHint. */ - private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint> + private transient ListState<GrootFlinkKafkaProducer.NextTransactionalIdHint> nextTransactionalIdHintState; /** Generator for Transactional IDs. */ private transient TransactionalIdsGenerator transactionalIdsGenerator; /** Hint for picking next transactional id. */ - private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint; + private transient GrootFlinkKafkaProducer.NextTransactionalIdHint nextTransactionalIdHint; /** User defined properties for the Producer. */ protected final Properties producerConfig; @@ -236,7 +236,7 @@ public class FlinkKafkaProducer2<IN> private boolean logFailuresOnly; /** Semantic chosen for this instance. */ - protected FlinkKafkaProducer2.Semantic semantic; + protected GrootFlinkKafkaProducer.Semantic semantic; // -------------------------------- Runtime fields ------------------------------------------ @@ -263,7 +263,7 @@ public class FlinkKafkaProducer2<IN> * @param topicId ID of the Kafka topic. * @param serializationSchema User defined (keyless) serialization schema. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); } @@ -275,14 +275,14 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * SerializationSchema, Properties, Optional)} instead. * * @param topicId ID of the Kafka topic. * @param serializationSchema User defined key-less serialization schema. * @param producerConfig Properties with the producer configuration. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { @@ -311,7 +311,7 @@ public class FlinkKafkaProducer2<IN> * partitions. If a partitioner is not provided, records will be distributed to Kafka * partitions in a round-robin fashion. */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, @@ -343,16 +343,16 @@ public class FlinkKafkaProducer2<IN> * partitions. If a partitioner is not provided, records will be distributed to Kafka * partitions in a round-robin fashion. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( topicId, @@ -374,17 +374,17 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * KeyedSerializationSchema, Properties, Optional)} instead. * * @param brokerList Comma separated addresses of the brokers * @param topicId ID of the Kafka topic. * @param serializationSchema User defined serialization schema supporting key/value messages - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { this( topicId, @@ -400,17 +400,17 @@ public class FlinkKafkaProducer2<IN> * partitioner. This default partitioner maps each sink subtask to a single Kafka partition * (i.e. all records received by a sink subtask will end up in the same Kafka partition). * - * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * <p>To use a custom partitioner, please use {@link #GrootFlinkKafkaProducer(String, * KeyedSerializationSchema, Properties, Optional)} instead. * * @param topicId ID of the Kafka topic. * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { @@ -432,16 +432,16 @@ public class FlinkKafkaProducer2<IN> * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * GrootFlinkKafkaProducer.Semantic}). + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic) { + GrootFlinkKafkaProducer.Semantic semantic) { this( topicId, serializationSchema, @@ -472,11 +472,11 @@ public class FlinkKafkaProducer2<IN> * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the * keys are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, @@ -486,7 +486,7 @@ public class FlinkKafkaProducer2<IN> serializationSchema, producerConfig, customPartitioner, - FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE, + GrootFlinkKafkaProducer.Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } @@ -512,19 +512,19 @@ public class FlinkKafkaProducer2<IN> * keys are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). - * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, - * FlinkKafkaProducer2.Semantic)} + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). + * @deprecated use {@link #GrootFlinkKafkaProducer(String, KafkaSerializationSchema, Properties, + * GrootFlinkKafkaProducer.Semantic)} */ @Deprecated - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( defaultTopicId, @@ -537,7 +537,7 @@ public class FlinkKafkaProducer2<IN> } /** - * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the + * Creates a {@link GrootFlinkKafkaProducer} for a given topic. The sink produces its input to the * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link * ProducerRecord}, including partitioning information. * @@ -547,13 +547,13 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic) { + GrootFlinkKafkaProducer.Semantic semantic) { this( defaultTopic, serializationSchema, @@ -573,15 +573,15 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - public FlinkKafkaProducer2( + public GrootFlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { this( defaultTopic, @@ -617,21 +617,21 @@ public class FlinkKafkaProducer2<IN> * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is * the only required argument. * @param semantic Defines semantic that will be used by this producer (see {@link - * FlinkKafkaProducer2.Semantic}). + * GrootFlinkKafkaProducer.Semantic}). * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link - * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * GrootFlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - private FlinkKafkaProducer2( + private GrootFlinkKafkaProducer( String defaultTopic, KeyedSerializationSchema<IN> keyedSchema, FlinkKafkaPartitioner<IN> customPartitioner, KafkaSerializationSchema<IN> kafkaSchema, Properties producerConfig, - FlinkKafkaProducer2.Semantic semantic, + GrootFlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { super( - new FlinkKafkaProducer2.TransactionStateSerializer(), - new FlinkKafkaProducer2.ContextStateSerializer()); + new GrootFlinkKafkaProducer.TransactionStateSerializer(), + new GrootFlinkKafkaProducer.ContextStateSerializer()); this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null"); @@ -711,7 +711,7 @@ public class FlinkKafkaProducer2<IN> // Enable transactionTimeoutWarnings to avoid silent data loss // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): // The KafkaProducer may not throw an exception if the transaction failed to commit - if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + if (semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long transactionTimeout; @@ -771,7 +771,7 @@ public class FlinkKafkaProducer2<IN> * attempt at least one commit of the transaction before giving up. */ @Override - public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() { + public GrootFlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() { super.ignoreFailuresAfterTransactionTimeout(); return this; } @@ -850,7 +850,7 @@ public class FlinkKafkaProducer2<IN> @Override public void invoke( - FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context) + GrootFlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous(); @@ -976,24 +976,24 @@ public class FlinkKafkaProducer2<IN> // ------------------- Logic for handling checkpoint flushing -------------------------- // @Override - protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction() + protected GrootFlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException { switch (semantic) { case EXACTLY_ONCE: FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer(); producer.beginTransaction(); - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( producer.getTransactionalId(), producer); case AT_LEAST_ONCE: case NONE: // Do not create new producer on each beginTransaction() if it is not necessary - final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = + final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null && currentTransaction.producer != null) { - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( currentTransaction.producer); } - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( initNonTransactionalProducer(true)); default: throw new UnsupportedOperationException("Not implemented semantic"); @@ -1001,7 +1001,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) + protected void preCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException { switch (semantic) { case EXACTLY_ONCE: @@ -1017,7 +1017,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void commit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try { transaction.producer.commitTransaction(); @@ -1028,7 +1028,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void recoverAndCommit(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { @@ -1051,7 +1051,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void abort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { transaction.producer.abortTransaction(); recycleTransactionalProducer(transaction.producer); @@ -1059,7 +1059,7 @@ public class FlinkKafkaProducer2<IN> } @Override - protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + protected void recoverAndAbort(GrootFlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { @@ -1087,7 +1087,7 @@ public class FlinkKafkaProducer2<IN> * * @param transaction */ - private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction) + private void flush(GrootFlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException { if (transaction.producer != null) { transaction.producer.flush(); @@ -1111,7 +1111,7 @@ public class FlinkKafkaProducer2<IN> // Otherwise all of the // subtasks would write exactly same information. if (getRuntimeContext().getIndexOfThisSubtask() == 0 - && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + && semantic == GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { checkState( nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE"); @@ -1129,7 +1129,7 @@ public class FlinkKafkaProducer2<IN> } nextTransactionalIdHintState.add( - new FlinkKafkaProducer2.NextTransactionalIdHint( + new GrootFlinkKafkaProducer.NextTransactionalIdHint( getRuntimeContext().getNumberOfParallelSubtasks(), nextFreeTransactionalId)); } @@ -1137,13 +1137,13 @@ public class FlinkKafkaProducer2<IN> @Override public void initializeState(FunctionInitializationContext context) throws Exception { - if (semantic != FlinkKafkaProducer2.Semantic.NONE + if (semantic != GrootFlinkKafkaProducer.Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { LOG.warn( "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, - FlinkKafkaProducer2.Semantic.NONE); - semantic = FlinkKafkaProducer2.Semantic.NONE; + GrootFlinkKafkaProducer.Semantic.NONE); + semantic = GrootFlinkKafkaProducer.Semantic.NONE; } nextTransactionalIdHintState = @@ -1177,16 +1177,16 @@ public class FlinkKafkaProducer2<IN> kafkaProducersPoolSize, SAFE_SCALE_DOWN_FACTOR); - if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { nextTransactionalIdHint = null; } else { - ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints = + ArrayList<GrootFlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get()); if (transactionalIdHints.size() > 1) { throw new IllegalStateException( "There should be at most one next transactional id hint written by the first subtask"); } else if (transactionalIdHints.size() == 0) { - nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0); + nextTransactionalIdHint = new GrootFlinkKafkaProducer.NextTransactionalIdHint(0, 0); // this means that this is either: // (1) the first execution of this application @@ -1203,14 +1203,14 @@ public class FlinkKafkaProducer2<IN> } @Override - protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() { - if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + protected Optional<GrootFlinkKafkaProducer.KafkaTransactionContext> initializeUserContext() { + if (semantic != GrootFlinkKafkaProducer.Semantic.EXACTLY_ONCE) { return Optional.empty(); } Set<String> transactionalIds = generateNewTransactionalIds(); resetAvailableTransactionalIdsPool(transactionalIds); - return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds)); + return Optional.of(new GrootFlinkKafkaProducer.KafkaTransactionContext(transactionalIds)); } private Set<String> generateNewTransactionalIds() { @@ -1227,7 +1227,7 @@ public class FlinkKafkaProducer2<IN> @Override protected void finishRecoveringContext( - Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) { cleanUpUserContext(handledTransactions); resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds); LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds); @@ -1245,7 +1245,7 @@ public class FlinkKafkaProducer2<IN> * need further handling */ private void cleanUpUserContext( - Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + Collection<GrootFlinkKafkaProducer.KafkaTransactionState> handledTransactions) { if (!getUserContext().isPresent()) { return; } @@ -1300,7 +1300,7 @@ public class FlinkKafkaProducer2<IN> } int getTransactionCoordinatorId() { - final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction(); + final GrootFlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction == null || currentTransaction.producer == null) { throw new IllegalArgumentException(); } @@ -1520,8 +1520,8 @@ public class FlinkKafkaProducer2<IN> return false; } - FlinkKafkaProducer2.KafkaTransactionState that = - (FlinkKafkaProducer2.KafkaTransactionState) o; + GrootFlinkKafkaProducer.KafkaTransactionState that = + (GrootFlinkKafkaProducer.KafkaTransactionState) o; if (producerId != that.producerId) { return false; @@ -1544,7 +1544,7 @@ public class FlinkKafkaProducer2<IN> } /** - * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track + * Context associated to this instance of the {@link GrootFlinkKafkaProducer}. User for keeping track * of the transactionalIds. */ @VisibleForTesting @@ -1567,8 +1567,8 @@ public class FlinkKafkaProducer2<IN> return false; } - FlinkKafkaProducer2.KafkaTransactionContext that = - (FlinkKafkaProducer2.KafkaTransactionContext) o; + GrootFlinkKafkaProducer.KafkaTransactionContext that = + (GrootFlinkKafkaProducer.KafkaTransactionContext) o; return transactionalIds.equals(that.transactionalIds); } @@ -1581,12 +1581,12 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.KafkaTransactionState}. + * GrootFlinkKafkaProducer.KafkaTransactionState}. */ @VisibleForTesting @Internal public static class TransactionStateSerializer - extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> { + extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionState> { private static final long serialVersionUID = 1L; @@ -1596,20 +1596,20 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionState createInstance() { + public GrootFlinkKafkaProducer.KafkaTransactionState createInstance() { return null; } @Override - public FlinkKafkaProducer2.KafkaTransactionState copy( - FlinkKafkaProducer2.KafkaTransactionState from) { + public GrootFlinkKafkaProducer.KafkaTransactionState copy( + GrootFlinkKafkaProducer.KafkaTransactionState from) { return from; } @Override - public FlinkKafkaProducer2.KafkaTransactionState copy( - FlinkKafkaProducer2.KafkaTransactionState from, - FlinkKafkaProducer2.KafkaTransactionState reuse) { + public GrootFlinkKafkaProducer.KafkaTransactionState copy( + GrootFlinkKafkaProducer.KafkaTransactionState from, + GrootFlinkKafkaProducer.KafkaTransactionState reuse) { return from; } @@ -1620,7 +1620,7 @@ public class FlinkKafkaProducer2<IN> @Override public void serialize( - FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target) + GrootFlinkKafkaProducer.KafkaTransactionState record, DataOutputView target) throws IOException { if (record.transactionalId == null) { target.writeBoolean(false); @@ -1633,7 +1633,7 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionState deserialize(DataInputView source) throws IOException { String transactionalId = null; if (source.readBoolean()) { @@ -1641,13 +1641,13 @@ public class FlinkKafkaProducer2<IN> } long producerId = source.readLong(); short epoch = source.readShort(); - return new FlinkKafkaProducer2.KafkaTransactionState( + return new GrootFlinkKafkaProducer.KafkaTransactionState( transactionalId, producerId, epoch, null); } @Override - public FlinkKafkaProducer2.KafkaTransactionState deserialize( - FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionState deserialize( + GrootFlinkKafkaProducer.KafkaTransactionState reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -1666,7 +1666,7 @@ public class FlinkKafkaProducer2<IN> // ----------------------------------------------------------------------------------- @Override - public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> + public TypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() { return new TransactionStateSerializerSnapshot(); } @@ -1674,7 +1674,7 @@ public class FlinkKafkaProducer2<IN> /** Serializer configuration snapshot for compatibility and format evolution. */ @SuppressWarnings("WeakerAccess") public static final class TransactionStateSerializerSnapshot - extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> { + extends SimpleTypeSerializerSnapshot<GrootFlinkKafkaProducer.KafkaTransactionState> { public TransactionStateSerializerSnapshot() { super(TransactionStateSerializer::new); @@ -1684,12 +1684,12 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.KafkaTransactionContext}. + * GrootFlinkKafkaProducer.KafkaTransactionContext}. */ @VisibleForTesting @Internal public static class ContextStateSerializer - extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> { + extends TypeSerializerSingleton<GrootFlinkKafkaProducer.KafkaTransactionContext> { private static final long serialVersionUID = 1L; @@ -1699,20 +1699,20 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionContext createInstance() { + public GrootFlinkKafkaProducer.KafkaTransactionContext createInstance() { return null; } @Override - public FlinkKafkaProducer2.KafkaTransactionContext copy( - FlinkKafkaProducer2.KafkaTransactionContext from) { + public GrootFlinkKafkaProducer.KafkaTransactionContext copy( + GrootFlinkKafkaProducer.KafkaTransactionContext from) { return from; } @Override - public FlinkKafkaProducer2.KafkaTransactionContext copy( - FlinkKafkaProducer2.KafkaTransactionContext from, - FlinkKafkaProducer2.KafkaTransactionContext reuse) { + public GrootFlinkKafkaProducer.KafkaTransactionContext copy( + GrootFlinkKafkaProducer.KafkaTransactionContext from, + GrootFlinkKafkaProducer.KafkaTransactionContext reuse) { return from; } @@ -1723,7 +1723,7 @@ public class FlinkKafkaProducer2<IN> @Override public void serialize( - FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target) + GrootFlinkKafkaProducer.KafkaTransactionContext record, DataOutputView target) throws IOException { int numIds = record.transactionalIds.size(); target.writeInt(numIds); @@ -1733,19 +1733,19 @@ public class FlinkKafkaProducer2<IN> } @Override - public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize(DataInputView source) throws IOException { int numIds = source.readInt(); Set<String> ids = new HashSet<>(numIds); for (int i = 0; i < numIds; i++) { ids.add(source.readUTF()); } - return new FlinkKafkaProducer2.KafkaTransactionContext(ids); + return new GrootFlinkKafkaProducer.KafkaTransactionContext(ids); } @Override - public FlinkKafkaProducer2.KafkaTransactionContext deserialize( - FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source) + public GrootFlinkKafkaProducer.KafkaTransactionContext deserialize( + GrootFlinkKafkaProducer.KafkaTransactionContext reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -1830,7 +1830,7 @@ public class FlinkKafkaProducer2<IN> /** * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link - * FlinkKafkaProducer2.NextTransactionalIdHint}. + * GrootFlinkKafkaProducer.NextTransactionalIdHint}. */ @VisibleForTesting @Internal diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java index 4000079..2cfc473 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/GrootKafkaFetcher.java @@ -6,16 +6,14 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import java.util.Map;
import java.util.Properties;
-public class KafkaFetcher2<T> extends KafkaFetcher<T> {
- public KafkaFetcher2(
+public class GrootKafkaFetcher<T> extends KafkaFetcher<T> {
+ public GrootKafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
|
