diff options
| author | lifengchao <[email protected]> | 2024-07-23 18:28:48 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-07-23 18:28:48 +0800 |
| commit | f8879f87c9f3298c7bb28e8987657dca38527c2a (patch) | |
| tree | b7a1a4e5cff56b62e802ec400e4bec3a11b87172 | |
| parent | e819d35d0adfb6cc8dae823bf3635e1f93ced81a (diff) | |
TSG-21885 application_protocol_stat摄入Metric更改为JSON扁平结构
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/pojo/Data.java | 251 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/pojo/ResultData.java | 251 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java | 95 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java | 299 |
5 files changed, 833 insertions, 65 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>app-protocol-stat-traffic-merge</artifactId> - <version>2.2.2</version> + <version>2.3.0</version> <name>app-protocol-stat-traffic-merge</name> <url>http://www.example.com</url> diff --git a/src/main/java/com/zdjizhi/common/pojo/Data.java b/src/main/java/com/zdjizhi/common/pojo/Data.java new file mode 100644 index 0000000..3be235c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/pojo/Data.java @@ -0,0 +1,251 @@ +package com.zdjizhi.common.pojo;
+
+import java.io.Serializable;
+
+public class Data implements Serializable {
+ public long timestamp_ms;
+ public String name;
+
+ public int vsys_id;
+ public String device_id;
+ public String device_group;
+ public String data_center;
+ public String decoded_path;
+ public String app;
+
+ public long sessions;
+ public long in_bytes;
+ public long out_bytes;
+ public long in_pkts;
+ public long out_pkts;
+ public long c2s_pkts;
+ public long s2c_pkts;
+ public long c2s_bytes;
+ public long s2c_bytes;
+ public long c2s_fragments;
+ public long s2c_fragments;
+ public long c2s_tcp_lost_bytes;
+ public long s2c_tcp_lost_bytes;
+ public long c2s_tcp_ooorder_pkts;
+ public long s2c_tcp_ooorder_pkts;
+ public long c2s_tcp_retransmitted_pkts;
+ public long s2c_tcp_retransmitted_pkts;
+ public long c2s_tcp_retransmitted_bytes;
+ public long s2c_tcp_retransmitted_bytes;
+
+ public long getTimestamp_ms() {
+ return timestamp_ms;
+ }
+
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getDecoded_path() {
+ return decoded_path;
+ }
+
+ public void setDecoded_path(String decoded_path) {
+ this.decoded_path = decoded_path;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getIn_bytes() {
+ return in_bytes;
+ }
+
+ public void setIn_bytes(long in_bytes) {
+ this.in_bytes = in_bytes;
+ }
+
+ public long getOut_bytes() {
+ return out_bytes;
+ }
+
+ public void setOut_bytes(long out_bytes) {
+ this.out_bytes = out_bytes;
+ }
+
+ public long getIn_pkts() {
+ return in_pkts;
+ }
+
+ public void setIn_pkts(long in_pkts) {
+ this.in_pkts = in_pkts;
+ }
+
+ public long getOut_pkts() {
+ return out_pkts;
+ }
+
+ public void setOut_pkts(long out_pkts) {
+ this.out_pkts = out_pkts;
+ }
+
+ public long getC2s_pkts() {
+ return c2s_pkts;
+ }
+
+ public void setC2s_pkts(long c2s_pkts) {
+ this.c2s_pkts = c2s_pkts;
+ }
+
+ public long getS2c_pkts() {
+ return s2c_pkts;
+ }
+
+ public void setS2c_pkts(long s2c_pkts) {
+ this.s2c_pkts = s2c_pkts;
+ }
+
+ public long getC2s_bytes() {
+ return c2s_bytes;
+ }
+
+ public void setC2s_bytes(long c2s_bytes) {
+ this.c2s_bytes = c2s_bytes;
+ }
+
+ public long getS2c_bytes() {
+ return s2c_bytes;
+ }
+
+ public void setS2c_bytes(long s2c_bytes) {
+ this.s2c_bytes = s2c_bytes;
+ }
+
+ public long getC2s_fragments() {
+ return c2s_fragments;
+ }
+
+ public void setC2s_fragments(long c2s_fragments) {
+ this.c2s_fragments = c2s_fragments;
+ }
+
+ public long getS2c_fragments() {
+ return s2c_fragments;
+ }
+
+ public void setS2c_fragments(long s2c_fragments) {
+ this.s2c_fragments = s2c_fragments;
+ }
+
+ public long getC2s_tcp_lost_bytes() {
+ return c2s_tcp_lost_bytes;
+ }
+
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ }
+
+ public long getS2c_tcp_lost_bytes() {
+ return s2c_tcp_lost_bytes;
+ }
+
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ }
+
+ public long getC2s_tcp_ooorder_pkts() {
+ return c2s_tcp_ooorder_pkts;
+ }
+
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ }
+
+ public long getS2c_tcp_ooorder_pkts() {
+ return s2c_tcp_ooorder_pkts;
+ }
+
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_pkts() {
+ return c2s_tcp_retransmitted_pkts;
+ }
+
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ }
+
+ public long getS2c_tcp_retransmitted_pkts() {
+ return s2c_tcp_retransmitted_pkts;
+ }
+
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_bytes() {
+ return c2s_tcp_retransmitted_bytes;
+ }
+
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ }
+
+ public long getS2c_tcp_retransmitted_bytes() {
+ return s2c_tcp_retransmitted_bytes;
+ }
+
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/pojo/ResultData.java b/src/main/java/com/zdjizhi/common/pojo/ResultData.java new file mode 100644 index 0000000..b0907c1 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/pojo/ResultData.java @@ -0,0 +1,251 @@ +package com.zdjizhi.common.pojo;
+
+import java.io.Serializable;
+
+public class ResultData implements Serializable {
+ public long timestamp_ms;
+ public String name;
+
+ public int vsys_id;
+ public String device_id;
+ public String device_group;
+ public String data_center;
+ public String protocol_stack_id;
+ public String app_name;
+
+ public long sessions;
+ public long in_bytes;
+ public long out_bytes;
+ public long in_pkts;
+ public long out_pkts;
+ public long c2s_pkts;
+ public long s2c_pkts;
+ public long c2s_bytes;
+ public long s2c_bytes;
+ public long c2s_fragments;
+ public long s2c_fragments;
+ public long c2s_tcp_lost_bytes;
+ public long s2c_tcp_lost_bytes;
+ public long c2s_tcp_ooorder_pkts;
+ public long s2c_tcp_ooorder_pkts;
+ public long c2s_tcp_retransmitted_pkts;
+ public long s2c_tcp_retransmitted_pkts;
+ public long c2s_tcp_retransmitted_bytes;
+ public long s2c_tcp_retransmitted_bytes;
+
+ public long getTimestamp_ms() {
+ return timestamp_ms;
+ }
+
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getProtocol_stack_id() {
+ return protocol_stack_id;
+ }
+
+ public void setProtocol_stack_id(String protocol_stack_id) {
+ this.protocol_stack_id = protocol_stack_id;
+ }
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getIn_bytes() {
+ return in_bytes;
+ }
+
+ public void setIn_bytes(long in_bytes) {
+ this.in_bytes = in_bytes;
+ }
+
+ public long getOut_bytes() {
+ return out_bytes;
+ }
+
+ public void setOut_bytes(long out_bytes) {
+ this.out_bytes = out_bytes;
+ }
+
+ public long getIn_pkts() {
+ return in_pkts;
+ }
+
+ public void setIn_pkts(long in_pkts) {
+ this.in_pkts = in_pkts;
+ }
+
+ public long getOut_pkts() {
+ return out_pkts;
+ }
+
+ public void setOut_pkts(long out_pkts) {
+ this.out_pkts = out_pkts;
+ }
+
+ public long getC2s_pkts() {
+ return c2s_pkts;
+ }
+
+ public void setC2s_pkts(long c2s_pkts) {
+ this.c2s_pkts = c2s_pkts;
+ }
+
+ public long getS2c_pkts() {
+ return s2c_pkts;
+ }
+
+ public void setS2c_pkts(long s2c_pkts) {
+ this.s2c_pkts = s2c_pkts;
+ }
+
+ public long getC2s_bytes() {
+ return c2s_bytes;
+ }
+
+ public void setC2s_bytes(long c2s_bytes) {
+ this.c2s_bytes = c2s_bytes;
+ }
+
+ public long getS2c_bytes() {
+ return s2c_bytes;
+ }
+
+ public void setS2c_bytes(long s2c_bytes) {
+ this.s2c_bytes = s2c_bytes;
+ }
+
+ public long getC2s_fragments() {
+ return c2s_fragments;
+ }
+
+ public void setC2s_fragments(long c2s_fragments) {
+ this.c2s_fragments = c2s_fragments;
+ }
+
+ public long getS2c_fragments() {
+ return s2c_fragments;
+ }
+
+ public void setS2c_fragments(long s2c_fragments) {
+ this.s2c_fragments = s2c_fragments;
+ }
+
+ public long getC2s_tcp_lost_bytes() {
+ return c2s_tcp_lost_bytes;
+ }
+
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ }
+
+ public long getS2c_tcp_lost_bytes() {
+ return s2c_tcp_lost_bytes;
+ }
+
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ }
+
+ public long getC2s_tcp_ooorder_pkts() {
+ return c2s_tcp_ooorder_pkts;
+ }
+
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ }
+
+ public long getS2c_tcp_ooorder_pkts() {
+ return s2c_tcp_ooorder_pkts;
+ }
+
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_pkts() {
+ return c2s_tcp_retransmitted_pkts;
+ }
+
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ }
+
+ public long getS2c_tcp_retransmitted_pkts() {
+ return s2c_tcp_retransmitted_pkts;
+ }
+
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_bytes() {
+ return c2s_tcp_retransmitted_bytes;
+ }
+
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ }
+
+ public long getS2c_tcp_retransmitted_bytes() {
+ return s2c_tcp_retransmitted_bytes;
+ }
+
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java new file mode 100644 index 0000000..d91f1be --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java @@ -0,0 +1,95 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.config.MergeConfigs; +import com.zdjizhi.common.config.MergeConfiguration; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Metrics; +import com.zdjizhi.common.pojo.Tags; +import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; +import com.zdjizhi.utils.functions.map.ResultFlatMap; +import com.zdjizhi.utils.functions.process.ParsingData; +import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; +import com.zdjizhi.utils.functions.statistics.MergeCountWindow; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; + +import static com.zdjizhi.common.config.MergeConfigs.*; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class ApplicationProtocolLegacyTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + try { + + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + //水印 + WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy + .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((element, timestamp) -> element.f2); + + //数据源 + DataStream<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); + + //解析数据 + SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData()) + .assignTimestampsAndWatermarks(strategyForSession) + .name("ParseDataProcess"); + + //增量聚合窗口 + SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME)))) + .reduce(new DispersionCountWindow(), new MergeCountWindow()) + .name("DispersionCountWindow"); + + //拆分数据 + SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) + .name("ResultFlatMap"); + + //输出 + resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), + config.get(SINK_KAFKA_TOPIC), + config.get(LOG_FAILURES_ONLY))); + + environment.execute(config.get(JOB_NAME)); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :"); + e.printStackTrace(); + } + + } + +} diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index d1a60c9..951703c 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -1,96 +1,267 @@ package com.zdjizhi.topology; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONPath; +import com.alibaba.fastjson2.JSONReader; import com.zdjizhi.common.config.MergeConfigs; import com.zdjizhi.common.config.MergeConfiguration; -import com.zdjizhi.common.pojo.Fields; -import com.zdjizhi.common.pojo.Metrics; -import com.zdjizhi.common.pojo.Tags; -import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; -import com.zdjizhi.utils.functions.map.ResultFlatMap; -import com.zdjizhi.utils.functions.process.ParsingData; -import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; -import com.zdjizhi.utils.functions.statistics.MergeCountWindow; +import com.zdjizhi.common.pojo.*; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.*; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.Arrays; import static com.zdjizhi.common.config.MergeConfigs.*; /** - * @author qidaijie + * @author lifengchao * @Package com.zdjizhi.topology * @Description: - * @date 2021/5/2016:42 + * @date 2024/7/23 11:20 */ public class ApplicationProtocolTopology { - private static final Log logger = LogFactory.get(); + static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopology.class); - public static void main(String[] args) { - try { + public static void main(String[] args) throws Exception{ + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + //水印 + WatermarkStrategy<Data> strategyForSession = WatermarkStrategy + .<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms()); + + //数据源 + DataStream<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); + + //解析数据 + SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession); + + //聚合 + 拆分 + SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME)))) + .aggregate(aggregateFunction(), processWindowFunction()); + + //输出 + rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY))); + + environment.execute(config.get(JOB_NAME)); + } - // param check - if (args.length < 1) { - throw new IllegalArgumentException("Error: Not found properties path. " + - "\nUsage: flink -c xxx xxx.jar app.properties."); + private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){ + return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() { + @Override + public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception { + return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app); } + }; + } - final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - - ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); - final Configuration config = tool.getConfiguration(); - environment.getConfig().setGlobalJobParameters(config); - final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); - - - //水印 - WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy - .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) - .withTimestampAssigner((element, timestamp) -> element.f2); - - //数据源 - DataStream<String> streamSource = environment.addSource( - KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), - config.get(SOURCE_KAFKA_TOPIC), - config.get(STARTUP_MODE))); - - //解析数据 - SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData()) - .assignTimestampsAndWatermarks(strategyForSession) - .name("ParseDataProcess"); - - //增量聚合窗口 - SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) - .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME)))) - .reduce(new DispersionCountWindow(), new MergeCountWindow()) - .name("DispersionCountWindow"); - - //拆分数据 - SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) - .name("ResultFlatMap"); - - //输出 - resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), - config.get(SINK_KAFKA_TOPIC), - config.get(LOG_FAILURES_ONLY))); - - environment.execute(config.get(JOB_NAME)); - } catch (Exception e) { - logger.error("This Flink task start ERROR! Exception information is :"); - e.printStackTrace(); - } + private static FlatMapFunction<String, Data> parseFlatMapFunction(){ + return new RichFlatMapFunction<String, Data>() { + private JSONPath namePath; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + namePath = JSONPath.of("$.name"); + } + + @Override + public void flatMap(String value, Collector<Data> out) throws Exception { + JSONReader parser = JSONReader.of(value); + try { + Object name = namePath.extract(parser); + if(!"traffic_application_protocol_stat".equals(name)){ + return; + } + + Data data = JSON.parseObject(value, Data.class); + String decodedPath = data.getDecoded_path(); + if(StringUtils.isBlank(decodedPath)){ + return; + } + + String appFullPath = data.getApp(); + if(StringUtils.isBlank(appFullPath)){ + out.collect(data); + return; + } + // decoded_path和app进行拼接,格式化 + String[] appSplits = appFullPath.split("\\."); + data.setApp(appSplits[appSplits.length -1]); + String firstAppProtocol = appSplits[0]; + String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1); + if (endProtocol.equals(firstAppProtocol)) { + if(appSplits.length > 1){ + decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf(".")); + data.setDecoded_path(decodedPath); + } + }else{ + decodedPath = decodedPath + "." + appFullPath; + data.setDecoded_path(decodedPath); + } + out.collect(data); + } catch (Exception e) { + LOG.error("parse error for value:" + value, e); + } finally { + parser.close(); + } + } + }; + } + + private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){ + return new AggregateFunction<Data, ResultData, ResultData>() { + + @Override + public ResultData createAccumulator() { + return new ResultData(); + } + + @Override + public ResultData add(Data value, ResultData acc) { + acc.sessions = acc.sessions + value.sessions; + acc.in_bytes = acc.in_bytes + value.in_bytes; + acc.out_bytes = acc.out_bytes + value.out_bytes; + acc.in_pkts = acc.in_pkts + value.in_pkts; + acc.out_pkts = acc.out_pkts + value.out_pkts; + acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts; + acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts; + acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes; + acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes; + acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments; + acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments; + acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes; + acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes; + acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts; + acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts; + acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts; + acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts; + acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes; + acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes; + return acc; + } + + @Override + public ResultData getResult(ResultData acc) { + return acc; + } + + @Override + public ResultData merge(ResultData a, ResultData b) { + a.sessions = a.sessions + b.sessions; + a.in_bytes = a.in_bytes + b.in_bytes; + a.out_bytes = a.out_bytes + b.out_bytes; + a.in_pkts = a.in_pkts + b.in_pkts; + a.out_pkts = a.out_pkts + b.out_pkts; + a.c2s_pkts = a.c2s_pkts + b.c2s_pkts; + a.s2c_pkts = a.s2c_pkts + b.s2c_pkts; + a.c2s_bytes = a.c2s_bytes + b.c2s_bytes; + a.s2c_bytes = a.s2c_bytes + b.s2c_bytes; + a.c2s_fragments = a.c2s_fragments + b.c2s_fragments; + a.s2c_fragments = a.s2c_fragments + b.s2c_fragments; + a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes; + a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes; + a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts; + a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts; + a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts; + a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts; + a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes; + a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes; + return a; + } + }; + } + + private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){ + return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() { + private String NAME = null; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME); + Preconditions.checkArgument(StringUtils.isNotBlank(NAME)); + } + @Override + public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception { + long timestamp_ms = context.window().getStart(); + for (ResultData ele : elements) { + ele.timestamp_ms = timestamp_ms; + ele.name = NAME; + ele.vsys_id = key.f0; + ele.device_id = key.f1; + ele.device_group = key.f2; + ele.data_center = key.f3; + ele.app_name = null; + String decodedPath = key.f4; + String app = key.f5; + + // 拆分 + int index = decodedPath.indexOf('.'); + String subDecodedPath; + while (index > 0) { + subDecodedPath = decodedPath.substring(0, index); + ele.protocol_stack_id = subDecodedPath; + out.collect(JSON.toJSONString(ele)); + index = decodedPath.indexOf('.', index + 1); + } + + ele.app_name = app; + ele.protocol_stack_id = decodedPath; + out.collect(JSON.toJSONString(ele)); + } + } + }; } + private static DataStream<String> testSource(StreamExecutionEnvironment environment){ + return environment.fromCollection(Arrays.asList( + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}", + "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}", + "{}" + )); + } } |
