“This is the 7th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”
MySql implements distributed locking
Using Mysql to achieve distributed locks in the actual development of the application scenarios are relatively few, generally only in the performance requirements are not very high, do not want to introduce other components will be used. Its greatest feature is that it is easy to understand.
It can be implemented in the following three ways:
- Based on table record implementation
- With the help of
mysql
Pessimistic lock implementation
Based on table record implementation
Create a table:
CREATE TABLE `mysql_lock` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` int NOT NULL COMMENT 'Locked resources',
`description` varchar(1024) NOT NULL DEFAULT "" COMMENT 'description'.PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table record based implementation';
Copy the code
The lock operation is to insert a corresponding record into the table:
INSERT INTO mysql_lock (resource, description) VALUES (1.'Application Resource :1');
Copy the code
Select * from table where lock is inserted;
DELETE FROM mysql_lock WHERE resource = 1;
Copy the code
Implementation principle:
We created the table with a unique constraint on resource, which cannot be repeatedly inserted, thus achieving mutual exclusion. In the previous section, we explained in detail the features that distributed locks should have. The above example does not implement other features. Let’s talk about specific optimization schemes:
- For timeouts: Write a program that periodically cleans up expired resources
- For reentrancy, exclusivity: you can add a field to record the number of the thread. If the same thread is allowed to acquire the lock again, the thread number is verified each time the database record is deleted to ensure exclusivity, and the lock can only be unlocked by itself.
- for
mysql
Reliability: You can configure active/standby or cluster to prevent single points of failure. - There’s a little bit of a problem here, because every time you try to get a lock, instead of blocking, you try to insert once into the library, and you don’t try again if you fail, you need to implement that in your code logic.
Mysql pessimistic lock implementation
In order to improve the efficiency of distributed locking, you can use a query statement, with the keyword for update to add a pessimistic lock to the query record, so that no other thread can do any operation on the record, so as to achieve the purpose of protecting shared resources.
Points to note when using row locks
mysql
By default, transactions are committed automatically, so you should disable this manually:SET AUTOCOMMIT = 0;
- A row lock is based on an index. If an index is not queried, the row lock is upgraded to a table lock for a full table scan.
Let’s continue with the table above to illustrate:
- Lock application operation:
SELECT * FROM mysql_lock WHERE id = 1 FOR UPDATE;
As long as the application can be checked out, it is successful. If the application cannot be obtained, it will be blocked. When the blocked timeout is set, the application can be setmysql
的innodb_lock_wait_timeout
To set it up. Pay attention toWHERE id = 1
This query condition is indexed. - Lock release operation:
COMMIT;
This record can be accessed by other threads after the transaction is committed.
Note: here simply provides the mysql implementation of distributed lock two ideas, the actual development is not recommended to use, if you want to use the second kind, like the above said distributed lock should achieve the characteristics of the use of the database, a lot of developers need to manually achieve, not too friendly.
Zookeeper implements distributed locks
Before implementing distributed locks using ZooKeeper, let’s take a look at some of the basics:
Zk node type
- Persistent node: The node remains when the client disconnects
- Persistent sequential nodes: Order is guaranteed on the basis of persistent nodes
- Temporary node: The node is deleted when the client disconnects
- Temporary sequential nodes: Order is guaranteed on the basis of temporary nodes
The idea of zK distributed lock
- using
zk
The unique feature of the peer node enables mutual exclusion of the lock - using
zk
The temporary node feature prevents the thread that is holding the lock from releasing the lock and then the other thread cannot acquire the lock because of some force majeure outage. - using
zk
The nodewatcher
Events can easily be used to notify other threads of lock contention. - using
zk
Sequential node features, can achieve fair lock, according to the order of the lock to wake up the blocking thread, to preventHerd behaviour.
Herding effect: when the great amount of concurrency, only one thread will get the lock, many threads will be blocked, after get the lock the thread lock is released, all other threads that are waiting on will jostle for lock together, may lead to the server for instant on thread too much on the situation, this is what we call herd behaviour.
Custom ZK distributed lock
Code examples:
package com.aha.lock.zk;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/** * Implement distributed locking using ZK * implement Lock following the specification provided by JUC **@author: WT
* @date: 2021/11/23 16:41 * /
@Slf4j
public class ZkDistributedLock implements Lock {
/** * to coordinate thread execution timing, note: Countdown, await again, there is no way to block threads, can't use thread-isolated variables, otherwise how to implement notification between threads, * private ThreadLocal
CountDownLatch = threadlocal.withinitial (() -> new CountDownLatch(1)); * /
private CountDownLatch countDownLatch = new CountDownLatch(1);
private static final String IP_PORT = "10.211.55.3:2181";
/** * Root node path */
private static final String ROOT_NODE = "/LOCK";
/** * The path of the front node of the current node */
private ThreadLocal<String> beforeNodePath = new ThreadLocal<>();
/** * Current node path */
private ThreadLocal<String> nodePath = new ThreadLocal<>();
private ZkClient zkClient = new ZkClient(IP_PORT);
public ZkDistributedLock(a) {
// Initialize the path of zK when creating a distributed lock object
if (!zkClient.exists(ROOT_NODE)) {
zkClient.createPersistent(ROOT_NODE);
}
}
/** ** lock method */
@Override
public void lock(a) {
if (tryLock()) {
log.info("{} lock succeeded", Thread.currentThread().getName());
return;
}
// block - wait for the next time to lock
waitForLock();
// Try locking again
lock();
}
/** * block - waiting for the next time to lock */
private void waitForLock(a) {
// Listen for deletion events on front nodes - listen for inner classes
IZkDataListener zkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}/** * listen for the deletion time of the node *@paramDataPath Node path *@throwsThe Exception Exception * /
@Override
public void handleDataDeleted(String dataPath) throws Exception {
log.info("{} front node deleted", dataPath);
// TODO:The countDown to 2021/11/24 wakes up all threads that are waiting, not just its own post-nodecountDownLatch.countDown(); }};// Subscribe to listen for the deletion time of the front node
zkClient.subscribeDataChanges(beforeNodePath.get(), zkDataListener);
// Check whether the front node has been deleted before listening
if (zkClient.exists(beforeNodePath.get())) {
// The front node still exists - the blocking thread waits for the front node to be removed
try {
countDownLatch.await();
log.info(Block thread: {}, nodePath: {}, Thread.currentThread().getName(), nodePath.get());
} catch (InterruptedException e) {
log.info("Failed to block {} thread, interrupt this thread", Thread.currentThread().getName()); e.printStackTrace(); Thread.currentThread().interrupt(); }}// Reset countDownLatch, the front node has been removed, unsubscribe the event
countDownLatch = new CountDownLatch(1);
zkClient.unsubscribeDataChanges(beforeNodePath.get(), zkDataListener);
}
@Override
public void lockInterruptibly(a) throws InterruptedException {}/** * Try to lock *@returnBoolean: Whether the lock was successful */
@Override
public boolean tryLock(a) {
// 1. Check whether nodePath is empty. If nodePath is empty, ZkDistributedLock has applied for a lock for the first time, and zK needs to create a temporary node
if(! StringUtils.hasText(nodePath.get())) { nodePath.set(zkClient.createEphemeralSequential(ROOT_NODE +"/"."lock"));
// log.info("ZkDistributedLock applies for the first time, zk needs to create a temporary node: {}", nodePath);
log.info("NodePath empty, create temporary node: {}", nodePath.get());
}
// 2. Get all the children of the root node
List<String> childrenNodeList = zkClient.getChildren(ROOT_NODE);
// 3. Sort the child node list
Collections.sort(childrenNodeList);
log.info("nodePath: {}, nodeList:{}", nodePath.get(), childrenNodeList);
If the thread is the smallest node, the lock is successfully obtained. If the thread is the smallest node, the lock is successfully obtained. If the thread is the smallest node, the lock is successfully obtained
if (nodePath.get().equals(ROOT_NODE + "/" + childrenNodeList.get(0))) {
log.info("Thread name: {}, nodePath: {} is the smallest node, obtained the lock successfully.", Thread.currentThread().getName(), nodePath.get());
return true;
} else {
// Get the position where the current node should be inserted into the node list to get its last node
int i = Collections.binarySearch(childrenNodeList, nodePath.get().substring(ROOT_NODE.length() + 1));
// Get the path of the last node
beforeNodePath.set(ROOT_NODE + "/" + childrenNodeList.get(i - 1));
log.info("The node before {} is: {}", nodePath.get(), beforeNodePath.get());
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock(a) {
zkClient.delete(nodePath.get());
}
@Override
public Condition newCondition(a) {
return null; }}Copy the code
Code parsing:
implements Lock
:Lock
followJUC
The specification provided.- use
zkClient.createEphemeralSequential()
Create temporary order nodes to avoid herd behavior and deadlocks caused by the unexpected hanging of the thread that owns the lock. - The blocking thread side uses
CountDownLatch
. When one thread grabs the lock, the other thread is seizedCountDownLatch
的await
Method to block, when the front node of the blocked thread is removed, the current node should be awakened, because the sequential nodes are in order, so only the current node can be awakened. The wakeup method here useszkClient.subscribeDataChanges(beforeNodePath.get(), zkDataListener);
Used after detecting that the front node has been deletedCountDownLatch
的countDown
Method, whencountDown()
When it goes to zero, it wakes up the thread. nodePath
和beforeNodePath
It should be thread private so that each thread keeps track of its ownnodePath
和beforeNodePath
.- Refer to the comments in the code for implementation details.
Step 3 Problem Description:
- when
countDown()
Zero will wake up all threads, this should be implemented to wake up the next node- when
countDwon()
And then you have to redo itnew
This object, otherwiseawait
Method has no way to re-block the thread.
Test custom ZK distributed lock:
package com.aha.lock.zk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author: WT
* @date: 2021/11/23 inquire * /
@RestController
@Slf4j
public class ZkDistributeLockTest {
static int inventory = 10;
private static final int NUM = 10;
private final ZkDistributedLock zkDistributedLock = new ZkDistributedLock();
@GetMapping("/zk/lock")
public void zkLockTest(a) {
try {
for (int i = 0; i < NUM; i++) {
new Thread(() -> {
try {
zkDistributedLock.lock();
Thread.sleep(200);
if (inventory > 0) {
inventory--;
}
log.warn("Inventory after deduction is: {}", inventory);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally{ zkDistributedLock.unlock(); } } ).start(); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code
Zk distributed locks for using curator
Several distributed locking schemes provided by Curator
InterProcessMutex
: distributed reentrant exclusive lockInterProcessSemaphoreMutex
: Distributed exclusive lockInterProcessReadWriteLock
: Distributed read/write lock
InterProcessMutex example
Configure the curatorFramework client
zookeeper:
address: 10.21155.3.: 2181 # Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
retryCount: 5 # retries
initElapsedTimeMs: 1000 Initial retry interval
maxElapsedTimeMs: 5000 # Maximum retry interval
sessionTimeoutMs: 30000 # Session timeout
connectionTimeoutMs: 10000 Connection timeout
Copy the code
package com.aha.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/** * Connects the ZooKeeper configuration class **@author: WT
* @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {
/** Number of retries */
private int retryCount;
/** Initial retry interval */
private int initElapsedTimeMs;
/** Maximum retry interval */
private int maxElapsedTimeMs;
/** Connection address */
private String address;
/**Session expiration time */
private int sessionTimeoutMs;
/** Connection timeout time */
private int connectionTimeoutMs;
}
Copy the code
package com.aha.client;
import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * Generate zK client **@author: WT
* @date: 2021/11/22 18:18
*/
@Configuration
public class ZookeeperClient {
/** * initMethod = "start" * after the curatorFramework creates the object, call the start method of the curatorFramework instance */
// @Bean(initMethod = "start")
// public CuratorFramework curatorFramework(ZkClientProperties zookeeperProperties) {
// return CuratorFrameworkFactory.newClient(
// zookeeperProperties.getAddress(),
// zookeeperProperties.getSessionTimeoutMs(),
// zookeeperProperties.getConnectionTimeoutMs(),
// new RetryNTimes(zookeeperProperties.getRetryCount(), zookeeperProperties.getInitElapsedTimeMs())
/ /);
/ /}
@Bean(initMethod = "start")
private static CuratorFramework getZkClient(ZkClientProperties zookeeperProperties) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, zookeeperProperties.getRetryCount(), 5000);
returnCuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getAddress()) .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()) .retryPolicy(retryPolicy) .build(); }}Copy the code
Simulate 50 threads scrambling for locks:
package com.aha.lock.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;
/ * * * *@author: WT
* @date: 2021/11/22 17:41
*/
@Slf4j
@Service
public class InterprocessMutexLock {
private final CuratorFramework curatorFramework;
public InterprocessMutexLock (CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}
public void test(String lockPath) {
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
// Simulate 50 thread snatch lock
for (int i = 0; i < 50; i++) {
new Thread(newTestThread(i, lock)).start(); }}static class TestThread implements Runnable {
private final Integer threadFlag;
private final InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run(a) {
try {
lock.acquire();
log.info({} thread acquired the lock, threadFlag);
// Wait 1 second to release the lock
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Copy the code
package com.aha.lock.controller;
import com.aha.lock.service.InterprocessMutexLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author: WT
* @date: 2021/11/23 14:13 * /
@RestController
public class TestController {
private final InterprocessMutexLock interprocessMutexLock;
public TestController (InterprocessMutexLock interprocessMutexLock) {
this.interprocessMutexLock = interprocessMutexLock;
}
@GetMapping("/lock/mutex")
public void testMutexLock (a) {
interprocessMutexLock.test("/lock/mutex"); }}Copy the code