summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--properties/default_config.properties2
-rw-r--r--properties/service_flow_config.properties8
-rw-r--r--src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java3
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java1
5 files changed, 9 insertions, 7 deletions
diff --git a/pom.xml b/pom.xml
index 3a48680..83d24ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-agent</artifactId>
- <version>20230425-test</version>
+ <version>20230522</version>
<name>app-protocol-stat-traffic-agent</name>
<url>http://www.example.com</url>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 976e210..c25005c 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -42,4 +42,4 @@ buffer.timeout=100
-random.range.num=20
+random.range.num=1
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index d7ac7b1..8fa687d 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,16 +1,16 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.45.102:9094
+source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.45.102:9094
+sink.kafka.servers=192.168.44.12:9094
tools.library=D:\\workerspace\\dat\\
#--------------------------------Kafka消费配置(session)------------------------------#
#已关闭会话
-session.source.kafka.topic=SESSION-RECORD
+session.source.kafka.topic=test
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
session.group.id=session-record-app-protocol-stat-traffic-agent-2
@@ -22,7 +22,7 @@ session.source.parallelism=1
#--------------------------------Kafka消费配置(interim Session)------------------------------#
#过渡会话
-interim.session.source.kafka.topic=SESSION-RECORD
+interim.session.source.kafka.topic=test-11
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
index 5271b7e..27a6a75 100644
--- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
+++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
@@ -61,10 +61,11 @@ public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3<
try {
HllSketch hllSketch = new HllSketch(12);
for (Tuple3<String, Fields, String> record : iterable) {
+
sessions = sessions + record.f1.getSessions();
in_bytes = in_bytes + record.f1.getIn_bytes();
- out_bytes = out_pkts + record.f1.getOut_bytes();
+ out_bytes = out_bytes + record.f1.getOut_bytes();
in_pkts = in_pkts + record.f1.getIn_pkts();
out_pkts = out_pkts + record.f1.getOut_pkts();
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index a0ebab6..5b00d0d 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -36,6 +36,7 @@ public class KafkaConsumer {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), createConsumerConfig(groupId));
+
//随着checkpoint提交,将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);