summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-04-26 16:41:51 +0800
committergujinkai <[email protected]>2024-04-26 16:43:31 +0800
commitec50236ef15db8a613d0acc74a141f3027983068 (patch)
tree180898552c1965c8ef49d2e4ccab7d2313b2acc6
parentf5f10f2064441234ebee74128a82b5eaea7835b3 (diff)
feature: add count of delay time
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java4
1 files changed, 4 insertions, 0 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java
index 799d221..93f5cc7 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java
@@ -41,6 +41,8 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe
private Counter delayEvents;
+ private Counter delayTime;
+
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
@@ -52,6 +54,7 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe
outEvents = internalMetrics.counter("out_events");
errorEvents = internalMetrics.counter("error_events");
delayEvents = internalMetrics.counter("delay_events");
+ delayTime = internalMetrics.counter("delay_time");
internalMetrics.meter("delay_events_rate", new MeterView(delayEvents));
}
@@ -72,6 +75,7 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe
if (ctx.timestamp() <= ctx.timerService().currentWatermark()) {
count++;
delayEvents.inc();
+ delayTime.inc(ctx.timerService().currentWatermark() - ctx.timestamp());
if (nextPrintTime == null) {
nextPrintTime = currentProcessingTime + 60 * 1000;
}