diff options
8 files changed, 602 insertions, 54 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>app-protocol-stat-traffic-merge</artifactId> - <version>2.3.0</version> + <version>2.3.1</version> <name>app-protocol-stat-traffic-merge</name> <url>http://www.example.com</url> @@ -115,20 +115,20 @@ </build> <dependencies> - <dependency> + <!--<dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> <version>3.2.4</version> - </dependency> + </dependency>--> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.3.2</version> - <scope>compile</scope> + <scope>test</scope> </dependency> - <dependency> + <!-- <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> <version>${zdjz.tools.version}</version> @@ -142,7 +142,7 @@ <groupId>org.slf4j</groupId> </exclusion> </exclusions> - </dependency> + </dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> @@ -175,6 +175,18 @@ <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> <!--<scope>${scope.type}</scope>--> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.8.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> @@ -192,11 +204,11 @@ </dependency> <!-- https://mvnrepository.com/artifact/org.jasypt/jasypt --> - <dependency> + <!-- <dependency> <groupId>org.jasypt</groupId> <artifactId>jasypt</artifactId> <version>${jasypt.version}</version> - </dependency> + </dependency>--> <dependency> <groupId>junit</groupId> diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java index 8cf604a..c17dabf 100644 --- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java +++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java @@ -35,6 +35,11 @@ public class MergeConfigs { .noDefaultValue() .withDescription("The Kafka topic used in the sink."); + public static final ConfigOption<Long> GRANULARITY_SECOND = + ConfigOptions.key("granularity.second") + .longType() + .defaultValue(1L) + .withDescription("granularity second"); public static final ConfigOption<Integer> COUNT_WINDOW_TIME = ConfigOptions.key("count.window.time") diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index 26111e0..66e6603 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -1,27 +1,24 @@ package com.zdjizhi.topology; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONPath; -import com.alibaba.fastjson2.JSONReader; import com.zdjizhi.common.config.MergeConfigs; import com.zdjizhi.common.config.MergeConfiguration; import com.zdjizhi.common.pojo.*; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -29,7 +26,6 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.Arrays; import static com.zdjizhi.common.config.MergeConfigs.*; @@ -57,23 +53,22 @@ public class ApplicationProtocolTopology { environment.getConfig().setGlobalJobParameters(config); final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); - //水印 - WatermarkStrategy<Data> strategyForSession = WatermarkStrategy - .<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) - .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms()); - //数据源 DataStream<String> streamSource = environment.addSource( KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), config.get(SOURCE_KAFKA_TOPIC), config.get(STARTUP_MODE))); + final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L; + Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L); + LOG.warn("granularityMs:{}", granularityMs); + //解析数据 - SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession); + SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs)); //聚合 + 拆分 SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME)))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME)))) .aggregate(aggregateFunction(), processWindowFunction()); //输出 @@ -82,35 +77,38 @@ public class ApplicationProtocolTopology { environment.execute(config.get(JOB_NAME)); } - private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){ - return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() { + private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){ + return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() { @Override - public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception { - return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app); + public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception { + return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app); } }; } - private static FlatMapFunction<String, Data> parseFlatMapFunction(){ + private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){ return new RichFlatMapFunction<String, Data>() { - private JSONPath namePath; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - namePath = JSONPath.of("$.name"); + LOG.warn("granularityMs:{}", granularityMs); } @Override public void flatMap(String value, Collector<Data> out) throws Exception { - JSONReader parser = JSONReader.of(value); try { - Object name = namePath.extract(parser); - if(!"traffic_application_protocol_stat".equals(name)){ + // 先快速近似过滤 + if(!value.contains("traffic_application_protocol_stat")){ return; } - Data data = JSON.parseObject(value, Data.class); + // 精确过滤 + if(!"traffic_application_protocol_stat".equals(data.name)){ + return; + } + data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs; + String decodedPath = data.getDecoded_path(); if(StringUtils.isBlank(decodedPath)){ return; @@ -138,8 +136,6 @@ public class ApplicationProtocolTopology { out.collect(data); } catch (Exception e) { LOG.error("parse error for value:" + value, e); - } finally { - parser.close(); } } }; @@ -208,8 +204,8 @@ public class ApplicationProtocolTopology { }; } - private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){ - return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() { + private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){ + return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() { private String NAME = null; @Override @@ -221,18 +217,17 @@ public class ApplicationProtocolTopology { } @Override - public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception { - long timestamp_ms = context.window().getStart(); + public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception { for (ResultData ele : elements) { - ele.timestamp_ms = timestamp_ms; ele.name = NAME; - ele.vsys_id = key.f0; - ele.device_id = key.f1; - ele.device_group = key.f2; - ele.data_center = key.f3; + ele.timestamp_ms = key.f0; + ele.vsys_id = key.f1; + ele.device_id = key.f2; + ele.device_group = key.f3; + ele.data_center = key.f4; ele.app_name = null; - String decodedPath = key.f4; - String app = key.f5; + String decodedPath = key.f5; + String app = key.f6; // 拆分 int index = decodedPath.indexOf('.'); diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java new file mode 100644 index 0000000..45edd6b --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java @@ -0,0 +1,267 @@ +package com.zdjizhi.topology; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONPath; +import com.alibaba.fastjson2.JSONReader; +import com.zdjizhi.common.config.MergeConfigs; +import com.zdjizhi.common.config.MergeConfiguration; +import com.zdjizhi.common.pojo.Data; +import com.zdjizhi.common.pojo.ResultData; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; + +import static com.zdjizhi.common.config.MergeConfigs.*; + +/** + * @author lifengchao + * @Package com.zdjizhi.topology + * @Description: + * @date 2024/7/23 11:20 + */ +public class ApplicationProtocolTopologyEventTime { + static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyEventTime.class); + + public static void main(String[] args) throws Exception{ + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + //水印 + WatermarkStrategy<Data> strategyForSession = WatermarkStrategy + .<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms()); + + //数据源 + DataStream<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); + + //解析数据 + SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession); + + //聚合 + 拆分 + SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME)))) + .aggregate(aggregateFunction(), processWindowFunction()); + + //输出 + rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY))); + + environment.execute(config.get(JOB_NAME)); + } + + private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){ + return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() { + @Override + public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception { + return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app); + } + }; + } + + private static FlatMapFunction<String, Data> parseFlatMapFunction(){ + return new RichFlatMapFunction<String, Data>() { + private JSONPath namePath; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + namePath = JSONPath.of("$.name"); + } + + @Override + public void flatMap(String value, Collector<Data> out) throws Exception { + JSONReader parser = JSONReader.of(value); + try { + Object name = namePath.extract(parser); + if(!"traffic_application_protocol_stat".equals(name)){ + return; + } + + Data data = JSON.parseObject(value, Data.class); + String decodedPath = data.getDecoded_path(); + if(StringUtils.isBlank(decodedPath)){ + return; + } + + String appFullPath = data.getApp(); + if(StringUtils.isBlank(appFullPath)){ + out.collect(data); + return; + } + // decoded_path和app进行拼接,格式化 + String[] appSplits = appFullPath.split("\\."); + data.setApp(appSplits[appSplits.length -1]); + String firstAppProtocol = appSplits[0]; + String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1); + if (endProtocol.equals(firstAppProtocol)) { + if(appSplits.length > 1){ + decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf(".")); + data.setDecoded_path(decodedPath); + } + }else{ + decodedPath = decodedPath + "." + appFullPath; + data.setDecoded_path(decodedPath); + } + out.collect(data); + } catch (Exception e) { + LOG.error("parse error for value:" + value, e); + } finally { + parser.close(); + } + } + }; + } + + private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){ + return new AggregateFunction<Data, ResultData, ResultData>() { + + @Override + public ResultData createAccumulator() { + return new ResultData(); + } + + @Override + public ResultData add(Data value, ResultData acc) { + acc.sessions = acc.sessions + value.sessions; + acc.in_bytes = acc.in_bytes + value.in_bytes; + acc.out_bytes = acc.out_bytes + value.out_bytes; + acc.in_pkts = acc.in_pkts + value.in_pkts; + acc.out_pkts = acc.out_pkts + value.out_pkts; + acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts; + acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts; + acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes; + acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes; + acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments; + acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments; + acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes; + acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes; + acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts; + acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts; + acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts; + acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts; + acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes; + acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes; + return acc; + } + + @Override + public ResultData getResult(ResultData acc) { + return acc; + } + + @Override + public ResultData merge(ResultData a, ResultData b) { + a.sessions = a.sessions + b.sessions; + a.in_bytes = a.in_bytes + b.in_bytes; + a.out_bytes = a.out_bytes + b.out_bytes; + a.in_pkts = a.in_pkts + b.in_pkts; + a.out_pkts = a.out_pkts + b.out_pkts; + a.c2s_pkts = a.c2s_pkts + b.c2s_pkts; + a.s2c_pkts = a.s2c_pkts + b.s2c_pkts; + a.c2s_bytes = a.c2s_bytes + b.c2s_bytes; + a.s2c_bytes = a.s2c_bytes + b.s2c_bytes; + a.c2s_fragments = a.c2s_fragments + b.c2s_fragments; + a.s2c_fragments = a.s2c_fragments + b.s2c_fragments; + a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes; + a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes; + a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts; + a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts; + a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts; + a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts; + a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes; + a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes; + return a; + } + }; + } + + private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){ + return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() { + private String NAME = null; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME); + Preconditions.checkArgument(StringUtils.isNotBlank(NAME)); + } + + @Override + public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception { + long timestamp_ms = context.window().getStart(); + for (ResultData ele : elements) { + ele.timestamp_ms = timestamp_ms; + ele.name = NAME; + ele.vsys_id = key.f0; + ele.device_id = key.f1; + ele.device_group = key.f2; + ele.data_center = key.f3; + ele.app_name = null; + String decodedPath = key.f4; + String app = key.f5; + + // 拆分 + int index = decodedPath.indexOf('.'); + String subDecodedPath; + while (index > 0) { + subDecodedPath = decodedPath.substring(0, index); + ele.protocol_stack_id = subDecodedPath; + out.collect(JSON.toJSONString(ele)); + index = decodedPath.indexOf('.', index + 1); + } + + ele.app_name = app; + ele.protocol_stack_id = decodedPath; + out.collect(JSON.toJSONString(ele)); + } + } + }; + } + + private static DataStream<String> testSource(StreamExecutionEnvironment environment){ + return environment.fromCollection(Arrays.asList( + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}", + "{}" + )); + } +} diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java new file mode 100644 index 0000000..f42c021 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java @@ -0,0 +1,269 @@ +package com.zdjizhi.topology; + +import com.alibaba.fastjson2.JSON; +import com.zdjizhi.common.config.MergeConfigs; +import com.zdjizhi.common.config.MergeConfiguration; +import com.zdjizhi.common.pojo.Data; +import com.zdjizhi.common.pojo.ResultData; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +import static com.zdjizhi.common.config.MergeConfigs.*; + +/** + * @author lifengchao + * @Package com.zdjizhi.topology + * @Description: + * @date 2024/7/23 11:20 + */ +public class ApplicationProtocolTopologyTest { + static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyTest.class); + + public static void main(String[] args) throws Exception{ + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + //数据源 + //DataStream<String> streamSource = testSource(environment); + DataStream<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); + + final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L; + Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L); + LOG.info("granularityMs:{}", granularityMs); + + //解析数据 + SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs)); + + //聚合 + 拆分 + SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME)))) + .aggregate(aggregateFunction(), processWindowFunction()); + + //输出 + rstStream.addSink(new SinkFunction<String>() { + @Override + public void invoke(String value, Context context) throws Exception { + System.out.println(value); + } + }); + + environment.execute(config.get(JOB_NAME)); + } + + private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){ + return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() { + @Override + public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception { + return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app); + } + }; + } + + private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){ + return new RichFlatMapFunction<String, Data>() { + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + LOG.info("granularityMs:{}", granularityMs); + } + + @Override + public void flatMap(String value, Collector<Data> out) throws Exception { + try { + // 先快速近似过滤 + if(!value.contains("traffic_application_protocol_stat")){ + return; + } + Data data = JSON.parseObject(value, Data.class); + // 精确过滤 + if(!"traffic_application_protocol_stat".equals(data.name)){ + return; + } + data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs; + + String decodedPath = data.getDecoded_path(); + if(StringUtils.isBlank(decodedPath)){ + return; + } + + String appFullPath = data.getApp(); + if(StringUtils.isBlank(appFullPath)){ + out.collect(data); + return; + } + // decoded_path和app进行拼接,格式化 + String[] appSplits = appFullPath.split("\\."); + data.setApp(appSplits[appSplits.length -1]); + String firstAppProtocol = appSplits[0]; + String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1); + if (endProtocol.equals(firstAppProtocol)) { + if(appSplits.length > 1){ + decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf(".")); + data.setDecoded_path(decodedPath); + } + }else{ + decodedPath = decodedPath + "." + appFullPath; + data.setDecoded_path(decodedPath); + } + out.collect(data); + } catch (Exception e) { + LOG.error("parse error for value:" + value, e); + } + } + }; + } + + private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){ + return new AggregateFunction<Data, ResultData, ResultData>() { + + @Override + public ResultData createAccumulator() { + return new ResultData(); + } + + @Override + public ResultData add(Data value, ResultData acc) { + acc.sessions = acc.sessions + value.sessions; + acc.in_bytes = acc.in_bytes + value.in_bytes; + acc.out_bytes = acc.out_bytes + value.out_bytes; + acc.in_pkts = acc.in_pkts + value.in_pkts; + acc.out_pkts = acc.out_pkts + value.out_pkts; + acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts; + acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts; + acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes; + acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes; + acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments; + acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments; + acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes; + acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes; + acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts; + acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts; + acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts; + acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts; + acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes; + acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes; + return acc; + } + + @Override + public ResultData getResult(ResultData acc) { + return acc; + } + + @Override + public ResultData merge(ResultData a, ResultData b) { + a.sessions = a.sessions + b.sessions; + a.in_bytes = a.in_bytes + b.in_bytes; + a.out_bytes = a.out_bytes + b.out_bytes; + a.in_pkts = a.in_pkts + b.in_pkts; + a.out_pkts = a.out_pkts + b.out_pkts; + a.c2s_pkts = a.c2s_pkts + b.c2s_pkts; + a.s2c_pkts = a.s2c_pkts + b.s2c_pkts; + a.c2s_bytes = a.c2s_bytes + b.c2s_bytes; + a.s2c_bytes = a.s2c_bytes + b.s2c_bytes; + a.c2s_fragments = a.c2s_fragments + b.c2s_fragments; + a.s2c_fragments = a.s2c_fragments + b.s2c_fragments; + a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes; + a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes; + a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts; + a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts; + a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts; + a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts; + a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes; + a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes; + return a; + } + }; + } + + private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){ + return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() { + private String NAME = null; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME); + Preconditions.checkArgument(StringUtils.isNotBlank(NAME)); + } + + @Override + public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception { + for (ResultData ele : elements) { + ele.name = NAME; + ele.timestamp_ms = key.f0; + ele.vsys_id = key.f1; + ele.device_id = key.f2; + ele.device_group = key.f3; + ele.data_center = key.f4; + ele.app_name = null; + String decodedPath = key.f5; + String app = key.f6; + + // 拆分 + int index = decodedPath.indexOf('.'); + String subDecodedPath; + while (index > 0) { + subDecodedPath = decodedPath.substring(0, index); + ele.protocol_stack_id = subDecodedPath; + out.collect(JSON.toJSONString(ele)); + index = decodedPath.indexOf('.', index + 1); + } + + ele.app_name = app; + ele.protocol_stack_id = decodedPath; + out.collect(JSON.toJSONString(ele)); + } + } + }; + } + + private static DataStream<String> testSource(StreamExecutionEnvironment environment){ + return environment.fromCollection(Arrays.asList( + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191152\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990002033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191153\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990003033}", + "{}" + )); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java index 28c01f5..4a170a8 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java @@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; -import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; @@ -35,7 +35,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> { String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER); int protocolIdsNum = protocolIds.length; for (int i = 0; i < protocolIdsNum - 1; i++) { - if (StringUtil.isBlank(stringBuilder.toString())) { + if (StringUtils.isBlank(stringBuilder.toString())) { stringBuilder.append(protocolIds[i]); tags.setProtocol_stack_id(stringBuilder.toString()); metrics.setTags(tags); diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java index 0f06a26..65302f5 100644 --- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java +++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java @@ -7,7 +7,7 @@ import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONPath; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Tags; -import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; @@ -32,7 +32,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo @Override public void processElement(String value, ProcessFunction<String, Tuple3<Tags, Fields, Long>>.Context ctx, Collector<Tuple3<Tags, Fields, Long>> out) { try { - if (StringUtil.isNotBlank(value)) { + if (StringUtils.isNotBlank(value)) { Object isProtocolData = JSONPath.eval(value, dataTypeExpr); if (isProtocolData != null) { JSONObject originalLog = JSON.parseObject(value); @@ -44,7 +44,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); Long timestamp_ms = originalLog.getLong("timestamp_ms"); - if (StringUtil.isNotBlank(tags.getProtocol_stack_id())) { + if (StringUtils.isNotBlank(tags.getProtocol_stack_id())) { joinProtocol(tags); out.collect(new Tuple3<>(tags, fields, timestamp_ms)); @@ -85,7 +85,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo private static void joinProtocol(Tags tags) { String appFullPath = tags.getApp_name(); - if (StringUtil.isNotBlank(appFullPath)) { + if (StringUtils.isNotBlank(appFullPath)) { String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); tags.setApp_name(appName); diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java index 9e5f185..f3339e8 100644 --- a/src/test/java/com/zdjizhi/ConventionalTest.java +++ b/src/test/java/com/zdjizhi/ConventionalTest.java @@ -1,6 +1,6 @@ package com.zdjizhi; -import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; import java.util.Arrays; @@ -23,7 +23,7 @@ public class ConventionalTest { String appName = "qq_r2"; String[] protocolIds = protocol.split("\\."); for (String proto : protocolIds) { - if (StringUtil.isBlank(stringBuffer.toString())) { + if (StringUtils.isBlank(stringBuffer.toString())) { stringBuffer.append(proto); System.out.println(stringBuffer.toString()); } else { @@ -50,7 +50,7 @@ public class ConventionalTest { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < protocol.split(str).length - 1; i++) { String value = protocol.split(str)[i]; - if (StringUtil.isBlank(stringBuilder.toString())) { + if (StringUtils.isBlank(stringBuilder.toString())) { stringBuilder.append(value); System.out.println(stringBuilder.toString()); } else { |
