In many scenarios, data consistency is an important topic that can be addressed in a stand-alone environment through the concurrency apis provided by Java; But in the distributed environment (will encounter network failure, message duplication, message loss and other problems) is much more complex, the common solution is distributed transaction, distributed lock, etc.

This paper mainly discusses how to use Zookeeper to realize distributed lock.

About Distributed Locking

Distributed locking is a way to control synchronous access to shared resources between distributed systems.

Note the following when implementing distributed locks:

  • Reentrancy of locks (recursive calls should not block to avoid deadlocks)

  • Timeout of locks (to avoid unexpected situations such as deadlocks and loops)

  • Blocking of locks (atomicity, etc.)

  • Lock features support (blocking locks, reentrant locks, fair locks, interlocks, semaphores, read-write locks)

Note when using distributed locks:

  • Cost of distributed locks (Distributed locks are usually not used at all, and optimistic locks can be used instead in some scenarios)

  • Granularity of locking (Controlling the granularity of locking can optimize system performance)

  • Lock the way

The following are several common solutions for implementing distributed locks and their advantages and disadvantages.

Database-based

1. Based on database tables

The simplest approach is probably to simply create a lock table. When we want to lock a method or resource, we add a record to the table and delete the record when we want to release the lock. By adding a unique constraint to a field, if multiple requests are submitted to the database at the same time, and the database guarantees that only one operation will succeed, we can assume that the thread that succeeded in the operation has acquired the lock and can execute the method body content.

Problems such as database single point, no failure time, no blocking, no reentrant will be introduced.

2. Database based exclusive locking

If you are using MySql InnoDB engine, behind the query statement increase for update, the database will be in the process of query (via a unique index query) to a database table exclusive locks, we could get the exclusive lock that threads can obtain a distributed lock, Release the lock with the connection.mit () operation.

Problems such as database singleton, non-reentrant, inability to guarantee the use of row locks (in some cases MySQL automatically uses table locks instead of row locks), and the use of exclusive locks that are not committed for a long time may occupy database connections.

3. Database implementation of distributed lock summary

Advantages:

  • Use the database directly, easy to understand.

Disadvantages:

  • More problems will be introduced, making the whole scheme more and more complicated

  • Operating the database requires some overhead and has certain performance problems

  • Using row-level locking for a database is not always a good idea, especially if our lock table is not large

Based on the cache

Compared to distributed locking based on database, cache-based implementation is better in terms of performance. There are many mature caching products out there, including Redis, memcached, Tair, and more.

Here, Redis is used as an example to give several implementation methods:

1. Distributed lock based on redis setnx() and expire() methods

Setnx is SET if Not Exists, which has two parameters setnx(key, value). This method is atomic. If the key does not exist, the current key is set successfully and returns 1. If the current key already exists, 0 is returned after setting the current key fails.

Expire sets the expiration time. Note that the setnx command does not set the expiration time of the key, but only through expire().

2. Distributed lock based on redis setnx(), get(), getSet () methods

The getset command takes two arguments getSet (key, newValue). This method is atomic. It sets the newValue of the key and returns the old value of the key.

3. Distributed lock based on Redlock

Redlock is a cluster mode Redis distributed lock given by Antirez, the author of Redis. It is based on N completely independent Redis nodes (usually N can be set to 5).

4. Distributed lock based on Redisson

Redisson is redis official distributed lock components, making address: https://github.com/redisson/redisson

Implement distributed lock summary based on cache

Advantages:

  • Performance is good

Disadvantages:

  • There are too many factors to consider in the implementation

  • Controlling lock failure time by timeout is not very reliable

Based on a Zookeeper

The general idea is that when each client locks a method, a unique temporary ordered node is generated in the directory of the specified node corresponding to the method on Zookeeper. The way to determine whether to obtain the lock is very simple, just need to determine the smallest serial number in the ordered node. When the lock is released, only the temporary node is removed. At the same time, it can avoid deadlock problems caused by locks that cannot be released due to service downtime

Zookeeper implements distributed locking summarization

Advantages:

  • Effectively solve single point problems, non-reentrant problems, non-blocking problems and locks that cannot be released

  • Relatively simple implementation

Disadvantages:

  • Performance is inferior to distributed locks implemented with caching because temporary nodes are created and destroyed dynamically each time a lock is created and released

  • Be familiar with the working principles of Zookeeper

How does Zookeeper implement distributed locks?

Here’s how to implement exclusive and shared locks, and how to solve herding.

Exclusive lock

