diff options
| author | gujinkai <[email protected]> | 2023-11-23 11:52:45 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2023-11-23 11:52:45 +0800 |
| commit | 56603f5aff1bd9b0d89f79e1365d5da7c134d83b (patch) | |
| tree | 2231a7a005ef7d1758cf24eee6dcb6c1f8cc99a8 | |
| parent | ebdbae07be5548c300c0449911dfb2e49ee84696 (diff) | |
feat: enhance subscriber and subscriber_app pre-aggregate statistics
16 files changed, 213 insertions, 98 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java index bebc844..4f9d199 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java @@ -135,7 +135,7 @@ public class CnPreMetric implements Schedule { public Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String> getKey(CnMetricLog value) throws Exception { return Tuple12.of(value.getClient_country_region(), value.getClient_super_admin_area(), value.getClient_admin_area(), value.getClient_zone(), value.getServer_country_region(), value.getServer_super_admin_area(), value.getServer_admin_area(), value.getServer_zone(), - value.getCommon_in_link_id(),value.getCommon_out_link_id(), + value.getCommon_in_link_id(), value.getCommon_out_link_id(), value.getIn_link_direction(), value.getOut_link_direction()); } }) @@ -143,6 +143,22 @@ public class CnPreMetric implements Schedule { .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc()); linkMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))).setParallelism(outputParallelism); + SingleOutputStreamOperator<String> subscriberMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER) + .keyBy(CnMetricLog::getSubscriber_id) + .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) + .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessFunc()); + subscriberMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_TOPIC))).setParallelism(outputParallelism); + + SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP) + .keyBy(new KeySelector<CnMetricLog, Tuple2<String, String>>() { + @Override + public Tuple2<String, String> getKey(CnMetricLog value) throws Exception { + return Tuple2.of(value.getSubscriber_id(), value.getCommon_app_label()); + } + }) + .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) + .reduce(new SecondAggregationReduce(), new MetricSubscriberAppProcessFunc()); + subscriberAppMetric.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC))).setParallelism(outputParallelism); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java index d548327..c8b135f 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java @@ -10,6 +10,11 @@ public class CnMetricLog { private int common_server_port; private String common_app_label; private String common_l7_protocol; + private String subscriber_id; + private String imei; + private String imsi; + private String phone_number; + private String apn; private String domain; private String domain_sld; @@ -150,6 +155,46 @@ public class CnMetricLog { this.common_app_label = common_app_label; } + public String getSubscriber_id() { + return subscriber_id; + } + + public void setSubscriber_id(String subscriber_id) { + this.subscriber_id = subscriber_id; + } + + public String getImei() { + return imei; + } + + public void setImei(String imei) { + this.imei = imei; + } + + public String getImsi() { + return imsi; + } + + public void setImsi(String imsi) { + this.imsi = imsi; + } + + public String getPhone_number() { + return phone_number; + } + + public void setPhone_number(String phone_number) { + this.phone_number = phone_number; + } + + public String getApn() { + return apn; + } + + public void setApn(String apn) { + this.apn = apn; + } + public String getDomain() { return domain; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java index 471a16b..225d4b5 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java @@ -45,6 +45,14 @@ public class CommonConfig { .stringType() .defaultValue("METRIC-LINK"); + public static final ConfigOption<String> METRIC_SUBSCRIBER_TOPIC = ConfigOptions.key("metric.subscriber.topic") + .stringType() + .defaultValue("METRIC-SUBSCRIBER"); + + public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_TOPIC = ConfigOptions.key("metric.subscriber_app.topic") + .stringType() + .defaultValue("METRIC-SUBSCRIBER_APP"); + public static final ConfigOption<String> METRIC_RELATION_TOPIC = ConfigOptions.key("metric.relation.topic") .stringType() .defaultValue("METRIC-RELATION"); diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java index 538aeb5..08e8add 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java @@ -45,7 +45,9 @@ public class MetricKeyConfig { public static final int DNS_RRCNAME = 21; + public static final int SUBSCRIBER = 22; + public static final int SUBSCRIBER_APP = 23; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java index bd17699..d50dd94 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java @@ -18,6 +18,12 @@ public class MetricResultLog { private String app_subcategory; private String app_company; + private String subscriber_id; + private String imei; + private String imsi; + private String phone_number; + private String apn; + private String domain; private String domain_sld; private String domain_category_name; @@ -179,6 +185,47 @@ public class MetricResultLog { public void setApp_company(String app_company) { this.app_company = app_company; } + + public String getSubscriber_id() { + return subscriber_id; + } + + public void setSubscriber_id(String subscriber_id) { + this.subscriber_id = subscriber_id; + } + + public String getImei() { + return imei; + } + + public void setImei(String imei) { + this.imei = imei; + } + + public String getImsi() { + return imsi; + } + + public void setImsi(String imsi) { + this.imsi = imsi; + } + + public String getPhone_number() { + return phone_number; + } + + public void setPhone_number(String phone_number) { + this.phone_number = phone_number; + } + + public String getApn() { + return apn; + } + + public void setApn(String apn) { + this.apn = apn; + } + public String getDomain() { return domain; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java index da88bd6..95095c0 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java @@ -1,10 +1,14 @@ package com.zdjizhi.pre.function; +import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.CommonConfig; +import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.handler.CommonMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; public abstract class AbstractMetricProcessFunc<KEY> extends ProcessWindowFunction<CnMetricLog, String, KEY, TimeWindow> { @@ -17,4 +21,17 @@ public abstract class AbstractMetricProcessFunc<KEY> extends ProcessWindowFunct .getExecutionConfig().getGlobalJobParameters(); preMetricsRoundScale = configuration.get(CommonConfig.PRE_METRICS_ROUND_SCALE); } + + @Override + public void process(KEY key, ProcessWindowFunction<CnMetricLog, String, KEY, TimeWindow>.Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { + CnMetricLog next = elements.iterator().next(); + MetricResultLog metricResultLog = new MetricResultLog(); + setCommonFields(key, next, metricResultLog); + metricResultLog.setStat_time(context.window().getStart() / 1000); + CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); + String metricResultJsonStr = JSON.toJSONString(metricResultLog); + out.collect(metricResultJsonStr); + } + + public abstract void setCommonFields(KEY key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog); }
\ No newline at end of file diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java index 363c41b..ebe77c6 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java @@ -1,26 +1,15 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricAppProcessFunc extends AbstractMetricProcessFunc<String> { + @Override - public void process(String key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(String key, CnMetricLog next, MetricResultLog metricResultLog) { metricResultLog.setCommon_app_label(next.getCommon_app_label()); metricResultLog.setApp_category(next.getApp_category()); metricResultLog.setApp_company(next.getApp_company()); metricResultLog.setApp_subcategory(next.getApp_subcategory()); - metricResultLog.setStat_time(context.window().getStart()/1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java index 91e7bb1..f1a36e8 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java @@ -1,13 +1,8 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> { private String side; @@ -17,17 +12,9 @@ public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<Strin } @Override - public void process(Tuple2<String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { metricResultLog.setAsn(key.f0); metricResultLog.setIsp(key.f1); metricResultLog.setSide(side); - metricResultLog.setStat_time(context.window().getStart() / 1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java index d3feca3..7b1e981 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java @@ -1,26 +1,15 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricDomainProcessFunc extends AbstractMetricProcessFunc<String> { + @Override - public void process(String key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(String s, CnMetricLog next, MetricResultLog metricResultLog) { metricResultLog.setDomain(next.getDomain()); metricResultLog.setDomain_sld(next.getDomain_sld()); metricResultLog.setDomain_category_group(next.getDomain_category_group()); metricResultLog.setDomain_category_name(next.getDomain_category_name()); - metricResultLog.setStat_time(context.window().getStart()/1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java index 7bae391..3d8b93b 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java @@ -1,12 +1,7 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> { private String side; @@ -16,15 +11,10 @@ public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> } @Override - public void process(String ip, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - MetricResultLog metricResultLog = new MetricResultLog(); - metricResultLog.setIp(ip); + public void setCommonFields(String key, CnMetricLog next, MetricResultLog metricResultLog) { + metricResultLog.setIp(key); metricResultLog.setSide(side); metricResultLog.setZone(next.getZone()); - metricResultLog.setStat_time(context.window().getStart()/1000); - - CommonMetric.setCommonMetric(next,metricResultLog, preMetricsRoundScale); metricResultLog.setAsymmetric_sessions(next.getAsymmetric_sessions()); metricResultLog.setBulky_sessions(next.getBulky_sessions()); metricResultLog.setCbr_streaming_sessions(next.getCbr_streaming_sessions()); @@ -35,10 +25,7 @@ public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> metricResultLog.setUnidirectional_sessions(next.getUnidirectional_sessions()); metricResultLog.setRandom_looking_sessions(next.getRandom_looking_sessions()); metricResultLog.setBidirectional_sessions(next.getBidirectional_sessions()); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } - } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java index fa56bae..d532cc1 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java @@ -1,20 +1,13 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; import org.apache.flink.api.java.tuple.Tuple12; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String>> { @Override - public void process(Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(Tuple12<String, String, String, String, String, String, String, String, Long, Long, String, String> key, CnMetricLog next, MetricResultLog metricResultLog) { metricResultLog.setClient_country_region(next.getClient_country_region()); metricResultLog.setClient_super_admin_area(next.getClient_super_admin_area()); metricResultLog.setClient_admin_area(next.getClient_admin_area()); @@ -27,11 +20,5 @@ public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<Str metricResultLog.setCommon_in_link_id(next.getCommon_in_link_id()); metricResultLog.setOut_link_direction(next.getOut_link_direction()); metricResultLog.setIn_link_direction(next.getIn_link_direction()); - - metricResultLog.setStat_time(context.window().getStart()/1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java index dfc0929..255329c 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java @@ -1,25 +1,14 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricProtocolProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, Integer>> { + @Override - public void process(Tuple2<String, Integer> stringIntegerTuple2, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(Tuple2<String, Integer> stringIntegerTuple2, CnMetricLog next, MetricResultLog metricResultLog) { metricResultLog.setCommon_l7_protocol(next.getCommon_l7_protocol()); metricResultLog.setCommon_server_port(next.getCommon_server_port()); - metricResultLog.setStat_time(context.window().getStart()/1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java index 9e7ec61..8f07306 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java @@ -1,13 +1,8 @@ package com.zdjizhi.pre.function; -import com.alibaba.fastjson2.JSON; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tuple3<String, String, String>> { private String side; @@ -17,18 +12,10 @@ public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tup } @Override - public void process(Tuple3<String, String, String> key, Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { - CnMetricLog next = elements.iterator().next(); - - MetricResultLog metricResultLog = new MetricResultLog(); + public void setCommonFields(Tuple3<String, String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { metricResultLog.setCountry_region(key.f0); metricResultLog.setSuper_admin_area(key.f1); metricResultLog.setAdmin_area(key.f2); metricResultLog.setSide(side); - metricResultLog.setStat_time(context.window().getStart() / 1000); - - CommonMetric.setCommonMetric(next, metricResultLog, preMetricsRoundScale); - String metricResultJsonStr = JSON.toJSONString(metricResultLog); - out.collect(metricResultJsonStr); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java new file mode 100644 index 0000000..15ce6fd --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java @@ -0,0 +1,15 @@ +package com.zdjizhi.pre.function; + +import com.zdjizhi.pre.common.CnMetricLog; +import com.zdjizhi.pre.common.MetricResultLog; +import org.apache.flink.api.java.tuple.Tuple2; + + +public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> { + + @Override + public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { + metricResultLog.setSubscriber_id(key.f0); + metricResultLog.setCommon_app_label(key.f1); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java new file mode 100644 index 0000000..1eaf366 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberProcessFunc.java @@ -0,0 +1,16 @@ +package com.zdjizhi.pre.function; + +import com.zdjizhi.pre.common.CnMetricLog; +import com.zdjizhi.pre.common.MetricResultLog; + +public class MetricSubscriberProcessFunc extends AbstractMetricProcessFunc<String> { + + @Override + public void setCommonFields(String key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { + metricResultLog.setSubscriber_id(key); + metricResultLog.setImei(cnMetricLog.getImei()); + metricResultLog.setImsi(cnMetricLog.getImsi()); + metricResultLog.setPhone_number(cnMetricLog.getPhone_number()); + metricResultLog.setApn(cnMetricLog.getApn()); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java index 74d3528..6b4112e 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java @@ -6,7 +6,6 @@ import com.zdjizhi.pre.common.MetricKeyConfig; import org.apache.commons.lang3.StringUtils; import java.util.HashMap; -import java.util.List; import java.util.Map; public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { @@ -25,6 +24,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { private final String sslSniMetricKey = "sslSniMetric"; private final String protocolMetricKey = "protocolMetric"; private final String linkMetricKey = "linkMetric"; + private final String subscriberKey = "subscriberMetric"; + private final String subscriberAppKey = "subscriberAppMetric"; @Override public Map<String, Map<String, CnMetricLog>> createAccumulator() { @@ -43,6 +44,8 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { accumulator.put(sslSniMetricKey, new HashMap<>()); accumulator.put(protocolMetricKey, new HashMap<>()); accumulator.put(linkMetricKey, new HashMap<>()); + accumulator.put(subscriberKey, new HashMap<>()); + accumulator.put(subscriberAppKey, new HashMap<>()); return accumulator; } @@ -254,6 +257,37 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { } linkMetricMap.put(key, metricLog); } + + String subscriberId = value.getSubscriber_id(); + if (StringUtils.isNotBlank(subscriberId)) { + Map<String, CnMetricLog> subscriberMetricMap = accumulator.get(subscriberKey); + CnMetricLog cnMetricLog = subscriberMetricMap.get(subscriberId); + if (cnMetricLog != null) { + updateMetric(value, cnMetricLog); + } else { + cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER); + initMetric(value, cnMetricLog); + cnMetricLog.setSubscriber_id(subscriberId); + cnMetricLog.setImei(value.getImei()); + cnMetricLog.setImsi(value.getImsi()); + cnMetricLog.setPhone_number(value.getPhone_number()); + cnMetricLog.setApn(value.getApn()); + } + } + + String appLabel = value.getCommon_app_label(); + if (StringUtils.isNotBlank(subscriberId) && StringUtils.isNotBlank(appLabel)) { + String key = subscriberId + "-" + appLabel; + Map<String, CnMetricLog> subscriberAppMetricMap = accumulator.get(subscriberAppKey); + CnMetricLog cnMetricLog = subscriberAppMetricMap.get(key); + if (cnMetricLog != null) { + updateMetric(value, cnMetricLog); + } else { + cnMetricLog = new CnMetricLog(MetricKeyConfig.SUBSCRIBER_APP); + cnMetricLog.setSubscriber_id(subscriberId); + cnMetricLog.setCommon_app_label(appLabel); + } + } } catch (Exception e) { e.printStackTrace(System.out); } |
