When it comes to the realization of distributed lock, three implementation schemes come to mind: innoDB row lock of database, SetNx of Redis and ZAB protocol of Zookeep.
A distributed lock based on database
- Pessimistic locking
Use the select… The where… For update exclusive lock
Note: Other additional functions are basically the same as implementation 1, except that “where name=lock”, the name field must be indexed, otherwise the table will be locked. In some cases, such as when the table is not large, the mysql optimizer will not move the index, causing table locking problems.
- Optimistic locking
The biggest difference between the so-called optimistic lock and the previous one is that based on the CAS idea, it is not mutually exclusive and does not consume resources due to lock waiting. During the operation, it is considered that there is no concurrency conflict and can only be detected after the update version fails. We use this implementation to prevent overselling in our snap and snap. Optimistic locking is implemented by adding incremental version number fields
Second, based on cache (Redis, etc.) to achieve distributed lock
- The following commands are used: SETNX SETNX key val: returns 1 if and only if the key does not exist. If key exists, do nothing and return 0. (2) Expire expire key timeout: Set a timeout period (unit: second) for the key. After the timeout period, the lock will be automatically released to avoid deadlocks. (3) delete Delete key: delete key
These three commands are mainly used when implementing distributed locks using Redis.
- Implementation idea: (1) When obtaining the lock, use setnx to add the lock, and use expire command to add a timeout time for the lock. When the timeout time expires, the lock will be automatically released. The value of the lock is a randomly generated UUID. (2) Set a timeout time for lock acquisition. If the time exceeds this time, lock acquisition will be abandoned. (3) When releasing the lock, determine whether it is the lock according to the UUID. If it is the lock, execute delete to release the lock.
Code:
Public class DistributedLock {private final JedisPool JedisPool; public DistributedLock(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * lock * @param lockName Lock key * @param acquireTimeout acquireTimeout time * @param timeout lock timeout time * @return lock identifier */ public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) { Jedis conn = null; String retIdentifier = null; Conn = jedispool.getResource (); // Generate a value String identifier = uuid.randomuuid ().toString(); String lockKey = "lock:" + lockName; Int lockExpire = (int) (timeout /); Long end = system.currentTimemillis () + acquireTimeout; while (System.currentTimeMillis() < end) { if (conn.setnx(lockKey, identifier) == ) { conn.expire(lockKey, lockExpire); RetIdentifier = identifier; // Return the value used to release the lock. return retIdentifier; If (conn.ttl(lockKey) == -) {conn.expire(lockKey, lockExpire); if (conn.ttl(lockKey) == -) {conn.expire(lockKey); } try { Thread.sleep(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (conn ! = null) { conn.close(); } } return retIdentifier; } /** * public Boolean releaseLock(String lockName, String lockName, String lockName, String lockName, String lockName) String identifier) { Jedis conn = null; String lockKey = "lock:" + lockName; boolean retFlag = false; try { conn = jedisPool.getResource(); While (true) {// Monitor lock, ready to start transaction conn.watch(lockKey); If (identifiers. Equals (conn.get(lockKey))) {Transaction Transaction = conn.multi(); transaction.del(lockKey); List<Object> results = transaction.exec(); if(results == null){ continue; } retFlag = true; } conn.unwatch(); break; } }catch(JedisException e){ e.printStackTrace(); }finally{ if(conn! - null){ conn.close(); } } return retFlag; }}Copy the code
Simulate the thread to perform the SEC kill service
public class ThreadA extends Thread { private Service service; public ThreadA(Service service) { this.service = service; } @Override public void run() { service.seckill(); } } public class Test { public static void main(String[] args) { Service service = new Service(); for (int i = 0; i < 50; i++) { ThreadA threadA = new ThreadA(service); threadA.start(); }}}Copy the code
The result is as follows: is ordered
Comment out the part that uses the lock
//String indentifier = lock.lockWithTimeout("resource", 5000, 1000); //String indentifier = lock.lockWithTimeout("resource", 5000, 1000); System.out.println(thread.currentThread ().getName() + "lock "); System.out.println(--n); //lock.releaseLock("resource", indentifier); }Copy the code
As you can see from the results, some are asynchronous:
Three, three, based on Zookeeper to achieve distributed lock
ZooKeeper is an open source component that provides consistency services for distributed applications. It contains a hierarchical file system directory tree structure, allowing only one unique file name in a directory. The steps for implementing distributed lock based on ZooKeeper are as follows:
(1) create a directory mylock; (2) Thread A wants to acquire the lock and creates A temporary sequential node in mylock; (3) Get all the child nodes in myLock directory, and then get the younger sibling node, if there is no, it means that the current thread sequence number is the smallest, get the lock; (4) Thread B obtains all nodes, determines that it is not the smallest node, and sets to listen on nodes that are smaller than it; (5) When thread A finishes processing, it will delete its own node. Thread B will monitor the change event and determine whether it is the smallest node. If so, it will obtain the lock.
InterProcessMutex, an open source library for Apache, is a ZooKeeper client. Acquire method is used to acquire the lock, and release method is used to release the lock.
The implementation source code is as follows:
**
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * @component public class ZkLock implements DistributionLock {private String zkAddress = "zk_adress"; private static final String root = "package root"; private CuratorFramework zkClient; private final String LOCK_PREFIX = "/lock_"; @Bean public DistributionLock initZkLock() { if (StringUtils.isBlank(root)) { throw new RuntimeException("zookeeper 'root' can't be null"); } zkClient = CuratorFrameworkFactory .builder() .connectString(zkAddress) .retryPolicy(new RetryNTimes(2000, 20000)) .namespace(root) .build(); zkClient.start(); return this; } public boolean tryLock(String lockName) { lockName = LOCK_PREFIX+lockName; boolean locked = true; try { Stat stat = zkClient.checkExists().forPath(lockName); if (stat == null) { log.info("tryLock:{}", lockName); stat = zkClient.checkExists().forPath(lockName); if (stat == null) { zkClient .create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(lockName, "1".getBytes()); } else { log.warn("double-check stat.version:{}", stat.getAversion()); locked = false; } } else { log.warn("check stat.version:{}", stat.getAversion()); locked = false; } } catch (Exception e) { locked = false; } return locked; } public boolean tryLock(String key, long timeout) { return false; } public void release(String lockName) { lockName = LOCK_PREFIX+lockName; try { zkClient .delete() .guaranteed() .deletingChildrenIfNeeded() .forPath(lockName); log.info("release:{}", lockName); } catch (Exception e) {log.error(" delete ", e); } } public void setZkAddress(String zkAddress) { this.zkAddress = zkAddress; }}Copy the code
Advantages: With high availability, reentrant, blocking lock features, can solve the failure deadlock problem.
Disadvantages: Performance is not as good as Redis because nodes need to be frequently created and deleted
Four, contrast
Disadvantages of database distributed lock implementation:
The DB performance is poor and tables may be locked. 2. If a non-blocking operation fails, polling is required, occupying CPU resources. 3. If you do not commit or poll for a long time, connection resources may be occupied
Redis(cache) distributed lock implementation disadvantages:
1. Lock deletion failure Expiration time cannot be controlled. 2.
Disadvantages of ZK distributed lock implementation: Performance is not as good as redis implementation, mainly because all write operations (obtaining locks and releasing locks) need to be performed on the Leader and then synchronized to the follower.
In conclusion, ZooKeeper provides high performance and reliability.
In order of ease of understanding (from lowest to highest) database > Cache > Zookeeper
Zookeeper >= Cache > database in terms of implementation complexity (from lowest to highest)
From a performance perspective (high to low) Cache > Zookeeper >= database
From the perspective of reliability (from highest to lowest) Zookeeper > Cache > Database
Xxl_job uses the database layer to implement distributed locking
The source code ` :
**
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author xuxueli 2019-05-21
*/
public class JobScheduleHelper {
private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
private static JobScheduleHelper instance = new JobScheduleHelper();
public static JobScheduleHelper getInstance(){
return instance;
}
public static final long PRE_READ_MS = 5000; // pre read
private Thread scheduleThread;
private Thread ringThread;
private volatile boolean scheduleThreadToStop = false;
private volatile boolean ringThreadToStop = false;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",
jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
}
}
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}
public void toStop(){
// 1、stop schedule
scheduleThreadToStop = true;
try {
TimeUnit.SECONDS.sleep(1); // wait
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (scheduleThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
scheduleThread.interrupt();
try {
scheduleThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// if has ring data
boolean hasRingData = false;
if (!ringData.isEmpty()) {
for (int second : ringData.keySet()) {
List<Integer> tmpData = ringData.get(second);
if (tmpData!=null && tmpData.size()>0) {
hasRingData = true;
break;
}
}
}
if (hasRingData) {
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop ring (wait job-in-memory stop)
ringThreadToStop = true;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (ringThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
ringThread.interrupt();
try {
ringThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
}
// ---------------------- tools ----------------------
public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
return nextValidTime;
} else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
}
return null;
}
}
Copy the code