summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author顾金凯 <[email protected]>2023-10-18 05:57:38 +0000
committer顾金凯 <[email protected]>2023-10-18 05:57:38 +0000
commit0d2d31c30b79f71aad8448df13c3c153b325a301 (patch)
tree7c54e48f8da9ec310b9dcbe7570993ad1ab29ebe
parentf4d222161c6cba815988068d079db60f52d53df9 (diff)
parent73f0c96237c400913117b89d89c5a5ccb16a5d57 (diff)
Merge branch 'feature/indicator-match' into '23.10'
fix: some bugfix See merge request cyber_narrator/flink-stream-schedule-platform!14
-rw-r--r--module-CN-indicator-match/pom.xml6
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java15
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java2
3 files changed, 19 insertions, 4 deletions
diff --git a/module-CN-indicator-match/pom.xml b/module-CN-indicator-match/pom.xml
index 8613147..db7e1d8 100644
--- a/module-CN-indicator-match/pom.xml
+++ b/module-CN-indicator-match/pom.xml
@@ -21,6 +21,12 @@
</dependency>
<dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.8.22</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
index fbcef44..8e9a407 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
@@ -1,9 +1,10 @@
package com.zdjizhi.schedule.indicator.functions;
+import cn.hutool.core.lang.Snowflake;
+import cn.hutool.core.util.IdUtil;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.rule.cache.RuleCache;
import com.zdjizhi.base.rule.info.RuleInfo;
-import com.zdjizhi.base.utils.SnowflakeId;
import com.zdjizhi.schedule.indicator.record.MatchGroup;
import com.zdjizhi.schedule.indicator.record.SecurityEvent;
import com.zdjizhi.schedule.indicator.record.SlideAggregate;
@@ -33,16 +34,24 @@ public class MatchKeyedProcessFunction
private final static long TIMER_INTERVAL = 30 * 1000L;
- private final transient RuleCache cache = RuleCache.getInstance();
+ private transient RuleCache cache;
private transient ValueState<LimitedSizeList<SlideAggregate<CnRecordLog>>> slideAggregateState;
private transient ValueState<SecurityEvent> eventState;
+ private transient Snowflake snowflake;
+
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+ snowflake = IdUtil.getSnowflake(
+ Math.abs(getRuntimeContext().getJobId().hashCode() % (1 << 5)),
+ Math.abs(getRuntimeContext().getTaskName().hashCode() % (1 << 5)));
+
+ cache = RuleCache.getInstance();
+
final ValueStateDescriptor<LimitedSizeList<SlideAggregate<CnRecordLog>>> listStateDescriptor =
new ValueStateDescriptor<>("slide-aggregate-state",
TypeInformation.of(
@@ -165,7 +174,7 @@ public class MatchKeyedProcessFunction
}
final SecurityEvent event = new SecurityEvent();
- event.setEventId(SnowflakeId.generateId());
+ event.setEventId(snowflake.nextId());
event.setEventType(info.getEventType());
event.setEventName(info.getName());
event.setEventKey(group.toString());
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java
index aec0a7e..cfb3c57 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java
@@ -26,7 +26,7 @@ public class SlideAggregate<T> {
private int times;
- private long firstTime;
+ private long firstTime = Long.MAX_VALUE;
private long lastTime;