summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-09-13 01:52:36 +0000
committer王宽 <[email protected]>2024-09-13 01:52:36 +0000
commit536ad900967495d1c1d8640422057c5a2b5c697c (patch)
treedabe8f556004f830f96d3e8973e6f20820d21bfd /groot-core
parent71a3d8bf0183c80dbd7dbf427e0c169c8ee26bd6 (diff)
parentc5d3f1c45899d580916e36c1eb689779c0431caa (diff)
Merge branch 'fix/pre-agg' into 'develop'
[fix][core]修复预聚合功能相关bug,增加测试用例 See merge request galaxy/platform/groot-stream!108
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java10
5 files changed, 35 insertions, 30 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index 3632ba7..ce77ee8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
@@ -139,12 +140,10 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize);
}
- public Accumulator createAccumulator() {
+ public Accumulator createAccumulator(Map<String, Object> keysMap) {
- Map<String, Object> map = new HashMap<>();
Accumulator accumulator = new Accumulator();
-
- accumulator.setMetricsFields(map);
+ accumulator.setMetricsFields(keysMap);
for (UdfEntity udfEntity : functions) {
udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
@@ -152,17 +151,21 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
}
- public String getKey(Event value, List<String> keys) {
+ public KeybyEntity getKey(Event value, List<String> keys) {
+ KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getExtractedFields().containsKey(key)) {
- stringBuilder.append(value.getExtractedFields().get(key).toString());
- } else {
+ Object object = value.getExtractedFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
+
}
- return SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(SecureUtil.md5(stringBuilder.toString()));
+ return keybyEntity;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
index 156c0ed..5adc6d1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.core.pojo.AggregateConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -33,7 +34,7 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
Long timestamp;
internalMetrics.incrementInEvents();
try {
- String key = getKey(value, groupByFields);
+ KeybyEntity keybyEntity = getKey(value, groupByFields);
while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) {
eventTimeTimersQueue.poll();
onTimer(timestamp, out);
@@ -41,15 +42,15 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
- map.put(key, createAccumulator());
+ map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
windows.put(windowEnd, map);
eventTimeTimersQueue.add(windowEnd);
} else {
- if (!windows.get(windowEnd).containsKey(key)) {
- windows.get(windowEnd).put(key, createAccumulator());
+ if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) {
+ windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
}
}
- add(value, windows.get(windowEnd).get(key));
+ add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString()));
} catch (Exception e) {
log.error("Error in pre-aggregate processElement", e);
internalMetrics.incrementErrorEvents();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
index e98daa5..01c346f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.core.pojo.AggregateConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -32,7 +33,7 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
Long timestamp;
internalMetrics.incrementInEvents();
try {
- String key = getKey(value, groupByFields);
+ KeybyEntity keybyEntity = getKey(value, groupByFields);
while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
processingTimeTimersQueue.poll();
onTimer(timestamp, out);
@@ -40,15 +41,15 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
- map.put(key, createAccumulator());
+ map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
windows.put(windowEnd, map);
processingTimeTimersQueue.add(windowEnd);
} else {
- if (!windows.get(windowEnd).containsKey(key)) {
- windows.get(windowEnd).put(key, createAccumulator());
+ if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) {
+ windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
}
}
- add(value, windows.get(windowEnd).get(key));
+ add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString()));
} catch (Exception e) {
log.error("Error in pre-aggregate processElement", e);
internalMetrics.incrementErrorEvents();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index da09690..a6fb294 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -22,12 +22,12 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getExtractedFields().containsKey(key)) {
- keybyEntity.getKeys().put(key, value.getExtractedFields().get(key));
- stringBuilder.append(value.getExtractedFields().get(key).toString());
- } else {
+ Object object = value.getExtractedFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
}
String hashedKey = SecureUtil.md5(stringBuilder.toString());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
index 6e43184..4b21ba7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
@@ -22,12 +22,12 @@ public class PreKeySelector implements org.apache.flink.api.java.functions.KeySe
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getMetricsFields().containsKey(key)) {
- keybyEntity.getKeys().put(key, value.getMetricsFields().get(key));
- stringBuilder.append(value.getMetricsFields().get(key).toString());
- } else {
+ Object object = value.getMetricsFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
}
String hashedKey = SecureUtil.md5(stringBuilder.toString());