Exclusive lock, also known as write lock or exclusive lock. If transaction T1 holds an exclusive lock on data object O1, then only transaction T1 is allowed to read or update O1 during the entire lock. No other task transaction is allowed to do anything on this data object until T1 releases the exclusive lock.

The core of exclusive locking is to ensure that one and only one transaction currently has the lock, and after the lock is released, all transactions waiting to acquire the lock can be notified.

The strong consistency feature of Zookeeper ensures global uniqueness when creating nodes in a distributed environment with high concurrency. That is, Zookeeper ensures that clients cannot repeatedly create an existing data node. You can use the Zookeeper feature to implement exclusive locking.

  • Define locks: A lock is represented by a data node on Zookeeper

  • Obtain the lock: the client can call the create method to create a temporary node representing the lock. It can be considered that the successfully created client has obtained the lock. At the same time, the node that did not obtain the lock can register Watcher listener on this node, so as to monitor the changes of the lock node in real time

  • Release lock: The lock can be released in either of the following ways

    • The temporary node on Zookeeper can be removed if the client currently holding the lock goes down or fails

    • After the service logic is executed, the client deletes the created temporary node

Zookeeper based exclusive lock process:

Zookeeper based on the implementation of exclusive lock process

A Shared lock

A shared lock is also called a read lock. If transaction T1 holds a shared lock on data object O1, then the current transaction can only read O1, and other transactions can only hold a shared lock on the data object until all shared locks on the data object are released.

The difference between a shared lock and an exclusive lock is that with an exclusive lock, the data object is visible only to the current transaction, whereas with a shared lock, the data object is visible to all transactions.

  • Define lock: A lock is represented by a data node on Zookeeper, which is a temporary sequential node similar to/lockPath /[hostname] -request type-serial number

  • Obtain the lock: The client creates a temporary sequential node representing the lock by calling the create method. For read requests, the/lockPath /[hostname] -r – ordinal node is created, and for write requests, the/lockPath /[hostname] -w – ordinal node is created

  • Judging the order of reading and writing: there are about 4 steps

    • 1) After the node is created, get all the child nodes under the ‘/lockpath’ node and register Watcher listeners for the child node changes on the node

    • 2) Determine the sequence of its node serial number among all child nodes

    • 3.1) For read requests: 1. If there is no child node with a lower serial number, or all children nodes with a lower serial number are read requests, the shared lock is successfully obtained and the read logic is started. 2. If there is a write request from a child node with a smaller number than its own, wait 3.

    • 3.2) For write requests, if it is not the node with the smallest ordinal number, wait

    • 4) After receiving the Watcher notification, repeat Step 1.)

  • Release lock: Consistent with exclusive lock logic

Zookeeper implements a shared lock node tree

Zookeeper based shared lock process:

Implement shared lock flow based on Zookeeper

Herd behaviour

The first step in implementing a shared lock is to get all the child nodes under the /lockpath node after the node is created, and register Watcher listeners for the child node changes on that node. So, any time a client removes a shared lock, Zookeeper will send Watcher notifications of child node changes to all machines, and the system will have a lot of “Watcher notifications “and” child node list fetch “repeated. All nodes then determine whether they are the node with the smallest sequence number (write request) or whether all children nodes with smaller sequence numbers are read requests (read request) and continue to wait for the next notification.

However, many of these repeated operations are “useless”, in fact each lock competitor only needs to pay attention to the existence of the node with the lower serial number

When the cluster size is large, these “useless” operations not only have a huge impact on Zookeeper performance and network impact, but more seriously, if multiple clients release shared locks at the same time, The Zookeeper server sends a large number of event notifications to the rest of the clients in a short period of time — this is known as “herding.”

Improved distributed lock implementation:

The concrete implementation is as follows:

  • 1. The client calls the create method to create a temporary order node similar to/lockPath /[hostname] -request type-serial number

  • 2. The client calls the getChildren method to get a list of all created child nodes (no Watcher is registered here).

  • 3. If you can’t get any shared locks, call exist to register Watcher with the node that is smaller than you

    • Read requests: Register Watcher listeners with the last write request node smaller than its own sequence number

    • Write requests: Register Watcher listeners with the last node with a lower sequence number

  • 4. Wait until Watcher listens, and go to Step 2

Watcher monitor diagram before and after Zookeeper herd effect improvement

Zookeeper herd effect before and after improvement

Distributed locking based on the Curator client

Apache Curator is an open source client of Zookeeper, which provides abstract encapsulation of various Zookeeper application scenarios (Recipe, such as shared lock service, master election, distributed counter, etc.). Next, a class provided by Curator will be used to implement distributed lock.

