diff options
| author | 顾金凯 <[email protected]> | 2023-10-18 08:49:54 +0000 |
|---|---|---|
| committer | 顾金凯 <[email protected]> | 2023-10-18 08:49:54 +0000 |
| commit | 87418340dcd6d7fcd36abbe066e91bc255d55414 (patch) | |
| tree | 34b4e5c448c660417b2a30216280afc748437d3f | |
| parent | f052583e9c648750f24154fdbafb8fe194e3b7e2 (diff) | |
| parent | 0a4be2a79d0d8a933bfec9df5981964d37b2f2ec (diff) | |
Merge branch 'feature/indicator-match' into '23.10'
fix: ended event
See merge request cyber_narrator/flink-stream-schedule-platform!17
| -rw-r--r-- | module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java index 4fcda19..ecee40b 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java @@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.time.Duration; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -71,10 +72,7 @@ public class MatchKeyedProcessFunction final KeyedProcessFunction<MatchGroup, SlideAggregate<CnRecordLog>, SecurityEvent>.Context ctx, final Collector<SecurityEvent> out) throws Exception { - final TimerService timerService = ctx.timerService(); - final long current = timerService.currentWatermark(); - final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL); - timerService.registerEventTimeTimer(current + interval); + registerNextTimer(ctx.timerService()); final long ruleId = ctx.getCurrentKey().getRuleId(); Optional<RuleInfo> optional = cache.get(ruleId); @@ -103,6 +101,8 @@ public class MatchKeyedProcessFunction final KeyedProcessFunction<MatchGroup, SlideAggregate<CnRecordLog>, SecurityEvent>.OnTimerContext ctx, final Collector<SecurityEvent> out) throws Exception { + registerNextTimer(ctx.timerService()); + final long ruleId = ctx.getCurrentKey().getRuleId(); final Optional<RuleInfo> optional = cache.get(ruleId); @@ -131,6 +131,7 @@ public class MatchKeyedProcessFunction final SecurityEvent e = securityEventOption.get(); out.collect(e); + eventState.update(e); slideAggregateState.clear(); } else { @@ -267,9 +268,10 @@ public class MatchKeyedProcessFunction final int lastTime = onGoingEvent.getStartTime() + onGoingEvent.getDurationS(); - final List<SlideAggregate<CnRecordLog>> incLogs = logs.stream() - .filter(slide -> slide.getLastTime() / 1000L > lastTime) - .collect(Collectors.toList()); + final List<SlideAggregate<CnRecordLog>> incLogs = + logs == null ? Collections.emptyList() : logs.stream() + .filter(slide -> slide.getLastTime() / 1000L > lastTime) + .collect(Collectors.toList()); final int endTime = (int) (currentTime / 1000L); if (incLogs.size() > 0) { @@ -293,6 +295,12 @@ public class MatchKeyedProcessFunction return slideValues.stream().mapToInt(SlideAggregate::getTimes).sum(); } + private void registerNextTimer(final TimerService timerService) { + final long current = timerService.currentWatermark(); + final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL); + timerService.registerEventTimeTimer(current + interval); + } + private static class LimitedSizeList<E> extends LinkedList<E> { private final int maxSize; |
