diff options
Diffstat (limited to 'src/main')
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java | 1 |
2 files changed, 3 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java index 5271b7e..27a6a75 100644 --- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java +++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java @@ -61,10 +61,11 @@ public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3< try { HllSketch hllSketch = new HllSketch(12); for (Tuple3<String, Fields, String> record : iterable) { + sessions = sessions + record.f1.getSessions(); in_bytes = in_bytes + record.f1.getIn_bytes(); - out_bytes = out_pkts + record.f1.getOut_bytes(); + out_bytes = out_bytes + record.f1.getOut_bytes(); in_pkts = in_pkts + record.f1.getIn_pkts(); out_pkts = out_pkts + record.f1.getOut_pkts(); diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index a0ebab6..5b00d0d 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -36,6 +36,7 @@ public class KafkaConsumer { FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), createConsumerConfig(groupId)); + //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); |
