An overview of the

This article mainly describes three kinds of distributed lock implementation through Zookeeper:

  1. Not fair lock
  2. Fair lock
  3. A Shared lock

Environment: JDK 1.8 and Zookeeper 3.6.x

Seconds kill scenario example

For the commodity kill scenario, we need to prevent concurrent problems such as oversold inventory or repeated debits. We usually need to use distributed locks to solve the problem of inconsistent data caused by competing shared resources.

Take the scene of mobile phone seconds killing as an example, there are usually three steps in the process of buying:

  1. Deduct the inventory of corresponding commodities; 2. Create orders for goods; 3. User payment.

For such a scenario we can use distributed lock to solve, for example, when the user enters the second kill “order” link process, we can lock the commodity inventory, and then complete the inventory and other operations, after the operation is completed. Release the lock, let the next user continue to enter to ensure the security of the inventory; It also reduces the number of DB rollback times caused by seconds kill failures. The whole process is shown in the figure below:

Note: The granularity of locks depends on specific scenarios and requirements.

Three types of distributed locks

For the distributed lock implementation of Zookeeper, it mainly uses two characteristics of Zookeeper to achieve:

  1. A Zookeeper node cannot be created repeatedly
  2. Zookeeper’s Watcher listening mechanism

Not fair lock

For unfair locks, the locking process is shown below.

Advantages and disadvantages

The above implementation has both advantages and disadvantages:

  • Advantages: Relatively simple implementation, notification mechanism, can provide fast response, somewhat similarReentrantLockThe idea for node deletion failure scenarios bySessionTimeout guarantees that the node can be deleted.
  • The bad: heavy weight, and a “stampede” problem with a lot of locks.

A “stampeding” is when a node is deleted, a large number of subscribers to the deletion action of that nodeWatcherThread callback, which is very bad for the Zk cluster. So we need to avoid this phenomenon.

Solve the “scare crowd” :

To solve the stampeding problem, we need to abandon the strategy of subscribing to a node, so how do we do that?

  1. We abstract the lock into a directory, and multiple threads create instantaneous sequential nodes under this directory. Because Zookeeper ensures the sequence of nodes for us, we can use the sequence of nodes to judge the lock.
  2. First, create sequential nodes, and then obtain the smallest node under the current directory, determine whether the smallest node is the current node, if so, the lock is successfully obtained, if not, the lock fails to be obtained.
  3. The node that fails to obtain the lock obtains a sequential node on the current node, registers the listener on this node, and notifies the current node when the node is deleted.
  4. When an UNLOCK is unlocked, the next node is notified when a node is deleted.

Fair lock

Based on the disadvantage of unfair lock, we can avoid it through the following scheme.

Advantages and disadvantages

Advantages: With the help of temporary sequential nodes, the concurrent lock contention of multiple nodes can be avoided, and the pressure on the server is relieved.

Disadvantages: For read/write scenarios, it can not solve the consistency problem, if the read also to acquire locks, this will lead to performance degradation, for such problems, we can use read/write locks such as the JDK ReadWriteLock

Read/write lock implementation

The read/write lock has the following features: The read/write lock can be read concurrently when multiple threads are reading, which is a lock free state. If a write lock is being operated, the read lock needs to wait for the write lock. When adding a write lock, the previous read locks are all concurrent, so you need to listen for the last read lock to complete the write lock. The steps are as follows:

  1. Read requests, if preceded by a read lock, can be read directly without listening. If one or more write locks precede, only the last write lock needs to be listened for.
  2. Write request, only need to listen on the previous node.WatcherThe mechanism is the same as mutex.

Distributed lock combat

Zookeeper distributed based on Curator

POM depends on

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>2.13.0</version>
</dependency>

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.13.0</version>
</dependency>
Copy the code

Mutex application

Due to the “stampede” effect of Zookeeper unfair locks, unfair locks are not the best choice in Zookeeper. The following is an example that simulates seckilling to use the Zookeeper distributed lock.

public class MutexTest {

    static ExecutorService executor = Executors.newFixedThreadPool(8);
    static AtomicInteger stock = new AtomicInteger(3);

