“This is the 7th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

MySql implements distributed locking

Using Mysql to achieve distributed locks in the actual development of the application scenarios are relatively few, generally only in the performance requirements are not very high, do not want to introduce other components will be used. Its greatest feature is that it is easy to understand.

It can be implemented in the following three ways:

  1. Based on table record implementation
  2. With the help ofmysqlPessimistic lock implementation

Based on table record implementation

Create a table:

CREATE TABLE `mysql_lock` (
	`id` BIGINT NOT NULL AUTO_INCREMENT,
	`resource` int NOT NULL COMMENT 'Locked resources',
	`description` varchar(1024) NOT NULL DEFAULT "" COMMENT 'description'.PRIMARY KEY (`id`),
	UNIQUE KEY `uiq_idx_resource` (`resource`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table record based implementation';
Copy the code

The lock operation is to insert a corresponding record into the table:

INSERT INTO mysql_lock (resource, description) VALUES (1.'Application Resource :1');
Copy the code

Select * from table where lock is inserted;

DELETE FROM mysql_lock WHERE resource = 1;
Copy the code

Implementation principle:

We created the table with a unique constraint on resource, which cannot be repeatedly inserted, thus achieving mutual exclusion. In the previous section, we explained in detail the features that distributed locks should have. The above example does not implement other features. Let’s talk about specific optimization schemes:

  1. For timeouts: Write a program that periodically cleans up expired resources
  2. For reentrancy, exclusivity: you can add a field to record the number of the thread. If the same thread is allowed to acquire the lock again, the thread number is verified each time the database record is deleted to ensure exclusivity, and the lock can only be unlocked by itself.
  3. formysqlReliability: You can configure active/standby or cluster to prevent single points of failure.
  4. There’s a little bit of a problem here, because every time you try to get a lock, instead of blocking, you try to insert once into the library, and you don’t try again if you fail, you need to implement that in your code logic.

Mysql pessimistic lock implementation

In order to improve the efficiency of distributed locking, you can use a query statement, with the keyword for update to add a pessimistic lock to the query record, so that no other thread can do any operation on the record, so as to achieve the purpose of protecting shared resources.

Points to note when using row locks

  1. mysqlBy default, transactions are committed automatically, so you should disable this manually:SET AUTOCOMMIT = 0;
  2. A row lock is based on an index. If an index is not queried, the row lock is upgraded to a table lock for a full table scan.

Let’s continue with the table above to illustrate:

  1. Lock application operation:SELECT * FROM mysql_lock WHERE id = 1 FOR UPDATE;As long as the application can be checked out, it is successful. If the application cannot be obtained, it will be blocked. When the blocked timeout is set, the application can be setmysqlinnodb_lock_wait_timeoutTo set it up. Pay attention toWHERE id = 1This query condition is indexed.
  2. Lock release operation:COMMIT;This record can be accessed by other threads after the transaction is committed.

Note: here simply provides the mysql implementation of distributed lock two ideas, the actual development is not recommended to use, if you want to use the second kind, like the above said distributed lock should achieve the characteristics of the use of the database, a lot of developers need to manually achieve, not too friendly.

Zookeeper implements distributed locks

Before implementing distributed locks using ZooKeeper, let’s take a look at some of the basics:

Zk node type

  1. Persistent node: The node remains when the client disconnects
  2. Persistent sequential nodes: Order is guaranteed on the basis of persistent nodes
  3. Temporary node: The node is deleted when the client disconnects
  4. Temporary sequential nodes: Order is guaranteed on the basis of temporary nodes

The idea of zK distributed lock

  1. usingzkThe unique feature of the peer node enables mutual exclusion of the lock
  2. usingzkThe temporary node feature prevents the thread that is holding the lock from releasing the lock and then the other thread cannot acquire the lock because of some force majeure outage.
  3. usingzkThe nodewatcherEvents can easily be used to notify other threads of lock contention.
  4. usingzkSequential node features, can achieve fair lock, according to the order of the lock to wake up the blocking thread, to preventHerd behaviour.

Herding effect: when the great amount of concurrency, only one thread will get the lock, many threads will be blocked, after get the lock the thread lock is released, all other threads that are waiting on will jostle for lock together, may lead to the server for instant on thread too much on the situation, this is what we call herd behaviour.

Custom ZK distributed lock

Code examples:

package com.aha.lock.zk;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.util.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/** * Implement distributed locking using ZK * implement Lock following the specification provided by JUC **@author: WT
 * @date: 2021/11/23 16:41 * /
@Slf4j
public class ZkDistributedLock implements Lock {

    /** * to coordinate thread execution timing, note: Countdown, await again, there is no way to block threads, can't use thread-isolated variables, otherwise how to implement notification between threads, * private ThreadLocal
      
        CountDownLatch = threadlocal.withinitial (() -> new CountDownLatch(1)); * /
      
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private static final String IP_PORT = "10.211.55.3:2181";

    /** * Root node path */
    private static final String ROOT_NODE = "/LOCK";

    /** * The path of the front node of the current node */
    private ThreadLocal<String> beforeNodePath = new ThreadLocal<>();

    /** * Current node path */
    private ThreadLocal<String> nodePath = new ThreadLocal<>();

    private ZkClient zkClient = new ZkClient(IP_PORT);

    public ZkDistributedLock(a) {

        // Initialize the path of zK when creating a distributed lock object
        if (!zkClient.exists(ROOT_NODE)) {
            zkClient.createPersistent(ROOT_NODE);
        }

    }

    /** ** lock method */
    @Override
    public void lock(a) {
        if (tryLock()) {
            log.info("{} lock succeeded", Thread.currentThread().getName());
            return;
        }
        // block - wait for the next time to lock
        waitForLock();
        // Try locking again
        lock();
    }

    /** * block - waiting for the next time to lock */
    private void waitForLock(a) {

        // Listen for deletion events on front nodes - listen for inner classes
        IZkDataListener zkDataListener = new IZkDataListener() {

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}/** * listen for the deletion time of the node *@paramDataPath Node path *@throwsThe Exception Exception * /
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                log.info("{} front node deleted", dataPath);
                // TODO:The countDown to 2021/11/24 wakes up all threads that are waiting, not just its own post-nodecountDownLatch.countDown(); }};// Subscribe to listen for the deletion time of the front node
        zkClient.subscribeDataChanges(beforeNodePath.get(), zkDataListener);
        // Check whether the front node has been deleted before listening
        if (zkClient.exists(beforeNodePath.get())) {
            // The front node still exists - the blocking thread waits for the front node to be removed
            try {
                countDownLatch.await();
                log.info(Block thread: {}, nodePath: {}, Thread.currentThread().getName(), nodePath.get());
            } catch (InterruptedException e) {
                log.info("Failed to block {} thread, interrupt this thread", Thread.currentThread().getName()); e.printStackTrace(); Thread.currentThread().interrupt(); }}// Reset countDownLatch, the front node has been removed, unsubscribe the event
        countDownLatch = new CountDownLatch(1);
        zkClient.unsubscribeDataChanges(beforeNodePath.get(), zkDataListener);

    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {}/** * Try to lock *@returnBoolean: Whether the lock was successful */
    @Override
    public boolean tryLock(a) {

        // 1. Check whether nodePath is empty. If nodePath is empty, ZkDistributedLock has applied for a lock for the first time, and zK needs to create a temporary node
        if(! StringUtils.hasText(nodePath.get())) { nodePath.set(zkClient.createEphemeralSequential(ROOT_NODE +"/"."lock"));
            // log.info("ZkDistributedLock applies for the first time, zk needs to create a temporary node: {}", nodePath);
            log.info("NodePath empty, create temporary node: {}", nodePath.get());
        }

        // 2. Get all the children of the root node
        List<String> childrenNodeList = zkClient.getChildren(ROOT_NODE);

        // 3. Sort the child node list
        Collections.sort(childrenNodeList);

        log.info("nodePath: {}, nodeList:{}", nodePath.get(), childrenNodeList);

        If the thread is the smallest node, the lock is successfully obtained. If the thread is the smallest node, the lock is successfully obtained. If the thread is the smallest node, the lock is successfully obtained
        if (nodePath.get().equals(ROOT_NODE + "/" + childrenNodeList.get(0))) {
            log.info("Thread name: {}, nodePath: {} is the smallest node, obtained the lock successfully.", Thread.currentThread().getName(), nodePath.get());
            return true;
        } else {
            // Get the position where the current node should be inserted into the node list to get its last node
            int i = Collections.binarySearch(childrenNodeList, nodePath.get().substring(ROOT_NODE.length() + 1));
            // Get the path of the last node
            beforeNodePath.set(ROOT_NODE + "/" + childrenNodeList.get(i - 1));
            log.info("The node before {} is: {}", nodePath.get(), beforeNodePath.get());
        }

        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock(a) {
        zkClient.delete(nodePath.get());
    }

    @Override
    public Condition newCondition(a) {
        return null; }}Copy the code

Code parsing:

  1. implements Lock:LockfollowJUCThe specification provided.
  2. usezkClient.createEphemeralSequential()Create temporary order nodes to avoid herd behavior and deadlocks caused by the unexpected hanging of the thread that owns the lock.
  3. The blocking thread side usesCountDownLatch. When one thread grabs the lock, the other thread is seizedCountDownLatchawaitMethod to block, when the front node of the blocked thread is removed, the current node should be awakened, because the sequential nodes are in order, so only the current node can be awakened. The wakeup method here useszkClient.subscribeDataChanges(beforeNodePath.get(), zkDataListener);Used after detecting that the front node has been deletedCountDownLatchcountDownMethod, whencountDown()When it goes to zero, it wakes up the thread.
  4. nodePathbeforeNodePathIt should be thread private so that each thread keeps track of its ownnodePathbeforeNodePath.
  5. Refer to the comments in the code for implementation details.

Step 3 Problem Description:

  1. whencountDown()Zero will wake up all threads, this should be implemented to wake up the next node
  2. whencountDwon()And then you have to redo itnewThis object, otherwiseawaitMethod has no way to re-block the thread.

Test custom ZK distributed lock:

package com.aha.lock.zk;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author: WT
 * @date: 2021/11/23 inquire * /
@RestController
@Slf4j
public class ZkDistributeLockTest {

    static int inventory = 10;
    private static final int NUM = 10;

    private final ZkDistributedLock zkDistributedLock = new ZkDistributedLock();

    @GetMapping("/zk/lock")
    public void zkLockTest(a) {
        try {
            for (int i = 0; i < NUM; i++) {
                new Thread(() -> {
                        try {
                            zkDistributedLock.lock();
                            Thread.sleep(200);
                            if (inventory > 0) {
                                inventory--;
                            }
                            log.warn("Inventory after deduction is: {}", inventory);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt();
                        } finally{ zkDistributedLock.unlock(); } } ).start(); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code

Zk distributed locks for using curator

Several distributed locking schemes provided by Curator

  1. InterProcessMutex: distributed reentrant exclusive lock
  2. InterProcessSemaphoreMutex: Distributed exclusive lock
  3. InterProcessReadWriteLock: Distributed read/write lock

InterProcessMutex example

Configure the curatorFramework client

zookeeper:
  address: 10.21155.3.: 2181   # Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
  retryCount: 5               # retries
  initElapsedTimeMs: 1000     Initial retry interval
  maxElapsedTimeMs: 5000      # Maximum retry interval
  sessionTimeoutMs: 30000     # Session timeout
  connectionTimeoutMs: 10000  Connection timeout
Copy the code
package com.aha.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/** * Connects the ZooKeeper configuration class **@author: WT
 * @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {

    /** Number of retries */
    private int retryCount;

    /** Initial retry interval */
    private int initElapsedTimeMs;

    /** Maximum retry interval */
    private int maxElapsedTimeMs;

    /** Connection address */
    private String address;

    /**Session expiration time */
    private int sessionTimeoutMs;

    /** Connection timeout time */
    private int connectionTimeoutMs;

}
Copy the code
package com.aha.client;

import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * Generate zK client **@author: WT
 * @date: 2021/11/22 18:18
 */
@Configuration
public class ZookeeperClient {

    /** * initMethod = "start" * after the curatorFramework creates the object, call the start method of the curatorFramework instance */
// @Bean(initMethod = "start")
// public CuratorFramework curatorFramework(ZkClientProperties zookeeperProperties) {
// return CuratorFrameworkFactory.newClient(
// zookeeperProperties.getAddress(),
// zookeeperProperties.getSessionTimeoutMs(),
// zookeeperProperties.getConnectionTimeoutMs(),
// new RetryNTimes(zookeeperProperties.getRetryCount(), zookeeperProperties.getInitElapsedTimeMs())
/ /);
/ /}

    @Bean(initMethod = "start")
    private static CuratorFramework getZkClient(ZkClientProperties zookeeperProperties) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, zookeeperProperties.getRetryCount(), 5000);
        returnCuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getAddress()) .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()) .retryPolicy(retryPolicy) .build(); }}Copy the code

