summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author顾金凯 <[email protected]>2023-10-18 08:49:54 +0000
committer顾金凯 <[email protected]>2023-10-18 08:49:54 +0000
commit87418340dcd6d7fcd36abbe066e91bc255d55414 (patch)
tree34b4e5c448c660417b2a30216280afc748437d3f
parentf052583e9c648750f24154fdbafb8fe194e3b7e2 (diff)
parent0a4be2a79d0d8a933bfec9df5981964d37b2f2ec (diff)
Merge branch 'feature/indicator-match' into '23.10'
fix: ended event See merge request cyber_narrator/flink-stream-schedule-platform!17
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java22
1 files changed, 15 insertions, 7 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
index 4fcda19..ecee40b 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
@@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -71,10 +72,7 @@ public class MatchKeyedProcessFunction
final KeyedProcessFunction<MatchGroup, SlideAggregate<CnRecordLog>, SecurityEvent>.Context ctx,
final Collector<SecurityEvent> out) throws Exception {
- final TimerService timerService = ctx.timerService();
- final long current = timerService.currentWatermark();
- final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL);
- timerService.registerEventTimeTimer(current + interval);
+ registerNextTimer(ctx.timerService());
final long ruleId = ctx.getCurrentKey().getRuleId();
Optional<RuleInfo> optional = cache.get(ruleId);
@@ -103,6 +101,8 @@ public class MatchKeyedProcessFunction
final KeyedProcessFunction<MatchGroup, SlideAggregate<CnRecordLog>, SecurityEvent>.OnTimerContext ctx,
final Collector<SecurityEvent> out) throws Exception {
+ registerNextTimer(ctx.timerService());
+
final long ruleId = ctx.getCurrentKey().getRuleId();
final Optional<RuleInfo> optional = cache.get(ruleId);
@@ -131,6 +131,7 @@ public class MatchKeyedProcessFunction
final SecurityEvent e = securityEventOption.get();
out.collect(e);
+ eventState.update(e);
slideAggregateState.clear();
} else {
@@ -267,9 +268,10 @@ public class MatchKeyedProcessFunction
final int lastTime = onGoingEvent.getStartTime() + onGoingEvent.getDurationS();
- final List<SlideAggregate<CnRecordLog>> incLogs = logs.stream()
- .filter(slide -> slide.getLastTime() / 1000L > lastTime)
- .collect(Collectors.toList());
+ final List<SlideAggregate<CnRecordLog>> incLogs =
+ logs == null ? Collections.emptyList() : logs.stream()
+ .filter(slide -> slide.getLastTime() / 1000L > lastTime)
+ .collect(Collectors.toList());
final int endTime = (int) (currentTime / 1000L);
if (incLogs.size() > 0) {
@@ -293,6 +295,12 @@ public class MatchKeyedProcessFunction
return slideValues.stream().mapToInt(SlideAggregate::getTimes).sum();
}
+ private void registerNextTimer(final TimerService timerService) {
+ final long current = timerService.currentWatermark();
+ final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL);
+ timerService.registerEventTimeTimer(current + interval);
+ }
+
private static class LimitedSizeList<E> extends LinkedList<E> {
private final int maxSize;