summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchaoc <[email protected]>2023-11-03 11:02:01 +0800
committerchaoc <[email protected]>2023-11-03 11:02:01 +0800
commita310e525e6190ee5a1c52afb882966845f41322d (patch)
tree83d3f331863fa69308a00b2c7f6c7bf704dc6afc
parent2a1690c2fd1654153c11c6ae1e6a3857ffbf1af2 (diff)
style: update the Snowflake Algorithm Library
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/SnowflakeId.java177
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java10
2 files changed, 181 insertions, 6 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/SnowflakeId.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/SnowflakeId.java
new file mode 100644
index 0000000..30abc83
--- /dev/null
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/SnowflakeId.java
@@ -0,0 +1,177 @@
+package com.zdjizhi.schedule.indicator.common;
+
+/**
+ * 雪花算法
+ *
+ * @author qidaijie
+ */
+@SuppressWarnings("FieldCanBeLocal")
+public class SnowflakeId {
+
+ /**
+ * 共 64 位 第一位为符号位 默认 0
+ * 时间戳 39 位 (17 year), centerId (关联每个环境或任务数) 5位 (0-32),
+ * workerId(关联进程):8(0-255) ,序列号:11位(2047/ms)
+ * <p>
+ * 序列号 /ms = (-1L ^ (-1L << 11))
+ * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
+ * 开始时间截 (2020-11-14 00:00:00) max 17years
+ */
+ private final long twepoch = 1693274481297L;
+
+ /**
+ * 机器id所占的位数
+ */
+ private final long workerIdBits = 10L;
+
+ /**
+ * 数据标识id所占的位数
+ */
+ private final long dataCenterIdBits = 3L;
+
+ /**
+ * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
+ * M << n = M * 2^n
+ */
+ private final long maxWorkerId = ~(-1L << workerIdBits);
+
+ /**
+ * 支持的最大数据标识id,结果是31
+ */
+ private final long maxDataCenterId = ~(-1L << dataCenterIdBits);
+
+ /**
+ * 序列在id中占的位数
+ */
+ private final long sequenceBits = 11L;
+
+ /**
+ * 机器ID向左移12位
+ */
+ private final long workerIdShift = sequenceBits;
+
+ /**
+ * 数据标识id向左移17位(14+6)
+ */
+ private final long dataCenterIdShift = sequenceBits + workerIdBits;
+
+ /**
+ * 时间截向左移22位(4+6+14)
+ */
+ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
+
+ /**
+ * 生成序列的掩码,这里为2047
+ */
+ private final long sequenceMask = ~(-1L << sequenceBits);
+
+ /**
+ * 工作机器ID(0~255)
+ */
+ private final long workerId;
+
+ /**
+ * 数据中心ID(0~31)
+ */
+ private final long dataCenterId;
+
+ /**
+ * 毫秒内序列(0~2047)
+ */
+ private long sequence = 0L;
+
+ /**
+ * 上次生成ID的时间截
+ */
+ private long lastTimestamp = -1L;
+
+
+ /**
+ * 设置允许时间回拨的最大限制10s
+ */
+ private static final long rollBackTime = 10000L;
+
+ //==============================Constructors=====================================
+
+ /**
+ * 构造函数
+ */
+ public SnowflakeId(long dataCenterIdNum, long tmpWorkerId) {
+ if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
+ throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+ }
+ if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
+ throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
+ }
+ this.workerId = tmpWorkerId;
+ this.dataCenterId = dataCenterIdNum;
+
+ }
+
+ // ==============================Methods==========================================
+
+ /**
+ * 获得下一个ID (该方法是线程安全的)
+ *
+ * @return SnowflakeId
+ */
+ public synchronized long nextId() {
+ long timestamp = timeGen();
+ //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
+ if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
+ if (timestamp < lastTimestamp) {
+ throw new RuntimeException(
+ String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+ }
+
+ //如果是同一时间生成的,则进行毫秒内序列
+ if (lastTimestamp == timestamp) {
+ sequence = (sequence + 1) & sequenceMask;
+ //毫秒内序列溢出
+ if (sequence == 0) {
+ //阻塞到下一个毫秒,获得新的时间戳
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ }
+ //时间戳改变,毫秒内序列重置
+ else {
+ sequence = 0L;
+ }
+
+ //上次生成ID的时间截
+ lastTimestamp = timestamp;
+
+ //移位并通过或运算拼到一起组成64位的ID
+ return ((timestamp - twepoch) << timestampLeftShift)
+ | (dataCenterId << dataCenterIdShift)
+ | (workerId << workerIdShift)
+ | sequence;
+ }
+
+ /**
+ * 阻塞到下一个毫秒,直到获得新的时间戳
+ *
+ * @param lastTimestamp 上次生成ID的时间截
+ * @return 当前时间戳
+ */
+ private long tilNextMillis(long lastTimestamp) {
+ long timestamp = timeGen();
+ while (timestamp <= lastTimestamp) {
+ timestamp = timeGen();
+ }
+ return timestamp;
+ }
+
+ /**
+ * 返回以毫秒为单位的当前时间
+ *
+ * @return 当前时间(毫秒)
+ */
+ private long timeGen() {
+ return System.currentTimeMillis();
+ }
+
+} \ No newline at end of file
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
index 0d1b8d1..5373f13 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/functions/MatchKeyedProcessFunction.java
@@ -1,10 +1,9 @@
package com.zdjizhi.schedule.indicator.functions;
-import cn.hutool.core.lang.Snowflake;
-import cn.hutool.core.util.IdUtil;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.rule.cache.RuleCache;
import com.zdjizhi.base.rule.info.RuleInfo;
+import com.zdjizhi.schedule.indicator.common.SnowflakeId;
import com.zdjizhi.schedule.indicator.record.MatchGroup;
import com.zdjizhi.schedule.indicator.record.SecurityEvent;
import com.zdjizhi.schedule.indicator.record.SlideAggregate;
@@ -41,15 +40,14 @@ public class MatchKeyedProcessFunction
private transient ValueState<SecurityEvent> eventState;
- private transient Snowflake snowflake;
+ private transient SnowflakeId snowflake;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- snowflake = IdUtil.getSnowflake(
- Math.abs(getRuntimeContext().getJobId().hashCode() % (1 << 5)),
- Math.abs(getRuntimeContext().getTaskName().hashCode() % (1 << 5)));
+ snowflake = new SnowflakeId(Math.abs((long) getRuntimeContext().getJobId().hashCode() % (1 << 3)),
+ getRuntimeContext().getIndexOfThisSubtask());
cache = RuleCache.getInstance();