diff options
| author | gujinkai <[email protected]> | 2024-04-08 18:10:38 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-04-08 18:10:38 +0800 |
| commit | f9e052008a776ad476fe61920d3fd933e166e65e (patch) | |
| tree | 7611b99b881ac216dbfaff4be40308447d3a7fcc | |
| parent | ca48c7dfd0151a6ff38f3e743cbf96ba8b5e755e (diff) | |
feature: remove filter before subscriber_app relation
| -rw-r--r-- | module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java | 15 | ||||
| -rw-r--r-- | module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java) | 27 | ||||
| -rw-r--r-- | module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java | 51 |
3 files changed, 62 insertions, 31 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index 15917e8..1546eca 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -8,7 +8,8 @@ import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.pre.base.common.CommonConfig; import com.zdjizhi.pre.relation.common.RelationMetricLog; -import com.zdjizhi.pre.relation.function.FirstAggregation; +import com.zdjizhi.pre.relation.function.DomainIpAppFirstAggregation; +import com.zdjizhi.pre.relation.function.SubscriberAppFirstAggregation; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -24,10 +25,10 @@ public class CnRelationMetric implements Schedule { @Override public void schedule() throws Exception { SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; - SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) - .process(new FirstAggregation()).name("relationProcess"); + SingleOutputStreamOperator<RelationMetricLog> domainIpAppRelationProcess = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) + .process(new DomainIpAppFirstAggregation()).name("domainIpAppRelationProcess"); - SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1) + SingleOutputStreamOperator<String> relationMetric = domainIpAppRelationProcess.filter(log -> log.getMetricKey() == 1) .keyBy(new KeySelector<RelationMetricLog, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> getKey(RelationMetricLog value) throws Exception { @@ -55,7 +56,7 @@ public class CnRelationMetric implements Schedule { .setParallelism(Configs.get(CommonConfig.ENTITY_RELATION_OUTPUT_PARALLELISM)); - SingleOutputStreamOperator<String> attributeMetric = process.filter(log -> log.getMetricKey() == 2) + SingleOutputStreamOperator<String> attributeMetric = domainIpAppRelationProcess.filter(log -> log.getMetricKey() == 2) .keyBy(new KeySelector<RelationMetricLog, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> getKey(RelationMetricLog value) throws Exception { @@ -82,7 +83,9 @@ public class CnRelationMetric implements Schedule { //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC))) .setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM)); - SingleOutputStreamOperator<String> subscriberAppRelationMetric = process.filter(log -> log.getMetricKey() == 3) + SingleOutputStreamOperator<RelationMetricLog> subscriberAppRelationProcess = source.process(new SubscriberAppFirstAggregation()).name("subscriberAppRelationProcess"); + + SingleOutputStreamOperator<String> subscriberAppRelationMetric = subscriberAppRelationProcess.filter(log -> log.getMetricKey() == 3) .keyBy(new KeySelector<RelationMetricLog, Tuple2<String, String>>() { @Override public Tuple2<String, String> getKey(RelationMetricLog value) throws Exception { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java index a30ccd3..3a5440b 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/DomainIpAppFirstAggregation.java @@ -9,18 +9,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog> { +public class DomainIpAppFirstAggregation extends AbstractFirstAggregation<RelationMetricLog> { private final String relationMetricKey = "relationMetric"; - private final String attributeMetricKey = "relationMetric"; - private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric"; + private final String attributeMetricKey = "attributeMetric"; @Override public Map<String, Map<String, RelationMetricLog>> createAccumulator() { Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>(); accumulator.put(relationMetricKey, new HashMap<>()); accumulator.put(attributeMetricKey, new HashMap<>()); - accumulator.put(subscriberAppRelationMetricKey, new HashMap<>()); return accumulator; } @@ -31,7 +29,6 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog String serverIp = value.getCommon_server_ip(); String l7Protocol = value.getCommon_l7_protocol(); int serverPort = value.getCommon_server_port(); - String subscriberId = value.getSubscriber_id(); String relationKey = appName + "-" + domain + "-" + serverIp; @@ -47,26 +44,6 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog if (attributeMetricLog == null) { attributeMetricMap.put(attributeKey, initAttribute(value)); } - - String subscriberAppRelationKey = subscriberId + "-" + appName; - Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey); - RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey); - if (subscriberAppRelationMetricLog == null) { - subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value)); - } - } - - private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) { - RelationMetricLog relationMetricLog = new RelationMetricLog(3); - relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label()); - relationMetricLog.setApp_category(cnRecordLog.getApp_category()); - relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory()); - relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id()); - relationMetricLog.setImei(cnRecordLog.getImei()); - relationMetricLog.setImsi(cnRecordLog.getImsi()); - relationMetricLog.setPhone_number(cnRecordLog.getPhone_number()); - relationMetricLog.setApn(cnRecordLog.getApn()); - return relationMetricLog; } private RelationMetricLog initRelationMetric(CnRecordLog cnRecordLog) { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java new file mode 100644 index 0000000..9566481 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/SubscriberAppFirstAggregation.java @@ -0,0 +1,51 @@ +package com.zdjizhi.pre.relation.function; + +import com.zdjizhi.base.common.CnRecordLog; +import com.zdjizhi.pre.base.operator.AbstractFirstAggregation; +import com.zdjizhi.pre.relation.common.RelationMetricLog; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/8 17:58 + */ +public class SubscriberAppFirstAggregation extends AbstractFirstAggregation<RelationMetricLog> { + + private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric"; + + @Override + public Map<String, Map<String, RelationMetricLog>> createAccumulator() { + Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>(); + accumulator.put(subscriberAppRelationMetricKey, new HashMap<>()); + return accumulator; + } + + @Override + public void add(CnRecordLog value, Map<String, Map<String, RelationMetricLog>> accumulator) { + String appName = value.getCommon_app_label(); + String subscriberId = value.getSubscriber_id(); + + String subscriberAppRelationKey = subscriberId + "-" + appName; + Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey); + RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey); + if (subscriberAppRelationMetricLog == null) { + subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value)); + } + } + + private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) { + RelationMetricLog relationMetricLog = new RelationMetricLog(3); + relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label()); + relationMetricLog.setApp_category(cnRecordLog.getApp_category()); + relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory()); + relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id()); + relationMetricLog.setImei(cnRecordLog.getImei()); + relationMetricLog.setImsi(cnRecordLog.getImsi()); + relationMetricLog.setPhone_number(cnRecordLog.getPhone_number()); + relationMetricLog.setApn(cnRecordLog.getApn()); + return relationMetricLog; + } +} |
