diff options
| author | chaoc <[email protected]> | 2023-10-30 11:23:15 +0800 |
|---|---|---|
| committer | chaoc <[email protected]> | 2023-10-30 11:23:15 +0800 |
| commit | 6447031df61cb50908485315d5b5b6ebcc69738d (patch) | |
| tree | d9d7668212ad7864c760f0ade68705c8c3a9c52a | |
| parent | 28643a308b3f6dc90414a289e29d4c3373908b6c (diff) | |
fix: duration match error
| -rw-r--r-- | module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java index a44afa2..d96f63b 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java @@ -56,7 +56,8 @@ public class MatchKeyedProcessFunction final ValueStateDescriptor<LimitedSizeList<SlideAggregate<CnRecordLog>>> listStateDescriptor = new ValueStateDescriptor<>("slide-aggregate-state", TypeInformation.of( - new TypeHint<LimitedSizeList<SlideAggregate<CnRecordLog>>>() {} + new TypeHint<LimitedSizeList<SlideAggregate<CnRecordLog>>>() { + } ) ); slideAggregateState = getRuntimeContext().getState(listStateDescriptor); @@ -161,11 +162,15 @@ public class MatchKeyedProcessFunction final RuleInfo info, final MatchGroup group, final Duration matchDuration, final int matchTimes, final long currentTime) { - if (logs == null) { + if (logs == null || logs.size() == 0) { return Optional.empty(); } + + final SlideAggregate<CnRecordLog> first = logs.getFirst(); + final long firstTime = first.getFirstTime(); + // match_duration - if (logs.size() < matchDuration.toMinutes()) { + if ((currentTime - firstTime) / 1000 / 60 < matchDuration.toMinutes()) { return Optional.empty(); } // match_times @@ -202,9 +207,6 @@ public class MatchKeyedProcessFunction event.setOffenderIp(group.getClientIp()); event.setVictimIp(group.getServerIp()); - final SlideAggregate<CnRecordLog> first = logs.getFirst(); - - long firstTime = first.getFirstTime(); event.setStartTime((int) (firstTime / 1000L)); // Use the last element's time @@ -307,7 +309,7 @@ public class MatchKeyedProcessFunction timerService.registerEventTimeTimer(current + interval); } - private void deleteTimer(final TimerService timerService, final long current){ + private void deleteTimer(final TimerService timerService, final long current) { final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL); timerService.deleteEventTimeTimer(current + interval); } |