Simulate 50 threads scrambling for locks:

package com.aha.lock.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;

/ * * * *@author: WT
 * @date: 2021/11/22 17:41
 */
@Slf4j
@Service
public class InterprocessMutexLock {

    private final CuratorFramework curatorFramework;

    public InterprocessMutexLock (CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    public void test(String lockPath)  {

        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
        // Simulate 50 thread snatch lock
        for (int i = 0; i < 50; i++) {
            new Thread(newTestThread(i, lock)).start(); }}static class TestThread implements Runnable {

        private final Integer threadFlag;
        private final InterProcessMutex lock;

        public TestThread(Integer threadFlag, InterProcessMutex lock) {
            this.threadFlag = threadFlag;
            this.lock = lock;
        }

        @Override
        public void run(a) {
            try {
                lock.acquire();
                log.info({} thread acquired the lock, threadFlag);
                // Wait 1 second to release the lock
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
Copy the code
package com.aha.lock.controller;

import com.aha.lock.service.InterprocessMutexLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author: WT
 * @date: 2021/11/23 14:13 * /
@RestController
public class TestController {

    private final InterprocessMutexLock interprocessMutexLock;

    public TestController (InterprocessMutexLock interprocessMutexLock) {
        this.interprocessMutexLock = interprocessMutexLock;
    }

    @GetMapping("/lock/mutex")
    public void testMutexLock (a) {
        interprocessMutexLock.test("/lock/mutex"); }}Copy the code