diff options
| author | wangkuan <[email protected]> | 2024-04-30 17:55:08 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-04-30 17:55:08 +0800 |
| commit | 75e4e73e01be9e5faea3b9c2aad0fab8eabc238b (patch) | |
| tree | a04ec38b1c1869852052f866e5cbc98725de8278 | |
| parent | 67a4ede2e6217f57e4f311c888ae18fbce67aff6 (diff) | |
开启offset设置24.02
| -rw-r--r-- | pom.xml | 10 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/tsg/Toptask.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/tsg/config/CommonConfig.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/tsg/util/KafkaUtils.java | 7 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 23 |
5 files changed, 31 insertions, 17 deletions
@@ -6,7 +6,7 @@ <groupId>com.galaxy.tsg</groupId> <artifactId>topn-metrics-job</artifactId> - <version>24-02-07</version> + <version>24-02-07-hotfix</version> <repositories> <repository> @@ -40,6 +40,11 @@ </exclusions> </dependency> + <dependency> + <groupId>com.googlecode.aviator</groupId> + <artifactId>aviator</artifactId> + <version>5.2.6</version> + </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> @@ -89,6 +94,7 @@ <version>3.4.9</version> </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> @@ -165,7 +171,7 @@ </goals> <configuration> - <finalName>topn-metrics-job-24-02-07</finalName> + <finalName>topn-metrics-job-24-02-07-hotfix</finalName> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index 6926c3c..1cc4af2 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -38,6 +38,10 @@ public class Toptask { Configuration configurationService = serviceConfig.getConfiguration(); // global register env.getConfig().setGlobalJobParameters(configurationService); + if(configurationService.get(CommonConfig.JOB_CHECKPOINT_INTERVAL)!=0) { + env.enableCheckpointing(configurationService.get(CommonConfig.JOB_CHECKPOINT_INTERVAL)); + env.getCheckpointConfig().setCheckpointTimeout(configurationService.get(CommonConfig.JOB_CHECKPOINT_TIMEOUT)); + } WatermarkStrategy<SessionEntity> strategyForSession = WatermarkStrategy .<SessionEntity>forBoundedOutOfOrderness(Duration.ofSeconds(configurationService.get(CommonConfig.WATERMARK_TIME))) diff --git a/src/main/java/com/galaxy/tsg/config/CommonConfig.java b/src/main/java/com/galaxy/tsg/config/CommonConfig.java index db1896a..a82b9a3 100644 --- a/src/main/java/com/galaxy/tsg/config/CommonConfig.java +++ b/src/main/java/com/galaxy/tsg/config/CommonConfig.java @@ -27,8 +27,8 @@ public class CommonConfig { public static final ConfigOption<String> KAFKA_PRODUCER_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.producer.max.request.size").stringType().defaultValue("10485760"); public static final ConfigOption<String> KAFKA_PRODUCER_COMPRESSION_TYPE = ConfigOptions.key("kafka.producer.compression.type").stringType().defaultValue("none"); public static final ConfigOption<String> KAFKA_PRODUCER_BROKER = ConfigOptions.key("kafka.producer.broker").stringType().defaultValue(""); - - + public static final ConfigOption<Integer> JOB_CHECKPOINT_INTERVAL = ConfigOptions.key("job.checkpoint.interval").intType().defaultValue(0); + public static final ConfigOption<Integer> JOB_CHECKPOINT_TIMEOUT = ConfigOptions.key("job.checkpoint.timeout").intType().defaultValue(600000); public static final ConfigOption<String> JOB_NAME = ConfigOptions.key("job.name").stringType().defaultValue(""); public static final ConfigOption<Integer> TASK_PARALLELISM = ConfigOptions.key("task.parallelism").intType().defaultValue(0); public static final ConfigOption<Integer> ORDERBY_PARALLELISM = ConfigOptions.key("orderby.parallelism").intType().defaultValue(0); diff --git a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java index 422d9e4..bb5d19e 100644 --- a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java +++ b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java @@ -7,8 +7,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.common.config.SslConfigs; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -84,10 +87,8 @@ public class KafkaUtils { public static FlinkKafkaConsumer<SessionEntity> getKafkaConsumer(Configuration configuration) { FlinkKafkaConsumer<SessionEntity> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.getString(KAFKA_CONSUMER_TOPIC), new TimestampDeserializationSchema(), getKafkaSourceProperty(configuration)); - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - + // kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index ca8057f..fa846c3 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -1,43 +1,46 @@ #--------------------------------Kafka��������Ϣ------------------------------# #kafka�ĵ�ַ��Ϣ -kafka.consumer.broker=192.168.54.241:9092 +kafka.consumer.broker=192.168.40.151:9092 #kafka ��������topic kafka.consumer.topic=SESSION-RECORD #���������� -kafka.consumer.group.id=topn-metrics-job-20231101-t1-t +kafka.consumer.group.id=testtop #--------------------------------Kafka��������Ϣ------------------------------# #kafka�ĵ�ַ��Ϣ kafka.producer.broker=192.168.44.12:9094 -kafka.producer.topic=TRAFFIC-TOP-METRIC +kafka.producer.topic=ETL-TEST-RESULT #--------------------------------topology����------------------------------# #�������� job.name=agg_session_record_topn +job.checkpoint.interval=60000 + +job.checkpoint.timeout=600000 #source���ж� -kafka.consumer.parallelism=3 +kafka.consumer.parallelism=1 #�����ж� -task.parallelism=3 +task.parallelism=1 #���������ж� -orderby.parallelism=3 +orderby.parallelism=1 #��Ⲣ�жȣ�ͨ������orderby.parallelism -sink.parallelism=3 +sink.parallelism=1 #�����ӳٵȴ�ʱ�䵥λ�� -watermark.time=90 +watermark.time=20 #top������� -top.limit=10000 +top.limit=100 #����������ʱ�䵥λ���� -window.time.minute=5 +window.time.minute=1 #--------------------------------Kafka��������------------------------------# #kafka source poll |
