The problem

(1) How does ZooKeeper implement distributed lock?

(2) What are the advantages of zooKeeper distributed lock?

(3) What are the disadvantages of zooKeeper distributed lock?

Introduction to the

ZooKeeper is a distributed, open source distributed application coordination service that provides consistency services for distributed applications. It is an important component of Hadoop and Hbase, and can be used as a configuration center or registry in micro-service systems.

This chapter introduces how ZooKeeper implements distributed locks in distributed systems.

Basic knowledge of

What is ZNode?

ZooKeeper operates and maintains data nodes, called ZNodes, which are managed in a hierarchical tree structure similar to file systems. If a ZNode contains data, it stores it as byte arrays.

In addition, when multiple clients create the same node at the same time, only one client will succeed, and the other clients will fail to create.

The node type

There are four types of ZNodes:

  • Persistent (disordered)
  • Persistent and orderly
  • Temporary (out-of-order)
  • The temporary order

Among them, persistent nodes will always exist unless manually deleted, and temporary nodes will be automatically deleted when the client session expires.

What is a watcher?

Watcher (event listener) is an important feature in ZooKeeper.

Zookeeper allows users to register some Watcher on the specified node, and when some specific events are triggered, the ZooKeeper server will notify the event to the interested client. This mechanism is an important feature of ZooKeeper to implement the distributed coordination service.

KeeperState EventType The trigger condition instructions operation
SyncConnected (3) None (1) The connection is successfully established between the client and server The client is connected to the server

Same as above NodeCreated (1) The corresponding data node that Watcher listens to is created Same as above Create
Same as above NodeDeleted (2) The corresponding data node Watcher is listening to is deleted Same as above Delete/znode
Same as above NodeDataChanged (3) The data content of the corresponding data node monitored by Watcher changes Same as above setDate/znode
Same as above NodeChildChanged (4) The list of children of the corresponding data node that Wather is listening to has changed Same as above Create/child
Disconnected (0) None (1) The client is disconnected from the ZooKeeper server The client is disconnected from the server
Expired (112) None (1) Session timeout In this case, the client session is invalid and SessionExpiredException usually occurs
AuthFailed (4) None (1) The permission check is performed using an incorrect schema. SASL permission check fails An AuthFailedException is also received

The principle of analytic

Plan a

Since, the same node can only be created once, so, when locking the existence of testing nodes and create does not exist, or create failure is present to monitor this node delete events, so, when the lock is released to monitor the client again to create the node competition, success is to get the lock, no success, to monitor the node again.

For example, if three clients client1, client2, and client3 simultaneously acquire the /locker/user_1 lock, they would run as follows:

(1) Try to create /locker/user_1 at the same time;

Client1 has been successfully created and acquired the lock.

Client2 and client3 failed to be created, listening for /locker/user_1 delete events;

(4) Client1 performs in-lock business logic;

Client1 releases lock, deletes node /locker/user_1;

(6) Both client2 and Client3 caught the deletion event of node /locker/user_1, and both woke up;

Client2 and client3 create /locker/user_1 at the same time;

(8) back to the second step, by analogy [this article by the public number “Tong Elder brother read source code” original];

However, this scheme has a serious drawback – the stampede effect.

If the concurrency value is very high, multiple clients at the same time listen to the same node, the lock is released when awakened so multiple clients at the same time, and then the competition, finally only one can get to the lock, other client again want to sleep, the client doesn’t make any sense to awaken, greatly waste of system resources, so is there a better solution? The answer is yes, look at plan two.

Scheme 2

In order to solve the shock effect in solution 1, we can implement distributed locks in the form of ordered child nodes, and in order to avoid the risk of sudden disconnection after the client obtains the lock, it is necessary to use temporary ordered nodes.

For example, if three clients client1, client2, and client3 simultaneously acquire the /locker/user_1 lock, they would run as follows:

(1) Create temporary ordered child nodes under /locker/user_1/.

/locker/user1/0000000001 /locker/user1/0000000003 /locker/user_1/0000000002

(3) Check whether the node you create is the smallest among the child nodes;

(4) Client1 finds that it is the smallest node and obtains the lock;

(5) Client2 and Client3 discover that they are not the smallest node and cannot acquire the lock;

Client2 creates node as /locker/user1/0000000003 and listens for delete event of its previous node /locker/user1/0000000002.

Client3 creates node as /locker/user1/0000000002 and listens for delete event of its previous node /locker/user1/0000000001.

(8) Client1 performs in-lock business logic;

Client1 release lock, delete node /locker/user_1/0000000001

Client3 wakes up after listening to node /locker/user_1/0000000001 delete event;

Client3 checks again if it is the smallest node, and if it is, the lock is acquired.

(12) Client3 executes locked business logic;

Client3 release lock, delete node /locker/user_1/0000000002;

