summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-09-13 01:52:36 +0000
committer王宽 <[email protected]>2024-09-13 01:52:36 +0000
commit536ad900967495d1c1d8640422057c5a2b5c697c (patch)
treedabe8f556004f830f96d3e8973e6f20820d21bfd
parent71a3d8bf0183c80dbd7dbf427e0c169c8ee26bd6 (diff)
parentc5d3f1c45899d580916e36c1eb689779c0431caa (diff)
Merge branch 'fix/pre-agg' into 'develop'
[fix][core]修复预聚合功能相关bug,增加测试用例 See merge request galaxy/platform/groot-stream!108
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java6
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java10
-rw-r--r--groot-examples/cn-udf-example/pom.xml2
8 files changed, 40 insertions, 35 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java
index 6c69f64..03b322f 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java
@@ -60,9 +60,9 @@ public class JobAggTest {
JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
jobExecution.execute();
- Assert.assertEquals(2, CollectSink.values.size());
- Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
- Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
+ Assert.assertEquals(4, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("3.5", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
}
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml
index ee589ef..4ec36e1 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml
@@ -3,7 +3,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
+ data: '[{"pkts":1,"sessions":1,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":1,"sessions":1,"decoded_as":null,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index 3632ba7..ce77ee8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
@@ -139,12 +140,10 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize);
}
- public Accumulator createAccumulator() {
+ public Accumulator createAccumulator(Map<String, Object> keysMap) {
- Map<String, Object> map = new HashMap<>();
Accumulator accumulator = new Accumulator();
-
- accumulator.setMetricsFields(map);
+ accumulator.setMetricsFields(keysMap);
for (UdfEntity udfEntity : functions) {
udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
@@ -152,17 +151,21 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator
}
- public String getKey(Event value, List<String> keys) {
+ public KeybyEntity getKey(Event value, List<String> keys) {
+ KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getExtractedFields().containsKey(key)) {
- stringBuilder.append(value.getExtractedFields().get(key).toString());
- } else {
+ Object object = value.getExtractedFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
+
}
- return SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(SecureUtil.md5(stringBuilder.toString()));
+ return keybyEntity;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
index 156c0ed..5adc6d1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.core.pojo.AggregateConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -33,7 +34,7 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
Long timestamp;
internalMetrics.incrementInEvents();
try {
- String key = getKey(value, groupByFields);
+ KeybyEntity keybyEntity = getKey(value, groupByFields);
while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) {
eventTimeTimersQueue.poll();
onTimer(timestamp, out);
@@ -41,15 +42,15 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
- map.put(key, createAccumulator());
+ map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
windows.put(windowEnd, map);
eventTimeTimersQueue.add(windowEnd);
} else {
- if (!windows.get(windowEnd).containsKey(key)) {
- windows.get(windowEnd).put(key, createAccumulator());
+ if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) {
+ windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
}
}
- add(value, windows.get(windowEnd).get(key));
+ add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString()));
} catch (Exception e) {
log.error("Error in pre-aggregate processElement", e);
internalMetrics.incrementErrorEvents();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
index e98daa5..01c346f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.KeybyEntity;
import com.geedgenetworks.core.pojo.AggregateConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -32,7 +33,7 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
Long timestamp;
internalMetrics.incrementInEvents();
try {
- String key = getKey(value, groupByFields);
+ KeybyEntity keybyEntity = getKey(value, groupByFields);
while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
processingTimeTimersQueue.poll();
onTimer(timestamp, out);
@@ -40,15 +41,15 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation {
long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
- map.put(key, createAccumulator());
+ map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
windows.put(windowEnd, map);
processingTimeTimersQueue.add(windowEnd);
} else {
- if (!windows.get(windowEnd).containsKey(key)) {
- windows.get(windowEnd).put(key, createAccumulator());
+ if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) {
+ windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys()));
}
}
- add(value, windows.get(windowEnd).get(key));
+ add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString()));
} catch (Exception e) {
log.error("Error in pre-aggregate processElement", e);
internalMetrics.incrementErrorEvents();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index da09690..a6fb294 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -22,12 +22,12 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getExtractedFields().containsKey(key)) {
- keybyEntity.getKeys().put(key, value.getExtractedFields().get(key));
- stringBuilder.append(value.getExtractedFields().get(key).toString());
- } else {
+ Object object = value.getExtractedFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
}
String hashedKey = SecureUtil.md5(stringBuilder.toString());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
index 6e43184..4b21ba7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java
@@ -22,12 +22,12 @@ public class PreKeySelector implements org.apache.flink.api.java.functions.KeySe
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
for (String key : keys) {
-
- if (value.getMetricsFields().containsKey(key)) {
- keybyEntity.getKeys().put(key, value.getMetricsFields().get(key));
- stringBuilder.append(value.getMetricsFields().get(key).toString());
- } else {
+ Object object = value.getMetricsFields().get(key);
+ if(object==null){
stringBuilder.append(",");
+ }else{
+ keybyEntity.getKeys().put(key, object);
+ stringBuilder.append(object);
}
}
String hashedKey = SecureUtil.md5(stringBuilder.toString());
diff --git a/groot-examples/cn-udf-example/pom.xml b/groot-examples/cn-udf-example/pom.xml
index 38ae4ea..4ec1f18 100644
--- a/groot-examples/cn-udf-example/pom.xml
+++ b/groot-examples/cn-udf-example/pom.xml
@@ -9,7 +9,7 @@
<version>${revision}</version>
</parent>
- <artifactId>cn-udf-example</artifactId>
+ <artifactId>cn-scalarFunction-example</artifactId>
<name>Groot : Examples : CN-UDF</name>
<properties>
<maven.install.skip>true</maven.install.skip>