There are five classes for distributed locking provided by Curator:

  • Shared Reentrant Lock Allows Reentrant locking

  • Shared Lock A Shared non-reentrant Lock

  • Shared Reentrant Read Write Lock Shared Reentrant Read Write Lock

  • Shared Semaphore

  • Multi Shared Lock Multiple locks

Regarding error handling: It is still highly recommended to use ConnectionStateListener to handle connection state changes. You no longer have locks when connecting to LOST.

Reentrant lock

Shared Reentrant Lock a Shared Reentrant Lock is a global Reentrant Lock that can be requested by all clients. The same client can obtain a Shared Reentrant Lock multiple times without blocking. It is implemented by the class InterProcessMutex, whose main methods:

Public InterProcessMutex(CuratorFramework Client, String path) Public InterProcessMutex(CuratorFramework Client, String path) String path, LockInternalsDriver driver)// Acquire the lock through acquire and provide timeout mechanism: public void acquire() throws Exceptionpublic boolean acquire(long time, TimeUnit Unit) throws Exception// Revoke the lock public void makeRevocable(RevocationListener<InterProcessMutex> Listener)public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)Copy the code

Define a FakeLimitedResource class to simulate a shared resource that can only be used by one thread at a time until the next thread can use it, otherwise an exception will be thrown

