summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-04-08 18:10:38 +0800
committergujinkai <[email protected]>2024-04-08 18:10:38 +0800
commitf9e052008a776ad476fe61920d3fd933e166e65e (patch)
tree7611b99b881ac216dbfaff4be40308447d3a7fcc
parentca48c7dfd0151a6ff38f3e743cbf96ba8b5e755e (diff)
feature: remove filter before subscriber_app relation
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java)27
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java51
3 files changed, 62 insertions, 31 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index 15917e8..1546eca 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -8,7 +8,8 @@ import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.pre.base.common.CommonConfig;
import com.zdjizhi.pre.relation.common.RelationMetricLog;
-import com.zdjizhi.pre.relation.function.FirstAggregation;
+import com.zdjizhi.pre.relation.function.DomainIpAppFirstAggregation;
+import com.zdjizhi.pre.relation.function.SubscriberAppFirstAggregation;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,10 +25,10 @@ public class CnRelationMetric implements Schedule {
@Override
public void schedule() throws Exception {
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
- SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
- .process(new FirstAggregation()).name("relationProcess");
+ SingleOutputStreamOperator<RelationMetricLog> domainIpAppRelationProcess = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
+ .process(new DomainIpAppFirstAggregation()).name("domainIpAppRelationProcess");
- SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1)
+ SingleOutputStreamOperator<String> relationMetric = domainIpAppRelationProcess.filter(log -> log.getMetricKey() == 1)
.keyBy(new KeySelector<RelationMetricLog, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> getKey(RelationMetricLog value) throws Exception {
@@ -55,7 +56,7 @@ public class CnRelationMetric implements Schedule {
.setParallelism(Configs.get(CommonConfig.ENTITY_RELATION_OUTPUT_PARALLELISM));
- SingleOutputStreamOperator<String> attributeMetric = process.filter(log -> log.getMetricKey() == 2)
+ SingleOutputStreamOperator<String> attributeMetric = domainIpAppRelationProcess.filter(log -> log.getMetricKey() == 2)
.keyBy(new KeySelector<RelationMetricLog, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> getKey(RelationMetricLog value) throws Exception {
@@ -82,7 +83,9 @@ public class CnRelationMetric implements Schedule {
//.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC)))
.setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM));
- SingleOutputStreamOperator<String> subscriberAppRelationMetric = process.filter(log -> log.getMetricKey() == 3)
+ SingleOutputStreamOperator<RelationMetricLog> subscriberAppRelationProcess = source.process(new SubscriberAppFirstAggregation()).name("subscriberAppRelationProcess");
+
+ SingleOutputStreamOperator<String> subscriberAppRelationMetric = subscriberAppRelationProcess.filter(log -> log.getMetricKey() == 3)
.keyBy(new KeySelector<RelationMetricLog, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(RelationMetricLog value) throws Exception {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java
index a30ccd3..3a5440b 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java
@@ -9,18 +9,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog> {
+public class DomainIpAppFirstAggregation extends AbstractFirstAggregation<RelationMetricLog> {
private final String relationMetricKey = "relationMetric";
- private final String attributeMetricKey = "relationMetric";
- private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric";
+ private final String attributeMetricKey = "attributeMetric";
@Override
public Map<String, Map<String, RelationMetricLog>> createAccumulator() {
Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>();
accumulator.put(relationMetricKey, new HashMap<>());
accumulator.put(attributeMetricKey, new HashMap<>());
- accumulator.put(subscriberAppRelationMetricKey, new HashMap<>());
return accumulator;
}
@@ -31,7 +29,6 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog
String serverIp = value.getCommon_server_ip();
String l7Protocol = value.getCommon_l7_protocol();
int serverPort = value.getCommon_server_port();
- String subscriberId = value.getSubscriber_id();
String relationKey = appName + "-" + domain + "-" + serverIp;
@@ -47,26 +44,6 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog
if (attributeMetricLog == null) {
attributeMetricMap.put(attributeKey, initAttribute(value));
}
-
- String subscriberAppRelationKey = subscriberId + "-" + appName;
- Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey);
- RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey);
- if (subscriberAppRelationMetricLog == null) {
- subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value));
- }
- }
-
- private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) {
- RelationMetricLog relationMetricLog = new RelationMetricLog(3);
- relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label());
- relationMetricLog.setApp_category(cnRecordLog.getApp_category());
- relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory());
- relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id());
- relationMetricLog.setImei(cnRecordLog.getImei());
- relationMetricLog.setImsi(cnRecordLog.getImsi());
- relationMetricLog.setPhone_number(cnRecordLog.getPhone_number());
- relationMetricLog.setApn(cnRecordLog.getApn());
- return relationMetricLog;
}
private RelationMetricLog initRelationMetric(CnRecordLog cnRecordLog) {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java
new file mode 100644
index 0000000..9566481
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.pre.relation.function;
+
+import com.zdjizhi.base.common.CnRecordLog;
+import com.zdjizhi.pre.base.operator.AbstractFirstAggregation;
+import com.zdjizhi.pre.relation.common.RelationMetricLog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/8 17:58
+ */
+public class SubscriberAppFirstAggregation extends AbstractFirstAggregation<RelationMetricLog> {
+
+ private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric";
+
+ @Override
+ public Map<String, Map<String, RelationMetricLog>> createAccumulator() {
+ Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>();
+ accumulator.put(subscriberAppRelationMetricKey, new HashMap<>());
+ return accumulator;
+ }
+
+ @Override
+ public void add(CnRecordLog value, Map<String, Map<String, RelationMetricLog>> accumulator) {
+ String appName = value.getCommon_app_label();
+ String subscriberId = value.getSubscriber_id();
+
+ String subscriberAppRelationKey = subscriberId + "-" + appName;
+ Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey);
+ RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey);
+ if (subscriberAppRelationMetricLog == null) {
+ subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value));
+ }
+ }
+
+ private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) {
+ RelationMetricLog relationMetricLog = new RelationMetricLog(3);
+ relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label());
+ relationMetricLog.setApp_category(cnRecordLog.getApp_category());
+ relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory());
+ relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id());
+ relationMetricLog.setImei(cnRecordLog.getImei());
+ relationMetricLog.setImsi(cnRecordLog.getImsi());
+ relationMetricLog.setPhone_number(cnRecordLog.getPhone_number());
+ relationMetricLog.setApn(cnRecordLog.getApn());
+ return relationMetricLog;
+ }
+}