summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2023-05-09 13:58:02 +0800
committerwangchengcheng <[email protected]>2023-05-09 13:58:02 +0800
commit2e795e3dfa3d4de6337bcc80beee863f98be43c5 (patch)
tree7da22963c9a43db9d2ef498db1eadf316aeacdad
parentb6aed3d341dffb0ed6b88bf9c1a07d48022c916e (diff)
1.Change Fields to Fields
2.Modify app_full_path logic
-rw-r--r--pom.xml2
-rw-r--r--properties/service_flow_config.properties4
-rw-r--r--src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java62
-rw-r--r--src/main/java/com/zdjizhi/common/Fields.java (renamed from src/main/java/com/zdjizhi/common/Fileds.java)4
-rw-r--r--src/main/java/com/zdjizhi/common/InterimSessionRecord.java10
-rw-r--r--src/main/java/com/zdjizhi/common/Metrics.java10
-rw-r--r--src/main/java/com/zdjizhi/common/SessionRecord.java8
-rw-r--r--src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java12
-rw-r--r--src/main/java/com/zdjizhi/tools/function/KeyByFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java22
-rw-r--r--src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java21
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/CertUtils.java14
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java12
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java26
-rw-r--r--src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java6
-rw-r--r--src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java35
16 files changed, 122 insertions, 132 deletions
diff --git a/pom.xml b/pom.xml
index 3a48680..8e52ab9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
- <scope.type>provided</scope.type>
+<!-- <scope.type>provided</scope.type>-->
<!-- <scope.type>compile</scope.type>-->
</properties>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 3208551..d7ac7b1 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,10 +1,10 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.12:9094
+source.kafka.servers=192.168.45.102:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
+sink.kafka.servers=192.168.45.102:9094
tools.library=D:\\workerspace\\dat\\
#--------------------------------Kafka消费配置(session)------------------------------#
diff --git a/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java b/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java
index 5889110..ba42597 100644
--- a/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java
+++ b/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java
@@ -1,13 +1,13 @@
package com.zdjizhi.common;
-import com.zdjizhi.tools.system.FlowWriteConfigurations;
+import com.zdjizhi.tools.system.AppProtocolMetricsConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
-public class FlowWriteConfig {
+public class AppProtocolMetricsConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
@@ -17,24 +17,24 @@ public class FlowWriteConfig {
/**
* kafka common
*/
- public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
- public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
+ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(AppProtocolMetricsConfigurations.getStringProperty(1, "kafka.user"));
+ public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(AppProtocolMetricsConfigurations.getStringProperty(1, "kafka.pin"));
/**
* consumer session-record config
*/
- public static final String SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "session.source.kafka.topic");
- public static final String SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "session.group.id");
- public static final Integer SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "session.source.parallelism");
+ public static final String SESSION_SOURCE_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0, "session.source.kafka.topic");
+ public static final String SESSION_GROUP_ID = AppProtocolMetricsConfigurations.getStringProperty(0, "session.group.id");
+ public static final Integer SESSION_SOURCE_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "session.source.parallelism");
/**
* consumer interim-session-record config
*/
- public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "interim.session.source.kafka.topic");
- public static final String INTERIM_SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "interim.session.group.id");
- public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "interim.session.source.parallelism");
+ public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0, "interim.session.source.kafka.topic");
+ public static final String INTERIM_SESSION_GROUP_ID = AppProtocolMetricsConfigurations.getStringProperty(0, "interim.session.group.id");
+ public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "interim.session.source.parallelism");
@@ -45,56 +45,56 @@ public class FlowWriteConfig {
/**
* kafka source config
*/
- public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
- public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
- public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+ public static final String SESSION_TIMEOUT_MS = AppProtocolMetricsConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = AppProtocolMetricsConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = AppProtocolMetricsConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka sink config
*/
- public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
- public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.topic");
- public static final Integer SINK_KAFKA_PARALLELISM = FlowWriteConfigurations.getIntProperty(0,"sink.kafka.parallelism");
+ public static final String PRODUCER_ACK = AppProtocolMetricsConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = AppProtocolMetricsConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String SINK_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0,"sink.kafka.topic");
+ public static final Integer SINK_KAFKA_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0,"sink.kafka.parallelism");
/**
* connection kafka
*/
- public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
- public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+ public static final String RETRIES = AppProtocolMetricsConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = AppProtocolMetricsConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = AppProtocolMetricsConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = AppProtocolMetricsConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = AppProtocolMetricsConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = AppProtocolMetricsConfigurations.getIntProperty(1, "max.request.size");
/**
* common config
*/
- public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
- public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
- public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String SOURCE_KAFKA_SERVERS = AppProtocolMetricsConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = AppProtocolMetricsConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String TOOLS_LIBRARY = AppProtocolMetricsConfigurations.getStringProperty(0, "tools.library");
/**
* window config
*/
- public static final Integer COUNT_WINDOW_SECONDS = FlowWriteConfigurations.getIntProperty(0, "count.window.seconds");
- public static final Integer COUNT_WINDOW_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "count.window.parallelism");
+ public static final Integer COUNT_WINDOW_SECONDS = AppProtocolMetricsConfigurations.getIntProperty(0, "count.window.seconds");
+ public static final Integer COUNT_WINDOW_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "count.window.parallelism");
/**
* metrics config
*/
- public static final String METRICS_NAME = FlowWriteConfigurations.getStringProperty(0, "metrics.name");
+ public static final String METRICS_NAME = AppProtocolMetricsConfigurations.getStringProperty(0, "metrics.name");
- public static final Integer RANDOM_RANGE_NUM = FlowWriteConfigurations.getIntProperty(1, "random.range.num");
+ public static final Integer RANDOM_RANGE_NUM = AppProtocolMetricsConfigurations.getIntProperty(1, "random.range.num");
- public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
+ public static final Integer BUFFER_TIMEOUT = AppProtocolMetricsConfigurations.getIntProperty(1, "buffer.timeout");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/Fileds.java b/src/main/java/com/zdjizhi/common/Fields.java
index ed290d5..e81a4d8 100644
--- a/src/main/java/com/zdjizhi/common/Fileds.java
+++ b/src/main/java/com/zdjizhi/common/Fields.java
@@ -2,7 +2,7 @@ package com.zdjizhi.common;
-public class Fileds {
+public class Fields {
private long sessions;
private long in_bytes;
private long out_bytes;
@@ -24,7 +24,7 @@ public class Fileds {
private long s2c_tcp_retransmitted_bytes;
private String client_ip_sketch;
- public Fileds(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes,String client_ip_sketch) {
+ public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes, String client_ip_sketch) {
this.sessions = sessions;
this.in_bytes = in_bytes;
this.out_bytes = out_bytes;
diff --git a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java
index efbcea4..61faacf 100644
--- a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java
+++ b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java
@@ -1,9 +1,7 @@
package com.zdjizhi.common;
import com.alibaba.fastjson2.JSON;
-import com.jayway.jsonpath.JsonPath;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -159,14 +157,14 @@ public class InterimSessionRecord {
tagsMap.put("device_id", common_device_id);
tagsMap.put("common_device_tag", common_device_tag);
tagsMap.put("protocol_label", common_protocol_label);
- tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol);
- tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM));
+ tagsMap.put("app_full_path", common_app_full_path);
+ tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(AppProtocolMetricsConfig.RANDOM_RANGE_NUM));
return JSON.toJSONString(tagsMap);
}
- public Fileds getInterimSessionFileds() {
+ public Fields getInterimSessionFileds() {
long out_bytes;
long in_bytes;
long in_pkts;
@@ -183,7 +181,7 @@ public class InterimSessionRecord {
in_pkts = common_c2s_pkt_diff;
out_pkts = common_s2c_pkt_diff;
}
- return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0,
+ return new Fields(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, "");
}
diff --git a/src/main/java/com/zdjizhi/common/Metrics.java b/src/main/java/com/zdjizhi/common/Metrics.java
index 17c9a61..1c4fca2 100644
--- a/src/main/java/com/zdjizhi/common/Metrics.java
+++ b/src/main/java/com/zdjizhi/common/Metrics.java
@@ -3,12 +3,12 @@ package com.zdjizhi.common;
import java.util.Map;
public class Metrics {
- private final static String name = FlowWriteConfig.METRICS_NAME;
+ private final static String name = AppProtocolMetricsConfig.METRICS_NAME;
private Map<String, Object> tags;
- private Fileds fields;
+ private Fields fields;
private long timestamp;
- public Metrics( Map<String, Object> tags, Fileds fields, long timestamp) {
+ public Metrics(Map<String, Object> tags, Fields fields, long timestamp) {
this.tags = tags;
this.fields = fields;
this.timestamp = timestamp;
@@ -26,11 +26,11 @@ public class Metrics {
this.tags = tags;
}
- public Fileds getFields() {
+ public Fields getFields() {
return fields;
}
- public void setFields(Fileds fields) {
+ public void setFields(Fields fields) {
this.fields = fields;
}
diff --git a/src/main/java/com/zdjizhi/common/SessionRecord.java b/src/main/java/com/zdjizhi/common/SessionRecord.java
index 6fa9a52..e3dc9d9 100644
--- a/src/main/java/com/zdjizhi/common/SessionRecord.java
+++ b/src/main/java/com/zdjizhi/common/SessionRecord.java
@@ -284,14 +284,14 @@ public class SessionRecord {
tagsMap.put("device_id", common_device_id);
tagsMap.put("common_device_tag", common_device_tag);
tagsMap.put("protocol_label", common_protocol_label);
- tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol);
- tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM));
+ tagsMap.put("app_full_path", common_app_full_path);
+ tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(AppProtocolMetricsConfig.RANDOM_RANGE_NUM));
return JSON.toJSONString(tagsMap);
}
- public Fileds getSessionFileds() {
+ public Fields getSessionFileds() {
long out_bytes;
long in_bytes;
long in_pkts;
@@ -308,7 +308,7 @@ public class SessionRecord {
in_pkts = common_c2s_pkt_diff;
out_pkts = common_s2c_pkt_diff;
}
- return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num,
+ return new Fields(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num,
common_c2s_pkt_retrans, common_s2c_pkt_retrans, common_c2s_byte_retrans, common_s2c_byte_retrans, "");
}
diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
index 92f0b57..5271b7e 100644
--- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
+++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java
@@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.Fields;
import com.zdjizhi.common.Metrics;
import com.zdjizhi.utils.StringUtil;
@@ -21,7 +21,7 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
-public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fileds, String>, String, String, TimeWindow> {
+public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3<String, Fields, String>, String, String, TimeWindow> {
private static final Log logger = LogFactory.get();
private long sessions;
@@ -56,11 +56,11 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil
private Map<String, Object> cacheMap = new HashMap<>(20);
@Override
- public void process(String key, Context context, Iterable<Tuple3<String, Fileds, String>> iterable, Collector<String> collector) throws Exception {
+ public void process(String key, Context context, Iterable<Tuple3<String, Fields, String>> iterable, Collector<String> collector) throws Exception {
if (StringUtil.isNotBlank(key)) {
try {
HllSketch hllSketch = new HllSketch(12);
- for (Tuple3<String, Fileds, String> record : iterable) {
+ for (Tuple3<String, Fields, String> record : iterable) {
sessions = sessions + record.f1.getSessions();
in_bytes = in_bytes + record.f1.getIn_bytes();
@@ -94,7 +94,7 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil
}
client_ip_sketch = Base64.getEncoder().encodeToString(hllSketch.toUpdatableByteArray());
- Fileds fileds = new Fileds(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch);
+ Fields field = new Fields(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch);
@@ -118,7 +118,7 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil
keyMap.put("data_center", data_center);
keyMap.remove("randomNum");
- collector.collect(JSON.toJSONString(new Metrics(keyMap, fileds, context.window().getEnd() / 1000)));
+ collector.collect(JSON.toJSONString(new Metrics(keyMap, field, context.window().getEnd() / 1000)));
sessions = 0;
in_bytes = 0;
out_bytes = 0;
diff --git a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java
index 49e7a7c..2c58353 100644
--- a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java
+++ b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java
@@ -1,12 +1,12 @@
package com.zdjizhi.tools.function;
-import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.Fields;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
-public class KeyByFunction implements KeySelector<Tuple3<String, Fileds, String>, String> {
+public class KeyByFunction implements KeySelector<Tuple3<String, Fields, String>, String> {
@Override
- public String getKey(Tuple3<String, Fileds, String> value) throws Exception {
+ public String getKey(Tuple3<String, Fields, String> value) throws Exception {
return value.f0;
}
diff --git a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java
index 8d273b0..90ee567 100644
--- a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java
+++ b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java
@@ -3,30 +3,26 @@ package com.zdjizhi.tools.function;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
-import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.Fields;
import com.zdjizhi.common.InterimSessionRecord;
-import com.zdjizhi.common.SessionRecord;
-import com.zdjizhi.common.Tags;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import java.util.Map;
-
/**
* @author wangchengcheng
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2023/04/20
*/
-public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> {
+public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fields, String>> {
private static final Log logger = LogFactory.get();
private InterimSessionRecord interimsessionRecord;
private String tags="";
- private Fileds sessionFileds;
+ private Fields sessionField;
@Override
- public Tuple3<String, Fileds, String> map(String message) throws Exception {
+ public Tuple3<String, Fields, String> map(String message) throws Exception {
try {
if (StringUtil.isNotBlank(message)) {
@@ -34,17 +30,17 @@ public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<S
//过滤common_protocol_label不为空的数据
if (StringUtil.isNotBlank(interimsessionRecord.getCommon_protocol_label())) {
tags = interimsessionRecord.getInterimSessionTags();
- sessionFileds = interimsessionRecord.getInterimSessionFileds();
- return new Tuple3<>(tags, sessionFileds, interimsessionRecord.getCommon_client_ip());
+ sessionField = interimsessionRecord.getInterimSessionFileds();
+ return new Tuple3<>(tags, sessionField, interimsessionRecord.getCommon_client_ip());
}else {
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
}else {
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
} catch (RuntimeException e) {
logger.error("An error occurred in the interim-session-record parsing,error message is:" + e + ",The original log is" + message);
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
}
}
diff --git a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java
index 01b0826..a8425b9 100644
--- a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java
+++ b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java
@@ -3,30 +3,27 @@ package com.zdjizhi.tools.function;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
-import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.Fields;
import com.zdjizhi.common.SessionRecord;
-import com.zdjizhi.common.Tags;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import java.util.Map;
-
/**
* @author wangchengcheng
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2023/04/20
*/
-public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> {
+public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fields, String>> {
private static final Log logger = LogFactory.get();
private SessionRecord sessionRecord;
private String tags;
- private Fileds sessionFileds;
+ private Fields sessionField;
@Override
- public Tuple3<String, Fileds, String> map(String message) {
+ public Tuple3<String, Fields, String> map(String message) {
try {
if (StringUtil.isNotBlank(message)) {
@@ -34,18 +31,18 @@ public class ParseSessionFunction implements MapFunction<String, Tuple3<String,
//过滤common_protocol_label不为空的数据
if (StringUtil.isNotBlank(sessionRecord.getCommon_protocol_label())) {
tags = sessionRecord.getSessionTags();
- sessionFileds = sessionRecord.getSessionFileds();
- return new Tuple3<>(tags, sessionFileds, sessionRecord.getCommon_client_ip());
+ sessionField = sessionRecord.getSessionFileds();
+ return new Tuple3<>(tags, sessionField, sessionRecord.getCommon_client_ip());
}else {
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
}else {
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
} catch (Exception e) {
logger.error("An error occurred in the session-record parsing,error message is:" + e + ",The original log is" + message);
- return new Tuple3<>("", sessionFileds, "");
+ return new Tuple3<>("", sessionField, "");
}
}
}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
index ad93f29..20c99db 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
@@ -1,6 +1,6 @@
package com.zdjizhi.tools.kafka;
-import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.common.AppProtocolMetricsConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -33,15 +33,15 @@ class CertUtils {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";");
+ + AppProtocolMetricsConfig.KAFKA_SASL_JAAS_USER + " password=" + AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.keystore.location", AppProtocolMetricsConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.truststore.location", AppProtocolMetricsConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.key.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN);
}
}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index d0fa01f..a0ebab6 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -1,6 +1,6 @@
package com.zdjizhi.tools.kafka;
-import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.common.AppProtocolMetricsConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -20,13 +20,13 @@ public class KafkaConsumer {
//消费session-record配置
private static Properties createConsumerConfig(String groupId) {
Properties properties = new Properties();
- properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", AppProtocolMetricsConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", groupId);
- properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
- properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
- properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("session.timeout.ms", AppProtocolMetricsConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", AppProtocolMetricsConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", AppProtocolMetricsConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
- CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
+ CertUtils.chooseCert(AppProtocolMetricsConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
index ee31bc2..f67736b 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
@@ -1,6 +1,6 @@
package com.zdjizhi.tools.kafka;
-import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.common.AppProtocolMetricsConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -17,17 +17,17 @@ public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS);
- properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
- properties.put("retries", FlowWriteConfig.RETRIES);
- properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
- properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
- properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
- properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
- properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
-
- CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties);
+ properties.put("bootstrap.servers", AppProtocolMetricsConfig.SINK_KAFKA_SERVERS);
+ properties.put("acks", AppProtocolMetricsConfig.PRODUCER_ACK);
+ properties.put("retries", AppProtocolMetricsConfig.RETRIES);
+ properties.put("linger.ms", AppProtocolMetricsConfig.LINGER_MS);
+ properties.put("request.timeout.ms", AppProtocolMetricsConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", AppProtocolMetricsConfig.BATCH_SIZE);
+ properties.put("buffer.memory", AppProtocolMetricsConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", AppProtocolMetricsConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", AppProtocolMetricsConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(AppProtocolMetricsConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -35,7 +35,7 @@ public class KafkaProducer {
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- FlowWriteConfig.SINK_KAFKA_TOPIC,
+ AppProtocolMetricsConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(),
//sink与所有分区建立连接,轮询写入;
diff --git a/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java b/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java
index d429def..e6dcf91 100644
--- a/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java
+++ b/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java
@@ -11,7 +11,7 @@ import java.util.Properties;
* @author Administrator
*/
-public final class FlowWriteConfigurations {
+public final class AppProtocolMetricsConfigurations {
private static Properties propDefault = new Properties();
private static Properties propService = new Properties();
@@ -59,8 +59,8 @@ public final class FlowWriteConfigurations {
static {
try {
- propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ propService.load(AppProtocolMetricsConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propDefault.load(AppProtocolMetricsConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
propDefault = null;
propService = null;
diff --git a/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java b/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java
index 170755f..22ceb22 100644
--- a/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java
@@ -2,16 +2,15 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.Fileds;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.tools.function.CountWindowProcess;
+import com.zdjizhi.common.Fields;
+import com.zdjizhi.common.AppProtocolMetricsConfig;
+import com.zdjizhi.tools.function.CountAppProtocolWindowProcess;
import com.zdjizhi.tools.function.KeyByFunction;
import com.zdjizhi.tools.function.ParseInterimSessionFunction;
import com.zdjizhi.tools.function.ParseSessionFunction;
import com.zdjizhi.tools.kafka.KafkaConsumer;
import com.zdjizhi.tools.kafka.KafkaProducer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
@@ -29,38 +28,38 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
* @Description:
* @date 2023/04/19
*/
-public class LogFlowWriteTopology {
+public class LogAppProtocolMetricsTopology {
private static final Log logger = LogFactory.get();
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
+ environment.setBufferTimeout(AppProtocolMetricsConfig.BUFFER_TIMEOUT);
//消费、清洗会话日志
- SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.SESSION_GROUP_ID))
- .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC);
+ SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(AppProtocolMetricsConfig.SESSION_SOURCE_KAFKA_TOPIC, AppProtocolMetricsConfig.SESSION_GROUP_ID))
+ .setParallelism(AppProtocolMetricsConfig.SESSION_SOURCE_PARALLELISM).name(AppProtocolMetricsConfig.SESSION_SOURCE_KAFKA_TOPIC);
- SingleOutputStreamOperator<Tuple3<String, Fileds, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction())
- .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap");
+ SingleOutputStreamOperator<Tuple3<String, Fields, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction())
+ .setParallelism(AppProtocolMetricsConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap");
//消费、清洗过渡会话
- SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.INTERIM_SESSION_GROUP_ID))
- .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC);
+ SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC, AppProtocolMetricsConfig.INTERIM_SESSION_GROUP_ID))
+ .setParallelism(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC);
- SingleOutputStreamOperator<Tuple3<String, Fileds, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction())
- .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap");
+ SingleOutputStreamOperator<Tuple3<String, Fields, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction())
+ .setParallelism(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap");
//将会话、过渡会话汇聚并按照key分组后进入滑动窗口
- WindowedStream<Tuple3<String, Fileds, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.COUNT_WINDOW_SECONDS)));
+ WindowedStream<Tuple3<String, Fields, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(AppProtocolMetricsConfig.COUNT_WINDOW_SECONDS)));
- SingleOutputStreamOperator<String> countWindow = window.process(new CountWindowProcess()).setParallelism(FlowWriteConfig.COUNT_WINDOW_PARALLELISM).name("countWindow");
+ SingleOutputStreamOperator<String> countWindow = window.process(new CountAppProtocolWindowProcess()).setParallelism(AppProtocolMetricsConfig.COUNT_WINDOW_PARALLELISM).name("countWindow");
countWindow.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
- .setParallelism(FlowWriteConfig.SINK_KAFKA_PARALLELISM).name(FlowWriteConfig.SINK_KAFKA_TOPIC);
+ .setParallelism(AppProtocolMetricsConfig.SINK_KAFKA_PARALLELISM).name(AppProtocolMetricsConfig.SINK_KAFKA_TOPIC);
try {
environment.execute(args[0]);