Client2 wakes up after listening to node /locker/user_1/0000000002 delete event;

Client2 performs in-lock business logic;

/locker/user_1/0000000003;

Client2 checks if there are any children under /locker/user1/ and deletes /locker/user1 if there are any children under /locker/user1/.

(18) Completion of the process;

Compared with scheme 1, this scheme only wakes up one client each time the lock is released, which reduces the cost of thread wake up and improves efficiency.

Zookeeper native API implementation

Pom file

The following JAR packages are introduced in the POM:

< the dependency > < groupId > org. Apache. Zookeeper < / groupId > < artifactId > zookeeper < / artifactId > < version > 2.6.2 < / version > </dependency>Copy the code

Supplementing interface

Define a Locker interface that uses the same interface as mysql distributed locks in the previous chapter.

public interface Locker {
    void lock(String key, Runnable command);
}Copy the code

Zookeeper distributed lock implementation

The internal ZkLockerWatcher class is used to handle zooKeeper operations. Note the following:

(1) Do not perform related operations before zK connection is established, otherwise ConnectionLoss will be reported. Here, locksupport.park () is used; Block the connection thread and wake up processing in the listening thread;

(2) The client thread and the listener thread are not the same thread, so locksupport.park () can be used; And LockSupport. Unpark (thread); To deal with;

(3) Many of the steps in the middle are not atomic (pits), so it needs to be tested again, see the comments in the code;

@Slf4j @Component public class ZkLocker implements Locker { @Override public void lock(String key, Runnable command) { ZkLockerWatcher watcher = ZkLockerWatcher.conn(key); try { if (watcher.getLock()) { command.run(); } } finally { watcher.releaseLock(); }} private static class implements Watcher {public static final String implements Watcher = "127.0.0.1:2181"; public static final int timeout = 6000; public static final String LOCKER_ROOT = "/locker"; ZooKeeper zooKeeper; String parentLockPath; String childLockPath; Thread thread; public static ZkLockerWatcher conn(String key) { ZkLockerWatcher watcher = new ZkLockerWatcher(); try { ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher); watcher.thread = Thread.currentThread(); Locksupport.park (); // If the root node does not exist, create a (concurrency problem, if both threads do not exist, If (zookeeper. exists(LOCKER_ROOT, false) == null) {try {zookeeper. create(LOCKER_ROOT, "".getbytes (), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) {log.info(" failed to create node {} ", LOCKER_ROOT); ParentLockPath = LOCKER_ROOT + "/" + key; if (zooKeeper.exists(watcher.parentLockPath, false) == null) { try { zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) {log.info(" failed to create node {} ", watcher.parentlockpath); } } } catch (Exception e) { log.error("conn to zk error", e); throw new RuntimeException("conn to zk error"); } return watcher; } public Boolean getLock() {try {this.childlockPath = zookeeper. create(parentLockPath +);} public Boolean getLock() {try {this.childlockPath = zookeeper. create(parentLockPath +) "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Return getLockOrWatchLast(); return getLockOrWatchLast(); } catch (Exception e) { log.error("get lock error", e); throw new RuntimeException("get lock error"); } finally { // System.out.println("getLock: " + childLockPath); } } public void releaseLock() { try { if (childLockPath ! Zookeeper. delete(childLockPath, -1); } // List<String> children = zookeeper. getChildren(parentLockPath, false); if (children.isEmpty()) { try { zooKeeper.delete(parentLockPath, -1); } catch (KeeperException e) {// Fail to delete log.info(" failed to delete node {} ", parentLockPath); // Fail to delete log.info(" failed to delete node {} ", parentLockPath); }} // Close zk connection if (zooKeeper! = null) { zooKeeper.close(); } } catch (Exception e) { log.error("release lock error", e); throw new RuntimeException("release lock error"); } finally { // System.out.println("releaseLock: " + childLockPath); } } private boolean getLockOrWatchLast() throws KeeperException, InterruptedException { List<String> children = zooKeeper.getChildren(parentLockPath, false); Collections.sort(children); // Collections.sort(children); If ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {return true; } // If it is not the first child, listen on the previous node String last = ""; for (String child : children) { if ((parentLockPath + "/" + child).equals(childLockPath)) { break; } last = child; } if (zooKeeper.exists(parentLockPath + "/" + last, true) ! = null) { this.thread = Thread.currentThread(); // Block the current thread locksupport.park (); Return getLockOrWatchLast(); return getLockOrWatchLast(); } else {return getLockOrWatchLast();} else {return getLockOrWatchLast(); } } @Override public void process(WatchedEvent event) { if (this.thread ! = null) {// Wake up the blocking thread (this is a listener thread, not the same thread that acquired the lock) locksupport.unpark (this.thread); this.thread = null; }}}}Copy the code

The test code

We have two batches of threads, one to acquire the lock user1, one to acquire the lock user2.

@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class ZkLockerTest { @Autowired private  Locker locker; @Test public void testZkLocker() throws IOException { for (int i = 0; i < 1000; i++) { new Thread(()->{ locker.lock("user_1", ()-> { try { System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName())); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }}); }, "Thread-"+i).start(); } for (int i = 1000; i < 2000; i++) { new Thread(()->{ locker.lock("user_2", ()-> { try { System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName())); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }}); }, "Thread-"+i).start(); } System.in.read(); }}Copy the code

