diff options
| author | wangchengcheng <[email protected]> | 2023-05-22 16:23:03 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2023-05-22 16:23:03 +0800 |
| commit | 578bcbbb2a0c1d0e5970df6b46f2764e25d915f2 (patch) | |
| tree | f60b7cf6bb2c16e99064a29016f27859c7f14acb | |
| parent | 5e21ca97f4896dd2f875c72cb9d771e632d6f90d (diff) | |
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | properties/default_config.properties | 2 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 8 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java | 1 |
5 files changed, 9 insertions, 7 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>app-protocol-stat-traffic-agent</artifactId> - <version>20230425-test</version> + <version>20230522</version> <name>app-protocol-stat-traffic-agent</name> <url>http://www.example.com</url> diff --git a/properties/default_config.properties b/properties/default_config.properties index 976e210..c25005c 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -42,4 +42,4 @@ buffer.timeout=100 -random.range.num=20 +random.range.num=1 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index d7ac7b1..8fa687d 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,16 +1,16 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.45.102:9094 +source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.45.102:9094 +sink.kafka.servers=192.168.44.12:9094 tools.library=D:\\workerspace\\dat\\ #--------------------------------Kafka消费配置(session)------------------------------# #已关闭会话 -session.source.kafka.topic=SESSION-RECORD +session.source.kafka.topic=test #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; session.group.id=session-record-app-protocol-stat-traffic-agent-2 @@ -22,7 +22,7 @@ session.source.parallelism=1 #--------------------------------Kafka消费配置(interim Session)------------------------------# #过渡会话 -interim.session.source.kafka.topic=SESSION-RECORD +interim.session.source.kafka.topic=test-11 #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java index 5271b7e..27a6a75 100644 --- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java +++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java @@ -61,10 +61,11 @@ public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3< try { HllSketch hllSketch = new HllSketch(12); for (Tuple3<String, Fields, String> record : iterable) { + sessions = sessions + record.f1.getSessions(); in_bytes = in_bytes + record.f1.getIn_bytes(); - out_bytes = out_pkts + record.f1.getOut_bytes(); + out_bytes = out_bytes + record.f1.getOut_bytes(); in_pkts = in_pkts + record.f1.getIn_pkts(); out_pkts = out_pkts + record.f1.getOut_pkts(); diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index a0ebab6..5b00d0d 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -36,6 +36,7 @@ public class KafkaConsumer { FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), createConsumerConfig(groupId)); + //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); |
