summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2023-11-23 11:52:45 +0800
committergujinkai <[email protected]>2023-11-23 11:52:45 +0800
commit56603f5aff1bd9b0d89f79e1365d5da7c134d83b (patch)
tree2231a7a005ef7d1758cf24eee6dcb6c1f8cc99a8
parentebdbae07be5548c300c0449911dfb2e49ee84696 (diff)
feat: enhance subscriber and subscriber_app pre-aggregate statistics
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java18
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java45
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java8
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java47
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java17
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java17
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java15
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java16
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java36
16 files changed, 213 insertions, 98 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
index bebc844..4f9d199 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
@@ -135,7 +135,7 @@ public class CnPreMetric implements Schedule {
public Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String> getKey(CnMetricLog value) throws Exception {
return Tuple12.of(value.getClient_country_region(), value.getClient_super_admin_area(), value.getClient_admin_area(), value.getClient_zone(),
value.getServer_country_region(), value.getServer_super_admin_area(), value.getServer_admin_area(), value.getServer_zone(),
- value.getCommon_in_link_id(),value.getCommon_out_link_id(),
+ value.getCommon_in_link_id(), value.getCommon_out_link_id(),
value.getIn_link_direction(), value.getOut_link_direction());
}
})
@@ -143,6 +143,22 @@ public class CnPreMetric implements Schedule {
.reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc());
linkMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))).setParallelism(outputParallelism);
+ SingleOutputStreamOperator<String> subscriberMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER)
+ .keyBy(CnMetricLog::getSubscriber_id)
+ .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
+ .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessFunc());
+ subscriberMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_TOPIC))).setParallelism(outputParallelism);
+
+ SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP)
+ .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() {
+ @Override
+ public Tuple2<String, String> getKey(CnMetricLog value) throws Exception {
+ return Tuple2.of(value.getSubscriber_id(), value.getCommon_app_label());
+ }
+ })
+ .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
+ .reduce(new SecondAggregationReduce(), new MetricSubscriberAppProcessFunc());
+ subscriberAppMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC))).setParallelism(outputParallelism);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java
index d548327..c8b135f 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java
@@ -10,6 +10,11 @@ public class CnMetricLog {
private int common_server_port;
private String common_app_label;
private String common_l7_protocol;
+ private String subscriber_id;
+ private String imei;
+ private String imsi;
+ private String phone_number;
+ private String apn;
private String domain;
private String domain_sld;
@@ -150,6 +155,46 @@ public class CnMetricLog {
this.common_app_label = common_app_label;
}
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+
+ public String getImei() {
+ return imei;
+ }
+
+ public void setImei(String imei) {
+ this.imei = imei;
+ }
+
+ public String getImsi() {
+ return imsi;
+ }
+
+ public void setImsi(String imsi) {
+ this.imsi = imsi;
+ }
+
+ public String getPhone_number() {
+ return phone_number;
+ }
+
+ public void setPhone_number(String phone_number) {
+ this.phone_number = phone_number;
+ }
+
+ public String getApn() {
+ return apn;
+ }
+
+ public void setApn(String apn) {
+ this.apn = apn;
+ }
+
public String getDomain() {
return domain;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java
index 471a16b..225d4b5 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java
@@ -45,6 +45,14 @@ public class CommonConfig {
.stringType()
.defaultValue("METRIC-LINK");
+ public static final ConfigOption<String> METRIC_SUBSCRIBER_TOPIC = ConfigOptions.key("metric.subscriber.topic")
+ .stringType()
+ .defaultValue("METRIC-SUBSCRIBER");
+
+ public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_TOPIC = ConfigOptions.key("metric.subscriber_app.topic")
+ .stringType()
+ .defaultValue("METRIC-SUBSCRIBER_APP");
+
public static final ConfigOption<String> METRIC_RELATION_TOPIC = ConfigOptions.key("metric.relation.topic")
.stringType()
.defaultValue("METRIC-RELATION");
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
index 538aeb5..08e8add 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
@@ -45,7 +45,9 @@ public class MetricKeyConfig {
public static final int DNS_RRCNAME = 21;
+ public static final int SUBSCRIBER = 22;
+ public static final int SUBSCRIBER_APP = 23;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java
index bd17699..d50dd94 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java
@@ -18,6 +18,12 @@ public class MetricResultLog {
private String app_subcategory;
private String app_company;
+ private String subscriber_id;
+ private String imei;
+ private String imsi;
+ private String phone_number;
+ private String apn;
+
private String domain;
private String domain_sld;
private String domain_category_name;
@@ -179,6 +185,47 @@ public class MetricResultLog {
public void setApp_company(String app_company) {
this.app_company = app_company;
}
+
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+
+ public String getImei() {
+ return imei;
+ }
+
+ public void setImei(String imei) {
+ this.imei = imei;
+ }
+
+ public String getImsi() {
+ return imsi;
+ }
+
+ public void setImsi(String imsi) {
+ this.imsi = imsi;
+ }
+
+ public String getPhone_number() {
+ return phone_number;
+ }
+
+ public void setPhone_number(String phone_number) {
+ this.phone_number = phone_number;
+ }
+
+ public String getApn() {
+ return apn;
+ }
+
+ public void setApn(String apn) {
+ this.apn = apn;
+ }
+
public String getDomain() {
return domain;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java
index da88bd6..95095c0 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java
@@ -1,10 +1,14 @@
package com.zdjizhi.pre.function;
+import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.CommonConfig;
+import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.handler.CommonMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
public abstract class AbstractMetricProcessFunc<KEY> extends ProcessWindowFunction<CnMetricLog, String, KEY, TimeWindow> {
@@ -17,4 +21,17 @@ public abstract class AbstractMetricProcessFunc<KEY> extends ProcessWindowFunct
.getExecutionConfig().getGlobalJobParameters();
preMetricsRoundScale = configuration.get(CommonConfig.PRE_METRICS_ROUND_SCALE);
}
+
+ @Override
+ public void process(KEY key, ProcessWindowFunction<CnMetricLog, String, KEY, TimeWindow>.Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
+ CnMetricLog next = elements.iterator().next();
+ MetricResultLog metricResultLog = new MetricResultLog();
+ setCommonFields(key, next, metricResultLog);
+ metricResultLog.setStat_time(context.window().getStart() / 1000);
+ CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
+ String metricResultJsonStr = JSON.toJSONString(metricResultLog);
+ out.collect(metricResultJsonStr);
+ }
+
+ public abstract void setCommonFields(KEY key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog);
} \ No newline at end of file
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java
index 363c41b..ebe77c6 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java
@@ -1,26 +1,15 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricAppProcessFunc extends AbstractMetricProcessFunc<String> {
+
@Override
- public void process(String key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(String key, CnMetricLog next, MetricResultLog metricResultLog) {
metricResultLog.setCommon_app_label(next.getCommon_app_label());
metricResultLog.setApp_category(next.getApp_category());
metricResultLog.setApp_company(next.getApp_company());
metricResultLog.setApp_subcategory(next.getApp_subcategory());
- metricResultLog.setStat_time(context.window().getStart()/1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java
index 91e7bb1..f1a36e8 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java
@@ -1,13 +1,8 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> {
private String side;
@@ -17,17 +12,9 @@ public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<Strin
}
@Override
- public void process(Tuple2<String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
-
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
metricResultLog.setAsn(key.f0);
metricResultLog.setIsp(key.f1);
metricResultLog.setSide(side);
- metricResultLog.setStat_time(context.window().getStart() / 1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java
index d3feca3..7b1e981 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java
@@ -1,26 +1,15 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricDomainProcessFunc extends AbstractMetricProcessFunc<String> {
+
@Override
- public void process(String key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(String s, CnMetricLog next, MetricResultLog metricResultLog) {
metricResultLog.setDomain(next.getDomain());
metricResultLog.setDomain_sld(next.getDomain_sld());
metricResultLog.setDomain_category_group(next.getDomain_category_group());
metricResultLog.setDomain_category_name(next.getDomain_category_name());
- metricResultLog.setStat_time(context.window().getStart()/1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java
index 7bae391..3d8b93b 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java
@@ -1,12 +1,7 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> {
private String side;
@@ -16,15 +11,10 @@ public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String>
}
@Override
- public void process(String ip, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
- MetricResultLog metricResultLog = new MetricResultLog();
- metricResultLog.setIp(ip);
+ public void setCommonFields(String key, CnMetricLog next, MetricResultLog metricResultLog) {
+ metricResultLog.setIp(key);
metricResultLog.setSide(side);
metricResultLog.setZone(next.getZone());
- metricResultLog.setStat_time(context.window().getStart()/1000);
-
- CommonMetric.setCommonMetric(next,metricResultLog, preMetricsRoundScale);
metricResultLog.setAsymmetric_sessions(next.getAsymmetric_sessions());
metricResultLog.setBulky_sessions(next.getBulky_sessions());
metricResultLog.setCbr_streaming_sessions(next.getCbr_streaming_sessions());
@@ -35,10 +25,7 @@ public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String>
metricResultLog.setUnidirectional_sessions(next.getUnidirectional_sessions());
metricResultLog.setRandom_looking_sessions(next.getRandom_looking_sessions());
metricResultLog.setBidirectional_sessions(next.getBidirectional_sessions());
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
-
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java
index fa56bae..d532cc1 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java
@@ -1,20 +1,13 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String>> {
@Override
- public void process(Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(Tuple12<String, String, String, String, String, String, String, String, Long, Long, String, String> key, CnMetricLog next, MetricResultLog metricResultLog) {
metricResultLog.setClient_country_region(next.getClient_country_region());
metricResultLog.setClient_super_admin_area(next.getClient_super_admin_area());
metricResultLog.setClient_admin_area(next.getClient_admin_area());
@@ -27,11 +20,5 @@ public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<Str
metricResultLog.setCommon_in_link_id(next.getCommon_in_link_id());
metricResultLog.setOut_link_direction(next.getOut_link_direction());
metricResultLog.setIn_link_direction(next.getIn_link_direction());
-
- metricResultLog.setStat_time(context.window().getStart()/1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java
index dfc0929..255329c 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java
@@ -1,25 +1,14 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricProtocolProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, Integer>> {
+
@Override
- public void process(Tuple2<String, Integer> stringIntegerTuple2, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(Tuple2<String, Integer> stringIntegerTuple2, CnMetricLog next, MetricResultLog metricResultLog) {
metricResultLog.setCommon_l7_protocol(next.getCommon_l7_protocol());
metricResultLog.setCommon_server_port(next.getCommon_server_port());
- metricResultLog.setStat_time(context.window().getStart()/1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java
index 9e7ec61..8f07306 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java
@@ -1,13 +1,8 @@
package com.zdjizhi.pre.function;
-import com.alibaba.fastjson2.JSON;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tuple3<String, String, String>> {
private String side;
@@ -17,18 +12,10 @@ public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tup
}
@Override
- public void process(Tuple3<String, String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
- CnMetricLog next = elements.iterator().next();
-
- MetricResultLog metricResultLog = new MetricResultLog();
+ public void setCommonFields(Tuple3<String, String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
metricResultLog.setCountry_region(key.f0);
metricResultLog.setSuper_admin_area(key.f1);
metricResultLog.setAdmin_area(key.f2);
metricResultLog.setSide(side);
- metricResultLog.setStat_time(context.window().getStart() / 1000);
-
- CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale);
- String metricResultJsonStr = JSON.toJSONString(metricResultLog);
- out.collect(metricResultJsonStr);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
new file mode 100644
index 0000000..15ce6fd
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
@@ -0,0 +1,15 @@
+package com.zdjizhi.pre.function;
+
+import com.zdjizhi.pre.common.CnMetricLog;
+import com.zdjizhi.pre.common.MetricResultLog;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+
+public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> {
+
+ @Override
+ public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
+ metricResultLog.setSubscriber_id(key.f0);
+ metricResultLog.setCommon_app_label(key.f1);
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java
new file mode 100644
index 0000000..1eaf366
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java
@@ -0,0 +1,16 @@
+package com.zdjizhi.pre.function;
+
+import com.zdjizhi.pre.common.CnMetricLog;
+import com.zdjizhi.pre.common.MetricResultLog;
+
+public class MetricSubscriberProcessFunc extends AbstractMetricProcessFunc<String> {
+
+ @Override
+ public void setCommonFields(String key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
+ metricResultLog.setSubscriber_id(key);
+ metricResultLog.setImei(cnMetricLog.getImei());
+ metricResultLog.setImsi(cnMetricLog.getImsi());
+ metricResultLog.setPhone_number(cnMetricLog.getPhone_number());
+ metricResultLog.setApn(cnMetricLog.getApn());
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
index 74d3528..6b4112e 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
@@ -6,7 +6,6 @@ import com.zdjizhi.pre.common.MetricKeyConfig;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
@@ -25,6 +24,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
private final String sslSniMetricKey = "sslSniMetric";
private final String protocolMetricKey = "protocolMetric";
private final String linkMetricKey = "linkMetric";
+ private final String subscriberKey = "subscriberMetric";
+ private final String subscriberAppKey = "subscriberAppMetric";
@Override
public Map<String, Map<String, CnMetricLog>> createAccumulator() {
@@ -43,6 +44,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
accumulator.put(sslSniMetricKey, new HashMap<>());
accumulator.put(protocolMetricKey, new HashMap<>());
accumulator.put(linkMetricKey, new HashMap<>());
+ accumulator.put(subscriberKey, new HashMap<>());
+ accumulator.put(subscriberAppKey, new HashMap<>());
return accumulator;
}
@@ -254,6 +257,37 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
}
linkMetricMap.put(key, metricLog);
}
+
+ String subscriberId = value.getSubscriber_id();
+ if (StringUtils.isNotBlank(subscriberId)) {
+ Map<String, CnMetricLog> subscriberMetricMap = accumulator.get(subscriberKey);
+ CnMetricLog cnMetricLog = subscriberMetricMap.get(subscriberId);
+ if (cnMetricLog != null) {
+ updateMetric(value, cnMetricLog);
+ } else {
+ cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER);
+ initMetric(value, cnMetricLog);
+ cnMetricLog.setSubscriber_id(subscriberId);
+ cnMetricLog.setImei(value.getImei());
+ cnMetricLog.setImsi(value.getImsi());
+ cnMetricLog.setPhone_number(value.getPhone_number());
+ cnMetricLog.setApn(value.getApn());
+ }
+ }
+
+ String appLabel = value.getCommon_app_label();
+ if (StringUtils.isNotBlank(subscriberId) && StringUtils.isNotBlank(appLabel)) {
+ String key = subscriberId + "-" + appLabel;
+ Map<String, CnMetricLog> subscriberAppMetricMap = accumulator.get(subscriberAppKey);
+ CnMetricLog cnMetricLog = subscriberAppMetricMap.get(key);
+ if (cnMetricLog != null) {
+ updateMetric(value, cnMetricLog);
+ } else {
+ cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER_APP);
+ cnMetricLog.setSubscriber_id(subscriberId);
+ cnMetricLog.setCommon_app_label(appLabel);
+ }
+ }
} catch (Exception e) {
e.printStackTrace(System.out);
}