summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml28
-rw-r--r--src/main/java/com/zdjizhi/common/config/MergeConfigs.java5
-rw-r--r--src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java69
-rw-r--r--src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java267
-rw-r--r--src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java269
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java4
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java8
-rw-r--r--src/test/java/com/zdjizhi/ConventionalTest.java6
8 files changed, 602 insertions, 54 deletions
diff --git a/pom.xml b/pom.xml
index 0624c9e..2c6c9d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId>
- <version>2.3.0</version>
+ <version>2.3.1</version>
<name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url>
@@ -115,20 +115,20 @@
</build>
<dependencies>
- <dependency>
+ <!--<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
- </dependency>
+ </dependency>-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
- <scope>compile</scope>
+ <scope>test</scope>
</dependency>
- <dependency>
+ <!-- <dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>${zdjz.tools.version}</version>
@@ -142,7 +142,7 @@
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
- </dependency>
+ </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
@@ -175,6 +175,18 @@
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
+ <exclusions>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.8.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
@@ -192,11 +204,11 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
- <dependency>
+ <!-- <dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
<version>${jasypt.version}</version>
- </dependency>
+ </dependency>-->
<dependency>
<groupId>junit</groupId>
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
index 8cf604a..c17dabf 100644
--- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -35,6 +35,11 @@ public class MergeConfigs {
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
+ public static final ConfigOption<Long> GRANULARITY_SECOND =
+ ConfigOptions.key("granularity.second")
+ .longType()
+ .defaultValue(1L)
+ .withDescription("granularity second");
public static final ConfigOption<Integer> COUNT_WINDOW_TIME =
ConfigOptions.key("count.window.time")
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 26111e0..66e6603 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -1,27 +1,24 @@
package com.zdjizhi.topology;
import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONPath;
-import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.*;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -29,7 +26,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
import java.util.Arrays;
import static com.zdjizhi.common.config.MergeConfigs.*;
@@ -57,23 +53,22 @@ public class ApplicationProtocolTopology {
environment.getConfig().setGlobalJobParameters(config);
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
- //水印
- WatermarkStrategy<Data> strategyForSession = WatermarkStrategy
- .<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
- .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
-
//数据源
DataStream<String> streamSource = environment.addSource(
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
config.get(SOURCE_KAFKA_TOPIC),
config.get(STARTUP_MODE)));
+ final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
+ Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
+ LOG.warn("granularityMs:{}", granularityMs);
+
//解析数据
- SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
+ SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
//聚合 + 拆分
SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
.aggregate(aggregateFunction(), processWindowFunction());
//输出
@@ -82,35 +77,38 @@ public class ApplicationProtocolTopology {
environment.execute(config.get(JOB_NAME));
}
- private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){
- return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() {
+ private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){
+ return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() {
@Override
- public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception {
- return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception {
+ return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
}
};
}
- private static FlatMapFunction<String, Data> parseFlatMapFunction(){
+ private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){
return new RichFlatMapFunction<String, Data>() {
- private JSONPath namePath;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- namePath = JSONPath.of("$.name");
+ LOG.warn("granularityMs:{}", granularityMs);
}
@Override
public void flatMap(String value, Collector<Data> out) throws Exception {
- JSONReader parser = JSONReader.of(value);
try {
- Object name = namePath.extract(parser);
- if(!"traffic_application_protocol_stat".equals(name)){
+ // 先快速近似过滤
+ if(!value.contains("traffic_application_protocol_stat")){
return;
}
-
Data data = JSON.parseObject(value, Data.class);
+ // 精确过滤
+ if(!"traffic_application_protocol_stat".equals(data.name)){
+ return;
+ }
+ data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
+
String decodedPath = data.getDecoded_path();
if(StringUtils.isBlank(decodedPath)){
return;
@@ -138,8 +136,6 @@ public class ApplicationProtocolTopology {
out.collect(data);
} catch (Exception e) {
LOG.error("parse error for value:" + value, e);
- } finally {
- parser.close();
}
}
};
@@ -208,8 +204,8 @@ public class ApplicationProtocolTopology {
};
}
- private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
- return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() {
+ private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() {
private String NAME = null;
@Override
@@ -221,18 +217,17 @@ public class ApplicationProtocolTopology {
}
@Override
- public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
- long timestamp_ms = context.window().getStart();
+ public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
for (ResultData ele : elements) {
- ele.timestamp_ms = timestamp_ms;
ele.name = NAME;
- ele.vsys_id = key.f0;
- ele.device_id = key.f1;
- ele.device_group = key.f2;
- ele.data_center = key.f3;
+ ele.timestamp_ms = key.f0;
+ ele.vsys_id = key.f1;
+ ele.device_id = key.f2;
+ ele.device_group = key.f3;
+ ele.data_center = key.f4;
ele.app_name = null;
- String decodedPath = key.f4;
- String app = key.f5;
+ String decodedPath = key.f5;
+ String app = key.f6;
// 拆分
int index = decodedPath.indexOf('.');
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java
new file mode 100644
index 0000000..45edd6b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java
@@ -0,0 +1,267 @@
+package com.zdjizhi.topology;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONPath;
+import com.alibaba.fastjson2.JSONReader;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
+import com.zdjizhi.common.pojo.Data;
+import com.zdjizhi.common.pojo.ResultData;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
+/**
+ * @author lifengchao
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2024/7/23 11:20
+ */
+public class ApplicationProtocolTopologyEventTime {
+ static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyEventTime.class);
+
+ public static void main(String[] args) throws Exception{
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //水印
+ WatermarkStrategy<Data> strategyForSession = WatermarkStrategy
+ .<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
+
+ //数据源
+ DataStream<String> streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ //解析数据
+ SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
+
+ //聚合 + 拆分
+ SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .aggregate(aggregateFunction(), processWindowFunction());
+
+ //输出
+ rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY)));
+
+ environment.execute(config.get(JOB_NAME));
+ }
+
+ private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){
+ return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() {
+ @Override
+ public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception {
+ return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ }
+ };
+ }
+
+ private static FlatMapFunction<String, Data> parseFlatMapFunction(){
+ return new RichFlatMapFunction<String, Data>() {
+ private JSONPath namePath;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ namePath = JSONPath.of("$.name");
+ }
+
+ @Override
+ public void flatMap(String value, Collector<Data> out) throws Exception {
+ JSONReader parser = JSONReader.of(value);
+ try {
+ Object name = namePath.extract(parser);
+ if(!"traffic_application_protocol_stat".equals(name)){
+ return;
+ }
+
+ Data data = JSON.parseObject(value, Data.class);
+ String decodedPath = data.getDecoded_path();
+ if(StringUtils.isBlank(decodedPath)){
+ return;
+ }
+
+ String appFullPath = data.getApp();
+ if(StringUtils.isBlank(appFullPath)){
+ out.collect(data);
+ return;
+ }
+ // decoded_path和app进行拼接,格式化
+ String[] appSplits = appFullPath.split("\\.");
+ data.setApp(appSplits[appSplits.length -1]);
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if(appSplits.length > 1){
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ data.setDecoded_path(decodedPath);
+ }
+ }else{
+ decodedPath = decodedPath + "." + appFullPath;
+ data.setDecoded_path(decodedPath);
+ }
+ out.collect(data);
+ } catch (Exception e) {
+ LOG.error("parse error for value:" + value, e);
+ } finally {
+ parser.close();
+ }
+ }
+ };
+ }
+
+ private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){
+ return new AggregateFunction<Data, ResultData, ResultData>() {
+
+ @Override
+ public ResultData createAccumulator() {
+ return new ResultData();
+ }
+
+ @Override
+ public ResultData add(Data value, ResultData acc) {
+ acc.sessions = acc.sessions + value.sessions;
+ acc.in_bytes = acc.in_bytes + value.in_bytes;
+ acc.out_bytes = acc.out_bytes + value.out_bytes;
+ acc.in_pkts = acc.in_pkts + value.in_pkts;
+ acc.out_pkts = acc.out_pkts + value.out_pkts;
+ acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
+ acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
+ acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
+ acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
+ acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
+ acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
+ acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
+ acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
+ acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
+ acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
+ acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
+ acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
+ acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
+ acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
+ return acc;
+ }
+
+ @Override
+ public ResultData getResult(ResultData acc) {
+ return acc;
+ }
+
+ @Override
+ public ResultData merge(ResultData a, ResultData b) {
+ a.sessions = a.sessions + b.sessions;
+ a.in_bytes = a.in_bytes + b.in_bytes;
+ a.out_bytes = a.out_bytes + b.out_bytes;
+ a.in_pkts = a.in_pkts + b.in_pkts;
+ a.out_pkts = a.out_pkts + b.out_pkts;
+ a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
+ a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
+ a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
+ a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
+ a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
+ a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
+ a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
+ a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
+ a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
+ a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
+ a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
+ a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
+ a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
+ a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
+ return a;
+ }
+ };
+ }
+
+ private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() {
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+ Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
+ }
+
+ @Override
+ public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
+ long timestamp_ms = context.window().getStart();
+ for (ResultData ele : elements) {
+ ele.timestamp_ms = timestamp_ms;
+ ele.name = NAME;
+ ele.vsys_id = key.f0;
+ ele.device_id = key.f1;
+ ele.device_group = key.f2;
+ ele.data_center = key.f3;
+ ele.app_name = null;
+ String decodedPath = key.f4;
+ String app = key.f5;
+
+ // 拆分
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ ele.protocol_stack_id = subDecodedPath;
+ out.collect(JSON.toJSONString(ele));
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ ele.app_name = app;
+ ele.protocol_stack_id = decodedPath;
+ out.collect(JSON.toJSONString(ele));
+ }
+ }
+ };
+ }
+
+ private static DataStream<String> testSource(StreamExecutionEnvironment environment){
+ return environment.fromCollection(Arrays.asList(
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{}"
+ ));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java
new file mode 100644
index 0000000..f42c021
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java
@@ -0,0 +1,269 @@
+package com.zdjizhi.topology;
+
+import com.alibaba.fastjson2.JSON;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
+import com.zdjizhi.common.pojo.Data;
+import com.zdjizhi.common.pojo.ResultData;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
+/**
+ * @author lifengchao
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2024/7/23 11:20
+ */
+public class ApplicationProtocolTopologyTest {
+ static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyTest.class);
+
+ public static void main(String[] args) throws Exception{
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.setParallelism(1);
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //数据源
+ //DataStream<String> streamSource = testSource(environment);
+ DataStream<String> streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
+ Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
+ LOG.info("granularityMs:{}", granularityMs);
+
+ //解析数据
+ SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
+
+ //聚合 + 拆分
+ SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .aggregate(aggregateFunction(), processWindowFunction());
+
+ //输出
+ rstStream.addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ System.out.println(value);
+ }
+ });
+
+ environment.execute(config.get(JOB_NAME));
+ }
+
+ private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){
+ return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() {
+ @Override
+ public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception {
+ return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ }
+ };
+ }
+
+ private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){
+ return new RichFlatMapFunction<String, Data>() {
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ LOG.info("granularityMs:{}", granularityMs);
+ }
+
+ @Override
+ public void flatMap(String value, Collector<Data> out) throws Exception {
+ try {
+ // 先快速近似过滤
+ if(!value.contains("traffic_application_protocol_stat")){
+ return;
+ }
+ Data data = JSON.parseObject(value, Data.class);
+ // 精确过滤
+ if(!"traffic_application_protocol_stat".equals(data.name)){
+ return;
+ }
+ data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
+
+ String decodedPath = data.getDecoded_path();
+ if(StringUtils.isBlank(decodedPath)){
+ return;
+ }
+
+ String appFullPath = data.getApp();
+ if(StringUtils.isBlank(appFullPath)){
+ out.collect(data);
+ return;
+ }
+ // decoded_path和app进行拼接,格式化
+ String[] appSplits = appFullPath.split("\\.");
+ data.setApp(appSplits[appSplits.length -1]);
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if(appSplits.length > 1){
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ data.setDecoded_path(decodedPath);
+ }
+ }else{
+ decodedPath = decodedPath + "." + appFullPath;
+ data.setDecoded_path(decodedPath);
+ }
+ out.collect(data);
+ } catch (Exception e) {
+ LOG.error("parse error for value:" + value, e);
+ }
+ }
+ };
+ }
+
+ private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){
+ return new AggregateFunction<Data, ResultData, ResultData>() {
+
+ @Override
+ public ResultData createAccumulator() {
+ return new ResultData();
+ }
+
+ @Override
+ public ResultData add(Data value, ResultData acc) {
+ acc.sessions = acc.sessions + value.sessions;
+ acc.in_bytes = acc.in_bytes + value.in_bytes;
+ acc.out_bytes = acc.out_bytes + value.out_bytes;
+ acc.in_pkts = acc.in_pkts + value.in_pkts;
+ acc.out_pkts = acc.out_pkts + value.out_pkts;
+ acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
+ acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
+ acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
+ acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
+ acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
+ acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
+ acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
+ acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
+ acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
+ acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
+ acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
+ acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
+ acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
+ acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
+ return acc;
+ }
+
+ @Override
+ public ResultData getResult(ResultData acc) {
+ return acc;
+ }
+
+ @Override
+ public ResultData merge(ResultData a, ResultData b) {
+ a.sessions = a.sessions + b.sessions;
+ a.in_bytes = a.in_bytes + b.in_bytes;
+ a.out_bytes = a.out_bytes + b.out_bytes;
+ a.in_pkts = a.in_pkts + b.in_pkts;
+ a.out_pkts = a.out_pkts + b.out_pkts;
+ a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
+ a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
+ a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
+ a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
+ a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
+ a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
+ a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
+ a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
+ a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
+ a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
+ a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
+ a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
+ a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
+ a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
+ return a;
+ }
+ };
+ }
+
+ private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() {
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+ Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
+ }
+
+ @Override
+ public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
+ for (ResultData ele : elements) {
+ ele.name = NAME;
+ ele.timestamp_ms = key.f0;
+ ele.vsys_id = key.f1;
+ ele.device_id = key.f2;
+ ele.device_group = key.f3;
+ ele.data_center = key.f4;
+ ele.app_name = null;
+ String decodedPath = key.f5;
+ String app = key.f6;
+
+ // 拆分
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ ele.protocol_stack_id = subDecodedPath;
+ out.collect(JSON.toJSONString(ele));
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ ele.app_name = app;
+ ele.protocol_stack_id = decodedPath;
+ out.collect(JSON.toJSONString(ele));
+ }
+ }
+ };
+ }
+
+ private static DataStream<String> testSource(StreamExecutionEnvironment environment){
+ return environment.fromCollection(Arrays.asList(
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191152\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990002033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191153\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990003033}",
+ "{}"
+ ));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
index 28c01f5..4a170a8 100644
--- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
@@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@@ -35,7 +35,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
- if (StringUtil.isBlank(stringBuilder.toString())) {
+ if (StringUtils.isBlank(stringBuilder.toString())) {
stringBuilder.append(protocolIds[i]);
tags.setProtocol_stack_id(stringBuilder.toString());
metrics.setTags(tags);
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
index 0f06a26..65302f5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -7,7 +7,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -32,7 +32,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
@Override
public void processElement(String value, ProcessFunction<String, Tuple3<Tags, Fields, Long>>.Context ctx, Collector<Tuple3<Tags, Fields, Long>> out) {
try {
- if (StringUtil.isNotBlank(value)) {
+ if (StringUtils.isNotBlank(value)) {
Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
if (isProtocolData != null) {
JSONObject originalLog = JSON.parseObject(value);
@@ -44,7 +44,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
Long timestamp_ms = originalLog.getLong("timestamp_ms");
- if (StringUtil.isNotBlank(tags.getProtocol_stack_id())) {
+ if (StringUtils.isNotBlank(tags.getProtocol_stack_id())) {
joinProtocol(tags);
out.collect(new Tuple3<>(tags, fields, timestamp_ms));
@@ -85,7 +85,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
private static void joinProtocol(Tags tags) {
String appFullPath = tags.getApp_name();
- if (StringUtil.isNotBlank(appFullPath)) {
+ if (StringUtils.isNotBlank(appFullPath)) {
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
tags.setApp_name(appName);
diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java
index 9e5f185..f3339e8 100644
--- a/src/test/java/com/zdjizhi/ConventionalTest.java
+++ b/src/test/java/com/zdjizhi/ConventionalTest.java
@@ -1,6 +1,6 @@
package com.zdjizhi;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.Arrays;
@@ -23,7 +23,7 @@ public class ConventionalTest {
String appName = "qq_r2";
String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) {
- if (StringUtil.isBlank(stringBuffer.toString())) {
+ if (StringUtils.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto);
System.out.println(stringBuffer.toString());
} else {
@@ -50,7 +50,7 @@ public class ConventionalTest {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < protocol.split(str).length - 1; i++) {
String value = protocol.split(str)[i];
- if (StringUtil.isBlank(stringBuilder.toString())) {
+ if (StringUtils.isBlank(stringBuilder.toString())) {
stringBuilder.append(value);
System.out.println(stringBuilder.toString());
} else {