summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-09-03 14:35:51 +0800
committerlifengchao <[email protected]>2024-09-03 14:35:51 +0800
commite3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c (patch)
treefad19eeae6e1fa5b435697521f1fc87bc0ff617c
parent0d5ce165d30383a4b9b9945b61150d8e8015893d (diff)
[feature][core] sketch聚合函数实现merge方法支持两阶段聚合
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java25
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java33
3 files changed, 51 insertions, 12 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
index 6af0be3..a099fde 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
@@ -59,6 +59,26 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected void updateHdr(Accumulator acc, Object value) {
Map<String, Object> aggs = acc.getMetricsFields();
ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
@@ -95,10 +115,7 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
his.merge(h);
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
index 041bad9..ec003f8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
@@ -16,10 +16,7 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate {
return acc;
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
index d6c3a44..0802c22 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
@@ -51,6 +51,34 @@ public abstract class HlldBaseAggregate implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ if(inputSketch){
+ ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
+ rst = agg;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) agg);
+ union.update((Hll) aggOther);
+ rst = union.getResult();
+ }
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected Hll getResultHll(Accumulator acc){
Object agg = acc.getMetricsFields().get(outputField);
if (agg == null) {
@@ -92,10 +120,7 @@ public abstract class HlldBaseAggregate implements AggregateFunction {
Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public void close() {}
}