diff options
| author | gujinkai <[email protected]> | 2024-02-27 17:53:01 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-02-28 11:10:35 +0800 |
| commit | d4e7a0e1b6bbb3491346613e01417e177e602b6c (patch) | |
| tree | 1fe71bfe3a1a90f5e1602f1557232043fec9df1b | |
| parent | 31969400245dad3fa480c1db0aa7bdb031ee2fcf (diff) | |
feat: add location_subscriber agg
36 files changed, 433 insertions, 138 deletions
diff --git a/module-CN-pre-metrics/pom.xml b/module-CN-pre-metrics/pom.xml index 3e74bdd..45b93b9 100644 --- a/module-CN-pre-metrics/pom.xml +++ b/module-CN-pre-metrics/pom.xml @@ -17,6 +17,12 @@ <artifactId>platform-etl</artifactId> <version>23.10-SNAPSHOT</version> </dependency> + + <dependency> + <groupId>com.uber</groupId> + <artifactId>h3</artifactId> + <version>4.1.1</version> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java index 4a63216..e048f08 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java @@ -1,16 +1,16 @@ -package com.zdjizhi.pre; +package com.zdjizhi.pre.base; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; import com.zdjizhi.base.utils.KafkaUtils; import com.zdjizhi.etl.CnRecordEtl; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.CommonConfig; -import com.zdjizhi.pre.common.MetricKeyConfig; -import com.zdjizhi.pre.function.*; -import com.zdjizhi.pre.operator.FirstAggregation; -import com.zdjizhi.pre.operator.SecondAggregationReduce; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.CommonConfig; +import com.zdjizhi.pre.base.common.MetricKeyConfig; +import com.zdjizhi.pre.base.function.*; +import com.zdjizhi.pre.base.operator.FirstAggregation; +import com.zdjizhi.pre.base.operator.SecondAggregationReduce; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java index c8b135f..e89087c 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java @@ -1,4 +1,4 @@ -package com.zdjizhi.pre.common; +package com.zdjizhi.pre.base.common; public class CnMetricLog { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java index 3d29b5c..03857fa 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java @@ -1,4 +1,4 @@ -package com.zdjizhi.pre.common; +package com.zdjizhi.pre.base.common; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java index e2af60b..0932d65 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java @@ -1,4 +1,4 @@ -package com.zdjizhi.pre.common; +package com.zdjizhi.pre.base.common; public class MetricKeyConfig { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java index e5b80a2..a0ac350 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java @@ -1,4 +1,4 @@ -package com.zdjizhi.pre.common; +package com.zdjizhi.pre.base.common; import com.alibaba.fastjson2.annotation.JSONField; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/AbstractMetricProcessFunc.java index 95095c0..9363d27 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/AbstractMetricProcessFunc.java @@ -1,10 +1,10 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; import com.alibaba.fastjson2.JSON; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.CommonConfig; -import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.handler.CommonMetric; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.CommonConfig; +import com.zdjizhi.pre.base.common.MetricResultLog; +import com.zdjizhi.pre.base.handler.CommonMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAppProcessFunc.java index ebe77c6..f5929ac 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAppProcessFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; public class MetricAppProcessFunc extends AbstractMetricProcessFunc<String> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAsnProcessFunc.java index f1a36e8..7c1ce3e 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAsnProcessFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; import org.apache.flink.api.java.tuple.Tuple2; public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricDomainProcessFunc.java index 7b1e981..86d86c2 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricDomainProcessFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; public class MetricDomainProcessFunc extends AbstractMetricProcessFunc<String> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricIpProcessWindowFunc.java index 3d8b93b..3917d35 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricIpProcessWindowFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> { private String side; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricLinkProcessFunc.java index d532cc1..391e672 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricLinkProcessFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; import org.apache.flink.api.java.tuple.Tuple12; public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String>> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricProtocolProcessFunc.java index 255329c..63f4b7a 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricProtocolProcessFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; import org.apache.flink.api.java.tuple.Tuple2; public class MetricProtocolProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, Integer>> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricRegionProcessWindowFunc.java index 8f07306..65019de 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricRegionProcessWindowFunc.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.function; +package com.zdjizhi.pre.base.function; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; import org.apache.flink.api.java.tuple.Tuple3; public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tuple3<String, String, String>> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java new file mode 100644 index 0000000..6c46a1a --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java @@ -0,0 +1,19 @@ +package com.zdjizhi.pre.base.function; + +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; +import org.apache.flink.api.java.tuple.Tuple2; + + +public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> { + + @Override + public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { + metricResultLog.setSubscriber_id(key.f0); + metricResultLog.setCommon_app_label(key.f1); + metricResultLog.setImei(cnMetricLog.getImei()); + metricResultLog.setImsi(cnMetricLog.getImsi()); + metricResultLog.setPhone_number(cnMetricLog.getPhone_number()); + metricResultLog.setApn(cnMetricLog.getApn()); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/handler/CommonMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/handler/CommonMetric.java index e0693b9..83ec861 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/handler/CommonMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/handler/CommonMetric.java @@ -1,9 +1,7 @@ -package com.zdjizhi.pre.handler; - -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.CommonConfig; -import com.zdjizhi.pre.common.MetricResultLog; +package com.zdjizhi.pre.base.handler; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; import java.math.BigDecimal; import java.math.RoundingMode; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/AbstractFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java index 9c6f463..9e59702 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/AbstractFirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java @@ -1,7 +1,7 @@ -package com.zdjizhi.pre.operator; +package com.zdjizhi.pre.base.operator; import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.pre.common.CommonConfig; +import com.zdjizhi.pre.base.common.CommonConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.windowing.time.Time; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java index 0c767a9..d6ba3ee 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java @@ -1,8 +1,8 @@ -package com.zdjizhi.pre.operator; +package com.zdjizhi.pre.base.operator; import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricKeyConfig; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricKeyConfig; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/SecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/SecondAggregationReduce.java index 19fc3f5..2817c63 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/SecondAggregationReduce.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/SecondAggregationReduce.java @@ -1,6 +1,6 @@ -package com.zdjizhi.pre.operator; +package com.zdjizhi.pre.base.operator; -import com.zdjizhi.pre.common.CnMetricLog; +import com.zdjizhi.pre.base.common.CnMetricLog; import org.apache.flink.api.common.functions.ReduceFunction; public class SecondAggregationReduce implements ReduceFunction<CnMetricLog> { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java index ca39841..ef1ac6f 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java @@ -5,7 +5,7 @@ import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; import com.zdjizhi.base.utils.KafkaUtils; import com.zdjizhi.etl.CnRecordEtl; -import com.zdjizhi.pre.common.MetricKeyConfig; +import com.zdjizhi.pre.base.common.MetricKeyConfig; import com.zdjizhi.pre.dns.common.CommonConfig; import com.zdjizhi.pre.dns.common.DnsMetricLog; import com.zdjizhi.pre.dns.function.*; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java index 117bed8..8884480 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java @@ -4,9 +4,9 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import com.geedgenetworks.utils.FormatUtils; import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.pre.common.MetricKeyConfig; +import com.zdjizhi.pre.base.common.MetricKeyConfig; +import com.zdjizhi.pre.base.operator.AbstractFirstAggregation; import com.zdjizhi.pre.dns.common.DnsMetricLog; -import com.zdjizhi.pre.operator.AbstractFirstAggregation; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java deleted file mode 100644 index 868345a..0000000 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zdjizhi.pre.function; - -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.MetricResultLog; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.Random; - - -public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> { - - private Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5}; - - @Override - public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) { - metricResultLog.setSubscriber_id(key.f0); - metricResultLog.setCommon_app_label(key.f1); - metricResultLog.setImei(cnMetricLog.getImei()); - metricResultLog.setImsi(cnMetricLog.getImsi()); - metricResultLog.setPhone_number(cnMetricLog.getPhone_number()); - metricResultLog.setApn(cnMetricLog.getApn()); - Locate randomLocate = getRandomLocate(); - metricResultLog.setSubscriber_longitude(randomLocate.longitude); - metricResultLog.setSubscriber_latitude(randomLocate.latitude); - } - - private Locate getRandomLocate() { - Random random = new Random(); - int i = random.nextInt(5); - return locates[i]; - } - - private enum Locate { - L1(116.391721, 39.906094), - L2(116.386739, 39.906266), - L3(116.384863, 39.902834), - L4(116.387807, 39.900607), - L5(116.39328, 39.898911); - - - private Double longitude; - private Double latitude; - - Locate(Double longitude, Double latitude) { - this.longitude = longitude; - this.latitude = latitude; - } - } -} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java new file mode 100644 index 0000000..28a462b --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java @@ -0,0 +1,48 @@ +package com.zdjizhi.pre.location; + +import com.zdjizhi.base.common.CnRecordLog; +import com.zdjizhi.base.config.Configs; +import com.zdjizhi.base.platform.Schedule; +import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.etl.CnRecordEtl; +import com.zdjizhi.pre.location.common.CommonConfig; +import com.zdjizhi.pre.location.common.LocationSubscriber; +import com.zdjizhi.pre.location.function.MetricSubscriberProcessWindowFunc; +import com.zdjizhi.pre.location.operator.FirstAggregation; +import com.zdjizhi.pre.location.operator.SecondAggregationReduce; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:35 + */ +public class LocationMetric implements Schedule { + + private int windowsTime = 1; + + private int outputParallelism = 1; + + public LocationMetric() { + this.windowsTime = Configs.get(CommonConfig.LOCATION_METRICS_WINDOW_TIME); + this.outputParallelism = Configs.get(CommonConfig.LOCATION_METRIC_OUTPUT_PARALLELISM); + } + + @Override + public void schedule() throws Exception { + //todo 先临时使用session数据源,后续替换为专门的数据源 + SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; + SingleOutputStreamOperator<LocationSubscriber> process = source.process(new FirstAggregation()).name("locationFirstAggProcess"); + + SingleOutputStreamOperator<String> locationSubscriberMetric = process.keyBy(LocationSubscriber::getSubscriber_id) + .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) + .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc()); + + locationSubscriberMetric + //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE))) + .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC))) + .setParallelism(outputParallelism); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java new file mode 100644 index 0000000..5c52bfb --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java @@ -0,0 +1,39 @@ +package com.zdjizhi.pre.location.common; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * @author wlh + * @date 2021/1/6 + */ +public class CommonConfig { + + public static final ConfigOption<Integer> LOCATION_METRICS_WINDOW_TIME = ConfigOptions.key("location.metrics.window.time") + .intType() + .defaultValue(1); + + public static final ConfigOption<Integer> LOCATION_METRIC_OUTPUT_PARALLELISM = ConfigOptions.key("location.metric.output.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption<String> LOCATION_SUBSCRIBER_TOPIC = ConfigOptions.key("location.subscriber.topic") + .stringType() + .defaultValue("LOCATION-SUBSCRIBER"); + + public static final ConfigOption<String> LOCATION_SUBSCRIBER_TABLE = ConfigOptions.key("location.subscriber.table") + .stringType() + .defaultValue("location_subscriber_local"); + + public static final ConfigOption<Integer> H3_FIRST_RES = ConfigOptions.key("h3.first.res") + .intType() + .defaultValue(9); + + public static final ConfigOption<Integer> H3_SECOND_RES = ConfigOptions.key("h3.second.res") + .intType() + .defaultValue(8); + + public static final ConfigOption<Integer> H3_THIRD_RES = ConfigOptions.key("h3.third.res") + .intType() + .defaultValue(7); +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java new file mode 100644 index 0000000..1c493fd --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java @@ -0,0 +1,23 @@ +package com.zdjizhi.pre.location.common; + +import lombok.Data; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:56 + */ +@Data +public class LocationMetricResult { + private String subscriber_id; + private String imei; + private String imsi; + private String phone_number; + private String apn; + private Double subscriber_longitude; + private Double subscriber_latitude; + private String first_location; + private String second_location; + private String third_location; + private Long stat_time; +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java new file mode 100644 index 0000000..e0bf0c7 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java @@ -0,0 +1,20 @@ +package com.zdjizhi.pre.location.common; + +import lombok.Data; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:25 + */ +@Data +public class LocationSubscriber { + private String subscriber_id; + private String imei; + private String imsi; + private String phone_number; + private String apn; + private Double subscriber_longitude; + private Double subscriber_latitude; + private Long recv_time; +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java new file mode 100644 index 0000000..8b09dd1 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java @@ -0,0 +1,57 @@ +package com.zdjizhi.pre.location.function; + +import com.alibaba.fastjson2.JSON; +import com.uber.h3core.H3Core; +import com.zdjizhi.pre.location.common.CommonConfig; +import com.zdjizhi.pre.location.common.LocationMetricResult; +import com.zdjizhi.pre.location.common.LocationSubscriber; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 17:02 + */ +public class MetricSubscriberProcessWindowFunc extends ProcessWindowFunction<LocationSubscriber, String, String, TimeWindow> { + + private H3Core h3; + + protected int firstRes = 9; + + protected int secondRes = 8; + + protected int thirdRes = 7; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + h3 = H3Core.newInstance(); + final Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + firstRes = configuration.get(CommonConfig.H3_FIRST_RES); + secondRes = configuration.get(CommonConfig.H3_SECOND_RES); + thirdRes = configuration.get(CommonConfig.H3_THIRD_RES); + } + + @Override + public void process(String s, ProcessWindowFunction<LocationSubscriber, String, String, TimeWindow>.Context context, Iterable<LocationSubscriber> elements, Collector<String> out) throws Exception { + LocationSubscriber next = elements.iterator().next(); + LocationMetricResult locationMetricResult = new LocationMetricResult(); + locationMetricResult.setSubscriber_id(next.getSubscriber_id()); + locationMetricResult.setImei(next.getImei()); + locationMetricResult.setImsi(next.getImsi()); + locationMetricResult.setPhone_number(next.getPhone_number()); + locationMetricResult.setApn(next.getApn()); + locationMetricResult.setSubscriber_longitude(next.getSubscriber_longitude()); + locationMetricResult.setSubscriber_latitude(next.getSubscriber_latitude()); + locationMetricResult.setFirst_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), firstRes)); + locationMetricResult.setSecond_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), secondRes)); + locationMetricResult.setThird_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), thirdRes)); + locationMetricResult.setStat_time(context.window().getStart() / 1000); + String metricResultJsonStr = JSON.toJSONString(locationMetricResult); + out.collect(metricResultJsonStr); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java new file mode 100644 index 0000000..b095404 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java @@ -0,0 +1,64 @@ +package com.zdjizhi.pre.location.operator; + +import com.zdjizhi.base.common.CnRecordLog; +import com.zdjizhi.pre.base.operator.AbstractFirstAggregation; +import com.zdjizhi.pre.location.common.LocationSubscriber; +import com.zdjizhi.pre.location.utils.SubscriberUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:40 + */ +public class FirstAggregation extends AbstractFirstAggregation<LocationSubscriber> { + + private final Logger logger = LoggerFactory.getLogger(FirstAggregation.class); + + private final String locationSubscriberKey = "locationSubscriberMetric"; + + @Override + public Map<String, Map<String, LocationSubscriber>> createAccumulator() { + Map<String, Map<String, LocationSubscriber>> accumulator = new HashMap<>(); + accumulator.put(locationSubscriberKey, new HashMap<>()); + return accumulator; + } + + @Override + public void add(CnRecordLog value, Map<String, Map<String, LocationSubscriber>> accumulator) { + try { + String subscriberId = value.getSubscriber_id(); + if (StringUtils.isNotBlank(subscriberId)) { + Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(locationSubscriberKey); + LocationSubscriber locationSubscriber = locationSubscriberMap.get(subscriberId); + if (locationSubscriber != null) { + updateLocationSubscriber(value, locationSubscriber); + } else { + locationSubscriber = new LocationSubscriber(); + locationSubscriber.setSubscriber_id(subscriberId); + updateLocationSubscriber(value, locationSubscriber); + } + locationSubscriberMap.put(subscriberId, locationSubscriber); + } + } catch (Exception e) { + logger.error("FirstAggregation add error", e); + } + } + + private void updateLocationSubscriber(CnRecordLog value, LocationSubscriber locationSubscriber) { + locationSubscriber.setImei(value.getImei()); + locationSubscriber.setImsi(value.getImsi()); + locationSubscriber.setPhone_number(value.getPhone_number()); + locationSubscriber.setApn(value.getApn()); + //todo 目前数据源字段不确定,先用随机经纬度代替 + SubscriberUtils.Locate randomLocate = SubscriberUtils.getRandomLocate(); + locationSubscriber.setSubscriber_longitude(randomLocate.longitude); + locationSubscriber.setSubscriber_latitude(randomLocate.latitude); + locationSubscriber.setRecv_time(value.getCommon_recv_time()); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java new file mode 100644 index 0000000..8d9e4a7 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java @@ -0,0 +1,20 @@ +package com.zdjizhi.pre.location.operator; + +import com.zdjizhi.pre.location.common.LocationSubscriber; +import org.apache.flink.api.common.functions.ReduceFunction; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:54 + */ +public class SecondAggregationReduce implements ReduceFunction<LocationSubscriber> { + + @Override + public LocationSubscriber reduce(LocationSubscriber value1, LocationSubscriber value2) throws Exception { + if (value1.getRecv_time() > value2.getRecv_time()) { + return value1; + } + return value2; + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java new file mode 100644 index 0000000..fdd4192 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java @@ -0,0 +1,56 @@ +package com.zdjizhi.pre.location.utils; + +import java.util.Random; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/27 16:29 + */ +public class SubscriberUtils { + + private static Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5, Locate.L6, Locate.L7, Locate.L8, Locate.L9, Locate.L10, Locate.L11, Locate.L12, Locate.L13, Locate.L14, Locate.L15, Locate.L16, Locate.L17, Locate.L18, Locate.L19, Locate.L20, Locate.L21, Locate.L22, Locate.L23, Locate.L24, Locate.L25}; + + public static Locate getRandomLocate() { + Random random = new Random(); + int i = random.nextInt(25); + return locates[i]; + } + + public enum Locate { + L1(116.391721, 39.906094), + L2(116.386739, 39.906266), + L3(116.384863, 39.902834), + L4(116.387807, 39.900607), + L5(116.39328, 39.898911), + L6(116.381896, 39.906917), + L7(116.376717, 39.906147), + L8(116.376113, 39.909896), + L9(116.372452, 39.913832), + L10(116.375427, 39.920883), + L11(116.382980, 39.925003), + L12(116.38813, 39.926376), + L13(116.400375, 39.926834), + L14(116.399913, 39.932007), + L15(116.393221, 39.934137), + L16(116.38916, 39.935303), + L17(116.385799, 39.935684), + L18(116.384354, 39.935303), + L19(116.38298, 39.934273), + L20(116.37999, 39.93446), + L21(116.378914, 39.93676), + L22(116.37787, 39.942691), + L23(116.374113, 39.943599), + L24(116.370041, 39.944283), + L25(116.367037, 39.947116); + + + public Double longitude; + public Double latitude; + + Locate(Double longitude, Double latitude) { + this.longitude = longitude; + this.latitude = latitude; + } + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index 18da277..a05c575 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -6,7 +6,7 @@ import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; import com.zdjizhi.base.utils.KafkaUtils; import com.zdjizhi.etl.CnRecordEtl; -import com.zdjizhi.pre.common.CommonConfig; +import com.zdjizhi.pre.base.common.CommonConfig; import com.zdjizhi.pre.relation.common.RelationMetricLog; import com.zdjizhi.pre.relation.function.FirstAggregation; import org.apache.flink.api.common.functions.ReduceFunction; diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java index c2f415a..114a579 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java @@ -1,8 +1,7 @@ package com.zdjizhi.pre.relation.function; import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.operator.AbstractFirstAggregation; +import com.zdjizhi.pre.base.operator.AbstractFirstAggregation; import com.zdjizhi.pre.relation.common.RelationMetricLog; import java.util.ArrayList; diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java index f8a6a5b..cff1b2b 100644 --- a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java +++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java @@ -5,15 +5,12 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; -import com.zdjizhi.base.utils.FlinkEnvironmentUtils; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.CommonConfig; -import com.zdjizhi.pre.common.MetricKeyConfig; - - -import com.zdjizhi.pre.function.MetricIpProcessWindowFunc; -import com.zdjizhi.pre.operator.FirstAggregation; -import com.zdjizhi.pre.operator.SecondAggregationReduce; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.CommonConfig; +import com.zdjizhi.pre.base.common.MetricKeyConfig; +import com.zdjizhi.pre.base.function.MetricIpProcessWindowFunc; +import com.zdjizhi.pre.base.operator.FirstAggregation; +import com.zdjizhi.pre.base.operator.SecondAggregationReduce; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -30,8 +27,6 @@ import org.junit.Test; import java.io.File; import java.time.Duration; -import static org.junit.Assert.assertEquals; - public class DNSMetricTest { @ClassRule diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java index 2c69b6c..15d7a67 100644 --- a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java +++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java @@ -5,16 +5,13 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; -import com.zdjizhi.base.utils.FlinkEnvironmentUtils; -import com.zdjizhi.pre.common.CnMetricLog; -import com.zdjizhi.pre.common.CommonConfig; -import com.zdjizhi.pre.common.MetricKeyConfig; - - -import com.zdjizhi.pre.common.MetricResultLog; -import com.zdjizhi.pre.function.MetricIpProcessWindowFunc; -import com.zdjizhi.pre.operator.FirstAggregation; -import com.zdjizhi.pre.operator.SecondAggregationReduce; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.CommonConfig; +import com.zdjizhi.pre.base.common.MetricKeyConfig; +import com.zdjizhi.pre.base.common.MetricResultLog; +import com.zdjizhi.pre.base.function.MetricIpProcessWindowFunc; +import com.zdjizhi.pre.base.operator.FirstAggregation; +import com.zdjizhi.pre.base.operator.SecondAggregationReduce; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStreamSource; diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties index c6c8197..fca0668 100644 --- a/platform-schedule/src/main/resources/business.properties +++ b/platform-schedule/src/main/resources/business.properties @@ -1,7 +1,9 @@ # session-record-cn?? -#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence +cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence # ??? -cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric +cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric +# location +cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric # ????????? cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric # dns??? diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties index fc5577f..cca2a64 100644 --- a/platform-schedule/src/main/resources/common.properties +++ b/platform-schedule/src/main/resources/common.properties @@ -1,19 +1,21 @@ # ???? -stream.execution.job.name=ETL-METRIC +stream.execution.job.name=CN-STREAM-PROCESSING-PLATFORM # ????? -stream.execution.environment.parallelism=1 +stream.execution.environment.parallelism=2 # kafka source??? -session.record.completed.parallelism=1 +session.record.completed.parallelism=2 # session-record-cn sink??? -cn.record.parallelism=1 +cn.record.parallelism=2 # ???sink??? -metric.output.parallelism=1 +metric.output.parallelism=2 # dns???sink??? -dns.metric.output.parallelism=1 +dns.metric.output.parallelism=2 # ????sink??? -metric.entity.relation.output.parallelism=1 +metric.entity.relation.output.parallelism=2 # ????sink??? -metric.dynamic.attribute.output.parallelism=1 +metric.dynamic.attribute.output.parallelism=2 +# location sink??? +location.metric.output.parallelism=2 # kafka??? kafka.input.bootstrap.servers=192.168.44.55:9092 session.record.completed.topic=SESSION-RECORD @@ -32,10 +34,9 @@ cn.record.topic=SESSION-RECORD-CN output.sasl.jaas.config.flag=0 # flink checkpoint?? 0:? 1:? flink.enable.checkpoint.flag=1 -# nacos?? -nacos.server.addr=192.168.44.55:8848 # api detection url rule.full.url=http://192.168.44.54:8090/v1/rule/detection rule.inc.url=http://192.168.44.54:8090/v1/rule/detection/increase +# ??host gateway.host=192.168.44.55 -etl.enable=false
\ No newline at end of file +#etl.enable=false
\ No newline at end of file |
