diff options
| author | 王宽 <[email protected]> | 2024-09-13 01:52:36 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-09-13 01:52:36 +0000 |
| commit | 536ad900967495d1c1d8640422057c5a2b5c697c (patch) | |
| tree | dabe8f556004f830f96d3e8973e6f20820d21bfd /groot-core | |
| parent | 71a3d8bf0183c80dbd7dbf427e0c169c8ee26bd6 (diff) | |
| parent | c5d3f1c45899d580916e36c1eb689779c0431caa (diff) | |
Merge branch 'fix/pre-agg' into 'develop'
[fix][core]修复预聚合功能相关bug,增加测试用例
See merge request galaxy/platform/groot-stream!108
Diffstat (limited to 'groot-core')
5 files changed, 35 insertions, 30 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 3632ba7..ce77ee8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; @@ -139,12 +140,10 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize); } - public Accumulator createAccumulator() { + public Accumulator createAccumulator(Map<String, Object> keysMap) { - Map<String, Object> map = new HashMap<>(); Accumulator accumulator = new Accumulator(); - - accumulator.setMetricsFields(map); + accumulator.setMetricsFields(keysMap); for (UdfEntity udfEntity : functions) { udfEntity.getAggregateFunction().initAccumulator(accumulator); } @@ -152,17 +151,21 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator } - public String getKey(Event value, List<String> keys) { + public KeybyEntity getKey(Event value, List<String> keys) { + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getExtractedFields().containsKey(key)) { - stringBuilder.append(value.getExtractedFields().get(key).toString()); - } else { + Object object = value.getExtractedFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } + } - return SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(SecureUtil.md5(stringBuilder.toString())); + return keybyEntity; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java index 156c0ed..5adc6d1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.core.pojo.AggregateConfig; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; @@ -33,7 +34,7 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { Long timestamp; internalMetrics.incrementInEvents(); try { - String key = getKey(value, groupByFields); + KeybyEntity keybyEntity = getKey(value, groupByFields); while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { eventTimeTimersQueue.poll(); onTimer(timestamp, out); @@ -41,15 +42,15 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); - map.put(key, createAccumulator()); + map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); windows.put(windowEnd, map); eventTimeTimersQueue.add(windowEnd); } else { - if (!windows.get(windowEnd).containsKey(key)) { - windows.get(windowEnd).put(key, createAccumulator()); + if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) { + windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); } } - add(value, windows.get(windowEnd).get(key)); + add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString())); } catch (Exception e) { log.error("Error in pre-aggregate processElement", e); internalMetrics.incrementErrorEvents(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java index e98daa5..01c346f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.core.pojo.AggregateConfig; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; @@ -32,7 +33,7 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation { Long timestamp; internalMetrics.incrementInEvents(); try { - String key = getKey(value, groupByFields); + KeybyEntity keybyEntity = getKey(value, groupByFields); while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { processingTimeTimersQueue.poll(); onTimer(timestamp, out); @@ -40,15 +41,15 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation { long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); - map.put(key, createAccumulator()); + map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); windows.put(windowEnd, map); processingTimeTimersQueue.add(windowEnd); } else { - if (!windows.get(windowEnd).containsKey(key)) { - windows.get(windowEnd).put(key, createAccumulator()); + if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) { + windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); } } - add(value, windows.get(windowEnd).get(key)); + add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString())); } catch (Exception e) { log.error("Error in pre-aggregate processElement", e); internalMetrics.incrementErrorEvents(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index da09690..a6fb294 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -22,12 +22,12 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getExtractedFields().containsKey(key)) { - keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); - stringBuilder.append(value.getExtractedFields().get(key).toString()); - } else { + Object object = value.getExtractedFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } } String hashedKey = SecureUtil.md5(stringBuilder.toString()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java index 6e43184..4b21ba7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java @@ -22,12 +22,12 @@ public class PreKeySelector implements org.apache.flink.api.java.functions.KeySe KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getMetricsFields().containsKey(key)) { - keybyEntity.getKeys().put(key, value.getMetricsFields().get(key)); - stringBuilder.append(value.getMetricsFields().get(key).toString()); - } else { + Object object = value.getMetricsFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } } String hashedKey = SecureUtil.md5(stringBuilder.toString()); |
