diff options
| author | gujinkai <[email protected]> | 2024-04-07 13:50:32 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-04-07 13:50:32 +0800 |
| commit | 0c0cc27ade94850b21568e70fb455b00c20d7c1d (patch) | |
| tree | 46cc639c49a4de38024a8bd14cfa62d2ae07fcf2 | |
| parent | fa70521afab4fcaf125eeeb895a33c3c23b75014 (diff) | |
feature: add tag agg metric
9 files changed, 274 insertions, 0 deletions
diff --git a/module-CN-pre-metrics/pom.xml b/module-CN-pre-metrics/pom.xml index 45b93b9..c6cbe32 100644 --- a/module-CN-pre-metrics/pom.xml +++ b/module-CN-pre-metrics/pom.xml @@ -23,6 +23,12 @@ <artifactId>h3</artifactId> <version>4.1.1</version> </dependency> + + <dependency> + <groupId>org.apache.datasketches</groupId> + <artifactId>datasketches-java</artifactId> + <version>5.0.1</version> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java index 325591c..581cccc 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/CnPreMetric.java @@ -11,6 +11,7 @@ import com.zdjizhi.pre.base.common.MetricKeyConfig; import com.zdjizhi.pre.base.function.*; import com.zdjizhi.pre.base.operator.FirstAggregation; import com.zdjizhi.pre.base.operator.SecondAggregationReduce; +import com.zdjizhi.pre.base.operator.TagSecondAggregationReduce; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple2; @@ -186,6 +187,14 @@ public class CnPreMetric implements Schedule { .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE))) //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC))) .setParallelism(outputParallelism); + + SingleOutputStreamOperator<String> tagMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.TAG) + .keyBy(CnMetricLog::getTag) + .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) + .reduce(new TagSecondAggregationReduce(), new MetricTagProcessFunc()); + tagMetric + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_TAG_TABLE))) + .setParallelism(outputParallelism); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java index e89087c..31be6b9 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CnMetricLog.java @@ -1,5 +1,8 @@ package com.zdjizhi.pre.base.common; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; + public class CnMetricLog { private int metricKey; @@ -37,6 +40,12 @@ public class CnMetricLog { private String app_category; private String app_subcategory; private String app_company; + + private String tag; + private UpdateSketch tag_server_ip_sketch; + private UpdateSketch tag_domain_sketch; + private Union tag_server_ip_union; + private Union tag_domain_union; private long common_c2s_pkt_num; private long common_c2s_byte_num; private long common_s2c_pkt_num; @@ -354,6 +363,47 @@ public class CnMetricLog { public void setApp_company(String app_company) { this.app_company = app_company; } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public UpdateSketch getTag_server_ip_sketch() { + return tag_server_ip_sketch; + } + + public void setTag_server_ip_sketch(UpdateSketch tag_server_ip_sketch) { + this.tag_server_ip_sketch = tag_server_ip_sketch; + } + + public UpdateSketch getTag_domain_sketch() { + return tag_domain_sketch; + } + + public void setTag_domain_sketch(UpdateSketch tag_domain_sketch) { + this.tag_domain_sketch = tag_domain_sketch; + } + + public Union getTag_server_ip_union() { + return tag_server_ip_union; + } + + public void setTag_server_ip_union(Union tag_server_ip_union) { + this.tag_server_ip_union = tag_server_ip_union; + } + + public Union getTag_domain_union() { + return tag_domain_union; + } + + public void setTag_domain_union(Union tag_domain_union) { + this.tag_domain_union = tag_domain_union; + } + public long getCommon_c2s_pkt_num() { return common_c2s_pkt_num; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java index 2f0b2b3..0208992 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/CommonConfig.java @@ -93,6 +93,10 @@ public class CommonConfig { .stringType() .defaultValue("metric_link_local"); + public static final ConfigOption<String> METRIC_TAG_TABLE = ConfigOptions.key("metric.tag.table") + .stringType() + .defaultValue("metric_tag_local"); + public static final ConfigOption<String> METRIC_SUBSCRIBER_APP_TABLE = ConfigOptions.key("metric.subscriber_app.table") .stringType() .defaultValue("metric_subscriber_app_local"); diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java index 0932d65..c47801f 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricKeyConfig.java @@ -46,6 +46,7 @@ public class MetricKeyConfig { public static final int DNS_RRCNAME = 21; public static final int SUBSCRIBER_APP = 22; + public static final int TAG = 23; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java index a0ac350..4b8aa02 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/common/MetricResultLog.java @@ -39,6 +39,12 @@ public class MetricResultLog { @JSONField(name = "server_port") private Integer common_server_port; + private String tag; + + private String ip_sketch; + + private String domain_sketch; + private Long stat_time; @JSONField(name = "send_pkts") @@ -319,6 +325,30 @@ public class MetricResultLog { this.common_server_port = common_server_port; } + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getIp_sketch() { + return ip_sketch; + } + + public void setIp_sketch(String ip_sketch) { + this.ip_sketch = ip_sketch; + } + + public String getDomain_sketch() { + return domain_sketch; + } + + public void setDomain_sketch(String domain_sketch) { + this.domain_sketch = domain_sketch; + } + public Long getStat_time() { return stat_time; } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java new file mode 100644 index 0000000..652deb5 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/function/MetricTagProcessFunc.java @@ -0,0 +1,71 @@ +package com.zdjizhi.pre.base.function; + +import com.alibaba.fastjson2.JSON; +import com.zdjizhi.pre.base.common.CnMetricLog; +import com.zdjizhi.pre.base.common.MetricResultLog; +import org.apache.datasketches.theta.Union; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 11:42 + */ +public class MetricTagProcessFunc extends ProcessWindowFunction<CnMetricLog, String, String, TimeWindow> { + @Override + public void process(String s, ProcessWindowFunction<CnMetricLog, String, String, TimeWindow>.Context context, Iterable<CnMetricLog> elements, Collector<String> out) throws Exception { + CnMetricLog next = elements.iterator().next(); + MetricResultLog metricResultLog = new MetricResultLog(); + metricResultLog.setTag(s); + Union tagServerIpUnion = next.getTag_server_ip_union(); + byte[] ipSketchBytes = tagServerIpUnion.getResult().toByteArray(); + metricResultLog.setIp_sketch(Base64.getEncoder().encodeToString(addHeader(ipSketchBytes))); + Union tagDomainUnion = next.getTag_domain_union(); + byte[] domainSketchBytes = tagDomainUnion.getResult().toByteArray(); + metricResultLog.setDomain_sketch(Base64.getEncoder().encodeToString(addHeader(domainSketchBytes))); + metricResultLog.setStat_time(context.window().getStart() / 1000); + String metricResultJsonStr = JSON.toJSONString(metricResultLog); + out.collect(metricResultJsonStr); + } + + public static byte[] getHeaderByteArray(long l) { + List<Byte> byteList = new ArrayList<>(); + + while (l > 0x7F) { + byte b = (byte) ((l & 0x7F) | 0x80); + byteList.add(b); + l >>>= 7; + } + byteList.add((byte) l); + byte[] bytes = new byte[byteList.size()]; + for (int i = 0; i < byteList.size(); i++) { + bytes[i] = byteList.get(i); + } + return bytes; + } + + public static byte[] mergeByteArray(byte[]... bytes) { + int length = 0; + for (byte[] aByte : bytes) { + length += aByte.length; + } + byte[] result = new byte[length]; + int index = 0; + for (byte[] aByte : bytes) { + System.arraycopy(aByte, 0, result, index, aByte.length); + index += aByte.length; + } + return result; + } + + public static byte[] addHeader(byte[] bytes) { + byte[] header = getHeaderByteArray(bytes.length); + return mergeByteArray(header, bytes); + } +} diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java index d6ba3ee..bb37b92 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/FirstAggregation.java @@ -4,10 +4,12 @@ import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.pre.base.common.CnMetricLog; import com.zdjizhi.pre.base.common.MetricKeyConfig; import org.apache.commons.lang3.StringUtils; +import org.apache.datasketches.theta.UpdateSketch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { @@ -29,6 +31,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { private final String protocolMetricKey = "protocolMetric"; private final String linkMetricKey = "linkMetric"; private final String subscriberAppKey = "subscriberAppMetric"; + private final String tagKey = "tagMetric"; @Override public Map<String, Map<String, CnMetricLog>> createAccumulator() { @@ -48,6 +51,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { accumulator.put(protocolMetricKey, new HashMap<>()); accumulator.put(linkMetricKey, new HashMap<>()); accumulator.put(subscriberAppKey, new HashMap<>()); + accumulator.put(tagKey, new HashMap<>()); return accumulator; } @@ -286,6 +290,50 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { } catch (Exception e) { logger.error("pre metric first agg add failed: " + e.getMessage(), e); } + + List<String> serverIpTags = value.getServer_ip_tags(); + List<String> domainTags = value.getDomain_tags(); + Map<String, CnMetricLog> tagMetricMap = accumulator.get(tagKey); + for (String tag : serverIpTags) { + CnMetricLog cnMetricLog = tagMetricMap.get(tag); + if (cnMetricLog != null) { + if (cnMetricLog.getTag_server_ip_sketch() != null) { + UpdateSketch tag_server_ip_sketch = cnMetricLog.getTag_server_ip_sketch(); + tag_server_ip_sketch.update(value.getCommon_server_ip()); + } else { + UpdateSketch sketch = UpdateSketch.builder().build(); + sketch.update(value.getCommon_server_ip()); + cnMetricLog.setTag_server_ip_sketch(sketch); + } + } else { + cnMetricLog = new CnMetricLog(MetricKeyConfig.TAG); + cnMetricLog.setTag(tag); + UpdateSketch sketch = UpdateSketch.builder().build(); + sketch.update(value.getCommon_server_ip()); + cnMetricLog.setTag_server_ip_sketch(sketch); + } + tagMetricMap.put(tag, cnMetricLog); + } + for (String tag : domainTags) { + CnMetricLog cnMetricLog = tagMetricMap.get(tag); + if (cnMetricLog != null) { + if (cnMetricLog.getTag_domain_sketch() != null) { + UpdateSketch tag_domain_sketch = cnMetricLog.getTag_domain_sketch(); + tag_domain_sketch.update(value.getDomain()); + } else { + UpdateSketch sketch = UpdateSketch.builder().build(); + sketch.update(value.getDomain()); + cnMetricLog.setTag_domain_sketch(sketch); + } + } else { + cnMetricLog = new CnMetricLog(MetricKeyConfig.TAG); + cnMetricLog.setTag(tag); + UpdateSketch sketch = UpdateSketch.builder().build(); + sketch.update(value.getDomain()); + cnMetricLog.setTag_domain_sketch(sketch); + } + tagMetricMap.put(tag, cnMetricLog); + } } private void initMetric(CnRecordLog cnRecordLog, CnMetricLog cnMetricLog) { diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java new file mode 100644 index 0000000..8ee5a14 --- /dev/null +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/TagSecondAggregationReduce.java @@ -0,0 +1,55 @@ +package com.zdjizhi.pre.base.operator; + +import com.zdjizhi.pre.base.common.CnMetricLog; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Union; +import org.apache.flink.api.common.functions.ReduceFunction; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 10:53 + */ +public class TagSecondAggregationReduce implements ReduceFunction<CnMetricLog> { + @Override + public CnMetricLog reduce(CnMetricLog value1, CnMetricLog value2) throws Exception { + if (value1.getTag_server_ip_union() != null) { + Union union = value1.getTag_server_ip_union(); + if (value2.getTag_server_ip_sketch() != null) { + union.union(value2.getTag_server_ip_sketch()); + } else { + union.union(value2.getTag_server_ip_union().getResult()); + } + } else { + Union union = SetOperation.builder().buildUnion(); + union.union(value1.getTag_server_ip_sketch()); + if (value2.getTag_server_ip_sketch() != null) { + union.union(value2.getTag_server_ip_sketch()); + } else { + union.union(value2.getTag_server_ip_union().getResult()); + } + value1.setTag_server_ip_union(union); + value1.setTag_server_ip_sketch(null); + } + + if (value1.getTag_domain_union() != null) { + Union union = value1.getTag_domain_union(); + if (value2.getTag_domain_sketch() != null) { + union.union(value2.getTag_domain_sketch()); + } else { + union.union(value2.getTag_domain_union().getResult()); + } + } else { + Union union = SetOperation.builder().buildUnion(); + union.union(value1.getTag_domain_sketch()); + if (value2.getTag_domain_sketch() != null) { + union.union(value2.getTag_domain_sketch()); + } else { + union.union(value2.getTag_domain_union().getResult()); + } + value1.setTag_domain_union(union); + value1.setTag_domain_sketch(null); + } + return value1; + } +} |
