summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-30 03:33:17 +0000
committer王宽 <[email protected]>2024-08-30 03:33:17 +0000
commit2947507b84076e5c6a2accc7b142bd27594ba3d8 (patch)
tree69c41d7b0f895fa002bcea229e60e23122bcd2d7
parentb17b5a08a5b51180ab8d9f0f210c1294e4f11fe2 (diff)
parentd4e7f873064a9d95578a64977eadf15b11ed4e11 (diff)
Merge branch 'feature/pre-agg' into 'develop'v1.6.0-SNAPSHOT
[improve][bootstrap][core]完善监控输出及单元测试,修改部分问题 See merge request galaxy/platform/groot-stream!102
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java9
4 files changed, 7 insertions, 12 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
index 6ed3888..9fa81c0 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -67,8 +67,8 @@ public class JobSplitWithAggTest {
}
Assert.assertEquals(2, CollectSink.values.size());
- Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
- Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
}
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
index 6a7011a..5163642 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -3,7 +3,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]'
+ data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
index 9390de4..156c0ed 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -34,11 +34,11 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
internalMetrics.incrementInEvents();
try {
String key = getKey(value, groupByFields);
- while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
+ while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) {
eventTimeTimersQueue.poll();
onTimer(timestamp, out);
}
- long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
+ long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
map.put(key, createAccumulator());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index 7c0a434..68fa53e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -107,15 +107,10 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
@Override
public Accumulator merge(Accumulator acc1, Accumulator acc2) {
- acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents());
- acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents());
- acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount());
+ acc1.setInEvents(acc1.getInEvents() + 1);
for (UdfEntity udafEntity : functions) {
try {
- boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true;
- if (result) {
- udafEntity.getAggregateFunction().merge(acc1, acc2);
- }
+ udafEntity.getAggregateFunction().merge(acc1, acc2);
} catch (ExpressionRuntimeException ignore) {
log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
acc1.setErrorCount(acc1.getErrorCount() + 1);