summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2019-08-09 16:31:25 +0800
committerqidaijie <[email protected]>2019-08-09 16:31:25 +0800
commitd2e310cda068365ca87049b8c4d914ea7046ee69 (patch)
treef316d2714aacf40e2319f4878544bceb41d25346
parentb765ffee7bf37daeea60509b4f2c0003f5f0d93e (diff)
提交 雪花算法代码,zookeeper代码
-rw-r--r--SnowFlake/SnowflakeId.java189
-rw-r--r--Zookeeper/DistributedLock.java215
-rw-r--r--Zookeeper/ZookeeperUtils.java135
3 files changed, 539 insertions, 0 deletions
diff --git a/SnowFlake/SnowflakeId.java b/SnowFlake/SnowflakeId.java
new file mode 100644
index 0000000..e697202
--- /dev/null
+++ b/SnowFlake/SnowflakeId.java
@@ -0,0 +1,189 @@
+package cn.ac.iie.utils.system;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.zookeeper.DistributedLock;
+import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * 雪花算法
+ *
+ * @author qidaijie
+ */
+public class SnowflakeId {
+ private static Logger logger = Logger.getLogger(SnowflakeId.class);
+
+ // ==============================Fields===========================================
+ /**
+ * 开始时间截 (2018-08-01 00:00:00) max 17years
+ */
+ private final long twepoch = 1564588800000L;
+
+ /**
+ * 机器id所占的位数
+ */
+ private final long workerIdBits = 6L;
+
+ /**
+ * 数据标识id所占的位数
+ */
+ private final long dataCenterIdBits = 4L;
+
+ /**
+ * 支持的最大机器id,结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
+ */
+ private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
+
+ /**
+ * 支持的最大数据标识id,结果是15
+ */
+ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
+
+ /**
+ * 序列在id中占的位数
+ */
+ private final long sequenceBits = 14L;
+
+ /**
+ * 机器ID向左移12位
+ */
+ private final long workerIdShift = sequenceBits;
+
+ /**
+ * 数据标识id向左移17位(14+6)
+ */
+ private final long dataCenterIdShift = sequenceBits + workerIdBits;
+
+ /**
+ * 时间截向左移22位(4+6+14)
+ */
+ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
+
+ /**
+ * 生成序列的掩码,这里为16383
+ */
+ private final long sequenceMask = -1L ^ (-1L << sequenceBits);
+
+ /**
+ * 工作机器ID(0~63)
+ */
+ private long workerId;
+
+ /**
+ * 数据中心ID(0~15)
+ */
+ private long dataCenterId;
+
+ /**
+ * 毫秒内序列(0~16383)
+ */
+ private long sequence = 0L;
+
+ /**
+ * 上次生成ID的时间截
+ */
+ private long lastTimestamp = -1L;
+
+
+ private static SnowflakeId idWorker;
+
+ private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ static {
+ idWorker = new SnowflakeId();
+ }
+
+ //==============================Constructors=====================================
+
+ /**
+ * 构造函数
+ */
+ private SnowflakeId() {
+ DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+ lock.lock();
+ int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
+ if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
+ throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+ }
+ if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) {
+ throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
+ }
+ this.workerId = tmpWorkerId;
+ this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
+ }
+
+ // ==============================Methods==========================================
+
+ /**
+ * 获得下一个ID (该方法是线程安全的)
+ *
+ * @return SnowflakeId
+ */
+ private synchronized long nextId() {
+ long timestamp = timeGen();
+
+ //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
+ if (timestamp < lastTimestamp) {
+ throw new RuntimeException(
+ String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+ }
+
+ //如果是同一时间生成的,则进行毫秒内序列
+ if (lastTimestamp == timestamp) {
+ sequence = (sequence + 1) & sequenceMask;
+ //毫秒内序列溢出
+ if (sequence == 0) {
+ //阻塞到下一个毫秒,获得新的时间戳
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ }
+ //时间戳改变,毫秒内序列重置
+ else {
+ sequence = 0L;
+ }
+
+ //上次生成ID的时间截
+ lastTimestamp = timestamp;
+
+ //移位并通过或运算拼到一起组成64位的ID
+ return ((timestamp - twepoch) << timestampLeftShift)
+ | (dataCenterId << dataCenterIdShift)
+ | (workerId << workerIdShift)
+ | sequence;
+ }
+
+ /**
+ * 阻塞到下一个毫秒,直到获得新的时间戳
+ *
+ * @param lastTimestamp 上次生成ID的时间截
+ * @return 当前时间戳
+ */
+ protected long tilNextMillis(long lastTimestamp) {
+ long timestamp = timeGen();
+ while (timestamp <= lastTimestamp) {
+ timestamp = timeGen();
+ }
+ return timestamp;
+ }
+
+ /**
+ * 返回以毫秒为单位的当前时间
+ *
+ * @return 当前时间(毫秒)
+ */
+ protected long timeGen() {
+ return System.currentTimeMillis();
+ }
+
+
+ /**
+ * 静态工具类
+ *
+ * @return
+ */
+ public static Long generateId() {
+ return idWorker.nextId();
+ }
+
+
+} \ No newline at end of file
diff --git a/Zookeeper/DistributedLock.java b/Zookeeper/DistributedLock.java
new file mode 100644
index 0000000..e57b0a5
--- /dev/null
+++ b/Zookeeper/DistributedLock.java
@@ -0,0 +1,215 @@
+package cn.ac.iie.utils.zookeeper;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.system.SnowflakeId;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author qidaijie
+ */
+public class DistributedLock implements Lock, Watcher {
+ private static Logger logger = Logger.getLogger(DistributedLock.class);
+
+ private ZooKeeper zk = null;
+ /**
+ * 根节点
+ */
+ private final String ROOT_LOCK = "/locks";
+ /**
+ * 竞争的资源
+ */
+ private String lockName;
+ /**
+ * 等待的前一个锁
+ */
+ private String waitLock;
+ /**
+ * 当前锁
+ */
+ private String currentLock;
+ /**
+ * 计数器
+ */
+ private CountDownLatch countDownLatch;
+
+ private int sessionTimeout = 2000;
+
+ private List<Exception> exceptionList = new ArrayList<Exception>();
+
+ /**
+ * 配置分布式锁
+ *
+ * @param config 连接的url
+ * @param lockName 竞争资源
+ */
+ public DistributedLock(String config, String lockName) {
+ this.lockName = lockName;
+ try {
+ // 连接zookeeper
+ zk = new ZooKeeper(config, sessionTimeout, this);
+ Stat stat = zk.exists(ROOT_LOCK, false);
+ if (stat == null) {
+ // 如果根节点不存在,则创建根节点
+ zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (IOException | InterruptedException | KeeperException e) {
+ logger.error("Node already exists!");
+ }
+ }
+
+ // 节点监视器
+ @Override
+ public void process(WatchedEvent event) {
+ if (this.countDownLatch != null) {
+ this.countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void lock() {
+ if (exceptionList.size() > 0) {
+ throw new LockException(exceptionList.get(0));
+ }
+ try {
+ if (this.tryLock()) {
+ System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
+ } else {
+ // 等待锁
+ waitForLock(waitLock, sessionTimeout);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ String splitStr = "_lock_";
+ if (lockName.contains(splitStr)) {
+ throw new LockException("锁名有误");
+ }
+ // 创建临时有序节点
+ currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ // 取所有子节点
+ List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
+ // 取出所有lockName的锁
+ List<String> lockObjects = new ArrayList<String>();
+ for (String node : subNodes) {
+ String tmpNode = node.split(splitStr)[0];
+ if (tmpNode.equals(lockName)) {
+ lockObjects.add(node);
+ }
+ }
+ Collections.sort(lockObjects);
+ // 若当前节点为最小节点,则获取锁成功
+ if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
+ return true;
+ }
+ // 若不是最小节点,则找到自己的前一个节点
+ String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
+ waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) {
+ try {
+ if (this.tryLock()) {
+ return true;
+ }
+ return waitForLock(waitLock, timeout);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ // 等待锁
+ private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
+
+ if (stat != null) {
+ this.countDownLatch = new CountDownLatch(1);
+ // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
+ this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
+ this.countDownLatch = null;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ zk.delete(currentLock, -1);
+ currentLock = null;
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ this.lock();
+ }
+
+
+ public class LockException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockException(String e) {
+ super(e);
+ }
+
+ public LockException(Exception e) {
+ super(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ DistributedLock lock = null;
+ try {
+// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+// lock.lock();
+ System.out.println(SnowflakeId.generateId());
+ } finally {
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+ }
+ };
+
+ for (int i = 0; i < 10; i++) {
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+ }
+} \ No newline at end of file
diff --git a/Zookeeper/ZookeeperUtils.java b/Zookeeper/ZookeeperUtils.java
new file mode 100644
index 0000000..612c4e7
--- /dev/null
+++ b/Zookeeper/ZookeeperUtils.java
@@ -0,0 +1,135 @@
+package cn.ac.iie.utils.zookeeper;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author qidaijie
+ */
+public class ZookeeperUtils implements Watcher {
+ private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
+
+ private ZooKeeper zookeeper;
+
+ private static final int SESSION_TIME_OUT = 20000;
+
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+ countDownLatch.countDown();
+ }
+ }
+
+
+ /**
+ * 修改节点信息
+ *
+ * @param path 节点路径
+ */
+ public int modifyNode(String path) {
+ createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ int workerId;
+ try {
+ connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
+ Stat stat = zookeeper.exists(path, true);
+ workerId = Integer.parseInt(getNodeDate(path));
+ if (workerId > 55) {
+ workerId = 0;
+ zookeeper.setData(path, "1".getBytes(), stat.getVersion());
+ } else {
+ String result = String.valueOf(workerId + 1);
+ if (stat != null) {
+ zookeeper.setData(path, result.getBytes(), stat.getVersion());
+ } else {
+ logger.error("Node does not exist!,Can't modify");
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ e.printStackTrace();
+ workerId = RandomUtils.nextInt(56, 63);
+ } finally {
+ closeConn();
+ }
+ logger.error("工作ID是:" + workerId);
+ return workerId;
+ }
+
+ /**
+ * 连接zookeeper
+ *
+ * @param host 地址
+ */
+ private void connectZookeeper(String host) {
+ try {
+ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
+ countDownLatch.await();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 关闭连接
+ */
+ private void closeConn() {
+ try {
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 获取节点内容
+ *
+ * @param path 节点路径
+ * @return 内容/异常null
+ */
+ private String getNodeDate(String path) {
+ String result = null;
+ Stat stat = new Stat();
+ try {
+ byte[] resByte = zookeeper.getData(path, true, stat);
+ result = new String(resByte);
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("Get node information exception");
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ /**
+ * @param path 节点创建的路径
+ * @param date 节点所存储的数据的byte[]
+ * @param acls 控制权限策略
+ */
+ private void createNode(String path, byte[] date, List<ACL> acls) {
+ try {
+ connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
+ Stat exists = zookeeper.exists(path, true);
+ if (exists == null) {
+ zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
+ } else {
+ logger.warn("Node already exists!,Don't need to create");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ closeConn();
+ }
+ }
+
+}