From cbdd7f96b4e342c9e8b296e231ea655fca139f5d Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 15 Aug 2024 15:28:24 +0800 Subject: [feature][core]添加sketche聚合函数和sketche mock类型 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/udf.plugins | 7 +- groot-common/src/main/resources/udf.plugins | 7 +- groot-connectors/connector-kafka/pom.xml | 11 +++ .../connectors/mock/faker/FakerUtils.java | 18 ++++ .../connectors/mock/faker/HdrHistogramFaker.java | 35 +++++++ .../connectors/mock/faker/HlldFaker.java | 39 ++++++++ groot-core/pom.xml | 5 + .../core/udf/udaf/HdrHistogram/HdrHistogram.java | 42 +++++++++ .../HdrHistogram/HdrHistogramBaseAggregate.java | 101 ++++++++++++++++++++ .../udaf/HdrHistogram/HdrHistogramQuantile.java | 36 ++++++++ .../udaf/HdrHistogram/HdrHistogramQuantiles.java | 51 +++++++++++ .../geedgenetworks/core/udf/udaf/hlld/Hlld.java | 40 ++++++++ .../udf/udaf/hlld/HlldApproxCountDistinct.java | 25 +++++ .../core/udf/udaf/hlld/HlldBaseAggregate.java | 98 ++++++++++++++++++++ .../HdrHistogram/HdrHistogramQuantileTest.java | 89 ++++++++++++++++++ .../HdrHistogram/HdrHistogramQuantilesTest.java | 98 ++++++++++++++++++++ .../udf/udaf/HdrHistogram/HdrHistogramTest.java | 102 +++++++++++++++++++++ .../udf/udaf/hlld/HlldApproxCountDistinctTest.java | 87 ++++++++++++++++++ .../core/udf/udaf/hlld/HlldTest.java | 86 +++++++++++++++++ pom.xml | 5 + 20 files changed, 980 insertions(+), 2 deletions(-) create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java diff --git a/config/udf.plugins b/config/udf.plugins index 31d1b21..772d2bc 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -23,4 +23,9 @@ com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udaf.hlld.Hlld +com.geedgenetworks.core.udf.udaf.hlld.HlldApproxCountDistinct +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles \ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 18446c9..0062c1a 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -22,4 +22,9 @@ com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udaf.hlld.Hlld +com.geedgenetworks.core.udf.udaf.hlld.HlldApproxCountDistinct +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles \ No newline at end of file diff --git a/groot-connectors/connector-kafka/pom.xml b/groot-connectors/connector-kafka/pom.xml index 7ec7d86..448383b 100644 --- a/groot-connectors/connector-kafka/pom.xml +++ b/groot-connectors/connector-kafka/pom.xml @@ -15,6 +15,17 @@ org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} + + + org.xerial.snappy + snappy-java + + + + + org.xerial.snappy + snappy-java + 1.1.8.3 diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java index 09cc8f8..5101fa1 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java @@ -46,6 +46,10 @@ public class FakerUtils { return wrapFaker(parseIPv4Faker(obj), obj); } else if ("Expression".equalsIgnoreCase(type)) { return wrapFaker(parseExpressionFaker(obj), obj); + } else if ("Hlld".equalsIgnoreCase(type)) { + return wrapFaker(parseHlldFaker(obj), obj); + } else if ("HdrHistogram".equalsIgnoreCase(type)) { + return wrapFaker(parseHdrHistogramFaker(obj), obj); } else if ("Object".equalsIgnoreCase(type)) { return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj); } else if ("Union".equalsIgnoreCase(type)) { @@ -109,6 +113,20 @@ public class FakerUtils { return new ExpressionFaker(expression); } + private static Faker parseHlldFaker(JSONObject obj) { + long itemCount = obj.getLongValue("itemCount", 1000000L); + int batchCount = obj.getIntValue("batchCount", 10000); + int precision = obj.getIntValue("precision", 12); + return new HlldFaker(itemCount, batchCount, precision); + } + + private static Faker parseHdrHistogramFaker(JSONObject obj) { + int max = obj.getIntValue("max", 100000); + int batchCount = obj.getIntValue("batchCount", 1000); + int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1); + return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits); + } + private static Faker parseIPv4Faker(JSONObject obj) { String start = obj.getString("start"); String end = obj.getString("end"); diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java new file mode 100644 index 0000000..393bf8e --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java @@ -0,0 +1,35 @@ +package com.geedgenetworks.connectors.mock.faker; + +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.HistogramSketch; + +import java.util.concurrent.ThreadLocalRandom; + +public class HdrHistogramFaker extends Faker { + private final int max; + private final int batchCount; + private final int numberOfSignificantValueDigits; + private HistogramSketch his; + + public HdrHistogramFaker(int max, int batchCount, int numberOfSignificantValueDigits) { + this.max = max; + this.batchCount = batchCount; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + his = new HistogramSketch(1L, max, numberOfSignificantValueDigits, false); + } + + @Override + public Object geneValue() throws Exception { + his.reset(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < batchCount; i++) { + his.recordValue(random.nextInt(max)); + } + return StringUtils.encodeBase64String(his.toBytes()); + } + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java new file mode 100644 index 0000000..5af2b35 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java @@ -0,0 +1,39 @@ +package com.geedgenetworks.connectors.mock.faker; + +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; + +import java.util.concurrent.ThreadLocalRandom; + +public class HlldFaker extends Faker { + private final long itemCount; + private final int batchCount; + private final int precision; + private Hll hll; + + public HlldFaker(long itemCount, int batchCount, int precision) { + this.itemCount = itemCount; + this.batchCount = batchCount; + this.precision = precision; + } + + public HlldFaker(long itemCount, int batchCount) { + this(itemCount, batchCount, 12); + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + hll = new Hll(precision); + } + + @Override + public Object geneValue() throws Exception { + hll.reset(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < batchCount; i++) { + hll.add(random.nextLong(itemCount)); + } + return StringUtils.encodeBase64String(hll.toBytes()); + } + +} diff --git a/groot-core/pom.xml b/groot-core/pom.xml index 18ae33b..e723fa5 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -65,6 +65,11 @@ provided + + com.geedgenetworks + sketches + + com.alibaba.nacos nacos-client diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java new file mode 100644 index 0000000..368e8c1 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.Histogramer; + +import java.util.Map; + +public class HdrHistogram extends HdrHistogramBaseAggregate { + boolean outputBase64; + + @Override + public void open(UDFContext c) { + super.open(c); + Map params = c.getParameters(); + outputBase64 = "base64".equalsIgnoreCase(params.getOrDefault("output_format", "base64").toString()); + } + + @Override + public Accumulator getResult(Accumulator acc) { + Object agg = acc.getMetricsFields().get(outputField); + if (agg == null) { + return acc; + } + + byte[] bytes = ((Histogramer) agg).toBytes(); + if (outputBase64) { + acc.getMetricsFields().put(outputField, StringUtils.encodeBase64String(bytes)); + } else { + acc.getMetricsFields().put(outputField, bytes); + } + + return acc; + } + + @Override + public String functionName() { + return "HDR_HISTOGRAM"; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java new file mode 100644 index 0000000..1648fa5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -0,0 +1,101 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.HdrHistogram.DirectMapHistogram; +import org.HdrHistogram.Histogramer; +import org.apache.commons.collections.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public abstract class HdrHistogramBaseAggregate implements AggregateFunction { + protected String inputField; + protected String outputField; + protected boolean inputSketch; + protected long lowestDiscernibleValue; + protected long highestTrackableValue; + protected int numberOfSignificantValueDigits; + protected boolean autoResize; + + @Override + public void open(UDFContext c) { + inputField = c.getLookup_fields().get(0); + if (CollectionUtils.isNotEmpty(c.getOutput_fields())) { + outputField = c.getOutput_fields().get(0); + } else { + outputField = inputField; + } + Map params = c.getParameters(); + lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString()); + highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString()); + numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString()); + autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString()); + inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object value = event.getExtractedFields().get(inputField); + if (value == null) { + return acc; + } + + if (inputSketch) { + updateHdrMerge(acc, value); + } else { + updateHdr(acc, value); + } + + return acc; + } + + protected void updateHdr(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); + if (his == null) { + his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + his.setAutoResize(autoResize); + aggs.put(outputField, his); + } + + his.recordValue(((Number) value).longValue()); + } + + + protected void updateHdrMerge(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); + if (his == null) { + his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + his.setAutoResize(autoResize); + aggs.put(outputField, his); + } + + Histogramer h; + if (value instanceof String) { + byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8)); + h = DirectMapHistogram.wrapBytes(bytes); + } else if (value instanceof byte[]) { + h = DirectMapHistogram.wrapBytes((byte[]) value); + } else if (value instanceof Histogramer) { + h = (Histogramer) value; + } else { + throw new IllegalArgumentException("Unsupported type " + value.getClass()); + } + + his.merge(h); + } + + @Override + public void close() {} +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java new file mode 100644 index 0000000..b9f7d5b --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.udf.UDFContext; +import org.HdrHistogram.Histogramer; + +import java.util.Map; + +public class HdrHistogramQuantile extends HdrHistogramBaseAggregate { + Double probability; + + @Override + public void open(UDFContext c) { + super.open(c); + Map params = c.getParameters(); + probability = Double.parseDouble(params.getOrDefault("probability", "0.5").toString()); + } + + @Override + public Accumulator getResult(Accumulator acc) { + Object agg = acc.getMetricsFields().get(outputField); + if (agg == null) { + return acc; + } + + long percentile = ((Histogramer) agg).getValueAtPercentile(probability * 100); + acc.getMetricsFields().put(outputField, percentile); + return acc; + } + + @Override + public String functionName() { + return "APPROX_QUANTILE_HDR"; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java new file mode 100644 index 0000000..ccfffd3 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java @@ -0,0 +1,51 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.udf.UDFContext; +import org.HdrHistogram.Histogramer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class HdrHistogramQuantiles extends HdrHistogramBaseAggregate { + double[] probabilities; + + @Override + public void open(UDFContext c) { + super.open(c); + Map params = c.getParameters(); + Object ps = params.get("probabilities"); + if(ps == null){ + throw new IllegalArgumentException("probabilities param is requested"); + } + List floats = JSON.parseArray(ps instanceof String ? ps.toString(): JSON.toJSONString(ps), Double.class); + probabilities = new double[floats.size()]; + for (int i = 0; i < floats.size(); i++) { + probabilities[i] = floats.get(i); + } + } + + @Override + public Accumulator getResult(Accumulator acc) { + Object agg = acc.getMetricsFields().get(outputField); + if (agg == null) { + return acc; + } + + Histogramer his = ((Histogramer) agg); + final List counts = new ArrayList<>(probabilities.length); + for (int i = 0; i < probabilities.length; i++) { + counts.add(his.getValueAtPercentile(probabilities[i] * 100)); + } + acc.getMetricsFields().put(outputField, counts); + return acc; + } + + @Override + public String functionName() { + return "APPROX_QUANTILES_HDR"; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java new file mode 100644 index 0000000..e373a7a --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java @@ -0,0 +1,40 @@ +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; + +import java.util.Map; + +public class Hlld extends HlldBaseAggregate { + boolean outputBase64; + + @Override + public void open(UDFContext c) { + super.open(c); + Map params = c.getParameters(); + outputBase64 = "base64".equalsIgnoreCase(params.getOrDefault("output_format", "base64").toString()); + } + + @Override + public Accumulator getResult(Accumulator acc) { + Hll hll = getResultHll(acc); + if (hll == null) { + return acc; + } + + if (outputBase64) { + acc.getMetricsFields().put(outputField, StringUtils.encodeBase64String(hll.toBytes())); + } else { + acc.getMetricsFields().put(outputField, hll.toBytes()); + } + + return acc; + } + + @Override + public String functionName() { + return "HLLD"; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java new file mode 100644 index 0000000..ec003f8 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.sketch.hlld.Hll; + +public class HlldApproxCountDistinct extends HlldBaseAggregate { + + @Override + public Accumulator getResult(Accumulator acc) { + Hll hll = getResultHll(acc); + if (hll == null) { + return acc; + } + + acc.getMetricsFields().put(outputField, (long)hll.size()); + + return acc; + } + + @Override + public String functionName() { + return "APPROX_COUNT_DISTINCT_HLLD"; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java new file mode 100644 index 0000000..71d61dc --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -0,0 +1,98 @@ +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.hlld.HllUnion; +import com.geedgenetworks.sketch.hlld.HllUtils; +import org.apache.commons.collections.CollectionUtils; + +import java.util.Map; + +public abstract class HlldBaseAggregate implements AggregateFunction { + protected String inputField; + protected String outputField; + protected boolean inputSketch; + protected int precision = 12; + + @Override + public void open(UDFContext c) { + inputField = c.getLookup_fields().get(0); + if (CollectionUtils.isNotEmpty(c.getOutput_fields())) { + outputField = c.getOutput_fields().get(0); + } else { + outputField = inputField; + } + Map params = c.getParameters(); + precision = Integer.parseInt(params.getOrDefault("precision", "12").toString()); + inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object value = event.getExtractedFields().get(inputField); + if (value == null) { + return acc; + } + + if (inputSketch) { + updateHllUnion(acc, value); + } else { + updateHll(acc, value); + } + + return acc; + } + + protected Hll getResultHll(Accumulator acc){ + Object agg = acc.getMetricsFields().get(outputField); + if (agg == null) { + return null; + } + + return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg; + } + + protected void updateHll(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + Hll hll = (Hll) aggs.get(outputField); + if (hll == null) { + hll = new Hll(precision); + aggs.put(outputField, hll); + } + + if (value instanceof Integer || value instanceof Long) { + hll.add(((Number) value).longValue()); + } else if (value instanceof Float || value instanceof Double) { + hll.add(((Number) value).doubleValue()); + } else if (value instanceof String) { + hll.add((String) value); + } else if (value instanceof byte[]) { + hll.add((byte[]) value); + } else { + throw new IllegalArgumentException("Unsupported type " + value.getClass()); + } + } + + protected void updateHllUnion(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + HllUnion hllUnion = (HllUnion) aggs.get(outputField); + if (hllUnion == null) { + hllUnion = new HllUnion(precision); + aggs.put(outputField, hllUnion); + } + + Hll hll = HllUtils.deserializeHll(value); + hllUnion.update(hll); + } + + @Override + public void close() {} +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java new file mode 100644 index 0000000..33f7bad --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java @@ -0,0 +1,89 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class HdrHistogramQuantileTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double probability = 0.5; + initData( "regular", 2, probability); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long expect = (long) (count * probability); + long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); + double error = Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + double probability = 0.5; + initData( "sketch", 2, probability); + long count = 100000; + Map fields = event.getExtractedFields(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + long expect = (long) (count * probability); + long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); + double error = Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(String input_type, int numberOfSignificantValueDigits, double probability){ + agg = new HdrHistogramQuantile(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("probability", probability); + c.setParameters(parameters); + c.setLookup_fields(Collections.singletonList("ms")); + c.setOutput_fields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java new file mode 100644 index 0000000..4eefd9a --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java @@ -0,0 +1,98 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HdrHistogramQuantilesTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "regular", 2, probabilities); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); + for (int i = 0; i < expects.length; i++) { + long rst = rsts.get(i); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + @Test + public void inputSketch() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "sketch", 2, probabilities); + long count = 100000; + Map fields = event.getExtractedFields(); + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); + for (int i = 0; i < expects.length; i++) { + long rst = rsts.get(i); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){ + agg = new HdrHistogramQuantiles(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("probabilities", probabilities); + c.setParameters(parameters); + c.setLookup_fields(Collections.singletonList("ms")); + c.setOutput_fields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java new file mode 100644 index 0000000..f177ca5 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java @@ -0,0 +1,102 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HdrHistogramTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "regular", 2, "base64"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his"); + ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8))); + + for (int i = 0; i < expects.length; i++) { + long rst = his.getValueAtPercentile(probabilities[i] * 100); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + @Test + public void inputSketch() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "sketch", 2, "binary"); + long count = 100000; + Map fields = event.getExtractedFields(); + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his"); + ArrayHistogram h = ArrayHistogram.fromBytes(bytes); + + for (int i = 0; i < expects.length; i++) { + long rst = h.getValueAtPercentile(probabilities[i] * 100); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){ + agg = new HdrHistogram(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("output_format", output_format); + c.setParameters(parameters); + c.setLookup_fields(Collections.singletonList("ms")); + c.setOutput_fields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java new file mode 100644 index 0000000..eae356d --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java @@ -0,0 +1,87 @@ +package com.geedgenetworks.core.udf.udaf.hlld; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class HlldApproxCountDistinctTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + + @Test + public void inputRegular() { + initData(14, "regular"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < count; i++) { + fields.put("ip", i); + agg.add(event, acc); + } + + long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + initData(14, "sketch"); + long count = 150000; + Map fields = event.getExtractedFields(); + + Hll hll = new Hll(12); + for (int i = 0; i < 100000; i++) { + hll.add(i); + } + fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); + agg.add(event, acc); + + hll = new Hll(13); + for (int i = 50000; i < 150000; i++) { + hll.add(i); + } + fields.put("ip", hll.toBytes()); + agg.add(event, acc); + + long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(int precision, String input_type){ + agg = new HlldApproxCountDistinct(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("precision", precision); + parameters.put("input_type", input_type); + c.setParameters(parameters); + c.setLookup_fields(Collections.singletonList("ip")); + c.setOutput_fields(Collections.singletonList("ip_cnt")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java new file mode 100644 index 0000000..f489ee4 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HlldTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + initData(14, "regular", "base64"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < count; i++) { + fields.put("ip", i); + agg.add(event, acc); + } + + String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size(); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + initData(14, "sketch", "binary"); + long count = 150000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < 100000; i++) { + Hll hll = new Hll(12); + hll.add(i); + fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); + agg.add(event, acc); + } + for (int i = 50000; i < 150000; i++) { + Hll hll = new Hll(13); + hll.add(i); + fields.put("ip", hll.toBytes()); + agg.add(event, acc); + } + + byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt"); + long rst = (long) Hll.fromBytes(hllBytes).size(); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(int precision, String input_type, String output_format){ + agg = new Hlld(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("precision", precision); + parameters.put("input_type", input_type); + parameters.put("output_format", output_format); + c.setParameters(parameters); + c.setLookup_fields(Collections.singletonList("ip")); + c.setOutput_fields(Collections.singletonList("ip_cnt")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} diff --git a/pom.xml b/pom.xml index a8f919b..53bcf1f 100644 --- a/pom.xml +++ b/pom.xml @@ -268,6 +268,11 @@ fastjson ${fastjson2.version} + + com.geedgenetworks + sketches + 1.0.0 + com.alibaba.nacos nacos-client -- cgit v1.2.3