summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-05-13 17:40:20 +0800
committerdoufenghu <[email protected]>2024-05-13 17:40:20 +0800
commitdf696e2f65227b950fcde0a51b7ca0f717b2becd (patch)
tree60632303c95e43001e9b3e9b4b14c59938bfc7ae /src/main
parentfc650bb9e525adc737058896f7c372597586d8f0 (diff)
[fix] 修复自定义kafka配置被默认配置覆盖的问题
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java14
2 files changed, 1 insertions, 23 deletions
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 397814f..ca360bc 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -21,10 +21,6 @@ public class KafkaConsumer {
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
- setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
- setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
- setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
-
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
switch (startupMode) {
@@ -43,10 +39,4 @@ public class KafkaConsumer {
return kafkaConsumer;
}
-
- private static void setDefaultConfig(Properties properties, String key, Object value) {
- if (!properties.contains(key)) {
- properties.put(key, value);
- }
- }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index d337e04..88cd058 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -15,14 +15,7 @@ import java.util.Properties;
public class KafkaProducer {
public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
- setDefaultConfig(properties, "acks", "1");
- setDefaultConfig(properties, "retries", 0);
- setDefaultConfig(properties, "linger.ms", 10);
- setDefaultConfig(properties, "request.timeout.ms", 30000);
- setDefaultConfig(properties, "batch.size", 262144);
- setDefaultConfig(properties, "buffer.memory", 134217728);
- setDefaultConfig(properties, "max.request.size", 10485760);
- setDefaultConfig(properties, "compression.type", "snappy");
+
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
@@ -34,9 +27,4 @@ public class KafkaProducer {
return kafkaProducer;
}
- private static void setDefaultConfig(Properties properties, String key, Object value) {
- if (!properties.contains(key)) {
- properties.put(key, value);
- }
- }
}