diff options
| author | gujinkai <[email protected]> | 2024-03-13 16:03:43 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-03-13 16:04:14 +0800 |
| commit | b2fb3ac24da57c1f4f064df535ecc49bec16c29c (patch) | |
| tree | db8be31532b9b895230e6da152ec55c0c983a3bb | |
| parent | 6c65d10e05c827c4924f40029ee703c8e35d6115 (diff) | |
perf: add metric: the relation of subscriber and app
7 files changed, 121 insertions, 2 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java index 03857fa..ea5a464 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java @@ -61,6 +61,10 @@ public class CommonConfig { .stringType() .defaultValue("METRIC-ATTRIBUTE"); + public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_RELATION_TOPIC = ConfigOptions.key("metric.subscriber_app_relation.topic") + .stringType() + .defaultValue("METRIC-RELATION-SUBSCRIBER-APP"); + public static final ConfigOption<String> METRIC_IP_TABLE = ConfigOptions.key("metric.ip.table") .stringType() .defaultValue("metric_ip_local"); @@ -101,6 +105,10 @@ public class CommonConfig { .stringType() .defaultValue("metric_ip_dynamic_attribute_local"); + public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_RELATION_TABLE = ConfigOptions.key("metric.subscriber_app_relation.table") + .stringType() + .defaultValue("metric_relation_subscriber_app_local"); + public static final ConfigOption<Integer> METRIC_OUTPUT_PARALLELISM = ConfigOptions.key("metric.output.parallelism") .intType() .defaultValue(1); @@ -112,4 +120,8 @@ public class CommonConfig { public static final ConfigOption<Integer> DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM = ConfigOptions.key("metric.dynamic.attribute.output.parallelism") .intType() .defaultValue(1); + + public static final ConfigOption<Integer> SUBSCRIBER_APP_RELATION_OUTPUT_PARALLELISM = ConfigOptions.key("metric.subscriber.app.relation.output.parallelism") + .intType() + .defaultValue(1); } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java index 2c3f04c..574c256 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java @@ -32,7 +32,6 @@ public class LocationMetric implements Schedule { @Override public void schedule() throws Exception { - //todo 先临时使用session数据源,后续替换为专门的数据源 SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; SingleOutputStreamOperator<LocationSubscriber> process = source.process(new FirstAggregation()).name("locationFirstAggProcess"); @@ -44,5 +43,7 @@ public class LocationMetric implements Schedule { .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE))) //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC))) .setParallelism(outputParallelism); + + //todo GMLC数据源字段详情不清楚,后续再开发GMLC->location_subscriber的数据流 } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java index 1c493fd..f08463b 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java @@ -20,4 +20,5 @@ public class LocationMetricResult { private String second_location; private String third_location; private Long stat_time; + private String data_source; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java index 8b09dd1..d209571 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java @@ -51,6 +51,7 @@ public class MetricSubscriberProcessWindowFunc extends ProcessWindowFunction<Loc locationMetricResult.setSecond_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), secondRes)); locationMetricResult.setThird_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), thirdRes)); locationMetricResult.setStat_time(context.window().getStart() / 1000); + locationMetricResult.setData_source("Session Record"); String metricResultJsonStr = JSON.toJSONString(locationMetricResult); out.collect(metricResultJsonStr); } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index c4d5b91..15917e8 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -11,6 +11,7 @@ import com.zdjizhi.pre.relation.common.RelationMetricLog; import com.zdjizhi.pre.relation.function.FirstAggregation; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -80,5 +81,35 @@ public class CnRelationMetric implements Schedule { .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE))) //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC))) .setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM)); + + SingleOutputStreamOperator<String> subscriberAppRelationMetric = process.filter(log -> log.getMetricKey() == 3) + .keyBy(new KeySelector<RelationMetricLog, Tuple2<String, String>>() { + @Override + public Tuple2<String, String> getKey(RelationMetricLog value) throws Exception { + return Tuple2.of(value.getSubscriber_id(), value.getApp_name()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(Configs.get(CommonConfig.PRE_METRICS_WINDOW_TIME)))) + .reduce(new ReduceFunction<RelationMetricLog>() { + @Override + public RelationMetricLog reduce(RelationMetricLog value1, RelationMetricLog value2) throws Exception { + return value1; + } + }, new ProcessWindowFunction<RelationMetricLog, String, Tuple2<String, String>, TimeWindow>() { + @Override + public void process(Tuple2<String, String> key, ProcessWindowFunction<RelationMetricLog, String, Tuple2<String, String>, TimeWindow>.Context context, Iterable<RelationMetricLog> elements, Collector<String> out) throws Exception { + RelationMetricLog next = elements.iterator().next(); + next.setStat_time(context.window().getStart() / 1000); + String metricResultJsonStr = JSON.toJSONString(next); + out.collect(metricResultJsonStr); + } + }); + + subscriberAppRelationMetric + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_RELATION_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_RELATION_TOPIC))) + .setParallelism(Configs.get(CommonConfig.SUBSCRIBER_APP_RELATION_OUTPUT_PARALLELISM)); } + + //todo GMLC数据源字段详情不清楚,后续再开发GMLC->relation_subscriber_app的数据流 } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java index 5616f6c..f641620 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java @@ -2,7 +2,6 @@ package com.zdjizhi.pre.relation.common; import com.alibaba.fastjson2.annotation.JSONField; -import java.util.ArrayList; import java.util.List; public class RelationMetricLog { @@ -24,6 +23,13 @@ public class RelationMetricLog { private String app_subcategory; private List<String> entity_tags; + //subscriber + private String subscriber_id; + private String imei; + private String imsi; + private String phone_number; + private String apn; + private String l7_protocol; private int port; @@ -169,6 +175,46 @@ public class RelationMetricLog { this.stat_time = stat_time; } + public String getSubscriber_id() { + return subscriber_id; + } + + public void setSubscriber_id(String subscriber_id) { + this.subscriber_id = subscriber_id; + } + + public String getImei() { + return imei; + } + + public void setImei(String imei) { + this.imei = imei; + } + + public String getImsi() { + return imsi; + } + + public void setImsi(String imsi) { + this.imsi = imsi; + } + + public String getPhone_number() { + return phone_number; + } + + public void setPhone_number(String phone_number) { + this.phone_number = phone_number; + } + + public String getApn() { + return apn; + } + + public void setApn(String apn) { + this.apn = apn; + } + @Override public String toString() { return "RelationMetricLog{" + @@ -186,6 +232,11 @@ public class RelationMetricLog { ", app_category='" + app_category + '\'' + ", app_subcategory='" + app_subcategory + '\'' + ", entity_tags=" + entity_tags + + ", subscriber_id='" + subscriber_id + '\'' + + ", imei='" + imei + '\'' + + ", imsi='" + imsi + '\'' + + ", phone_number='" + phone_number + '\'' + + ", apn='" + apn + '\'' + ", l7_protocol='" + l7_protocol + '\'' + ", port=" + port + ", stat_time=" + stat_time + diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java index 114a579..a30ccd3 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java @@ -13,12 +13,14 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog private final String relationMetricKey = "relationMetric"; private final String attributeMetricKey = "relationMetric"; + private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric"; @Override public Map<String, Map<String, RelationMetricLog>> createAccumulator() { Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>(); accumulator.put(relationMetricKey, new HashMap<>()); accumulator.put(attributeMetricKey, new HashMap<>()); + accumulator.put(subscriberAppRelationMetricKey, new HashMap<>()); return accumulator; } @@ -29,6 +31,7 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog String serverIp = value.getCommon_server_ip(); String l7Protocol = value.getCommon_l7_protocol(); int serverPort = value.getCommon_server_port(); + String subscriberId = value.getSubscriber_id(); String relationKey = appName + "-" + domain + "-" + serverIp; @@ -45,6 +48,25 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog attributeMetricMap.put(attributeKey, initAttribute(value)); } + String subscriberAppRelationKey = subscriberId + "-" + appName; + Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey); + RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey); + if (subscriberAppRelationMetricLog == null) { + subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value)); + } + } + + private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) { + RelationMetricLog relationMetricLog = new RelationMetricLog(3); + relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label()); + relationMetricLog.setApp_category(cnRecordLog.getApp_category()); + relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory()); + relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id()); + relationMetricLog.setImei(cnRecordLog.getImei()); + relationMetricLog.setImsi(cnRecordLog.getImsi()); + relationMetricLog.setPhone_number(cnRecordLog.getPhone_number()); + relationMetricLog.setApn(cnRecordLog.getApn()); + return relationMetricLog; } private RelationMetricLog initRelationMetric(CnRecordLog cnRecordLog) { |