Running results:

You can see the result of printing two locks stable at around 500ms.

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621Copy the code

Curator implementation

The native API implementation above makes it easier to understand the logic of zooKeeper’s distributed lock implementation, but it inevitably ensures that there are no problems, such as no reentrent locks, no read/write locks, etc.

So let’s take a look at how existing wheeled curator works.

Pom file

The following JAR packages are introduced into the POM file:

< the dependency > < groupId > org. Apache. Curator < / groupId > < artifactId > curator - recipes < / artifactId > < version > 4.0.0 < / version > </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> The < version > 4.0.0 < / version > < / dependency >Copy the code

Code implementation

Here is an implementation of the mutex:

@Component @Slf4j public class ZkCuratorLocker implements Locker { public static final String connAddr = "127.0.0.1:2181"; public static final int timeout = 6000; public static final String LOCKER_ROOT = "/locker"; private CuratorFramework cf; @PostConstruct public void init() { this.cf = CuratorFrameworkFactory.builder() .connectString(connAddr) .sessionTimeoutMs(timeout) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); cf.start(); } @Override public void lock(String key, Runnable command) { String path = LOCKER_ROOT + "/" + key; InterProcessLock lock = new InterProcessMutex(cf, path); Try {// [查 看 原 文] Lock.acquire (); command.run(); } catch (Exception e) { log.error("get lock error", e); throw new RuntimeException("get lock error", e); } finally { try { lock.release(); } catch (Exception e) { log.error("release lock error", e); throw new RuntimeException("release lock error", e); }}}}Copy the code

In addition to mutex, curator provides read-write locks, multiple locks, semaphores, and so on, and they are reentrant locks.

conclusion

(1) There are four types of nodes in ZooKeeper: persistent, persistent and ordered, temporary and temporary ordered;

(2) ZooKeeper provides a very important feature — monitoring mechanism, which can be used to monitor the changes of nodes;

(3) ZooKeeper distributed lock is implemented based on temporary ordered node + monitoring mechanism;

(4) When the ZooKeeper distributed lock is added, a temporary ordered node is created under the lock path.

(5) If they are the first node, then get the lock;

(6) If it is not the first node, it listens on the previous node and blocks the current thread;

(7) When listening to the deletion event of the previous node, wake up the thread of the current node and check again whether it is the first node;

(8) The purpose of using temporary ordered nodes instead of permanent ordered nodes is to release the lock automatically when the client disconnects without reason;

eggs

What are the advantages of the ZooKeeper distributed lock?

A: 1) ZooKeeper itself can be deployed in a cluster, which is more reliable than mysql’s single point.

2) It will not occupy the number of mysql connections, and will not increase the pressure of mysql;

3) Use the listening mechanism to reduce the number of thread context switching;

4) Client disconnection can automatically release the lock, very safe;

5) There is an existing wheel curator to use;

6) The toth implementation is reentrant, and the reconstruction cost of the existing code is small;

What are the disadvantages of the ZooKeeper distributed lock?

A: 1) Locking will frequently “write” ZooKeeper, increasing the pressure on ZooKeeper.

2) When zooKeeper is written, synchronization is performed in the cluster. The more nodes there are, the slower the synchronization is and the slower the lock acquisition process is.

3) The system needs to rely on ZooKeeper, but most services do not use ZooKeeper, which increases the complexity of the system.

4) Compared with redis distributed lock, the performance is slightly worse;

Recommended reading

The beginning of the Java Synchronization series

2, Unbroadening Java magic class parsing

JMM (Java Memory Model)

Volatile parsing of the Java Synchronization series

Synchronized parsing of Java series

6, Deadknock Java synchronization series write a Lock Lock yourself

7. AQS of The Java Synchronization series

ReentrantLock (a) — fair lock, unfair lock

ReentrantLock – Conditional lock

ReentrantLock VS Synchronized Java series

ReentrantReadWriteLock source code parsing

Semaphore source code analysis of Semaphore Java synchronization series

CountDownLatch source code parsing

The AQS finale of the Java Sync series

Java synchronization series StampedLock source code parsing

CyclicBarrier Java synchronization series source code parsing

Phaser source code analysis for Java Sync series

Mysql distributed lock for Java synchronous series

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.