summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchaoc <[email protected]>2023-10-30 11:23:15 +0800
committerchaoc <[email protected]>2023-10-30 11:23:15 +0800
commit6447031df61cb50908485315d5b5b6ebcc69738d (patch)
treed9d7668212ad7864c760f0ade68705c8c3a9c52a
parent28643a308b3f6dc90414a289e29d4c3373908b6c (diff)
fix: duration match error
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java16
1 files changed, 9 insertions, 7 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
index a44afa2..d96f63b 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
@@ -56,7 +56,8 @@ public class MatchKeyedProcessFunction
final ValueStateDescriptor<LimitedSizeList<SlideAggregate<CnRecordLog>>> listStateDescriptor =
new ValueStateDescriptor<>("slide-aggregate-state",
TypeInformation.of(
- new TypeHint<LimitedSizeList<SlideAggregate<CnRecordLog>>>() {}
+ new TypeHint<LimitedSizeList<SlideAggregate<CnRecordLog>>>() {
+ }
)
);
slideAggregateState = getRuntimeContext().getState(listStateDescriptor);
@@ -161,11 +162,15 @@ public class MatchKeyedProcessFunction
final RuleInfo info, final MatchGroup group,
final Duration matchDuration,
final int matchTimes, final long currentTime) {
- if (logs == null) {
+ if (logs == null || logs.size() == 0) {
return Optional.empty();
}
+
+ final SlideAggregate<CnRecordLog> first = logs.getFirst();
+ final long firstTime = first.getFirstTime();
+
// match_duration
- if (logs.size() < matchDuration.toMinutes()) {
+ if ((currentTime - firstTime) / 1000 / 60 < matchDuration.toMinutes()) {
return Optional.empty();
}
// match_times
@@ -202,9 +207,6 @@ public class MatchKeyedProcessFunction
event.setOffenderIp(group.getClientIp());
event.setVictimIp(group.getServerIp());
- final SlideAggregate<CnRecordLog> first = logs.getFirst();
-
- long firstTime = first.getFirstTime();
event.setStartTime((int) (firstTime / 1000L));
// Use the last element's time
@@ -307,7 +309,7 @@ public class MatchKeyedProcessFunction
timerService.registerEventTimeTimer(current + interval);
}
- private void deleteTimer(final TimerService timerService, final long current){
+ private void deleteTimer(final TimerService timerService, final long current) {
final long interval = TIMER_INTERVAL - (current % TIMER_INTERVAL);
timerService.deleteEventTimeTimer(current + interval);
}