diff options
| author | wangkuan <[email protected]> | 2024-08-30 11:10:40 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-30 11:10:40 +0800 |
| commit | d4e7f873064a9d95578a64977eadf15b11ed4e11 (patch) | |
| tree | 69c41d7b0f895fa002bcea229e60e23122bcd2d7 | |
| parent | 13323b1fe6315cedc5e312fe084fb883442fe066 (diff) | |
[improve][bootstrap][core]完善监控输出及单元测试,修改部分问题feature/pre-agg
4 files changed, 7 insertions, 12 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 6ed3888..9fa81c0 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -67,8 +67,8 @@ public class JobSplitWithAggTest { } Assert.assertEquals(2, CollectSink.values.size()); - Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); - Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 6a7011a..5163642 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java index 9390de4..156c0ed 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -34,11 +34,11 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { internalMetrics.incrementInEvents(); try { String key = getKey(value, groupByFields); - while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { eventTimeTimersQueue.poll(); onTimer(timestamp, out); } - long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); map.put(key, createAccumulator()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index 7c0a434..68fa53e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -107,15 +107,10 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co @Override public Accumulator merge(Accumulator acc1, Accumulator acc2) { - acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents()); - acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents()); - acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount()); + acc1.setInEvents(acc1.getInEvents() + 1); for (UdfEntity udafEntity : functions) { try { - boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true; - if (result) { - udafEntity.getAggregateFunction().merge(acc1, acc2); - } + udafEntity.getAggregateFunction().merge(acc1, acc2); } catch (ExpressionRuntimeException ignore) { log.error("Function " + udafEntity.getName() + " Invalid filter ! "); acc1.setErrorCount(acc1.getErrorCount() + 1); |
