summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml10
-rw-r--r--src/main/java/com/galaxy/tsg/Toptask.java4
-rw-r--r--src/main/java/com/galaxy/tsg/config/CommonConfig.java4
-rw-r--r--src/main/java/com/galaxy/tsg/util/KafkaUtils.java7
-rw-r--r--src/main/resources/common.properties23
5 files changed, 31 insertions, 17 deletions
diff --git a/pom.xml b/pom.xml
index eb7787b..104515c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.galaxy.tsg</groupId>
<artifactId>topn-metrics-job</artifactId>
- <version>24-02-07</version>
+ <version>24-02-07-hotfix</version>
<repositories>
<repository>
@@ -40,6 +40,11 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.googlecode.aviator</groupId>
+ <artifactId>aviator</artifactId>
+ <version>5.2.6</version>
+ </dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
@@ -89,6 +94,7 @@
<version>3.4.9</version>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
@@ -165,7 +171,7 @@
</goals>
<configuration>
- <finalName>topn-metrics-job-24-02-07</finalName>
+ <finalName>topn-metrics-job-24-02-07-hotfix</finalName>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java
index 6926c3c..1cc4af2 100644
--- a/src/main/java/com/galaxy/tsg/Toptask.java
+++ b/src/main/java/com/galaxy/tsg/Toptask.java
@@ -38,6 +38,10 @@ public class Toptask {
Configuration configurationService = serviceConfig.getConfiguration();
// global register
env.getConfig().setGlobalJobParameters(configurationService);
+ if(configurationService.get(CommonConfig.JOB_CHECKPOINT_INTERVAL)!=0) {
+ env.enableCheckpointing(configurationService.get(CommonConfig.JOB_CHECKPOINT_INTERVAL));
+ env.getCheckpointConfig().setCheckpointTimeout(configurationService.get(CommonConfig.JOB_CHECKPOINT_TIMEOUT));
+ }
WatermarkStrategy<SessionEntity> strategyForSession = WatermarkStrategy
.<SessionEntity>forBoundedOutOfOrderness(Duration.ofSeconds(configurationService.get(CommonConfig.WATERMARK_TIME)))
diff --git a/src/main/java/com/galaxy/tsg/config/CommonConfig.java b/src/main/java/com/galaxy/tsg/config/CommonConfig.java
index db1896a..a82b9a3 100644
--- a/src/main/java/com/galaxy/tsg/config/CommonConfig.java
+++ b/src/main/java/com/galaxy/tsg/config/CommonConfig.java
@@ -27,8 +27,8 @@ public class CommonConfig {
public static final ConfigOption<String> KAFKA_PRODUCER_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.producer.max.request.size").stringType().defaultValue("10485760");
public static final ConfigOption<String> KAFKA_PRODUCER_COMPRESSION_TYPE = ConfigOptions.key("kafka.producer.compression.type").stringType().defaultValue("none");
public static final ConfigOption<String> KAFKA_PRODUCER_BROKER = ConfigOptions.key("kafka.producer.broker").stringType().defaultValue("");
-
-
+ public static final ConfigOption<Integer> JOB_CHECKPOINT_INTERVAL = ConfigOptions.key("job.checkpoint.interval").intType().defaultValue(0);
+ public static final ConfigOption<Integer> JOB_CHECKPOINT_TIMEOUT = ConfigOptions.key("job.checkpoint.timeout").intType().defaultValue(600000);
public static final ConfigOption<String> JOB_NAME = ConfigOptions.key("job.name").stringType().defaultValue("");
public static final ConfigOption<Integer> TASK_PARALLELISM = ConfigOptions.key("task.parallelism").intType().defaultValue(0);
public static final ConfigOption<Integer> ORDERBY_PARALLELISM = ConfigOptions.key("orderby.parallelism").intType().defaultValue(0);
diff --git a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
index 422d9e4..bb5d19e 100644
--- a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
+++ b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
@@ -7,8 +7,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.kafka.common.config.SslConfigs;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -84,10 +87,8 @@ public class KafkaUtils {
public static FlinkKafkaConsumer<SessionEntity> getKafkaConsumer(Configuration configuration) {
FlinkKafkaConsumer<SessionEntity> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.getString(KAFKA_CONSUMER_TOPIC),
new TimestampDeserializationSchema(), getKafkaSourceProperty(configuration));
-
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
+ // kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index ca8057f..fa846c3 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -1,43 +1,46 @@
#--------------------------------Kafka��������Ϣ------------------------------#
#kafka�ĵ�ַ��Ϣ
-kafka.consumer.broker=192.168.54.241:9092
+kafka.consumer.broker=192.168.40.151:9092
#kafka ��������topic
kafka.consumer.topic=SESSION-RECORD
#����������
-kafka.consumer.group.id=topn-metrics-job-20231101-t1-t
+kafka.consumer.group.id=testtop
#--------------------------------Kafka��������Ϣ------------------------------#
#kafka�ĵ�ַ��Ϣ
kafka.producer.broker=192.168.44.12:9094
-kafka.producer.topic=TRAFFIC-TOP-METRIC
+kafka.producer.topic=ETL-TEST-RESULT
#--------------------------------topology����------------------------------#
#��������
job.name=agg_session_record_topn
+job.checkpoint.interval=60000
+
+job.checkpoint.timeout=600000
#source���ж�
-kafka.consumer.parallelism=3
+kafka.consumer.parallelism=1
#�����ж�
-task.parallelism=3
+task.parallelism=1
#���������ж�
-orderby.parallelism=3
+orderby.parallelism=1
#��Ⲣ�жȣ�ͨ������orderby.parallelism
-sink.parallelism=3
+sink.parallelism=1
#�����ӳٵȴ�ʱ�䵥λ��
-watermark.time=90
+watermark.time=20
#top�������
-top.limit=10000
+top.limit=100
#����������ʱ�䵥λ����
-window.time.minute=5
+window.time.minute=1
#--------------------------------Kafka��������------------------------------#
#kafka source poll