diff options
| author | 顾金凯 <[email protected]> | 2023-10-18 05:57:38 +0000 |
|---|---|---|
| committer | 顾金凯 <[email protected]> | 2023-10-18 05:57:38 +0000 |
| commit | 0d2d31c30b79f71aad8448df13c3c153b325a301 (patch) | |
| tree | 7c54e48f8da9ec310b9dcbe7570993ad1ab29ebe | |
| parent | f4d222161c6cba815988068d079db60f52d53df9 (diff) | |
| parent | 73f0c96237c400913117b89d89c5a5ccb16a5d57 (diff) | |
Merge branch 'feature/indicator-match' into '23.10'
fix: some bugfix
See merge request cyber_narrator/flink-stream-schedule-platform!14
3 files changed, 19 insertions, 4 deletions
diff --git a/module-CN-indicator-match/pom.xml b/module-CN-indicator-match/pom.xml index 8613147..db7e1d8 100644 --- a/module-CN-indicator-match/pom.xml +++ b/module-CN-indicator-match/pom.xml @@ -21,6 +21,12 @@ </dependency> <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.8.22</version> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java index fbcef44..8e9a407 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java @@ -1,9 +1,10 @@ package com.zdjizhi.schedule.indicator.functions; +import cn.hutool.core.lang.Snowflake; +import cn.hutool.core.util.IdUtil; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.rule.cache.RuleCache; import com.zdjizhi.base.rule.info.RuleInfo; -import com.zdjizhi.base.utils.SnowflakeId; import com.zdjizhi.schedule.indicator.record.MatchGroup; import com.zdjizhi.schedule.indicator.record.SecurityEvent; import com.zdjizhi.schedule.indicator.record.SlideAggregate; @@ -33,16 +34,24 @@ public class MatchKeyedProcessFunction private final static long TIMER_INTERVAL = 30 * 1000L; - private final transient RuleCache cache = RuleCache.getInstance(); + private transient RuleCache cache; private transient ValueState<LimitedSizeList<SlideAggregate<CnRecordLog>>> slideAggregateState; private transient ValueState<SecurityEvent> eventState; + private transient Snowflake snowflake; + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + snowflake = IdUtil.getSnowflake( + Math.abs(getRuntimeContext().getJobId().hashCode() % (1 << 5)), + Math.abs(getRuntimeContext().getTaskName().hashCode() % (1 << 5))); + + cache = RuleCache.getInstance(); + final ValueStateDescriptor<LimitedSizeList<SlideAggregate<CnRecordLog>>> listStateDescriptor = new ValueStateDescriptor<>("slide-aggregate-state", TypeInformation.of( @@ -165,7 +174,7 @@ public class MatchKeyedProcessFunction } final SecurityEvent event = new SecurityEvent(); - event.setEventId(SnowflakeId.generateId()); + event.setEventId(snowflake.nextId()); event.setEventType(info.getEventType()); event.setEventName(info.getName()); event.setEventKey(group.toString()); diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java index aec0a7e..cfb3c57 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/record/SlideAggregate.java @@ -26,7 +26,7 @@ public class SlideAggregate<T> { private int times; - private long firstTime; + private long firstTime = Long.MAX_VALUE; private long lastTime; |
