diff options
| author | 窦凤虎 <[email protected]> | 2024-05-13 09:41:46 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-05-13 09:41:46 +0000 |
| commit | 9392ead27aaac49c8cdd5387aa3fcceb35607cfc (patch) | |
| tree | 60632303c95e43001e9b3e9b4b14c59938bfc7ae | |
| parent | f0cb64ef9c8762bba2c683ca8d9bbb9a3ee9668e (diff) | |
| parent | df696e2f65227b950fcde0a51b7ca0f717b2becd (diff) | |
Merge branch 'develop' into 'master'v2.2.2
[fix] 修复自定义kafka配置被默认配置覆盖的问题
See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!8
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | properties/application.properties | 32 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java | 10 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java | 14 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/ConfigTest.java | 6 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/FastJsonTest.java | 2 |
6 files changed, 34 insertions, 32 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>app-protocol-stat-traffic-merge</artifactId> - <version>2.2.1</version> + <version>2.2.2</version> <name>app-protocol-stat-traffic-merge</name> <url>http://www.example.com</url> diff --git a/properties/application.properties b/properties/application.properties index 16c7a25..7fc84d0 100644 --- a/properties/application.properties +++ b/properties/application.properties @@ -1,27 +1,51 @@ -#kafka 接收数据topic +# Kafka Source properties + source.kafka.topic=NETWORK-TRAFFIC-METRIC source.kafka.props.bootstrap.servers=192.168.44.12:9094 source.kafka.props.group.id=appapp-protocol-merge-231109-1 +source.kafka.props.max.poll.records=1000 + +source.kafka.session.timeout.ms=60000 + +source.kafka.max.partition.fetch.bytes=31457280 + source.kafka.props.security.protocol=SASL_PLAINTEXT source.kafka.props.sasl.mechanism=PLAIN source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; -#补全数据 输出 topic +# Kafka Sink properties + sink.kafka.topic=NETWORK-TRAFFIC-METRIC sink.kafka.props.bootstrap.servers=192.168.44.12:9094 +sink.kafka.props.linger.ms=1 + +sink.kafka.acks=1 + +sink.kafka.retries=0 + +sink.kafka.request.timeout.ms=30000 + +sink.kafka.batch.size=262144 + +sink.kafka.buffer.memory=134217728 + +sink.kafka.max.request.size=10485760 + +sink.kafka.compression.type=snappy + sink.kafka.props.security.protocol=SASL_PLAINTEXT sink.kafka.props.sasl.mechanism=PLAIN sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; -count.window.time=5 +count.window.time=1 -watermark.max.orderness=5
\ No newline at end of file +watermark.max.orderness=1
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 397814f..ca360bc 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -21,10 +21,6 @@ public class KafkaConsumer { */ public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) { - setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); - setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); - setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); switch (startupMode) { @@ -43,10 +39,4 @@ public class KafkaConsumer { return kafkaConsumer; } - - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index d337e04..88cd058 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,14 +15,7 @@ import java.util.Properties; public class KafkaProducer { public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { - setDefaultConfig(properties, "acks", "1"); - setDefaultConfig(properties, "retries", 0); - setDefaultConfig(properties, "linger.ms", 10); - setDefaultConfig(properties, "request.timeout.ms", 30000); - setDefaultConfig(properties, "batch.size", 262144); - setDefaultConfig(properties, "buffer.memory", 134217728); - setDefaultConfig(properties, "max.request.size", 10485760); - setDefaultConfig(properties, "compression.type", "snappy"); + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( topic, @@ -34,9 +27,4 @@ public class KafkaProducer { return kafkaProducer; } - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java index 7a2b5d3..53cc764 100644 --- a/src/test/java/com/zdjizhi/ConfigTest.java +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -19,15 +19,15 @@ public class ConfigTest { final ParameterTool tool; try { - tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties"); + tool = ParameterTool.fromPropertiesFile("/Users/darnell/IdeaProjects/app-protocol-stat-traffic-merge/properties/application.properties"); final Configuration config = tool.getConfiguration(); environment.getConfig().setGlobalJobParameters(config); final FusionConfiguration fusionConfiguration = new FusionConfiguration(config); System.out.println(config.get(SOURCE_KAFKA_TOPIC)); System.out.println(config.get(SINK_KAFKA_TOPIC)); - System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); - System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + System.out.println("#####"+ fusionConfiguration.getProperties( SOURCE_KAFKA_PROPERTIES_PREFIX)); + System.out.println("#####"+ fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java index 3a6df5c..bf04b83 100644 --- a/src/test/java/com/zdjizhi/FastJsonTest.java +++ b/src/test/java/com/zdjizhi/FastJsonTest.java @@ -77,7 +77,7 @@ public class FastJsonTest { } - @Test + @Test(expected=JSONException.class) public void errorJsonTest() { String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}"; JSONObject originalLog = JSON.parseObject(message); |