    public static void main(String[] args) throws InterruptedException {
        CuratorFramework client = getZkClient();
        String key = "/lock/lockId_111/111";
        final InterProcessMutex mutex = new InterProcessMutex(client, key);

        for (int i = 0; i < 99; i++) {
            executor.submit(() -> {
                if (stock.get() < 0) {
                    System.err.println("Out of stock, straight back.");
                    return;
                }
                try {
                    boolean acquire = mutex.acquire(200, TimeUnit.MILLISECONDS);
                    if (acquire) {
                        int s = stock.decrementAndGet();
                        if (s < 0) {
                            System.err.println("Low inventory in SEC kill.");
                        } else {
                            System.out.println("Successful purchase, remaining inventory:"+ s); }}}catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if (mutex.isAcquiredInThisProcess())
                            mutex.release();
                    } catch(Exception e) { e.printStackTrace(); }}}); }while (true) {
            if (executor.isTerminated()) {
                executor.shutdown();
                System.out.println("The remaining inventory after the second kill is :" + stock.get());
            }
            TimeUnit.MILLISECONDS.sleep(100); }}private static CuratorFramework getZkClient(a) {
        String zkServerAddress = "127.0.0.1:2181";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000.3.5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        zkClient.start();
        returnzkClient; }}Copy the code

Read/write lock application

public class ReadWriteLockTest {

    static ExecutorService executor = Executors.newFixedThreadPool(8);
    static AtomicInteger stock = new AtomicInteger(3);

    static InterProcessMutex readLock;
    static InterProcessMutex writeLock;

    public static void main(String[] args) throws InterruptedException {
        CuratorFramework client = getZkClient();
        String key = "/lock/lockId_111/1111";
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, key);
        readLock = readWriteLock.readLock();
        writeLock = readWriteLock.writeLock();


        for (int i = 0; i < 16; i++) {
            executor.submit(() -> {
                try {
                    boolean read = readLock.acquire(2000, TimeUnit.MILLISECONDS);
                    if (read) {
                        int num = stock.get();
                        System.out.println("Read inventory, current inventory is:" + num);
                        if (num < 0) {
                            System.err.println("Out of stock, straight back.");
                            return; }}}catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    if (readLock.isAcquiredInThisProcess()) {
                        try {
                            readLock.release();
                        } catch(Exception e) { e.printStackTrace(); }}}try {
                    boolean acquire = writeLock.acquire(2000, TimeUnit.MILLISECONDS);
                    if (acquire) {
                        int s = stock.get();
                        if (s <= 0) {
                            System.err.println("Low inventory in SEC kill.");
                        } else {
                            s = stock.decrementAndGet();
                            System.out.println("Successful purchase, remaining inventory:"+ s); }}}catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if (writeLock.isAcquiredInThisProcess())
                            writeLock.release();
                    } catch(Exception e) { e.printStackTrace(); }}}); }while (true) {
            if (executor.isTerminated()) {
                executor.shutdown();
                System.out.println("The remaining inventory after the second kill is :" + stock.get());
            }
            TimeUnit.MILLISECONDS.sleep(100); }}private static CuratorFramework getZkClient(a) {
        String zkServerAddress = "127.0.0.1:2181";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000.3.5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        zkClient.start();
        returnzkClient; }}Copy the code

The print is as follows. The initial output is 8 to read the inventory, the current inventory is: 3 and then in the write lock back to the sequence to reduce the inventory.

Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Read the inventory, the current inventory is:3Successful purchase, remaining inventory:2Successful purchase, remaining inventory:1Successful purchase, remaining inventory:0Enter the second kill, inventory is insufficient enter the second kill, inventory is insufficient enter the second kill, inventory is insufficient read inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0Read the inventory, the current inventory is:0If you go into the second kill, you're not enough inventory, if you don't have enough inventory, you're not enough inventoryCopy the code

Distributed lock selection

We are most commonly used is Redis distributed lock and Zookeeper distributed lock, in terms of performance Redis TPS per second can be easily tens of thousands. I recommend Redis distributed locking as the recommended technical solution for large scale high concurrency scenarios. If the concurrency requirements are not very high, Zookeeper can be used to handle distributed scenarios.

The resources

  • Curator Apache
  • Zookeeper distributed lock implementation blog garden
  • Application and Principle analysis of distributed lock InterProcessMutex CSDN