summaryrefslogtreecommitdiff
path: root/Zookeeper/DistributedLock.java
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2019-08-09 16:31:25 +0800
committerqidaijie <[email protected]>2019-08-09 16:31:25 +0800
commitd2e310cda068365ca87049b8c4d914ea7046ee69 (patch)
treef316d2714aacf40e2319f4878544bceb41d25346 /Zookeeper/DistributedLock.java
parentb765ffee7bf37daeea60509b4f2c0003f5f0d93e (diff)
提交 雪花算法代码,zookeeper代码
Diffstat (limited to 'Zookeeper/DistributedLock.java')
-rw-r--r--Zookeeper/DistributedLock.java215
1 files changed, 215 insertions, 0 deletions
diff --git a/Zookeeper/DistributedLock.java b/Zookeeper/DistributedLock.java
new file mode 100644
index 0000000..e57b0a5
--- /dev/null
+++ b/Zookeeper/DistributedLock.java
@@ -0,0 +1,215 @@
+package cn.ac.iie.utils.zookeeper;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.system.SnowflakeId;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author qidaijie
+ */
+public class DistributedLock implements Lock, Watcher {
+ private static Logger logger = Logger.getLogger(DistributedLock.class);
+
+ private ZooKeeper zk = null;
+ /**
+ * 根节点
+ */
+ private final String ROOT_LOCK = "/locks";
+ /**
+ * 竞争的资源
+ */
+ private String lockName;
+ /**
+ * 等待的前一个锁
+ */
+ private String waitLock;
+ /**
+ * 当前锁
+ */
+ private String currentLock;
+ /**
+ * 计数器
+ */
+ private CountDownLatch countDownLatch;
+
+ private int sessionTimeout = 2000;
+
+ private List<Exception> exceptionList = new ArrayList<Exception>();
+
+ /**
+ * 配置分布式锁
+ *
+ * @param config 连接的url
+ * @param lockName 竞争资源
+ */
+ public DistributedLock(String config, String lockName) {
+ this.lockName = lockName;
+ try {
+ // 连接zookeeper
+ zk = new ZooKeeper(config, sessionTimeout, this);
+ Stat stat = zk.exists(ROOT_LOCK, false);
+ if (stat == null) {
+ // 如果根节点不存在,则创建根节点
+ zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (IOException | InterruptedException | KeeperException e) {
+ logger.error("Node already exists!");
+ }
+ }
+
+ // 节点监视器
+ @Override
+ public void process(WatchedEvent event) {
+ if (this.countDownLatch != null) {
+ this.countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void lock() {
+ if (exceptionList.size() > 0) {
+ throw new LockException(exceptionList.get(0));
+ }
+ try {
+ if (this.tryLock()) {
+ System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
+ } else {
+ // 等待锁
+ waitForLock(waitLock, sessionTimeout);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ String splitStr = "_lock_";
+ if (lockName.contains(splitStr)) {
+ throw new LockException("锁名有误");
+ }
+ // 创建临时有序节点
+ currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ // 取所有子节点
+ List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
+ // 取出所有lockName的锁
+ List<String> lockObjects = new ArrayList<String>();
+ for (String node : subNodes) {
+ String tmpNode = node.split(splitStr)[0];
+ if (tmpNode.equals(lockName)) {
+ lockObjects.add(node);
+ }
+ }
+ Collections.sort(lockObjects);
+ // 若当前节点为最小节点,则获取锁成功
+ if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
+ return true;
+ }
+ // 若不是最小节点,则找到自己的前一个节点
+ String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
+ waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) {
+ try {
+ if (this.tryLock()) {
+ return true;
+ }
+ return waitForLock(waitLock, timeout);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ // 等待锁
+ private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
+
+ if (stat != null) {
+ this.countDownLatch = new CountDownLatch(1);
+ // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
+ this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
+ this.countDownLatch = null;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ zk.delete(currentLock, -1);
+ currentLock = null;
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ this.lock();
+ }
+
+
+ public class LockException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockException(String e) {
+ super(e);
+ }
+
+ public LockException(Exception e) {
+ super(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ DistributedLock lock = null;
+ try {
+// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+// lock.lock();
+ System.out.println(SnowflakeId.generateId());
+ } finally {
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+ }
+ };
+
+ for (int i = 0; i < 10; i++) {
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+ }
+} \ No newline at end of file