diff options
| author | wangchengcheng <[email protected]> | 2023-05-09 13:58:02 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2023-05-09 13:58:02 +0800 |
| commit | 2e795e3dfa3d4de6337bcc80beee863f98be43c5 (patch) | |
| tree | 7da22963c9a43db9d2ef498db1eadf316aeacdad | |
| parent | b6aed3d341dffb0ed6b88bf9c1a07d48022c916e (diff) | |
1.Change Fields to Fields
2.Modify app_full_path logic
16 files changed, 122 insertions, 132 deletions
@@ -39,7 +39,7 @@ <hbase.version>2.2.3</hbase.version> <nacos.version>1.2.0</nacos.version> <zdjz.tools.version>1.0.8</zdjz.tools.version> - <scope.type>provided</scope.type> +<!-- <scope.type>provided</scope.type>--> <!-- <scope.type>compile</scope.type>--> </properties> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3208551..d7ac7b1 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,10 +1,10 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.12:9094 +source.kafka.servers=192.168.45.102:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 +sink.kafka.servers=192.168.45.102:9094 tools.library=D:\\workerspace\\dat\\ #--------------------------------Kafka消费配置(session)------------------------------# diff --git a/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java b/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java index 5889110..ba42597 100644 --- a/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java +++ b/src/main/java/com/zdjizhi/common/AppProtocolMetricsConfig.java @@ -1,13 +1,13 @@ package com.zdjizhi.common; -import com.zdjizhi.tools.system.FlowWriteConfigurations; +import com.zdjizhi.tools.system.AppProtocolMetricsConfigurations; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** * @author Administrator */ -public class FlowWriteConfig { +public class AppProtocolMetricsConfig { private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); @@ -17,24 +17,24 @@ public class FlowWriteConfig { /** * kafka common */ - public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); - public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(AppProtocolMetricsConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(AppProtocolMetricsConfigurations.getStringProperty(1, "kafka.pin")); /** * consumer session-record config */ - public static final String SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "session.source.kafka.topic"); - public static final String SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "session.group.id"); - public static final Integer SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "session.source.parallelism"); + public static final String SESSION_SOURCE_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0, "session.source.kafka.topic"); + public static final String SESSION_GROUP_ID = AppProtocolMetricsConfigurations.getStringProperty(0, "session.group.id"); + public static final Integer SESSION_SOURCE_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "session.source.parallelism"); /** * consumer interim-session-record config */ - public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "interim.session.source.kafka.topic"); - public static final String INTERIM_SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "interim.session.group.id"); - public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "interim.session.source.parallelism"); + public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0, "interim.session.source.kafka.topic"); + public static final String INTERIM_SESSION_GROUP_ID = AppProtocolMetricsConfigurations.getStringProperty(0, "interim.session.group.id"); + public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "interim.session.source.parallelism"); @@ -45,56 +45,56 @@ public class FlowWriteConfig { /** * kafka source config */ - public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); - public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); - public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + public static final String SESSION_TIMEOUT_MS = AppProtocolMetricsConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = AppProtocolMetricsConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = AppProtocolMetricsConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); /** * kafka sink config */ - public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); - public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.topic"); - public static final Integer SINK_KAFKA_PARALLELISM = FlowWriteConfigurations.getIntProperty(0,"sink.kafka.parallelism"); + public static final String PRODUCER_ACK = AppProtocolMetricsConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = AppProtocolMetricsConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String SINK_KAFKA_TOPIC = AppProtocolMetricsConfigurations.getStringProperty(0,"sink.kafka.topic"); + public static final Integer SINK_KAFKA_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0,"sink.kafka.parallelism"); /** * connection kafka */ - public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); - public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); - public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); - public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); - public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); - public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + public static final String RETRIES = AppProtocolMetricsConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = AppProtocolMetricsConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = AppProtocolMetricsConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = AppProtocolMetricsConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = AppProtocolMetricsConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = AppProtocolMetricsConfigurations.getIntProperty(1, "max.request.size"); /** * common config */ - public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); - public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); - public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); + public static final String SOURCE_KAFKA_SERVERS = AppProtocolMetricsConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = AppProtocolMetricsConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String TOOLS_LIBRARY = AppProtocolMetricsConfigurations.getStringProperty(0, "tools.library"); /** * window config */ - public static final Integer COUNT_WINDOW_SECONDS = FlowWriteConfigurations.getIntProperty(0, "count.window.seconds"); - public static final Integer COUNT_WINDOW_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "count.window.parallelism"); + public static final Integer COUNT_WINDOW_SECONDS = AppProtocolMetricsConfigurations.getIntProperty(0, "count.window.seconds"); + public static final Integer COUNT_WINDOW_PARALLELISM = AppProtocolMetricsConfigurations.getIntProperty(0, "count.window.parallelism"); /** * metrics config */ - public static final String METRICS_NAME = FlowWriteConfigurations.getStringProperty(0, "metrics.name"); + public static final String METRICS_NAME = AppProtocolMetricsConfigurations.getStringProperty(0, "metrics.name"); - public static final Integer RANDOM_RANGE_NUM = FlowWriteConfigurations.getIntProperty(1, "random.range.num"); + public static final Integer RANDOM_RANGE_NUM = AppProtocolMetricsConfigurations.getIntProperty(1, "random.range.num"); - public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); + public static final Integer BUFFER_TIMEOUT = AppProtocolMetricsConfigurations.getIntProperty(1, "buffer.timeout"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/Fileds.java b/src/main/java/com/zdjizhi/common/Fields.java index ed290d5..e81a4d8 100644 --- a/src/main/java/com/zdjizhi/common/Fileds.java +++ b/src/main/java/com/zdjizhi/common/Fields.java @@ -2,7 +2,7 @@ package com.zdjizhi.common; -public class Fileds { +public class Fields { private long sessions; private long in_bytes; private long out_bytes; @@ -24,7 +24,7 @@ public class Fileds { private long s2c_tcp_retransmitted_bytes; private String client_ip_sketch; - public Fileds(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes,String client_ip_sketch) { + public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { this.sessions = sessions; this.in_bytes = in_bytes; this.out_bytes = out_bytes; diff --git a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java index efbcea4..61faacf 100644 --- a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java +++ b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java @@ -1,9 +1,7 @@ package com.zdjizhi.common; import com.alibaba.fastjson2.JSON; -import com.jayway.jsonpath.JsonPath; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -159,14 +157,14 @@ public class InterimSessionRecord { tagsMap.put("device_id", common_device_id); tagsMap.put("common_device_tag", common_device_tag); tagsMap.put("protocol_label", common_protocol_label); - tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol); - tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM)); + tagsMap.put("app_full_path", common_app_full_path); + tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(AppProtocolMetricsConfig.RANDOM_RANGE_NUM)); return JSON.toJSONString(tagsMap); } - public Fileds getInterimSessionFileds() { + public Fields getInterimSessionFileds() { long out_bytes; long in_bytes; long in_pkts; @@ -183,7 +181,7 @@ public class InterimSessionRecord { in_pkts = common_c2s_pkt_diff; out_pkts = common_s2c_pkt_diff; } - return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0, + return new Fields(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ""); } diff --git a/src/main/java/com/zdjizhi/common/Metrics.java b/src/main/java/com/zdjizhi/common/Metrics.java index 17c9a61..1c4fca2 100644 --- a/src/main/java/com/zdjizhi/common/Metrics.java +++ b/src/main/java/com/zdjizhi/common/Metrics.java @@ -3,12 +3,12 @@ package com.zdjizhi.common; import java.util.Map; public class Metrics { - private final static String name = FlowWriteConfig.METRICS_NAME; + private final static String name = AppProtocolMetricsConfig.METRICS_NAME; private Map<String, Object> tags; - private Fileds fields; + private Fields fields; private long timestamp; - public Metrics( Map<String, Object> tags, Fileds fields, long timestamp) { + public Metrics(Map<String, Object> tags, Fields fields, long timestamp) { this.tags = tags; this.fields = fields; this.timestamp = timestamp; @@ -26,11 +26,11 @@ public class Metrics { this.tags = tags; } - public Fileds getFields() { + public Fields getFields() { return fields; } - public void setFields(Fileds fields) { + public void setFields(Fields fields) { this.fields = fields; } diff --git a/src/main/java/com/zdjizhi/common/SessionRecord.java b/src/main/java/com/zdjizhi/common/SessionRecord.java index 6fa9a52..e3dc9d9 100644 --- a/src/main/java/com/zdjizhi/common/SessionRecord.java +++ b/src/main/java/com/zdjizhi/common/SessionRecord.java @@ -284,14 +284,14 @@ public class SessionRecord { tagsMap.put("device_id", common_device_id); tagsMap.put("common_device_tag", common_device_tag); tagsMap.put("protocol_label", common_protocol_label); - tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol); - tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM)); + tagsMap.put("app_full_path", common_app_full_path); + tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(AppProtocolMetricsConfig.RANDOM_RANGE_NUM)); return JSON.toJSONString(tagsMap); } - public Fileds getSessionFileds() { + public Fields getSessionFileds() { long out_bytes; long in_bytes; long in_pkts; @@ -308,7 +308,7 @@ public class SessionRecord { in_pkts = common_c2s_pkt_diff; out_pkts = common_s2c_pkt_diff; } - return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num, + return new Fields(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num, common_c2s_pkt_retrans, common_s2c_pkt_retrans, common_c2s_byte_retrans, common_s2c_byte_retrans, ""); } diff --git a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java index 92f0b57..5271b7e 100644 --- a/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java +++ b/src/main/java/com/zdjizhi/tools/function/CountAppProtocolWindowProcess.java @@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.Fields; import com.zdjizhi.common.Metrics; import com.zdjizhi.utils.StringUtil; @@ -21,7 +21,7 @@ import java.util.Base64; import java.util.HashMap; import java.util.Map; -public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fileds, String>, String, String, TimeWindow> { +public class CountAppProtocolWindowProcess extends ProcessWindowFunction<Tuple3<String, Fields, String>, String, String, TimeWindow> { private static final Log logger = LogFactory.get(); private long sessions; @@ -56,11 +56,11 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil private Map<String, Object> cacheMap = new HashMap<>(20); @Override - public void process(String key, Context context, Iterable<Tuple3<String, Fileds, String>> iterable, Collector<String> collector) throws Exception { + public void process(String key, Context context, Iterable<Tuple3<String, Fields, String>> iterable, Collector<String> collector) throws Exception { if (StringUtil.isNotBlank(key)) { try { HllSketch hllSketch = new HllSketch(12); - for (Tuple3<String, Fileds, String> record : iterable) { + for (Tuple3<String, Fields, String> record : iterable) { sessions = sessions + record.f1.getSessions(); in_bytes = in_bytes + record.f1.getIn_bytes(); @@ -94,7 +94,7 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil } client_ip_sketch = Base64.getEncoder().encodeToString(hllSketch.toUpdatableByteArray()); - Fileds fileds = new Fileds(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch); + Fields field = new Fields(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch); @@ -118,7 +118,7 @@ public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fil keyMap.put("data_center", data_center); keyMap.remove("randomNum"); - collector.collect(JSON.toJSONString(new Metrics(keyMap, fileds, context.window().getEnd() / 1000))); + collector.collect(JSON.toJSONString(new Metrics(keyMap, field, context.window().getEnd() / 1000))); sessions = 0; in_bytes = 0; out_bytes = 0; diff --git a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java index 49e7a7c..2c58353 100644 --- a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java +++ b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java @@ -1,12 +1,12 @@ package com.zdjizhi.tools.function; -import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.Fields; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; -public class KeyByFunction implements KeySelector<Tuple3<String, Fileds, String>, String> { +public class KeyByFunction implements KeySelector<Tuple3<String, Fields, String>, String> { @Override - public String getKey(Tuple3<String, Fileds, String> value) throws Exception { + public String getKey(Tuple3<String, Fields, String> value) throws Exception { return value.f0; } diff --git a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java index 8d273b0..90ee567 100644 --- a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java +++ b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java @@ -3,30 +3,26 @@ package com.zdjizhi.tools.function; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; -import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.Fields; import com.zdjizhi.common.InterimSessionRecord; -import com.zdjizhi.common.SessionRecord; -import com.zdjizhi.common.Tags; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import java.util.Map; - /** * @author wangchengcheng * @Package com.zdjizhi.utils.functions * @Description: * @date 2023/04/20 */ -public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> { +public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fields, String>> { private static final Log logger = LogFactory.get(); private InterimSessionRecord interimsessionRecord; private String tags=""; - private Fileds sessionFileds; + private Fields sessionField; @Override - public Tuple3<String, Fileds, String> map(String message) throws Exception { + public Tuple3<String, Fields, String> map(String message) throws Exception { try { if (StringUtil.isNotBlank(message)) { @@ -34,17 +30,17 @@ public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<S //过滤common_protocol_label不为空的数据 if (StringUtil.isNotBlank(interimsessionRecord.getCommon_protocol_label())) { tags = interimsessionRecord.getInterimSessionTags(); - sessionFileds = interimsessionRecord.getInterimSessionFileds(); - return new Tuple3<>(tags, sessionFileds, interimsessionRecord.getCommon_client_ip()); + sessionField = interimsessionRecord.getInterimSessionFileds(); + return new Tuple3<>(tags, sessionField, interimsessionRecord.getCommon_client_ip()); }else { - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } }else { - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } } catch (RuntimeException e) { logger.error("An error occurred in the interim-session-record parsing,error message is:" + e + ",The original log is" + message); - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } } } diff --git a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java index 01b0826..a8425b9 100644 --- a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java +++ b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java @@ -3,30 +3,27 @@ package com.zdjizhi.tools.function; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; -import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.Fields; import com.zdjizhi.common.SessionRecord; -import com.zdjizhi.common.Tags; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import java.util.Map; - /** * @author wangchengcheng * @Package com.zdjizhi.utils.functions * @Description: * @date 2023/04/20 */ -public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> { +public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fields, String>> { private static final Log logger = LogFactory.get(); private SessionRecord sessionRecord; private String tags; - private Fileds sessionFileds; + private Fields sessionField; @Override - public Tuple3<String, Fileds, String> map(String message) { + public Tuple3<String, Fields, String> map(String message) { try { if (StringUtil.isNotBlank(message)) { @@ -34,18 +31,18 @@ public class ParseSessionFunction implements MapFunction<String, Tuple3<String, //过滤common_protocol_label不为空的数据 if (StringUtil.isNotBlank(sessionRecord.getCommon_protocol_label())) { tags = sessionRecord.getSessionTags(); - sessionFileds = sessionRecord.getSessionFileds(); - return new Tuple3<>(tags, sessionFileds, sessionRecord.getCommon_client_ip()); + sessionField = sessionRecord.getSessionFileds(); + return new Tuple3<>(tags, sessionField, sessionRecord.getCommon_client_ip()); }else { - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } }else { - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } } catch (Exception e) { logger.error("An error occurred in the session-record parsing,error message is:" + e + ",The original log is" + message); - return new Tuple3<>("", sessionFileds, ""); + return new Tuple3<>("", sessionField, ""); } } } diff --git a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java index ad93f29..20c99db 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java @@ -1,6 +1,6 @@ package com.zdjizhi.tools.kafka; -import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.AppProtocolMetricsConfig; import org.apache.kafka.common.config.SslConfigs; import java.util.Properties; @@ -33,15 +33,15 @@ class CertUtils { properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";"); + + AppProtocolMetricsConfig.KAFKA_SASL_JAAS_USER + " password=" + AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN + ";"); } else if (servers.contains(SSL_PORT)) { properties.put("security.protocol", "SSL"); properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.keystore.location", AppProtocolMetricsConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.truststore.location", AppProtocolMetricsConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.key.password", AppProtocolMetricsConfig.KAFKA_SASL_JAAS_PIN); } } diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index d0fa01f..a0ebab6 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -1,6 +1,6 @@ package com.zdjizhi.tools.kafka; -import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.AppProtocolMetricsConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -20,13 +20,13 @@ public class KafkaConsumer { //消费session-record配置 private static Properties createConsumerConfig(String groupId) { Properties properties = new Properties(); - properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); + properties.put("bootstrap.servers", AppProtocolMetricsConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", groupId); - properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); - properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); - properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); + properties.put("session.timeout.ms", AppProtocolMetricsConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", AppProtocolMetricsConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", AppProtocolMetricsConfig.MAX_PARTITION_FETCH_BYTES); properties.put("partition.discovery.interval.ms", "10000"); - CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); + CertUtils.chooseCert(AppProtocolMetricsConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java index ee31bc2..f67736b 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java @@ -1,6 +1,6 @@ package com.zdjizhi.tools.kafka; -import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.AppProtocolMetricsConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; @@ -17,17 +17,17 @@ public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS); - properties.put("acks", FlowWriteConfig.PRODUCER_ACK); - properties.put("retries", FlowWriteConfig.RETRIES); - properties.put("linger.ms", FlowWriteConfig.LINGER_MS); - properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", FlowWriteConfig.BATCH_SIZE); - properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY); - properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE); - properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - - CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties); + properties.put("bootstrap.servers", AppProtocolMetricsConfig.SINK_KAFKA_SERVERS); + properties.put("acks", AppProtocolMetricsConfig.PRODUCER_ACK); + properties.put("retries", AppProtocolMetricsConfig.RETRIES); + properties.put("linger.ms", AppProtocolMetricsConfig.LINGER_MS); + properties.put("request.timeout.ms", AppProtocolMetricsConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", AppProtocolMetricsConfig.BATCH_SIZE); + properties.put("buffer.memory", AppProtocolMetricsConfig.BUFFER_MEMORY); + properties.put("max.request.size", AppProtocolMetricsConfig.MAX_REQUEST_SIZE); + properties.put("compression.type", AppProtocolMetricsConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(AppProtocolMetricsConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -35,7 +35,7 @@ public class KafkaProducer { public static FlinkKafkaProducer<String> getKafkaProducer() { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - FlowWriteConfig.SINK_KAFKA_TOPIC, + AppProtocolMetricsConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), //sink与所有分区建立连接,轮询写入; diff --git a/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java b/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java index d429def..e6dcf91 100644 --- a/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java +++ b/src/main/java/com/zdjizhi/tools/system/AppProtocolMetricsConfigurations.java @@ -11,7 +11,7 @@ import java.util.Properties; * @author Administrator */ -public final class FlowWriteConfigurations { +public final class AppProtocolMetricsConfigurations { private static Properties propDefault = new Properties(); private static Properties propService = new Properties(); @@ -59,8 +59,8 @@ public final class FlowWriteConfigurations { static { try { - propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + propService.load(AppProtocolMetricsConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(AppProtocolMetricsConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); } catch (IOException | RuntimeException e) { propDefault = null; propService = null; diff --git a/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java b/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java index 170755f..22ceb22 100644 --- a/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogAppProtocolMetricsTopology.java @@ -2,16 +2,15 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.Fileds; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.tools.function.CountWindowProcess; +import com.zdjizhi.common.Fields; +import com.zdjizhi.common.AppProtocolMetricsConfig; +import com.zdjizhi.tools.function.CountAppProtocolWindowProcess; import com.zdjizhi.tools.function.KeyByFunction; import com.zdjizhi.tools.function.ParseInterimSessionFunction; import com.zdjizhi.tools.function.ParseSessionFunction; import com.zdjizhi.tools.kafka.KafkaConsumer; import com.zdjizhi.tools.kafka.KafkaProducer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; @@ -29,38 +28,38 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; * @Description: * @date 2023/04/19 */ -public class LogFlowWriteTopology { +public class LogAppProtocolMetricsTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); + environment.setBufferTimeout(AppProtocolMetricsConfig.BUFFER_TIMEOUT); //消费、清洗会话日志 - SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.SESSION_GROUP_ID)) - .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC); + SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(AppProtocolMetricsConfig.SESSION_SOURCE_KAFKA_TOPIC, AppProtocolMetricsConfig.SESSION_GROUP_ID)) + .setParallelism(AppProtocolMetricsConfig.SESSION_SOURCE_PARALLELISM).name(AppProtocolMetricsConfig.SESSION_SOURCE_KAFKA_TOPIC); - SingleOutputStreamOperator<Tuple3<String, Fileds, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction()) - .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap"); + SingleOutputStreamOperator<Tuple3<String, Fields, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction()) + .setParallelism(AppProtocolMetricsConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap"); //消费、清洗过渡会话 - SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.INTERIM_SESSION_GROUP_ID)) - .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC); + SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC, AppProtocolMetricsConfig.INTERIM_SESSION_GROUP_ID)) + .setParallelism(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC); - SingleOutputStreamOperator<Tuple3<String, Fileds, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction()) - .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap"); + SingleOutputStreamOperator<Tuple3<String, Fields, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction()) + .setParallelism(AppProtocolMetricsConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap"); //将会话、过渡会话汇聚并按照key分组后进入滑动窗口 - WindowedStream<Tuple3<String, Fileds, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.COUNT_WINDOW_SECONDS))); + WindowedStream<Tuple3<String, Fields, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(AppProtocolMetricsConfig.COUNT_WINDOW_SECONDS))); - SingleOutputStreamOperator<String> countWindow = window.process(new CountWindowProcess()).setParallelism(FlowWriteConfig.COUNT_WINDOW_PARALLELISM).name("countWindow"); + SingleOutputStreamOperator<String> countWindow = window.process(new CountAppProtocolWindowProcess()).setParallelism(AppProtocolMetricsConfig.COUNT_WINDOW_PARALLELISM).name("countWindow"); countWindow.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") - .setParallelism(FlowWriteConfig.SINK_KAFKA_PARALLELISM).name(FlowWriteConfig.SINK_KAFKA_TOPIC); + .setParallelism(AppProtocolMetricsConfig.SINK_KAFKA_PARALLELISM).name(AppProtocolMetricsConfig.SINK_KAFKA_TOPIC); try { environment.execute(args[0]); |
