summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-03-13 16:03:43 +0800
committergujinkai <[email protected]>2024-03-13 16:04:14 +0800
commitb2fb3ac24da57c1f4f064df535ecc49bec16c29c (patch)
treedb8be31532b9b895230e6da152ec55c0c983a3bb
parent6c65d10e05c827c4924f40029ee703c8e35d6115 (diff)
perf: add metric: the relation of subscriber and app
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java12
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java3
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java1
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java1
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java31
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java53
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java22
7 files changed, 121 insertions, 2 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
index 03857fa..ea5a464 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
@@ -61,6 +61,10 @@ public class CommonConfig {
.stringType()
.defaultValue("METRIC-ATTRIBUTE");
+ public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_RELATION_TOPIC = ConfigOptions.key("metric.subscriber_app_relation.topic")
+ .stringType()
+ .defaultValue("METRIC-RELATION-SUBSCRIBER-APP");
+
public static final ConfigOption<String> METRIC_IP_TABLE = ConfigOptions.key("metric.ip.table")
.stringType()
.defaultValue("metric_ip_local");
@@ -101,6 +105,10 @@ public class CommonConfig {
.stringType()
.defaultValue("metric_ip_dynamic_attribute_local");
+ public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_RELATION_TABLE = ConfigOptions.key("metric.subscriber_app_relation.table")
+ .stringType()
+ .defaultValue("metric_relation_subscriber_app_local");
+
public static final ConfigOption<Integer> METRIC_OUTPUT_PARALLELISM = ConfigOptions.key("metric.output.parallelism")
.intType()
.defaultValue(1);
@@ -112,4 +120,8 @@ public class CommonConfig {
public static final ConfigOption<Integer> DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM = ConfigOptions.key("metric.dynamic.attribute.output.parallelism")
.intType()
.defaultValue(1);
+
+ public static final ConfigOption<Integer> SUBSCRIBER_APP_RELATION_OUTPUT_PARALLELISM = ConfigOptions.key("metric.subscriber.app.relation.output.parallelism")
+ .intType()
+ .defaultValue(1);
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
index 2c3f04c..574c256 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
@@ -32,7 +32,6 @@ public class LocationMetric implements Schedule {
@Override
public void schedule() throws Exception {
- //todo 先临时使用session数据源,后续替换为专门的数据源
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
SingleOutputStreamOperator<LocationSubscriber> process = source.process(new FirstAggregation()).name("locationFirstAggProcess");
@@ -44,5 +43,7 @@ public class LocationMetric implements Schedule {
.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE)))
//.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC)))
.setParallelism(outputParallelism);
+
+ //todo GMLC数据源字段详情不清楚,后续再开发GMLC->location_subscriber的数据流
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java
index 1c493fd..f08463b 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java
@@ -20,4 +20,5 @@ public class LocationMetricResult {
private String second_location;
private String third_location;
private Long stat_time;
+ private String data_source;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java
index 8b09dd1..d209571 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java
@@ -51,6 +51,7 @@ public class MetricSubscriberProcessWindowFunc extends ProcessWindowFunction<Loc
locationMetricResult.setSecond_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), secondRes));
locationMetricResult.setThird_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), thirdRes));
locationMetricResult.setStat_time(context.window().getStart() / 1000);
+ locationMetricResult.setData_source("Session Record");
String metricResultJsonStr = JSON.toJSONString(locationMetricResult);
out.collect(metricResultJsonStr);
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index c4d5b91..15917e8 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -11,6 +11,7 @@ import com.zdjizhi.pre.relation.common.RelationMetricLog;
import com.zdjizhi.pre.relation.function.FirstAggregation;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -80,5 +81,35 @@ public class CnRelationMetric implements Schedule {
.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE)))
//.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC)))
.setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM));
+
+ SingleOutputStreamOperator<String> subscriberAppRelationMetric = process.filter(log -> log.getMetricKey() == 3)
+ .keyBy(new KeySelector<RelationMetricLog, Tuple2<String, String>>() {
+ @Override
+ public Tuple2<String, String> getKey(RelationMetricLog value) throws Exception {
+ return Tuple2.of(value.getSubscriber_id(), value.getApp_name());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(Configs.get(CommonConfig.PRE_METRICS_WINDOW_TIME))))
+ .reduce(new ReduceFunction<RelationMetricLog>() {
+ @Override
+ public RelationMetricLog reduce(RelationMetricLog value1, RelationMetricLog value2) throws Exception {
+ return value1;
+ }
+ }, new ProcessWindowFunction<RelationMetricLog, String, Tuple2<String, String>, TimeWindow>() {
+ @Override
+ public void process(Tuple2<String, String> key, ProcessWindowFunction<RelationMetricLog, String, Tuple2<String, String>, TimeWindow>.Context context, Iterable<RelationMetricLog> elements, Collector<String> out) throws Exception {
+ RelationMetricLog next = elements.iterator().next();
+ next.setStat_time(context.window().getStart() / 1000);
+ String metricResultJsonStr = JSON.toJSONString(next);
+ out.collect(metricResultJsonStr);
+ }
+ });
+
+ subscriberAppRelationMetric
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_RELATION_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_RELATION_TOPIC)))
+ .setParallelism(Configs.get(CommonConfig.SUBSCRIBER_APP_RELATION_OUTPUT_PARALLELISM));
}
+
+ //todo GMLC数据源字段详情不清楚,后续再开发GMLC->relation_subscriber_app的数据流
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java
index 5616f6c..f641620 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/common/RelationMetricLog.java
@@ -2,7 +2,6 @@ package com.zdjizhi.pre.relation.common;
import com.alibaba.fastjson2.annotation.JSONField;
-import java.util.ArrayList;
import java.util.List;
public class RelationMetricLog {
@@ -24,6 +23,13 @@ public class RelationMetricLog {
private String app_subcategory;
private List<String> entity_tags;
+ //subscriber
+ private String subscriber_id;
+ private String imei;
+ private String imsi;
+ private String phone_number;
+ private String apn;
+
private String l7_protocol;
private int port;
@@ -169,6 +175,46 @@ public class RelationMetricLog {
this.stat_time = stat_time;
}
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+
+ public String getImei() {
+ return imei;
+ }
+
+ public void setImei(String imei) {
+ this.imei = imei;
+ }
+
+ public String getImsi() {
+ return imsi;
+ }
+
+ public void setImsi(String imsi) {
+ this.imsi = imsi;
+ }
+
+ public String getPhone_number() {
+ return phone_number;
+ }
+
+ public void setPhone_number(String phone_number) {
+ this.phone_number = phone_number;
+ }
+
+ public String getApn() {
+ return apn;
+ }
+
+ public void setApn(String apn) {
+ this.apn = apn;
+ }
+
@Override
public String toString() {
return "RelationMetricLog{" +
@@ -186,6 +232,11 @@ public class RelationMetricLog {
", app_category='" + app_category + '\'' +
", app_subcategory='" + app_subcategory + '\'' +
", entity_tags=" + entity_tags +
+ ", subscriber_id='" + subscriber_id + '\'' +
+ ", imei='" + imei + '\'' +
+ ", imsi='" + imsi + '\'' +
+ ", phone_number='" + phone_number + '\'' +
+ ", apn='" + apn + '\'' +
", l7_protocol='" + l7_protocol + '\'' +
", port=" + port +
", stat_time=" + stat_time +
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
index 114a579..a30ccd3 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
@@ -13,12 +13,14 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog
private final String relationMetricKey = "relationMetric";
private final String attributeMetricKey = "relationMetric";
+ private final String subscriberAppRelationMetricKey = "subscriberAppRelationMetric";
@Override
public Map<String, Map<String, RelationMetricLog>> createAccumulator() {
Map<String, Map<String, RelationMetricLog>> accumulator = new HashMap<>();
accumulator.put(relationMetricKey, new HashMap<>());
accumulator.put(attributeMetricKey, new HashMap<>());
+ accumulator.put(subscriberAppRelationMetricKey, new HashMap<>());
return accumulator;
}
@@ -29,6 +31,7 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog
String serverIp = value.getCommon_server_ip();
String l7Protocol = value.getCommon_l7_protocol();
int serverPort = value.getCommon_server_port();
+ String subscriberId = value.getSubscriber_id();
String relationKey = appName + "-" + domain + "-" + serverIp;
@@ -45,6 +48,25 @@ public class FirstAggregation extends AbstractFirstAggregation<RelationMetricLog
attributeMetricMap.put(attributeKey, initAttribute(value));
}
+ String subscriberAppRelationKey = subscriberId + "-" + appName;
+ Map<String, RelationMetricLog> subscriberAppRelationMetricMap = accumulator.get(subscriberAppRelationMetricKey);
+ RelationMetricLog subscriberAppRelationMetricLog = subscriberAppRelationMetricMap.get(subscriberAppRelationKey);
+ if (subscriberAppRelationMetricLog == null) {
+ subscriberAppRelationMetricMap.put(subscriberAppRelationKey, initSubscriberAppRelationMetric(value));
+ }
+ }
+
+ private RelationMetricLog initSubscriberAppRelationMetric(CnRecordLog cnRecordLog) {
+ RelationMetricLog relationMetricLog = new RelationMetricLog(3);
+ relationMetricLog.setApp_name(cnRecordLog.getCommon_app_label());
+ relationMetricLog.setApp_category(cnRecordLog.getApp_category());
+ relationMetricLog.setApp_subcategory(cnRecordLog.getApp_subcategory());
+ relationMetricLog.setSubscriber_id(cnRecordLog.getSubscriber_id());
+ relationMetricLog.setImei(cnRecordLog.getImei());
+ relationMetricLog.setImsi(cnRecordLog.getImsi());
+ relationMetricLog.setPhone_number(cnRecordLog.getPhone_number());
+ relationMetricLog.setApn(cnRecordLog.getApn());
+ return relationMetricLog;
}
private RelationMetricLog initRelationMetric(CnRecordLog cnRecordLog) {