diff options
| author | doufenghu <[email protected]> | 2024-05-13 17:40:20 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-05-13 17:40:20 +0800 |
| commit | df696e2f65227b950fcde0a51b7ca0f717b2becd (patch) | |
| tree | 60632303c95e43001e9b3e9b4b14c59938bfc7ae /src/main | |
| parent | fc650bb9e525adc737058896f7c372597586d8f0 (diff) | |
[fix] 修复自定义kafka配置被默认配置覆盖的问题
Diffstat (limited to 'src/main')
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java | 10 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java | 14 |
2 files changed, 1 insertions, 23 deletions
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 397814f..ca360bc 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -21,10 +21,6 @@ public class KafkaConsumer { */ public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) { - setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); - setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); - setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); switch (startupMode) { @@ -43,10 +39,4 @@ public class KafkaConsumer { return kafkaConsumer; } - - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index d337e04..88cd058 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,14 +15,7 @@ import java.util.Properties; public class KafkaProducer { public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { - setDefaultConfig(properties, "acks", "1"); - setDefaultConfig(properties, "retries", 0); - setDefaultConfig(properties, "linger.ms", 10); - setDefaultConfig(properties, "request.timeout.ms", 30000); - setDefaultConfig(properties, "batch.size", 262144); - setDefaultConfig(properties, "buffer.memory", 134217728); - setDefaultConfig(properties, "max.request.size", 10485760); - setDefaultConfig(properties, "compression.type", "snappy"); + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( topic, @@ -34,9 +27,4 @@ public class KafkaProducer { return kafkaProducer; } - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } |
