diff options
| author | wangchengcheng <[email protected]> | 2023-05-22 16:23:03 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2023-05-22 16:23:03 +0800 |
| commit | 578bcbbb2a0c1d0e5970df6b46f2764e25d915f2 (patch) | |
| tree | f60b7cf6bb2c16e99064a29016f27859c7f14acb /src | |
| parent | 5e21ca97f4896dd2f875c72cb9d771e632d6f90d (diff) | |
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java | 1 |
2 files changed, 3 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java index 5271b7e..27a6a75 100644 --- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java +++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java @@ -61,10 +61,11 @@ public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3< try { HllSketch hllSketch = new HllSketch(12); for (Tuple3<String, Fields, String> record : iterable) { + sessions = sessions + record.f1.getSessions(); in_bytes = in_bytes + record.f1.getIn_bytes(); - out_bytes = out_pkts + record.f1.getOut_bytes(); + out_bytes = out_bytes + record.f1.getOut_bytes(); in_pkts = in_pkts + record.f1.getIn_pkts(); out_pkts = out_pkts + record.f1.getOut_pkts(); diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index a0ebab6..5b00d0d 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -36,6 +36,7 @@ public class KafkaConsumer { FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), createConsumerConfig(groupId)); + //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); |
