summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-30 11:10:40 +0800
committerwangkuan <[email protected]>2024-08-30 11:10:40 +0800
commitd4e7f873064a9d95578a64977eadf15b11ed4e11 (patch)
tree69c41d7b0f895fa002bcea229e60e23122bcd2d7 /groot-core
parent13323b1fe6315cedc5e312fe084fb883442fe066 (diff)
[improve][bootstrap][core]完善监控输出及单元测试,修改部分问题feature/pre-agg
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java9
2 files changed, 4 insertions, 9 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
index 9390de4..156c0ed 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java
@@ -34,11 +34,11 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation {
internalMetrics.incrementInEvents();
try {
String key = getKey(value, groupByFields);
- while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) {
+ while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) {
eventTimeTimersQueue.poll();
onTimer(timestamp, out);
}
- long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime());
+ long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark());
if (!windows.containsKey(windowEnd)) {
Map<String, Accumulator> map = new HashMap<>();
map.put(key, createAccumulator());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index 7c0a434..68fa53e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -107,15 +107,10 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
@Override
public Accumulator merge(Accumulator acc1, Accumulator acc2) {
- acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents());
- acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents());
- acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount());
+ acc1.setInEvents(acc1.getInEvents() + 1);
for (UdfEntity udafEntity : functions) {
try {
- boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true;
- if (result) {
- udafEntity.getAggregateFunction().merge(acc1, acc2);
- }
+ udafEntity.getAggregateFunction().merge(acc1, acc2);
} catch (ExpressionRuntimeException ignore) {
log.error("Function " + udafEntity.getName() + " Invalid filter ! ");
acc1.setErrorCount(acc1.getErrorCount() + 1);