summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-02-27 17:53:01 +0800
committergujinkai <[email protected]>2024-02-28 11:10:35 +0800
commitd4e7a0e1b6bbb3491346613e01417e177e602b6c (patch)
tree1fe71bfe3a1a90f5e1602f1557232043fec9df1b
parent31969400245dad3fa480c1db0aa7bdb031ee2fcf (diff)
feat: add location_subscriber agg
-rw-r--r--module-CN-pre-metrics/pom.xml6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java)14
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java)2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java)2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java)2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java)2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/AbstractMetricProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java)10
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAppProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAsnProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricDomainProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricIpProcessWindowFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricLinkProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricProtocolProcessFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricRegionProcessWindowFunc.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java19
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/handler/CommonMetric.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/handler/CommonMetric.java)8
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/AbstractFirstAggregation.java)4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java)6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/SecondAggregationReduce.java (renamed from module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/SecondAggregationReduce.java)4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java49
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java48
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java39
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java23
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java20
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java57
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java64
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java20
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java56
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java2
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java3
-rw-r--r--module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java17
-rw-r--r--module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java17
-rw-r--r--platform-schedule/src/main/resources/business.properties6
-rw-r--r--platform-schedule/src/main/resources/common.properties23
36 files changed, 433 insertions, 138 deletions
diff --git a/module-CN-pre-metrics/pom.xml b/module-CN-pre-metrics/pom.xml
index 3e74bdd..45b93b9 100644
--- a/module-CN-pre-metrics/pom.xml
+++ b/module-CN-pre-metrics/pom.xml
@@ -17,6 +17,12 @@
<artifactId>platform-etl</artifactId>
<version>23.10-SNAPSHOT</version>
</dependency>
+
+ <dependency>
+ <groupId>com.uber</groupId>
+ <artifactId>h3</artifactId>
+ <version>4.1.1</version>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java
index 4a63216..e048f08 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java
@@ -1,16 +1,16 @@
-package com.zdjizhi.pre;
+package com.zdjizhi.pre.base;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
import com.zdjizhi.base.utils.KafkaUtils;
import com.zdjizhi.etl.CnRecordEtl;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.CommonConfig;
-import com.zdjizhi.pre.common.MetricKeyConfig;
-import com.zdjizhi.pre.function.*;
-import com.zdjizhi.pre.operator.FirstAggregation;
-import com.zdjizhi.pre.operator.SecondAggregationReduce;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.CommonConfig;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.function.*;
+import com.zdjizhi.pre.base.operator.FirstAggregation;
+import com.zdjizhi.pre.base.operator.SecondAggregationReduce;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java
index c8b135f..e89087c 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CnMetricLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.pre.common;
+package com.zdjizhi.pre.base.common;
public class CnMetricLog {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
index 3d29b5c..03857fa 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/CommonConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.pre.common;
+package com.zdjizhi.pre.base.common;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java
index e2af60b..0932d65 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricKeyConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.pre.common;
+package com.zdjizhi.pre.base.common;
public class MetricKeyConfig {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java
index e5b80a2..a0ac350 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/common/MetricResultLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.pre.common;
+package com.zdjizhi.pre.base.common;
import com.alibaba.fastjson2.annotation.JSONField;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/AbstractMetricProcessFunc.java
index 95095c0..9363d27 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/AbstractMetricProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/AbstractMetricProcessFunc.java
@@ -1,10 +1,10 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
import com.alibaba.fastjson2.JSON;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.CommonConfig;
-import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.handler.CommonMetric;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.CommonConfig;
+import com.zdjizhi.pre.base.common.MetricResultLog;
+import com.zdjizhi.pre.base.handler.CommonMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAppProcessFunc.java
index ebe77c6..f5929ac 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAppProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAppProcessFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
public class MetricAppProcessFunc extends AbstractMetricProcessFunc<String> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAsnProcessFunc.java
index f1a36e8..7c1ce3e 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricAsnProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricAsnProcessFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
import org.apache.flink.api.java.tuple.Tuple2;
public class MetricAsnProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricDomainProcessFunc.java
index 7b1e981..86d86c2 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricDomainProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricDomainProcessFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
public class MetricDomainProcessFunc extends AbstractMetricProcessFunc<String> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricIpProcessWindowFunc.java
index 3d8b93b..3917d35 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricIpProcessWindowFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricIpProcessWindowFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
public class MetricIpProcessWindowFunc extends AbstractMetricProcessFunc<String> {
private String side;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricLinkProcessFunc.java
index d532cc1..391e672 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricLinkProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricLinkProcessFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
import org.apache.flink.api.java.tuple.Tuple12;
public class MetricLinkProcessFunc extends AbstractMetricProcessFunc<Tuple12<String, String, String, String, String, String, String, String, Long, Long ,String, String>> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricProtocolProcessFunc.java
index 255329c..63f4b7a 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricProtocolProcessFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricProtocolProcessFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
import org.apache.flink.api.java.tuple.Tuple2;
public class MetricProtocolProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, Integer>> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricRegionProcessWindowFunc.java
index 8f07306..65019de 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricRegionProcessWindowFunc.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricRegionProcessWindowFunc.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.function;
+package com.zdjizhi.pre.base.function;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
import org.apache.flink.api.java.tuple.Tuple3;
public class MetricRegionProcessWindowFunc extends AbstractMetricProcessFunc<Tuple3<String, String, String>> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java
new file mode 100644
index 0000000..6c46a1a
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricSubscriberAppProcessFunc.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.pre.base.function;
+
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+
+public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> {
+
+ @Override
+ public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
+ metricResultLog.setSubscriber_id(key.f0);
+ metricResultLog.setCommon_app_label(key.f1);
+ metricResultLog.setImei(cnMetricLog.getImei());
+ metricResultLog.setImsi(cnMetricLog.getImsi());
+ metricResultLog.setPhone_number(cnMetricLog.getPhone_number());
+ metricResultLog.setApn(cnMetricLog.getApn());
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/handler/CommonMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/handler/CommonMetric.java
index e0693b9..83ec861 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/handler/CommonMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/handler/CommonMetric.java
@@ -1,9 +1,7 @@
-package com.zdjizhi.pre.handler;
-
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.CommonConfig;
-import com.zdjizhi.pre.common.MetricResultLog;
+package com.zdjizhi.pre.base.handler;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
import java.math.BigDecimal;
import java.math.RoundingMode;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/AbstractFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java
index 9c6f463..9e59702 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/AbstractFirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.pre.operator;
+package com.zdjizhi.pre.base.operator;
import com.zdjizhi.base.common.CnRecordLog;
-import com.zdjizhi.pre.common.CommonConfig;
+import com.zdjizhi.pre.base.common.CommonConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java
index 0c767a9..d6ba3ee 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java
@@ -1,8 +1,8 @@
-package com.zdjizhi.pre.operator;
+package com.zdjizhi.pre.base.operator;
import com.zdjizhi.base.common.CnRecordLog;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/SecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/SecondAggregationReduce.java
index 19fc3f5..2817c63 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/SecondAggregationReduce.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/SecondAggregationReduce.java
@@ -1,6 +1,6 @@
-package com.zdjizhi.pre.operator;
+package com.zdjizhi.pre.base.operator;
-import com.zdjizhi.pre.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.CnMetricLog;
import org.apache.flink.api.common.functions.ReduceFunction;
public class SecondAggregationReduce implements ReduceFunction<CnMetricLog> {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
index ca39841..ef1ac6f 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
@@ -5,7 +5,7 @@ import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
import com.zdjizhi.base.utils.KafkaUtils;
import com.zdjizhi.etl.CnRecordEtl;
-import com.zdjizhi.pre.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
import com.zdjizhi.pre.dns.common.CommonConfig;
import com.zdjizhi.pre.dns.common.DnsMetricLog;
import com.zdjizhi.pre.dns.function.*;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
index 117bed8..8884480 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/process/FirstAggregation.java
@@ -4,9 +4,9 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.geedgenetworks.utils.FormatUtils;
import com.zdjizhi.base.common.CnRecordLog;
-import com.zdjizhi.pre.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.operator.AbstractFirstAggregation;
import com.zdjizhi.pre.dns.common.DnsMetricLog;
-import com.zdjizhi.pre.operator.AbstractFirstAggregation;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
deleted file mode 100644
index 868345a..0000000
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/function/MetricSubscriberAppProcessFunc.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.zdjizhi.pre.function;
-
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.MetricResultLog;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Random;
-
-
-public class MetricSubscriberAppProcessFunc extends AbstractMetricProcessFunc<Tuple2<String, String>> {
-
- private Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5};
-
- @Override
- public void setCommonFields(Tuple2<String, String> key, CnMetricLog cnMetricLog, MetricResultLog metricResultLog) {
- metricResultLog.setSubscriber_id(key.f0);
- metricResultLog.setCommon_app_label(key.f1);
- metricResultLog.setImei(cnMetricLog.getImei());
- metricResultLog.setImsi(cnMetricLog.getImsi());
- metricResultLog.setPhone_number(cnMetricLog.getPhone_number());
- metricResultLog.setApn(cnMetricLog.getApn());
- Locate randomLocate = getRandomLocate();
- metricResultLog.setSubscriber_longitude(randomLocate.longitude);
- metricResultLog.setSubscriber_latitude(randomLocate.latitude);
- }
-
- private Locate getRandomLocate() {
- Random random = new Random();
- int i = random.nextInt(5);
- return locates[i];
- }
-
- private enum Locate {
- L1(116.391721, 39.906094),
- L2(116.386739, 39.906266),
- L3(116.384863, 39.902834),
- L4(116.387807, 39.900607),
- L5(116.39328, 39.898911);
-
-
- private Double longitude;
- private Double latitude;
-
- Locate(Double longitude, Double latitude) {
- this.longitude = longitude;
- this.latitude = latitude;
- }
- }
-}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
new file mode 100644
index 0000000..28a462b
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.pre.location;
+
+import com.zdjizhi.base.common.CnRecordLog;
+import com.zdjizhi.base.config.Configs;
+import com.zdjizhi.base.platform.Schedule;
+import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.etl.CnRecordEtl;
+import com.zdjizhi.pre.location.common.CommonConfig;
+import com.zdjizhi.pre.location.common.LocationSubscriber;
+import com.zdjizhi.pre.location.function.MetricSubscriberProcessWindowFunc;
+import com.zdjizhi.pre.location.operator.FirstAggregation;
+import com.zdjizhi.pre.location.operator.SecondAggregationReduce;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:35
+ */
+public class LocationMetric implements Schedule {
+
+ private int windowsTime = 1;
+
+ private int outputParallelism = 1;
+
+ public LocationMetric() {
+ this.windowsTime = Configs.get(CommonConfig.LOCATION_METRICS_WINDOW_TIME);
+ this.outputParallelism = Configs.get(CommonConfig.LOCATION_METRIC_OUTPUT_PARALLELISM);
+ }
+
+ @Override
+ public void schedule() throws Exception {
+ //todo 先临时使用session数据源,后续替换为专门的数据源
+ SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
+ SingleOutputStreamOperator<LocationSubscriber> process = source.process(new FirstAggregation()).name("locationFirstAggProcess");
+
+ SingleOutputStreamOperator<String> locationSubscriberMetric = process.keyBy(LocationSubscriber::getSubscriber_id)
+ .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
+ .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc());
+
+ locationSubscriberMetric
+ //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE)))
+ .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC)))
+ .setParallelism(outputParallelism);
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java
new file mode 100644
index 0000000..5c52bfb
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/CommonConfig.java
@@ -0,0 +1,39 @@
+package com.zdjizhi.pre.location.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * @author wlh
+ * @date 2021/1/6
+ */
+public class CommonConfig {
+
+ public static final ConfigOption<Integer> LOCATION_METRICS_WINDOW_TIME = ConfigOptions.key("location.metrics.window.time")
+ .intType()
+ .defaultValue(1);
+
+ public static final ConfigOption<Integer> LOCATION_METRIC_OUTPUT_PARALLELISM = ConfigOptions.key("location.metric.output.parallelism")
+ .intType()
+ .defaultValue(1);
+
+ public static final ConfigOption<String> LOCATION_SUBSCRIBER_TOPIC = ConfigOptions.key("location.subscriber.topic")
+ .stringType()
+ .defaultValue("LOCATION-SUBSCRIBER");
+
+ public static final ConfigOption<String> LOCATION_SUBSCRIBER_TABLE = ConfigOptions.key("location.subscriber.table")
+ .stringType()
+ .defaultValue("location_subscriber_local");
+
+ public static final ConfigOption<Integer> H3_FIRST_RES = ConfigOptions.key("h3.first.res")
+ .intType()
+ .defaultValue(9);
+
+ public static final ConfigOption<Integer> H3_SECOND_RES = ConfigOptions.key("h3.second.res")
+ .intType()
+ .defaultValue(8);
+
+ public static final ConfigOption<Integer> H3_THIRD_RES = ConfigOptions.key("h3.third.res")
+ .intType()
+ .defaultValue(7);
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java
new file mode 100644
index 0000000..1c493fd
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationMetricResult.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.pre.location.common;
+
+import lombok.Data;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:56
+ */
+@Data
+public class LocationMetricResult {
+ private String subscriber_id;
+ private String imei;
+ private String imsi;
+ private String phone_number;
+ private String apn;
+ private Double subscriber_longitude;
+ private Double subscriber_latitude;
+ private String first_location;
+ private String second_location;
+ private String third_location;
+ private Long stat_time;
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java
new file mode 100644
index 0000000..e0bf0c7
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/common/LocationSubscriber.java
@@ -0,0 +1,20 @@
+package com.zdjizhi.pre.location.common;
+
+import lombok.Data;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:25
+ */
+@Data
+public class LocationSubscriber {
+ private String subscriber_id;
+ private String imei;
+ private String imsi;
+ private String phone_number;
+ private String apn;
+ private Double subscriber_longitude;
+ private Double subscriber_latitude;
+ private Long recv_time;
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java
new file mode 100644
index 0000000..8b09dd1
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/function/MetricSubscriberProcessWindowFunc.java
@@ -0,0 +1,57 @@
+package com.zdjizhi.pre.location.function;
+
+import com.alibaba.fastjson2.JSON;
+import com.uber.h3core.H3Core;
+import com.zdjizhi.pre.location.common.CommonConfig;
+import com.zdjizhi.pre.location.common.LocationMetricResult;
+import com.zdjizhi.pre.location.common.LocationSubscriber;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 17:02
+ */
+public class MetricSubscriberProcessWindowFunc extends ProcessWindowFunction<LocationSubscriber, String, String, TimeWindow> {
+
+ private H3Core h3;
+
+ protected int firstRes = 9;
+
+ protected int secondRes = 8;
+
+ protected int thirdRes = 7;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ h3 = H3Core.newInstance();
+ final Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+ firstRes = configuration.get(CommonConfig.H3_FIRST_RES);
+ secondRes = configuration.get(CommonConfig.H3_SECOND_RES);
+ thirdRes = configuration.get(CommonConfig.H3_THIRD_RES);
+ }
+
+ @Override
+ public void process(String s, ProcessWindowFunction<LocationSubscriber, String, String, TimeWindow>.Context context, Iterable<LocationSubscriber> elements, Collector<String> out) throws Exception {
+ LocationSubscriber next = elements.iterator().next();
+ LocationMetricResult locationMetricResult = new LocationMetricResult();
+ locationMetricResult.setSubscriber_id(next.getSubscriber_id());
+ locationMetricResult.setImei(next.getImei());
+ locationMetricResult.setImsi(next.getImsi());
+ locationMetricResult.setPhone_number(next.getPhone_number());
+ locationMetricResult.setApn(next.getApn());
+ locationMetricResult.setSubscriber_longitude(next.getSubscriber_longitude());
+ locationMetricResult.setSubscriber_latitude(next.getSubscriber_latitude());
+ locationMetricResult.setFirst_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), firstRes));
+ locationMetricResult.setSecond_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), secondRes));
+ locationMetricResult.setThird_location(h3.latLngToCellAddress(locationMetricResult.getSubscriber_latitude(), locationMetricResult.getSubscriber_longitude(), thirdRes));
+ locationMetricResult.setStat_time(context.window().getStart() / 1000);
+ String metricResultJsonStr = JSON.toJSONString(locationMetricResult);
+ out.collect(metricResultJsonStr);
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java
new file mode 100644
index 0000000..b095404
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/FirstAggregation.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.pre.location.operator;
+
+import com.zdjizhi.base.common.CnRecordLog;
+import com.zdjizhi.pre.base.operator.AbstractFirstAggregation;
+import com.zdjizhi.pre.location.common.LocationSubscriber;
+import com.zdjizhi.pre.location.utils.SubscriberUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:40
+ */
+public class FirstAggregation extends AbstractFirstAggregation<LocationSubscriber> {
+
+ private final Logger logger = LoggerFactory.getLogger(FirstAggregation.class);
+
+ private final String locationSubscriberKey = "locationSubscriberMetric";
+
+ @Override
+ public Map<String, Map<String, LocationSubscriber>> createAccumulator() {
+ Map<String, Map<String, LocationSubscriber>> accumulator = new HashMap<>();
+ accumulator.put(locationSubscriberKey, new HashMap<>());
+ return accumulator;
+ }
+
+ @Override
+ public void add(CnRecordLog value, Map<String, Map<String, LocationSubscriber>> accumulator) {
+ try {
+ String subscriberId = value.getSubscriber_id();
+ if (StringUtils.isNotBlank(subscriberId)) {
+ Map<String, LocationSubscriber> locationSubscriberMap = accumulator.get(locationSubscriberKey);
+ LocationSubscriber locationSubscriber = locationSubscriberMap.get(subscriberId);
+ if (locationSubscriber != null) {
+ updateLocationSubscriber(value, locationSubscriber);
+ } else {
+ locationSubscriber = new LocationSubscriber();
+ locationSubscriber.setSubscriber_id(subscriberId);
+ updateLocationSubscriber(value, locationSubscriber);
+ }
+ locationSubscriberMap.put(subscriberId, locationSubscriber);
+ }
+ } catch (Exception e) {
+ logger.error("FirstAggregation add error", e);
+ }
+ }
+
+ private void updateLocationSubscriber(CnRecordLog value, LocationSubscriber locationSubscriber) {
+ locationSubscriber.setImei(value.getImei());
+ locationSubscriber.setImsi(value.getImsi());
+ locationSubscriber.setPhone_number(value.getPhone_number());
+ locationSubscriber.setApn(value.getApn());
+ //todo 目前数据源字段不确定,先用随机经纬度代替
+ SubscriberUtils.Locate randomLocate = SubscriberUtils.getRandomLocate();
+ locationSubscriber.setSubscriber_longitude(randomLocate.longitude);
+ locationSubscriber.setSubscriber_latitude(randomLocate.latitude);
+ locationSubscriber.setRecv_time(value.getCommon_recv_time());
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java
new file mode 100644
index 0000000..8d9e4a7
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/operator/SecondAggregationReduce.java
@@ -0,0 +1,20 @@
+package com.zdjizhi.pre.location.operator;
+
+import com.zdjizhi.pre.location.common.LocationSubscriber;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:54
+ */
+public class SecondAggregationReduce implements ReduceFunction<LocationSubscriber> {
+
+ @Override
+ public LocationSubscriber reduce(LocationSubscriber value1, LocationSubscriber value2) throws Exception {
+ if (value1.getRecv_time() > value2.getRecv_time()) {
+ return value1;
+ }
+ return value2;
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java
new file mode 100644
index 0000000..fdd4192
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/utils/SubscriberUtils.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.pre.location.utils;
+
+import java.util.Random;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/27 16:29
+ */
+public class SubscriberUtils {
+
+ private static Locate[] locates = new Locate[]{Locate.L1, Locate.L2, Locate.L3, Locate.L4, Locate.L5, Locate.L6, Locate.L7, Locate.L8, Locate.L9, Locate.L10, Locate.L11, Locate.L12, Locate.L13, Locate.L14, Locate.L15, Locate.L16, Locate.L17, Locate.L18, Locate.L19, Locate.L20, Locate.L21, Locate.L22, Locate.L23, Locate.L24, Locate.L25};
+
+ public static Locate getRandomLocate() {
+ Random random = new Random();
+ int i = random.nextInt(25);
+ return locates[i];
+ }
+
+ public enum Locate {
+ L1(116.391721, 39.906094),
+ L2(116.386739, 39.906266),
+ L3(116.384863, 39.902834),
+ L4(116.387807, 39.900607),
+ L5(116.39328, 39.898911),
+ L6(116.381896, 39.906917),
+ L7(116.376717, 39.906147),
+ L8(116.376113, 39.909896),
+ L9(116.372452, 39.913832),
+ L10(116.375427, 39.920883),
+ L11(116.382980, 39.925003),
+ L12(116.38813, 39.926376),
+ L13(116.400375, 39.926834),
+ L14(116.399913, 39.932007),
+ L15(116.393221, 39.934137),
+ L16(116.38916, 39.935303),
+ L17(116.385799, 39.935684),
+ L18(116.384354, 39.935303),
+ L19(116.38298, 39.934273),
+ L20(116.37999, 39.93446),
+ L21(116.378914, 39.93676),
+ L22(116.37787, 39.942691),
+ L23(116.374113, 39.943599),
+ L24(116.370041, 39.944283),
+ L25(116.367037, 39.947116);
+
+
+ public Double longitude;
+ public Double latitude;
+
+ Locate(Double longitude, Double latitude) {
+ this.longitude = longitude;
+ this.latitude = latitude;
+ }
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index 18da277..a05c575 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -6,7 +6,7 @@ import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
import com.zdjizhi.base.utils.KafkaUtils;
import com.zdjizhi.etl.CnRecordEtl;
-import com.zdjizhi.pre.common.CommonConfig;
+import com.zdjizhi.pre.base.common.CommonConfig;
import com.zdjizhi.pre.relation.common.RelationMetricLog;
import com.zdjizhi.pre.relation.function.FirstAggregation;
import org.apache.flink.api.common.functions.ReduceFunction;
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
index c2f415a..114a579 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/function/FirstAggregation.java
@@ -1,8 +1,7 @@
package com.zdjizhi.pre.relation.function;
import com.zdjizhi.base.common.CnRecordLog;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.operator.AbstractFirstAggregation;
+import com.zdjizhi.pre.base.operator.AbstractFirstAggregation;
import com.zdjizhi.pre.relation.common.RelationMetricLog;
import java.util.ArrayList;
diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
index f8a6a5b..cff1b2b 100644
--- a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
+++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/DNSMetricTest.java
@@ -5,15 +5,12 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
-import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.CommonConfig;
-import com.zdjizhi.pre.common.MetricKeyConfig;
-
-
-import com.zdjizhi.pre.function.MetricIpProcessWindowFunc;
-import com.zdjizhi.pre.operator.FirstAggregation;
-import com.zdjizhi.pre.operator.SecondAggregationReduce;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.CommonConfig;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.function.MetricIpProcessWindowFunc;
+import com.zdjizhi.pre.base.operator.FirstAggregation;
+import com.zdjizhi.pre.base.operator.SecondAggregationReduce;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -30,8 +27,6 @@ import org.junit.Test;
import java.io.File;
import java.time.Duration;
-import static org.junit.Assert.assertEquals;
-
public class DNSMetricTest {
@ClassRule
diff --git a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java
index 2c69b6c..15d7a67 100644
--- a/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java
+++ b/module-CN-pre-metrics/src/test/java/com.zdjizhi.pre/PreMetricTest.java
@@ -5,16 +5,13 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
-import com.zdjizhi.base.utils.FlinkEnvironmentUtils;
-import com.zdjizhi.pre.common.CnMetricLog;
-import com.zdjizhi.pre.common.CommonConfig;
-import com.zdjizhi.pre.common.MetricKeyConfig;
-
-
-import com.zdjizhi.pre.common.MetricResultLog;
-import com.zdjizhi.pre.function.MetricIpProcessWindowFunc;
-import com.zdjizhi.pre.operator.FirstAggregation;
-import com.zdjizhi.pre.operator.SecondAggregationReduce;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.CommonConfig;
+import com.zdjizhi.pre.base.common.MetricKeyConfig;
+import com.zdjizhi.pre.base.common.MetricResultLog;
+import com.zdjizhi.pre.base.function.MetricIpProcessWindowFunc;
+import com.zdjizhi.pre.base.operator.FirstAggregation;
+import com.zdjizhi.pre.base.operator.SecondAggregationReduce;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index c6c8197..fca0668 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1,7 +1,9 @@
# session-record-cn??
-#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
+cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
# ???
-cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric
+cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric
+# location
+cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric
# ?????????
cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric
# dns???
diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties
index fc5577f..cca2a64 100644
--- a/platform-schedule/src/main/resources/common.properties
+++ b/platform-schedule/src/main/resources/common.properties
@@ -1,19 +1,21 @@
# ????
-stream.execution.job.name=ETL-METRIC
+stream.execution.job.name=CN-STREAM-PROCESSING-PLATFORM
# ?????
-stream.execution.environment.parallelism=1
+stream.execution.environment.parallelism=2
# kafka source???
-session.record.completed.parallelism=1
+session.record.completed.parallelism=2
# session-record-cn sink???
-cn.record.parallelism=1
+cn.record.parallelism=2
# ???sink???
-metric.output.parallelism=1
+metric.output.parallelism=2
# dns???sink???
-dns.metric.output.parallelism=1
+dns.metric.output.parallelism=2
# ????sink???
-metric.entity.relation.output.parallelism=1
+metric.entity.relation.output.parallelism=2
# ????sink???
-metric.dynamic.attribute.output.parallelism=1
+metric.dynamic.attribute.output.parallelism=2
+# location sink???
+location.metric.output.parallelism=2
# kafka???
kafka.input.bootstrap.servers=192.168.44.55:9092
session.record.completed.topic=SESSION-RECORD
@@ -32,10 +34,9 @@ cn.record.topic=SESSION-RECORD-CN
output.sasl.jaas.config.flag=0
# flink checkpoint?? 0:? 1:?
flink.enable.checkpoint.flag=1
-# nacos??
-nacos.server.addr=192.168.44.55:8848
# api detection url
rule.full.url=http://192.168.44.54:8090/v1/rule/detection
rule.inc.url=http://192.168.44.54:8090/v1/rule/detection/increase
+# ??host
gateway.host=192.168.44.55
-etl.enable=false \ No newline at end of file
+#etl.enable=false \ No newline at end of file