public class FakeLimitedResource { private final AtomicBoolean inUse = new AtomicBoolean(false); Public void use() throws InterruptedException {if (! InUse.com pareAndSet(false, true)) {// If the lock is properly used, This exception cannot throw a new IllegalStateException("Needs to be used by one client at a time"); } try { Thread.sleep((long) (100 * Math.random())); } finally { inUse.set(false); }}}Copy the code

The following code creates N threads to simulate nodes in a distributed system that controls the synchronous use of resources through InterProcessMutex; Each node will make 10 requests to complete the process of requesting locks — accessing resources — requesting locks again — releasing locks — releasing locks; The client requests locks through acquire, releases locks through release, and releases locks after acquiring several locks. This shared resource can only be used by one thread at a time, and an exception is thrown if control synchronization fails.

public class SharedReentrantLockTest { private static final String lockPath = "/testZK/sharedreentrantlock"; private static final Integer clientNums = 5; final static FakeLimitedResource resource = new FakeLimitedResource(); Private static CountDownLatch CountDownLatch = new CountDownLatch(clientNums); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < clientNums; i++) { String clientName = "client#" + i; new Thread(new Runnable() { @Override public void run() { CuratorFramework client = ZKUtils.getClient(); client.start(); Random random = new Random(); try { final InterProcessMutex lock = new InterProcessMutex(client, lockPath); For (int j = 0; j < 10; j++) { if (! Lock.acquire (10, timeunit.seconds)) {throw new IllegalStateException(j + ". "+ clientName +" Cannot get mutex "); } try {system.out. println(j + ". "+ clientName +" obtain mutex "); resource.use(); // Use the resource if (! Lock.acquire (10, timeunit.seconds)) {throw new IllegalStateException(j + ". "+ clientName +" Cannot get mutex again "); } system.out. println(j + ". "+ clientName +" mutex acquired again "); lock.release(); } finally {system.out. println(j + ". "+ clientName +" release mutex "); lock.release(); } thread.sleep (random.nextint (100)); } } catch (Throwable e) { System.out.println(e.getMessage()); } finally { CloseableUtils.closeQuietly(client); System.out.println(clientName + "Client closed! ); countDownLatch.countDown(); } } }).start(); } countDownLatch.await(); System.out.println(" End! ); }}Copy the code

The console prints a log showing that synchronous access control of the resource was successful and that the lock is reentrant

Client# 3 has obtained the mutex again. Client# 3 releases the mutex. Client# 1 has obtained the mutex again. Client# 2 has obtained the mutex again. Client# 2 has obtained the mutex again. Client# 2 releases the mutex. Client# 4 has obtained the mutex again. 0. Client# 4 has obtained the mutex again. Client# 1 has obtained the mutex again. Client# 1 has obtained the mutex again. Client# 1 has released the mutex. Client# 3 has obtained the mutex again 1. Client# 3 releases the mutex 1. Client# 2 has obtained the mutex again 1. Client# 4 has obtained the mutex. 2. Client# 4 has obtained the mutex again Client# 2 client closed! Client# 0 has obtained the mutex again 9. Client# 0 has released the mutex 9. Client# 3 has obtained the mutex again 9 Release mutex client#0 client closed! Client# 4 has obtained the mutex again. Client# 4 has released the mutex. 9. Client# 4 has obtained the mutex. 9 Release mutex client#3 client closed! Client# 4 client closed! The end!Copy the code

Looking at the Zookeeper node tree while the program is running, you can see that each lock request actually corresponds to a temporary sequential node

[zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock[leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]Copy the code

No reentrant lock

A Shared Lock is similar to a Shared Reentrant Lock, but cannot be Reentrant. The InterProcessSemaphoreMutex non-reentrant lock by class, using methods similar to the upper class.

In the above program InterProcessMutex InterProcessSemaphoreMutex non-reentrant lock, if I run the above code again, the result is found that the thread is blocked on the second acquire, until timeout, this lock is not reentrant.

Console Output Logs

Client# 2 has obtained a mutex. 0. Client# 1 cannot obtain a mutex. 0. Client# 4 cannot obtain a mutex. 0. Client# 3 cannot obtain a mutex. Client# 4 client closed! Client# 3 client closed! Client# 0 client closed! 0. Client# 2 cannot get the mutex again. Client# 2 is closed! The end!Copy the code

The second code to obtain the lock comment, the program can be executed normally

Client# 1 has obtained the mutex. Client# 1 releases the mutex. Client# 2 has obtained the mutex. Client# 0 has obtained the mutex Client# 3 has obtained the mutex. 0. Client# 3 has obtained the mutex. 1. Client# 1 has obtained the mutex. Client# 2 Release the mutex........ Client# 4 has obtained the mutex. Client# 4 has released the mutex. Client# 0 has obtained the mutex. Client# 2 is closed! 9. Client# 1 has obtained the mutex. Client# 0 is closed! Client# 4 client closed! 9. Client# 3 has obtained the mutex. Client# 1 is closed! Client# 3 releases the mutex. Client# 3 is closed! The end!Copy the code

Reentrant read/write lock

Shared Reentrant Read Write Lock. A Read/Write Lock manages a pair of related locks, one for reads and the other for writes. Read operations can be used by multiple processes at the same time when the write lock is not in use, while the write lock does not allow read (blocking). This lock is reentrant; A thread with a write lock can re-enter the read lock, but the read lock cannot enter the write lock, which means that the write lock can be downgraded to a read lock, such as requesting a write lock –> read lock —-> release the write lock; Upgrading from a read lock to a write lock is not possible.

Reentrant read/write locks are implemented by two classes: InterProcessReadWriteLock, InterProcessMutex, when using, first create a InterProcessReadWriteLock instance, and then according to your requirements are read or write locks, lock The read/write lock is of type InterProcessMutex.

public static void main(String[] args) throws InterruptedException { for (int i = 0; i < clientNums; i++) { final String clientName = "client#" + i; new Thread(new Runnable() { @Override public void run() { CuratorFramework client = ZKUtils.getClient(); client.start(); final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath); final InterProcessMutex readLock = lock.readLock(); final InterProcessMutex writeLock = lock.writeLock(); Try {// Note that only write locks can be obtained before read locks can be obtained. if (! Writelock. acquire(10, timeunit.seconds)) {throw new IllegalStateException(clientName + "cannot get writeLock "); } system.out. println(clientName + "write lock "); if (! Readlock.acquire (10, timeunit.seconds)) {throw new IllegalStateException(clientName + "cannot get readLock "); } system.out. println(clientName + "read lock "); try { resource.use(); } finally {system.out. println(clientName + "release read/write lock "); readLock.release(); writeLock.release(); } } catch (Exception e) { System.out.println(e.getMessage()); } finally { CloseableUtils.closeQuietly(client); countDownLatch.countDown(); } } }).start(); } countDownLatch.await(); System.out.println(" End! ); }}Copy the code

Console Logs

Client# 1 has obtained the write lock. Client# 1 has obtained the read lock. Client# 2 has obtained the read lock Client# 4 has obtained the write lock. Client# 4 has obtained the read lock. Client# 3 has obtained the write lock.Copy the code

A semaphore

Shared Semaphore, a Semaphore that counts Semaphore like JDK Semaphore, maintains a set of permitts in JDK Semaphore, and leases in Cubator. There are two ways to determine the maximum number of leases a Semaphore can have. The first way is determined by the path given by the user, and the second way is using the SharedCountReader class. If you do not use SharedCountReader, there is no internal code to check whether the process assumes 10 leases and process B assumes 20 leases. Therefore, all instances must use the same numberOfLeases value.

The main semaphore implementation classes are:

InterProcessSemaphoreV2 - Semapore implementation class Lease - Lease (single signal)SharedCountReader - counter for calculating the maximum number of leasesCopy the code

Calling Acquire returns a lease object that the client must close in Finally or the leases will be lost. However, if a client session is lost for some reason, such as a crash, then the leases held by these clients are automatically closed so that other clients can continue to use them. Leases can also be returned in the following ways:

public void returnLease(Lease lease)public void returnAll(Collection<Lease> leases) Copy the code

Note that you can request more than one lease at a time, and if Semaphore’s current lease is insufficient, the request thread will block. Overloading methods for timeout are also provided.

public Lease acquire() throws Exceptionpublic Collection<Lease> acquire(int qty) throws Exceptionpublic Lease acquire(long time, TimeUnit unit) throws Exceptionpublic Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws ExceptionCopy the code

A Demo program is shown below

public class SharedSemaphoreTest { private static final int MAX_LEASE = 10; private static final String PATH = "/testZK/semaphore"; private static final FakeLimitedResource resource = new FakeLimitedResource(); public static void main(String[] args) throws Exception { CuratorFramework client = ZKUtils.getClient(); client.start(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE); Collection<Lease> leases = semaphore.acquire(5); System.out.println(" Obtain leases: "+ leases.size()); Lease lease = semaphore.acquire(); System.out.println(" Get a single lease "); resource.use(); Leases Collection<Lease> Leases2 = Semaphore. Acquire (5, 10, timeunit.seconds); System.out.println(" Get lease, null if timeout: "+ leases2); System.out.println(" release lease "); semaphore.returnLease(lease); Leases2 = Semaphore. Acquire (5, 10, timeUnit.seconds); System.out.println(" Get lease, null if timeout: "+ leases2); System.out.println(" release all leases in the collection "); semaphore.returnAll(leases); semaphore.returnAll(leases2); client.close(); System. Out.println (" end!" ); }}Copy the code

Console Logs

Number of leases obtained: 5 Obtain a single lease Obtain a lease, null if timeout occurs: NULL Release lease Obtain a lease, null if timeout occurs:  [org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d, Org. Apache. The curator. Framework. Recipes. The locks. InterProcessSemaphoreV2 $3 @ 6 b09bb57] release all end of the lease in the collection!Copy the code

Note: all the above locks are fair locks. From ZooKeeper’s point of view, each client obtains the locks in the order requested, which is fairly fair.

Many lock

Multi Shared Lock is a container for locks. When acquire is called, all acquire locks are acquired, and if the request fails, all locks are released. All locks are also released when release is called (failures are ignored). Basically, it is a representation of a group lock, on which release requests are passed to all the locks it contains.

There are two main classes involved:

InterProcessMultiLock - Implementation of the object class InterProcessLock - Distributed lock interface classCopy the code

Its constructor requires a collection of contained locks, or a set of ZooKeeper paths, used in the same way as Shared locks

public InterProcessMultiLock(CuratorFramework client, List<String> paths)public InterProcessMultiLock(List<InterProcessLock> locks)Copy the code

A Demo program is shown below

public class MultiSharedLockTest { private static final String lockPath1 = "/testZK/MSLock1"; private static final String lockPath2 = "/testZK/MSLock2"; private static final FakeLimitedResource resource = new FakeLimitedResource(); public static void main(String[] args) throws Exception { CuratorFramework client = ZKUtils.getClient(); client.start(); InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); / / reentrant lock InterProcessLock lock2 = new InterProcessSemaphoreMutex (client, lockPath2); InterProcessMultiLock Lock = new InterProcessMultiLock(Arrays. AsList (lock1, lock2)); if (! Lock.acquire (10, timeunit.seconds)) {throw new IllegalStateException(" Cannot acquire multiple locks "); } system.out. println(" multiple locks obtained "); System. The out. Println (" whether there is the first lock: "+ lock1. IsAcquiredInThisProcess ()); System. The out. Println (" whether there is a second lock: "+ lock2. IsAcquiredInThisProcess ()); try { resource.use(); } finally {system.out. println(" release multiple locks "); lock.release(); / / release the lock} System. Out. Println (" whether there is the first lock: "+ lock1. IsAcquiredInThisProcess ()); System. The out. Println (" whether there is a second lock: "+ lock2. IsAcquiredInThisProcess ()); client.close(); System. Out.println (" end!" ); }}Copy the code

Download the code at http://t.cn/EtVc1s4

The resources

1, From Paxos to Zookeeper Distributed consistency Principle and Practice 2, Apache Curator Recipes Docs 3, Several implementations of distributed lock ~ 4, Technical symposium 4: 5, Distributed lock is enough for this article

Follow _ Xiao Xuan Feng _ wechat official account