diff options
| author | wangkuan <[email protected]> | 2024-09-13 09:47:31 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-09-13 09:47:31 +0800 |
| commit | c5d3f1c45899d580916e36c1eb689779c0431caa (patch) | |
| tree | e0ddcbd4f7a738cc6bb8fb4a479b6e5605d47e3c | |
| parent | f88fd6a24a4870b47e6a88c852d172c0850caf33 (diff) | |
[fix][core]修复预聚合功能相关bug,增加测试用例
8 files changed, 40 insertions, 35 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java index 6c69f64..03b322f 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java @@ -60,9 +60,9 @@ public class JobAggTest { JobExecution jobExecution = new JobExecution(config, grootStreamConfig); jobExecution.execute(); - Assert.assertEquals(2, CollectSink.values.size()); - Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); - Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); + Assert.assertEquals(4, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("3.5", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml index ee589ef..4ec36e1 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":1,"sessions":1,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":1,"sessions":1,"decoded_as":null,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 3632ba7..ce77ee8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; @@ -139,12 +140,10 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize); } - public Accumulator createAccumulator() { + public Accumulator createAccumulator(Map<String, Object> keysMap) { - Map<String, Object> map = new HashMap<>(); Accumulator accumulator = new Accumulator(); - - accumulator.setMetricsFields(map); + accumulator.setMetricsFields(keysMap); for (UdfEntity udfEntity : functions) { udfEntity.getAggregateFunction().initAccumulator(accumulator); } @@ -152,17 +151,21 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator } - public String getKey(Event value, List<String> keys) { + public KeybyEntity getKey(Event value, List<String> keys) { + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getExtractedFields().containsKey(key)) { - stringBuilder.append(value.getExtractedFields().get(key).toString()); - } else { + Object object = value.getExtractedFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } + } - return SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(SecureUtil.md5(stringBuilder.toString())); + return keybyEntity; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java index 156c0ed..5adc6d1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.core.pojo.AggregateConfig; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; @@ -33,7 +34,7 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { Long timestamp; internalMetrics.incrementInEvents(); try { - String key = getKey(value, groupByFields); + KeybyEntity keybyEntity = getKey(value, groupByFields); while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { eventTimeTimersQueue.poll(); onTimer(timestamp, out); @@ -41,15 +42,15 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); - map.put(key, createAccumulator()); + map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); windows.put(windowEnd, map); eventTimeTimersQueue.add(windowEnd); } else { - if (!windows.get(windowEnd).containsKey(key)) { - windows.get(windowEnd).put(key, createAccumulator()); + if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) { + windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); } } - add(value, windows.get(windowEnd).get(key)); + add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString())); } catch (Exception e) { log.error("Error in pre-aggregate processElement", e); internalMetrics.incrementErrorEvents(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java index e98daa5..01c346f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.KeybyEntity; import com.geedgenetworks.core.pojo.AggregateConfig; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; @@ -32,7 +33,7 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation { Long timestamp; internalMetrics.incrementInEvents(); try { - String key = getKey(value, groupByFields); + KeybyEntity keybyEntity = getKey(value, groupByFields); while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { processingTimeTimersQueue.poll(); onTimer(timestamp, out); @@ -40,15 +41,15 @@ public class FirstAggregationProcessingTime extends AbstractFirstAggregation { long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); - map.put(key, createAccumulator()); + map.put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); windows.put(windowEnd, map); processingTimeTimersQueue.add(windowEnd); } else { - if (!windows.get(windowEnd).containsKey(key)) { - windows.get(windowEnd).put(key, createAccumulator()); + if (!windows.get(windowEnd).containsKey(keybyEntity.getKeysToString())) { + windows.get(windowEnd).put(keybyEntity.getKeysToString(), createAccumulator(keybyEntity.getKeys())); } } - add(value, windows.get(windowEnd).get(key)); + add(value, windows.get(windowEnd).get(keybyEntity.getKeysToString())); } catch (Exception e) { log.error("Error in pre-aggregate processElement", e); internalMetrics.incrementErrorEvents(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index da09690..a6fb294 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -22,12 +22,12 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getExtractedFields().containsKey(key)) { - keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); - stringBuilder.append(value.getExtractedFields().get(key).toString()); - } else { + Object object = value.getExtractedFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } } String hashedKey = SecureUtil.md5(stringBuilder.toString()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java index 6e43184..4b21ba7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java @@ -22,12 +22,12 @@ public class PreKeySelector implements org.apache.flink.api.java.functions.KeySe KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); for (String key : keys) { - - if (value.getMetricsFields().containsKey(key)) { - keybyEntity.getKeys().put(key, value.getMetricsFields().get(key)); - stringBuilder.append(value.getMetricsFields().get(key).toString()); - } else { + Object object = value.getMetricsFields().get(key); + if(object==null){ stringBuilder.append(","); + }else{ + keybyEntity.getKeys().put(key, object); + stringBuilder.append(object); } } String hashedKey = SecureUtil.md5(stringBuilder.toString()); diff --git a/groot-examples/cn-udf-example/pom.xml b/groot-examples/cn-udf-example/pom.xml index 38ae4ea..4ec1f18 100644 --- a/groot-examples/cn-udf-example/pom.xml +++ b/groot-examples/cn-udf-example/pom.xml @@ -9,7 +9,7 @@ <version>${revision}</version> </parent> - <artifactId>cn-udf-example</artifactId> + <artifactId>cn-scalarFunction-example</artifactId> <name>Groot : Examples : CN-UDF</name> <properties> <maven.install.skip>true</maven.install.skip> |
