diff options
| author | 李奉超 <[email protected]> | 2024-08-15 07:35:05 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-08-15 07:35:05 +0000 |
| commit | 91549fc30b0d621edd73ab834149a5c36ca37c09 (patch) | |
| tree | 7689fa7f5901e8b57224bbd8215add4acef36680 | |
| parent | cba6a89bade113b208fc24411300a931a64c5db1 (diff) | |
| parent | cbdd7f96b4e342c9e8b296e231ea655fca139f5d (diff) | |
Merge branch 'sketch' into 'develop'
[feature][core]添加sketche聚合函数和sketche mock类型
See merge request galaxy/platform/groot-stream!93
20 files changed, 980 insertions, 2 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 31d1b21..772d2bc 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -23,4 +23,9 @@ com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udaf.hlld.Hlld +com.geedgenetworks.core.udf.udaf.hlld.HlldApproxCountDistinct +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
\ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 18446c9..0062c1a 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -22,4 +22,9 @@ com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udaf.hlld.Hlld +com.geedgenetworks.core.udf.udaf.hlld.HlldApproxCountDistinct +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
\ No newline at end of file diff --git a/groot-connectors/connector-kafka/pom.xml b/groot-connectors/connector-kafka/pom.xml index 7ec7d86..448383b 100644 --- a/groot-connectors/connector-kafka/pom.xml +++ b/groot-connectors/connector-kafka/pom.xml @@ -15,6 +15,17 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.8.3</version> </dependency> </dependencies> diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java index 09cc8f8..5101fa1 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java @@ -46,6 +46,10 @@ public class FakerUtils { return wrapFaker(parseIPv4Faker(obj), obj);
} else if ("Expression".equalsIgnoreCase(type)) {
return wrapFaker(parseExpressionFaker(obj), obj);
+ } else if ("Hlld".equalsIgnoreCase(type)) {
+ return wrapFaker(parseHlldFaker(obj), obj);
+ } else if ("HdrHistogram".equalsIgnoreCase(type)) {
+ return wrapFaker(parseHdrHistogramFaker(obj), obj);
} else if ("Object".equalsIgnoreCase(type)) {
return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj);
} else if ("Union".equalsIgnoreCase(type)) {
@@ -109,6 +113,20 @@ public class FakerUtils { return new ExpressionFaker(expression);
}
+ private static Faker<?> parseHlldFaker(JSONObject obj) {
+ long itemCount = obj.getLongValue("itemCount", 1000000L);
+ int batchCount = obj.getIntValue("batchCount", 10000);
+ int precision = obj.getIntValue("precision", 12);
+ return new HlldFaker(itemCount, batchCount, precision);
+ }
+
+ private static Faker<?> parseHdrHistogramFaker(JSONObject obj) {
+ int max = obj.getIntValue("max", 100000);
+ int batchCount = obj.getIntValue("batchCount", 1000);
+ int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1);
+ return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits);
+ }
+
private static Faker<?> parseIPv4Faker(JSONObject obj) {
String start = obj.getString("start");
String end = obj.getString("end");
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java new file mode 100644 index 0000000..393bf8e --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HdrHistogramFaker.java @@ -0,0 +1,35 @@ +package com.geedgenetworks.connectors.mock.faker;
+
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.HistogramSketch;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class HdrHistogramFaker extends Faker<Object> {
+ private final int max;
+ private final int batchCount;
+ private final int numberOfSignificantValueDigits;
+ private HistogramSketch his;
+
+ public HdrHistogramFaker(int max, int batchCount, int numberOfSignificantValueDigits) {
+ this.max = max;
+ this.batchCount = batchCount;
+ this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ his = new HistogramSketch(1L, max, numberOfSignificantValueDigits, false);
+ }
+
+ @Override
+ public Object geneValue() throws Exception {
+ his.reset();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < batchCount; i++) {
+ his.recordValue(random.nextInt(max));
+ }
+ return StringUtils.encodeBase64String(his.toBytes());
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java new file mode 100644 index 0000000..5af2b35 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/HlldFaker.java @@ -0,0 +1,39 @@ +package com.geedgenetworks.connectors.mock.faker;
+
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class HlldFaker extends Faker<Object> {
+ private final long itemCount;
+ private final int batchCount;
+ private final int precision;
+ private Hll hll;
+
+ public HlldFaker(long itemCount, int batchCount, int precision) {
+ this.itemCount = itemCount;
+ this.batchCount = batchCount;
+ this.precision = precision;
+ }
+
+ public HlldFaker(long itemCount, int batchCount) {
+ this(itemCount, batchCount, 12);
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ hll = new Hll(precision);
+ }
+
+ @Override
+ public Object geneValue() throws Exception {
+ hll.reset();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < batchCount; i++) {
+ hll.add(random.nextLong(itemCount));
+ }
+ return StringUtils.encodeBase64String(hll.toBytes());
+ }
+
+}
diff --git a/groot-core/pom.xml b/groot-core/pom.xml index 18ae33b..e723fa5 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -66,6 +66,11 @@ </dependency> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>sketches</artifactId> + </dependency> + + <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <exclusions> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java new file mode 100644 index 0000000..368e8c1 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogram.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.Histogramer;
+
+import java.util.Map;
+
+public class HdrHistogram extends HdrHistogramBaseAggregate {
+ boolean outputBase64;
+
+ @Override
+ public void open(UDFContext c) {
+ super.open(c);
+ Map<String, Object> params = c.getParameters();
+ outputBase64 = "base64".equalsIgnoreCase(params.getOrDefault("output_format", "base64").toString());
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ if (agg == null) {
+ return acc;
+ }
+
+ byte[] bytes = ((Histogramer) agg).toBytes();
+ if (outputBase64) {
+ acc.getMetricsFields().put(outputField, StringUtils.encodeBase64String(bytes));
+ } else {
+ acc.getMetricsFields().put(outputField, bytes);
+ }
+
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "HDR_HISTOGRAM";
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java new file mode 100644 index 0000000..1648fa5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -0,0 +1,101 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.HdrHistogram.DirectMapHistogram;
+import org.HdrHistogram.Histogramer;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
+ protected String inputField;
+ protected String outputField;
+ protected boolean inputSketch;
+ protected long lowestDiscernibleValue;
+ protected long highestTrackableValue;
+ protected int numberOfSignificantValueDigits;
+ protected boolean autoResize;
+
+ @Override
+ public void open(UDFContext c) {
+ inputField = c.getLookup_fields().get(0);
+ if (CollectionUtils.isNotEmpty(c.getOutput_fields())) {
+ outputField = c.getOutput_fields().get(0);
+ } else {
+ outputField = inputField;
+ }
+ Map<String, Object> params = c.getParameters();
+ lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString());
+ highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString());
+ numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString());
+ autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString());
+ inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Object value = event.getExtractedFields().get(inputField);
+ if (value == null) {
+ return acc;
+ }
+
+ if (inputSketch) {
+ updateHdrMerge(acc, value);
+ } else {
+ updateHdr(acc, value);
+ }
+
+ return acc;
+ }
+
+ protected void updateHdr(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
+ if (his == null) {
+ his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ his.setAutoResize(autoResize);
+ aggs.put(outputField, his);
+ }
+
+ his.recordValue(((Number) value).longValue());
+ }
+
+
+ protected void updateHdrMerge(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
+ if (his == null) {
+ his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ his.setAutoResize(autoResize);
+ aggs.put(outputField, his);
+ }
+
+ Histogramer h;
+ if (value instanceof String) {
+ byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8));
+ h = DirectMapHistogram.wrapBytes(bytes);
+ } else if (value instanceof byte[]) {
+ h = DirectMapHistogram.wrapBytes((byte[]) value);
+ } else if (value instanceof Histogramer) {
+ h = (Histogramer) value;
+ } else {
+ throw new IllegalArgumentException("Unsupported type " + value.getClass());
+ }
+
+ his.merge(h);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java new file mode 100644 index 0000000..b9f7d5b --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantile.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.HdrHistogram.Histogramer;
+
+import java.util.Map;
+
+public class HdrHistogramQuantile extends HdrHistogramBaseAggregate {
+ Double probability;
+
+ @Override
+ public void open(UDFContext c) {
+ super.open(c);
+ Map<String, Object> params = c.getParameters();
+ probability = Double.parseDouble(params.getOrDefault("probability", "0.5").toString());
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ if (agg == null) {
+ return acc;
+ }
+
+ long percentile = ((Histogramer) agg).getValueAtPercentile(probability * 100);
+ acc.getMetricsFields().put(outputField, percentile);
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "APPROX_QUANTILE_HDR";
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java new file mode 100644 index 0000000..ccfffd3 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantiles.java @@ -0,0 +1,51 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.HdrHistogram.Histogramer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class HdrHistogramQuantiles extends HdrHistogramBaseAggregate {
+ double[] probabilities;
+
+ @Override
+ public void open(UDFContext c) {
+ super.open(c);
+ Map<String, Object> params = c.getParameters();
+ Object ps = params.get("probabilities");
+ if(ps == null){
+ throw new IllegalArgumentException("probabilities param is requested");
+ }
+ List<Double> floats = JSON.parseArray(ps instanceof String ? ps.toString(): JSON.toJSONString(ps), Double.class);
+ probabilities = new double[floats.size()];
+ for (int i = 0; i < floats.size(); i++) {
+ probabilities[i] = floats.get(i);
+ }
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ if (agg == null) {
+ return acc;
+ }
+
+ Histogramer his = ((Histogramer) agg);
+ final List<Long> counts = new ArrayList<>(probabilities.length);
+ for (int i = 0; i < probabilities.length; i++) {
+ counts.add(his.getValueAtPercentile(probabilities[i] * 100));
+ }
+ acc.getMetricsFields().put(outputField, counts);
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "APPROX_QUANTILES_HDR";
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java new file mode 100644 index 0000000..e373a7a --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/Hlld.java @@ -0,0 +1,40 @@ +package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+
+import java.util.Map;
+
+public class Hlld extends HlldBaseAggregate {
+ boolean outputBase64;
+
+ @Override
+ public void open(UDFContext c) {
+ super.open(c);
+ Map<String, Object> params = c.getParameters();
+ outputBase64 = "base64".equalsIgnoreCase(params.getOrDefault("output_format", "base64").toString());
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Hll hll = getResultHll(acc);
+ if (hll == null) {
+ return acc;
+ }
+
+ if (outputBase64) {
+ acc.getMetricsFields().put(outputField, StringUtils.encodeBase64String(hll.toBytes()));
+ } else {
+ acc.getMetricsFields().put(outputField, hll.toBytes());
+ }
+
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "HLLD";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java new file mode 100644 index 0000000..ec003f8 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.sketch.hlld.Hll;
+
+public class HlldApproxCountDistinct extends HlldBaseAggregate {
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Hll hll = getResultHll(acc);
+ if (hll == null) {
+ return acc;
+ }
+
+ acc.getMetricsFields().put(outputField, (long)hll.size());
+
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "APPROX_COUNT_DISTINCT_HLLD";
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java new file mode 100644 index 0000000..71d61dc --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -0,0 +1,98 @@ +package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.hlld.HllUnion;
+import com.geedgenetworks.sketch.hlld.HllUtils;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.Map;
+
+public abstract class HlldBaseAggregate implements AggregateFunction {
+ protected String inputField;
+ protected String outputField;
+ protected boolean inputSketch;
+ protected int precision = 12;
+
+ @Override
+ public void open(UDFContext c) {
+ inputField = c.getLookup_fields().get(0);
+ if (CollectionUtils.isNotEmpty(c.getOutput_fields())) {
+ outputField = c.getOutput_fields().get(0);
+ } else {
+ outputField = inputField;
+ }
+ Map<String, Object> params = c.getParameters();
+ precision = Integer.parseInt(params.getOrDefault("precision", "12").toString());
+ inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Object value = event.getExtractedFields().get(inputField);
+ if (value == null) {
+ return acc;
+ }
+
+ if (inputSketch) {
+ updateHllUnion(acc, value);
+ } else {
+ updateHll(acc, value);
+ }
+
+ return acc;
+ }
+
+ protected Hll getResultHll(Accumulator acc){
+ Object agg = acc.getMetricsFields().get(outputField);
+ if (agg == null) {
+ return null;
+ }
+
+ return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg;
+ }
+
+ protected void updateHll(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ Hll hll = (Hll) aggs.get(outputField);
+ if (hll == null) {
+ hll = new Hll(precision);
+ aggs.put(outputField, hll);
+ }
+
+ if (value instanceof Integer || value instanceof Long) {
+ hll.add(((Number) value).longValue());
+ } else if (value instanceof Float || value instanceof Double) {
+ hll.add(((Number) value).doubleValue());
+ } else if (value instanceof String) {
+ hll.add((String) value);
+ } else if (value instanceof byte[]) {
+ hll.add((byte[]) value);
+ } else {
+ throw new IllegalArgumentException("Unsupported type " + value.getClass());
+ }
+ }
+
+ protected void updateHllUnion(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ HllUnion hllUnion = (HllUnion) aggs.get(outputField);
+ if (hllUnion == null) {
+ hllUnion = new HllUnion(precision);
+ aggs.put(outputField, hllUnion);
+ }
+
+ Hll hll = HllUtils.deserializeHll(value);
+ hllUnion.update(hll);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java new file mode 100644 index 0000000..33f7bad --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java @@ -0,0 +1,89 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class HdrHistogramQuantileTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double probability = 0.5;
+ initData( "regular", 2, probability);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long expect = (long) (count * probability);
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
+ double error = Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ double probability = 0.5;
+ initData( "sketch", 2, probability);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ long expect = (long) (count * probability);
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
+ double error = Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, double probability){
+ agg = new HdrHistogramQuantile();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("probability", probability);
+ c.setParameters(parameters);
+ c.setLookup_fields(Collections.singletonList("ms"));
+ c.setOutput_fields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java new file mode 100644 index 0000000..4eefd9a --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java @@ -0,0 +1,98 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HdrHistogramQuantilesTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "regular", 2, probabilities);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
+ for (int i = 0; i < expects.length; i++) {
+ long rst = rsts.get(i);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ @Test
+ public void inputSketch() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "sketch", 2, probabilities);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
+ for (int i = 0; i < expects.length; i++) {
+ long rst = rsts.get(i);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){
+ agg = new HdrHistogramQuantiles();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("probabilities", probabilities);
+ c.setParameters(parameters);
+ c.setLookup_fields(Collections.singletonList("ms"));
+ c.setOutput_fields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java new file mode 100644 index 0000000..f177ca5 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java @@ -0,0 +1,102 @@ +package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HdrHistogramTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "regular", 2, "base64");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+ String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his");
+ ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8)));
+
+ for (int i = 0; i < expects.length; i++) {
+ long rst = his.getValueAtPercentile(probabilities[i] * 100);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ @Test
+ public void inputSketch() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "sketch", 2, "binary");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his");
+ ArrayHistogram h = ArrayHistogram.fromBytes(bytes);
+
+ for (int i = 0; i < expects.length; i++) {
+ long rst = h.getValueAtPercentile(probabilities[i] * 100);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){
+ agg = new HdrHistogram();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("output_format", output_format);
+ c.setParameters(parameters);
+ c.setLookup_fields(Collections.singletonList("ms"));
+ c.setOutput_fields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java new file mode 100644 index 0000000..eae356d --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java @@ -0,0 +1,87 @@ +package com.geedgenetworks.core.udf.udaf.hlld;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class HlldApproxCountDistinctTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+
+ @Test
+ public void inputRegular() {
+ initData(14, "regular");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < count; i++) {
+ fields.put("ip", i);
+ agg.add(event, acc);
+ }
+
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ initData(14, "sketch");
+ long count = 150000;
+ Map<String, Object> fields = event.getExtractedFields();
+
+ Hll hll = new Hll(12);
+ for (int i = 0; i < 100000; i++) {
+ hll.add(i);
+ }
+ fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
+ agg.add(event, acc);
+
+ hll = new Hll(13);
+ for (int i = 50000; i < 150000; i++) {
+ hll.add(i);
+ }
+ fields.put("ip", hll.toBytes());
+ agg.add(event, acc);
+
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(int precision, String input_type){
+ agg = new HlldApproxCountDistinct();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", precision);
+ parameters.put("input_type", input_type);
+ c.setParameters(parameters);
+ c.setLookup_fields(Collections.singletonList("ip"));
+ c.setOutput_fields(Collections.singletonList("ip_cnt"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java new file mode 100644 index 0000000..f489ee4 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HlldTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ initData(14, "regular", "base64");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < count; i++) {
+ fields.put("ip", i);
+ agg.add(event, acc);
+ }
+
+ String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size();
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ initData(14, "sketch", "binary");
+ long count = 150000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < 100000; i++) {
+ Hll hll = new Hll(12);
+ hll.add(i);
+ fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
+ agg.add(event, acc);
+ }
+ for (int i = 50000; i < 150000; i++) {
+ Hll hll = new Hll(13);
+ hll.add(i);
+ fields.put("ip", hll.toBytes());
+ agg.add(event, acc);
+ }
+
+ byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ long rst = (long) Hll.fromBytes(hllBytes).size();
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(int precision, String input_type, String output_format){
+ agg = new Hlld();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", precision);
+ parameters.put("input_type", input_type);
+ parameters.put("output_format", output_format);
+ c.setParameters(parameters);
+ c.setLookup_fields(Collections.singletonList("ip"));
+ c.setOutput_fields(Collections.singletonList("ip_cnt"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
@@ -269,6 +269,11 @@ <version>${fastjson2.version}</version> </dependency> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>sketches</artifactId> + <version>1.0.0</version> + </dependency> + <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>${nacos.version}</version> |
