diff options
| author | gujinkai <[email protected]> | 2024-04-26 16:41:51 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-04-26 16:43:31 +0800 |
| commit | ec50236ef15db8a613d0acc74a141f3027983068 (patch) | |
| tree | 180898552c1965c8ef49d2e4ccab7d2313b2acc6 | |
| parent | f5f10f2064441234ebee74128a82b5eaea7835b3 (diff) | |
feature: add count of delay time
| -rw-r--r-- | module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java index 799d221..93f5cc7 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/base/operator/AbstractFirstAggregation.java @@ -41,6 +41,8 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe private Counter delayEvents; + private Counter delayTime; + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); @@ -52,6 +54,7 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe outEvents = internalMetrics.counter("out_events"); errorEvents = internalMetrics.counter("error_events"); delayEvents = internalMetrics.counter("delay_events"); + delayTime = internalMetrics.counter("delay_time"); internalMetrics.meter("delay_events_rate", new MeterView(delayEvents)); } @@ -72,6 +75,7 @@ public abstract class AbstractFirstAggregation<OUT> extends ProcessFunction<CnRe if (ctx.timestamp() <= ctx.timerService().currentWatermark()) { count++; delayEvents.inc(); + delayTime.inc(ctx.timerService().currentWatermark() - ctx.timestamp()); if (nextPrintTime == null) { nextPrintTime = currentProcessingTime + 60 * 1000; } |
