summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-08-21 17:22:37 +0800
committerqidaijie <[email protected]>2023-08-21 17:22:37 +0800
commit7b2302234ac385f2850ce584573151f5d0930446 (patch)
tree3613bdd1b8c740a16916c5fadc80d57546dc8027 /src
parent345b7fd6013ac6404263c9a9c408a88d76c34d00 (diff)
协议与应用统计程序基于事件时间处理,且结果数据时间戳为毫秒级。(TSG-16737)
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/common/config/GlobalConfig.java1
-rw-r--r--src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java37
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java12
-rw-r--r--src/test/java/com/zdjizhi/FlagsTest.java5
6 files changed, 46 insertions, 26 deletions
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
index 84008df..24702a5 100644
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
+++ b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
@@ -27,6 +27,7 @@ public class GlobalConfig {
public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
+ public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness");
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 0f34770..bcb0f63 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -6,21 +6,22 @@ import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.functions.filter.DataTypeFilter;
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
-import com.zdjizhi.utils.functions.map.MetricsParseMap;
import com.zdjizhi.utils.functions.map.ResultFlatMap;
+import com.zdjizhi.utils.functions.process.ParsingData;
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.eventtime.*;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import java.time.Duration;
/**
* @author qidaijie
@@ -35,33 +36,39 @@ public class ApplicationProtocolTopology {
try {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- //解析原始日志
+ WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
+ .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS))
+ .withTimestampAssigner((element,timestamp) -> element.f2);
+
+ //数据源
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
.setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
- SingleOutputStreamOperator<String> appProtocolFilter = streamSource.filter(new DataTypeFilter())
- .name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM);
-
+ //解析数据
+ SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
+ .assignTimestampsAndWatermarks(strategyForSession)
+ .name("ParseDataProcess")
+ .setParallelism(GlobalConfig.PARSE_PARALLELISM);
- SingleOutputStreamOperator<Tuple2<Tags, Fields>> parseDataMap = appProtocolFilter.map(new MetricsParseMap())
- .name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM);
-
- SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
+ //增量聚合窗口
+ SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
+ .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
.reduce(new DispersionCountWindow(), new MergeCountWindow())
.name("DispersionCountWindow")
.setParallelism(GlobalConfig.WINDOW_PARALLELISM);
+ //拆分数据
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
.name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
-
+ //输出
resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
environment.execute(args[0]);
} catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
+ logger.error("This Flink task start ERROR! Exception information is :");
+ e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
index 4393729..eed832f 100644
--- a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
+++ b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
@@ -4,6 +4,9 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.sql.Timestamp;
/**
* @author qidaijie
@@ -11,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description:
* @date 2021/7/2112:13
*/
-public class DimensionKeyBy implements KeySelector<Tuple2<Tags, Fields>, String> {
+public class DimensionKeyBy implements KeySelector<Tuple3<Tags, Fields, Long>, String> {
@Override
- public String getKey(Tuple2<Tags, Fields> value) throws Exception {
+ public String getKey(Tuple3<Tags, Fields, Long> value) throws Exception {
//以map拼接的key分组
return value.f0.toString();
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
index 57ebde1..8216320 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
@@ -3,10 +3,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.general.MetricUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
/**
* @author qidaijie
@@ -14,21 +16,23 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description:
* @date 2023/4/2314:02
*/
-public class DispersionCountWindow implements ReduceFunction<Tuple2<Tags, Fields>> {
+public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple2<Tags, Fields> reduce(Tuple2<Tags, Fields> value1, Tuple2<Tags, Fields> value2) throws Exception {
+ public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) throws Exception {
try {
Fields cacheData = value1.f1;
Fields newData = value2.f1;
Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData);
- return new Tuple2<>(value1.f0, metricsResult);
+ return new Tuple3<>(value1.f0, metricsResult, value1.f2);
} catch (RuntimeException e) {
logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage());
return value1;
}
}
+
+
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
index e3179a7..52f08d2 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
@@ -7,6 +7,7 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -17,19 +18,20 @@ import org.apache.flink.util.Collector;
* @Description:
* @date 2023/4/2314:43
*/
-public class MergeCountWindow extends ProcessWindowFunction<Tuple2<Tags, Fields>, Metrics, String, TimeWindow> {
+public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields,Long>, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get();
@Override
- public void process(String windowKey, Context context, Iterable<Tuple2<Tags, Fields>> input, Collector<Metrics> output) throws Exception {
+ public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields,Long>> input, Collector<Metrics> output) throws Exception {
try {
- Long endTime = context.window().getStart() / 1000;
- for (Tuple2<Tags, Fields> tuple : input) {
+ long timestamp = context.window().getStart();
+ for (Tuple3<Tags, Fields,Long> tuple : input) {
Tags tags = tuple.f0;
Fields fields = tuple.f1;
- Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime);
+ Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp);
output.collect(metrics);
}
+
} catch (RuntimeException e) {
logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage());
}
diff --git a/src/test/java/com/zdjizhi/FlagsTest.java b/src/test/java/com/zdjizhi/FlagsTest.java
index 15ee4aa..ce59840 100644
--- a/src/test/java/com/zdjizhi/FlagsTest.java
+++ b/src/test/java/com/zdjizhi/FlagsTest.java
@@ -43,9 +43,12 @@ public class FlagsTest {
common_flags = 16400L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
- System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal));
+ System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
+ common_flags = 1062135466L;
+ System.out.println("common_flags & clientIsLocal = " + (common_flags & 128));
+ System.out.println("common_flags & serverIsLocal = " + (common_flags & 256)+"\n\n");
if ((0L & clientIsLocal) == 0L){
System.out.println("yes");