summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-04-07 13:50:32 +0800
committergujinkai <[email protected]>2024-04-07 13:50:32 +0800
commit0c0cc27ade94850b21568e70fb455b00c20d7c1d (patch)
tree46cc639c49a4de38024a8bd14cfa62d2ae07fcf2
parentfa70521afab4fcaf125eeeb895a33c3c23b75014 (diff)
feature: add tag agg metric
-rw-r--r--module-CN-pre-metrics/pom.xml6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java9
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java50
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java1
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java30
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java71
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java48
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java55
9 files changed, 274 insertions, 0 deletions
diff --git a/module-CN-pre-metrics/pom.xml b/module-CN-pre-metrics/pom.xml
index 45b93b9..c6cbe32 100644
--- a/module-CN-pre-metrics/pom.xml
+++ b/module-CN-pre-metrics/pom.xml
@@ -23,6 +23,12 @@
<artifactId>h3</artifactId>
<version>4.1.1</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+ <artifactId>datasketches-java</artifactId>
+ <version>5.0.1</version>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java
index 325591c..581cccc 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java
@@ -11,6 +11,7 @@ import com.zdjizhi.pre.base.common.MetricKeyConfig;
import com.zdjizhi.pre.base.function.*;
import com.zdjizhi.pre.base.operator.FirstAggregation;
import com.zdjizhi.pre.base.operator.SecondAggregationReduce;
+import com.zdjizhi.pre.base.operator.TagSecondAggregationReduce;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -186,6 +187,14 @@ public class CnPreMetric implements Schedule {
.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE)))
//.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC)))
.setParallelism(outputParallelism);
+
+ SingleOutputStreamOperator<String> tagMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.TAG)
+ .keyBy(CnMetricLog::getTag)
+ .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
+ .reduce(new TagSecondAggregationReduce(), new MetricTagProcessFunc());
+ tagMetric
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_TAG_TABLE)))
+ .setParallelism(outputParallelism);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java
index e89087c..31be6b9 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java
@@ -1,5 +1,8 @@
package com.zdjizhi.pre.base.common;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+
public class CnMetricLog {
private int metricKey;
@@ -37,6 +40,12 @@ public class CnMetricLog {
private String app_category;
private String app_subcategory;
private String app_company;
+
+ private String tag;
+ private UpdateSketch tag_server_ip_sketch;
+ private UpdateSketch tag_domain_sketch;
+ private Union tag_server_ip_union;
+ private Union tag_domain_union;
private long common_c2s_pkt_num;
private long common_c2s_byte_num;
private long common_s2c_pkt_num;
@@ -354,6 +363,47 @@ public class CnMetricLog {
public void setApp_company(String app_company) {
this.app_company = app_company;
}
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public UpdateSketch getTag_server_ip_sketch() {
+ return tag_server_ip_sketch;
+ }
+
+ public void setTag_server_ip_sketch(UpdateSketch tag_server_ip_sketch) {
+ this.tag_server_ip_sketch = tag_server_ip_sketch;
+ }
+
+ public UpdateSketch getTag_domain_sketch() {
+ return tag_domain_sketch;
+ }
+
+ public void setTag_domain_sketch(UpdateSketch tag_domain_sketch) {
+ this.tag_domain_sketch = tag_domain_sketch;
+ }
+
+ public Union getTag_server_ip_union() {
+ return tag_server_ip_union;
+ }
+
+ public void setTag_server_ip_union(Union tag_server_ip_union) {
+ this.tag_server_ip_union = tag_server_ip_union;
+ }
+
+ public Union getTag_domain_union() {
+ return tag_domain_union;
+ }
+
+ public void setTag_domain_union(Union tag_domain_union) {
+ this.tag_domain_union = tag_domain_union;
+ }
+
public long getCommon_c2s_pkt_num() {
return common_c2s_pkt_num;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
index 2f0b2b3..0208992 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java
@@ -93,6 +93,10 @@ public class CommonConfig {
.stringType()
.defaultValue("metric_link_local");
+ public static final ConfigOption<String> METRIC_TAG_TABLE = ConfigOptions.key("metric.tag.table")
+ .stringType()
+ .defaultValue("metric_tag_local");
+
public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_TABLE = ConfigOptions.key("metric.subscriber_app.table")
.stringType()
.defaultValue("metric_subscriber_app_local");
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java
index 0932d65..c47801f 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java
@@ -46,6 +46,7 @@ public class MetricKeyConfig {
public static final int DNS_RRCNAME = 21;
public static final int SUBSCRIBER_APP = 22;
+ public static final int TAG = 23;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java
index a0ac350..4b8aa02 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java
@@ -39,6 +39,12 @@ public class MetricResultLog {
@JSONField(name = "server_port")
private Integer common_server_port;
+ private String tag;
+
+ private String ip_sketch;
+
+ private String domain_sketch;
+
private Long stat_time;
@JSONField(name = "send_pkts")
@@ -319,6 +325,30 @@ public class MetricResultLog {
this.common_server_port = common_server_port;
}
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getIp_sketch() {
+ return ip_sketch;
+ }
+
+ public void setIp_sketch(String ip_sketch) {
+ this.ip_sketch = ip_sketch;
+ }
+
+ public String getDomain_sketch() {
+ return domain_sketch;
+ }
+
+ public void setDomain_sketch(String domain_sketch) {
+ this.domain_sketch = domain_sketch;
+ }
+
public Long getStat_time() {
return stat_time;
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java
new file mode 100644
index 0000000..652deb5
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java
@@ -0,0 +1,71 @@
+package com.zdjizhi.pre.base.function;
+
+import com.alibaba.fastjson2.JSON;
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import com.zdjizhi.pre.base.common.MetricResultLog;
+import org.apache.datasketches.theta.Union;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 11:42
+ */
+public class MetricTagProcessFunc extends ProcessWindowFunction<CnMetricLog, String, String, TimeWindow> {
+ @Override
+ public void process(String s, ProcessWindowFunction<CnMetricLog, String, String, TimeWindow>.Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception {
+ CnMetricLog next = elements.iterator().next();
+ MetricResultLog metricResultLog = new MetricResultLog();
+ metricResultLog.setTag(s);
+ Union tagServerIpUnion = next.getTag_server_ip_union();
+ byte[] ipSketchBytes = tagServerIpUnion.getResult().toByteArray();
+ metricResultLog.setIp_sketch(Base64.getEncoder().encodeToString(addHeader(ipSketchBytes)));
+ Union tagDomainUnion = next.getTag_domain_union();
+ byte[] domainSketchBytes = tagDomainUnion.getResult().toByteArray();
+ metricResultLog.setDomain_sketch(Base64.getEncoder().encodeToString(addHeader(domainSketchBytes)));
+ metricResultLog.setStat_time(context.window().getStart() / 1000);
+ String metricResultJsonStr = JSON.toJSONString(metricResultLog);
+ out.collect(metricResultJsonStr);
+ }
+
+ public static byte[] getHeaderByteArray(long l) {
+ List<Byte> byteList = new ArrayList<>();
+
+ while (l > 0x7F) {
+ byte b = (byte) ((l & 0x7F) | 0x80);
+ byteList.add(b);
+ l >>>= 7;
+ }
+ byteList.add((byte) l);
+ byte[] bytes = new byte[byteList.size()];
+ for (int i = 0; i < byteList.size(); i++) {
+ bytes[i] = byteList.get(i);
+ }
+ return bytes;
+ }
+
+ public static byte[] mergeByteArray(byte[]... bytes) {
+ int length = 0;
+ for (byte[] aByte : bytes) {
+ length += aByte.length;
+ }
+ byte[] result = new byte[length];
+ int index = 0;
+ for (byte[] aByte : bytes) {
+ System.arraycopy(aByte, 0, result, index, aByte.length);
+ index += aByte.length;
+ }
+ return result;
+ }
+
+ public static byte[] addHeader(byte[] bytes) {
+ byte[] header = getHeaderByteArray(bytes.length);
+ return mergeByteArray(header, bytes);
+ }
+}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java
index d6ba3ee..bb37b92 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java
@@ -4,10 +4,12 @@ import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.pre.base.common.CnMetricLog;
import com.zdjizhi.pre.base.common.MetricKeyConfig;
import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.theta.UpdateSketch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
@@ -29,6 +31,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
private final String protocolMetricKey = "protocolMetric";
private final String linkMetricKey = "linkMetric";
private final String subscriberAppKey = "subscriberAppMetric";
+ private final String tagKey = "tagMetric";
@Override
public Map<String, Map<String, CnMetricLog>> createAccumulator() {
@@ -48,6 +51,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
accumulator.put(protocolMetricKey, new HashMap<>());
accumulator.put(linkMetricKey, new HashMap<>());
accumulator.put(subscriberAppKey, new HashMap<>());
+ accumulator.put(tagKey, new HashMap<>());
return accumulator;
}
@@ -286,6 +290,50 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
} catch (Exception e) {
logger.error("pre metric first agg add failed: " + e.getMessage(), e);
}
+
+ List<String> serverIpTags = value.getServer_ip_tags();
+ List<String> domainTags = value.getDomain_tags();
+ Map<String, CnMetricLog> tagMetricMap = accumulator.get(tagKey);
+ for (String tag : serverIpTags) {
+ CnMetricLog cnMetricLog = tagMetricMap.get(tag);
+ if (cnMetricLog != null) {
+ if (cnMetricLog.getTag_server_ip_sketch() != null) {
+ UpdateSketch tag_server_ip_sketch = cnMetricLog.getTag_server_ip_sketch();
+ tag_server_ip_sketch.update(value.getCommon_server_ip());
+ } else {
+ UpdateSketch sketch = UpdateSketch.builder().build();
+ sketch.update(value.getCommon_server_ip());
+ cnMetricLog.setTag_server_ip_sketch(sketch);
+ }
+ } else {
+ cnMetricLog = new CnMetricLog(MetricKeyConfig.TAG);
+ cnMetricLog.setTag(tag);
+ UpdateSketch sketch = UpdateSketch.builder().build();
+ sketch.update(value.getCommon_server_ip());
+ cnMetricLog.setTag_server_ip_sketch(sketch);
+ }
+ tagMetricMap.put(tag, cnMetricLog);
+ }
+ for (String tag : domainTags) {
+ CnMetricLog cnMetricLog = tagMetricMap.get(tag);
+ if (cnMetricLog != null) {
+ if (cnMetricLog.getTag_domain_sketch() != null) {
+ UpdateSketch tag_domain_sketch = cnMetricLog.getTag_domain_sketch();
+ tag_domain_sketch.update(value.getDomain());
+ } else {
+ UpdateSketch sketch = UpdateSketch.builder().build();
+ sketch.update(value.getDomain());
+ cnMetricLog.setTag_domain_sketch(sketch);
+ }
+ } else {
+ cnMetricLog = new CnMetricLog(MetricKeyConfig.TAG);
+ cnMetricLog.setTag(tag);
+ UpdateSketch sketch = UpdateSketch.builder().build();
+ sketch.update(value.getDomain());
+ cnMetricLog.setTag_domain_sketch(sketch);
+ }
+ tagMetricMap.put(tag, cnMetricLog);
+ }
}
private void initMetric(CnRecordLog cnRecordLog, CnMetricLog cnMetricLog) {
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java
new file mode 100644
index 0000000..8ee5a14
--- /dev/null
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java
@@ -0,0 +1,55 @@
+package com.zdjizhi.pre.base.operator;
+
+import com.zdjizhi.pre.base.common.CnMetricLog;
+import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Union;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 10:53
+ */
+public class TagSecondAggregationReduce implements ReduceFunction<CnMetricLog> {
+ @Override
+ public CnMetricLog reduce(CnMetricLog value1, CnMetricLog value2) throws Exception {
+ if (value1.getTag_server_ip_union() != null) {
+ Union union = value1.getTag_server_ip_union();
+ if (value2.getTag_server_ip_sketch() != null) {
+ union.union(value2.getTag_server_ip_sketch());
+ } else {
+ union.union(value2.getTag_server_ip_union().getResult());
+ }
+ } else {
+ Union union = SetOperation.builder().buildUnion();
+ union.union(value1.getTag_server_ip_sketch());
+ if (value2.getTag_server_ip_sketch() != null) {
+ union.union(value2.getTag_server_ip_sketch());
+ } else {
+ union.union(value2.getTag_server_ip_union().getResult());
+ }
+ value1.setTag_server_ip_union(union);
+ value1.setTag_server_ip_sketch(null);
+ }
+
+ if (value1.getTag_domain_union() != null) {
+ Union union = value1.getTag_domain_union();
+ if (value2.getTag_domain_sketch() != null) {
+ union.union(value2.getTag_domain_sketch());
+ } else {
+ union.union(value2.getTag_domain_union().getResult());
+ }
+ } else {
+ Union union = SetOperation.builder().buildUnion();
+ union.union(value1.getTag_domain_sketch());
+ if (value2.getTag_domain_sketch() != null) {
+ union.union(value2.getTag_domain_sketch());
+ } else {
+ union.union(value2.getTag_domain_union().getResult());
+ }
+ value1.setTag_domain_union(union);
+ value1.setTag_domain_sketch(null);
+ }
+ return value1;
+ }
+}