diff options
| author | wangkuan <[email protected]> | 2024-08-30 11:10:40 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-30 11:10:40 +0800 |
| commit | d4e7f873064a9d95578a64977eadf15b11ed4e11 (patch) | |
| tree | 69c41d7b0f895fa002bcea229e60e23122bcd2d7 /groot-core | |
| parent | 13323b1fe6315cedc5e312fe084fb883442fe066 (diff) | |
[improve][bootstrap][core]完善监控输出及单元测试,修改部分问题feature/pre-agg
Diffstat (limited to 'groot-core')
2 files changed, 4 insertions, 9 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java index 9390de4..156c0ed 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -34,11 +34,11 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { internalMetrics.incrementInEvents(); try { String key = getKey(value, groupByFields); - while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { eventTimeTimersQueue.poll(); onTimer(timestamp, out); } - long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); if (!windows.containsKey(windowEnd)) { Map<String, Accumulator> map = new HashMap<>(); map.put(key, createAccumulator()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index 7c0a434..68fa53e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -107,15 +107,10 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co @Override public Accumulator merge(Accumulator acc1, Accumulator acc2) { - acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents()); - acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents()); - acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount()); + acc1.setInEvents(acc1.getInEvents() + 1); for (UdfEntity udafEntity : functions) { try { - boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true; - if (result) { - udafEntity.getAggregateFunction().merge(acc1, acc2); - } + udafEntity.getAggregateFunction().merge(acc1, acc2); } catch (ExpressionRuntimeException ignore) { log.error("Function " + udafEntity.getName() + " Invalid filter ! "); acc1.setErrorCount(acc1.getErrorCount() + 1); |
