summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2023-05-22 16:23:03 +0800
committerwangchengcheng <[email protected]>2023-05-22 16:23:03 +0800
commit578bcbbb2a0c1d0e5970df6b46f2764e25d915f2 (patch)
treef60b7cf6bb2c16e99064a29016f27859c7f14acb /src
parent5e21ca97f4896dd2f875c72cb9d771e632d6f90d (diff)
1.修改out_bytes计算逻辑错误。HEADmaster
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java3
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java1
2 files changed, 3 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
index 5271b7e..27a6a75 100644
--- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
+++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
@@ -61,10 +61,11 @@ public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3<
try {
HllSketch hllSketch = new HllSketch(12);
for (Tuple3<String, Fields, String> record : iterable) {
+
sessions = sessions + record.f1.getSessions();
in_bytes = in_bytes + record.f1.getIn_bytes();
- out_bytes = out_pkts + record.f1.getOut_bytes();
+ out_bytes = out_bytes + record.f1.getOut_bytes();
in_pkts = in_pkts + record.f1.getIn_pkts();
out_pkts = out_pkts + record.f1.getOut_pkts();
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index a0ebab6..5b00d0d 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -36,6 +36,7 @@ public class KafkaConsumer {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), createConsumerConfig(groupId));
+
//随着checkpoint提交,将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);