I. Overview of Redisson
What is a Redisson? – Redisson Wiki
Redisson is a Java in-memory Data Grid based on Redis. It not only provides a set of distributed Java common objects, but also provides a number of distributed services. These include BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Redisson provides the simplest and most convenient way to use Redis. The goal of Redisson is to promote the Separation of Concern for Redisso that users can focus more on business logic.
A distributed tool based on the Redis implementation, with basic distributed objects and high-level and abstract distributed services, brings solutions to most distributed problems for every programmer trying to recreate distributed wheels.
What’s the difference between Redisson and Jedis and Lettuce? Not lei Feng and Lei Feng Tower
Redisson is as different from Redisson as using a mouse to manipulate a graphical interface and using a command line to manipulate files. Redisson is a higher level abstraction, Jedis and Lettuce are wrappers of Redis commands.
- Jedis is an official Redis toolkit for connecting to the Redis client through Java. It provides a variety of Redis command support
- Lettuce is an extensible thread-safe Redis client, communication framework based on Netty, supporting advanced Redis features such as sentry, clustering, pipes, automatic reconnection and Redis data model. Spring Boot 2.x start Lettuce has replaced Jedis as the preferred Redis client.
- Redisson is set up on the basis of Redis, communication based on Netty comprehensive, new middleware, enterprise development using Redis the best model
Jedis encapsulates the Redis command, and then has a richer Api and supports clustering and other modes. However, both of them are also minimal, giving you just the scaffolding to work with Redis databases, whereas Redisson builds a mature distributed solution based on Redis, Lua, and Netty, and even a set of tools that Redis officially recommends.
Distributed lock
How to implement distributed lock?
Distributed lock is a prerequisite for concurrent services, although there are many implementations: ZooKeeper has Znode sequential node, database has table level lock and happy/pessimistic lock, Redis has setNx, but they all come to the same destination, and finally return to the mutual exclusion. This article introduces Redisson, then take Redis as an example.
How to write a simple Redis distributed lock?
Using Spring Data Redis as an example, RedisTemplate is used to operate Redis (setIfAbsent is already the merge command of setNx + EXPIRE) as follows
Public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); } // Unlock to prevent the wrong lock, Verify whether the uUID is its own lock. Public void unlock(String lockName, String uuid) { if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); } // struct if(tryLock){// todo}finally{unlock; }Copy the code
The simple 1.0 version is complete, and the clever zhang can see at a glance that this is a lock, but get and DEL operations are non-atomic, once the concurrency is large, there is no guarantee of process security. So Xiao Zhang suggested using Lua script
What is a Lua script?
Lua scripting is a lightweight language built into Redis. It is executed using the Eval /evalsha command of Redis, which encapsulates all operations into a Lua script.
So version 2.0 is removed via Lua script
LockDel. Lua as follows
If redis. Call ('get', KEYS[1]) == ARGV[1] then return redis. Call ('del', KEYS[1]) elseCopy the code
Run the Lua command during the delete operation
// unlockScript DefaultRedisScript<Object> unlockScript = new DefaultRedisScript(); unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua"))); / / execution lua script unlock redisTemplate. Execute (unlockScript, Collections. SingletonList (keyName), value);Copy the code
Synchronized and ReentrantLock are slippery because they are reentrant locks. A thread holding the lock more than once won’t deadlock. We need reentrant locks.
How is reentrant guaranteed?
Reentrant means that it is allowed for the same thread to acquire the same lock multiple times without causing deadlocks, which is a good idea provided by synchronized bias lock. The implementation of synchronized reentrant is at the JVM level. The thread ID and counter are hidden in the JAVA object header MARK WORD to reentrant the current thread, avoiding every CAS.
When a thread accesses a synchronized block and acquires a lock, it stores the biased thread ID in the lock record in the object header and stack frame. Later, the thread does not need to perform CAS operation to lock and unlock the synchronized block, but simply tests whether the Mark Word of the object header stores the biased lock pointing to the current thread. If the test succeeds, the thread has acquired the lock. If the test fails, it is necessary to test whether the bias lock flag in Mark Word is set to 1. If set, CAS points the object header bias lock to the current thread.
Then maintain a counter, the same thread will increase by 1 when entering, and then decrease by 1 when leaving, until it is 0 to release
Reentrant lock
To copy this scheme, we need to modify the Lua script:
1. Store the lockName lockName, id of the thread that obtains the lock, and count of the entry count of the corresponding thread
2. Lock
Each time a thread acquires a lock, it determines whether the lock already exists
There is no
Set the hash key to the thread ID and value to 1
Setting expiration Time
Returns true after obtaining the lock successfully
There are
Continue to check if there is a hash key for the current thread ID
If yes, the value of the thread key is + 1, the reentrant count is increased by 1, and the expiration time is set
Does not exist, return locking failed
3. To unlock
Determines whether the lock already exists each time the thread attempts to unlock it
There are
Check whether there is a hash key for the id of the thread. If there is a hash key, the value is reduced by 1. If there is no hash key, the unlock fails
If the remaining count is 0, the lock is no longer needed. Run the del command to delete the lock
1. Storage structure
To facilitate maintenance of the object, we use a Hash structure to store these fields. Redis Hash is similar to Java HashMap and is suitable for storing objects.
hset lockname1 threadId 1
Set a hash structure named lockname1 with a hash key of threadId and a value of 1
hget lockname1 threadId
Gets the value of threadId for lockname1
The storage structure is
Lockname lockname key1: threadId unique key, threadId value1: count counter, which records the number of times the thread obtains the lockCopy the code
Structure in Redis
2. Add and subtract the counter
When the same thread acquires the same lock, we need to add or subtract the count of the corresponding thread
You can use EXISTS to check whether a redis key exists, and hexists to check whether a hash key exists
Redis also has the hash increment command hincrby
Each time it increments by 1hincrby lockname1 threadId 1
, when minus 1hincrby lockname1 threadId -1
3. Judgment of unlocking
When a lock is no longer needed, unlock it once, and count is reduced by 1 until it reaches 0
Based on the above storage structure and judgment process, lock and unlock Lua are as follows
Lock lock. Lua
local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2]; If (redis. Call ('exists', key) == 0) then redis. Call ('hset', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end; If (redis. Call ('hexists', key, threadId) == 1) then redis. Call ('hincrby', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end; return 0;Copy the code
Unlock the unlock. Lua
local key = KEYS[1]; local threadId = ARGV[1]; If (redis. Call ('hexists', key, threadId) == 0) then return nil; end; -- counter -1 local count = redis. Call ('hincrby', key, threadId, -1); -- Delete lock if (count == 0) then redis. Call ('del', key); return nil; end;Copy the code
code
/** * @description native Redis implement distributed lock * @date 2021/2/6 10:51 PM **/ @getter @setter public class RedisLock {private RedisTemplate redisTemplate; private DefaultRedisScript<Long> lockScript; private DefaultRedisScript<Object> unlockScript; public RedisLock(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; LockScript = new DefaultRedisScript<>(); this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); this.lockScript.setResultType(Long.class); UnlockScript = new DefaultRedisScript<>(); this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); } /** * get lock */ public String tryLock(String lockName, Long releaseTime) {String key = uuid.randomuuid ().toString(); / / execute the script Long result = (Long) redisTemplate. The execute (lockScript, Collections. SingletonList (lockName), key + Thread.currentThread().getId(), releaseTime); if (result ! = null && result.intValue() == 1) { return key; } else { return null; }} /** * unlock * @param lockName * @param key */ public void unlock(String lockName, String key) { redisTemplate.execute(unlockScript, Collections.singletonList(lockName), key + Thread.currentThread().getId() ); }}Copy the code
So far, a distributed lock has been completed, which conforms to the basic characteristics of mutual exclusion, reentrant and deadlock-proof.
Rigorous zhang think although when A common mutex, already enough, but in the business is always A lot of special circumstances, such as A process in acquiring the lock, because of business operation time is too long, lock released but business is in the execution, but this moment B process and can get A lock normally do business operations, two processes operating problems will still have A Shared resource.
And if the Redis node that stores the distributed lock goes down and the lock happens to be in the locked state, the lock will become locked.
Xiao Zhang is not a fine, because the inventory operation is always so special.
So we want to extend the lock’s releaseTime in this case to delay the release of the lock until the desired result of the business has been achieved. This action of continually extending the lock expiration time to ensure that the business has completed is a lock renewal.
Read/write separation is also common. A service that reads more than it writes more often has read locks and write locks for performance.
At this point, the extension has gone beyond the complexity of a simple wheel. Just dealing with renewal is enough for Zhang to enjoy, let alone work on performance (maximum wait time for locks), elegance (invalid lock requests), retry (retry mechanism for failure), and so on. When Xiao Zhang was thinking hard, xiao Bai leaned over to take a look at Xiao Zhang and was curious that it was 2021, why not just use Redisson?
Redisson has the lock you asked for.
Redisson distributed lock
What is the posture for the supposedly simple Redisson distributed lock?
1. Rely on
<! - native, --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency> <! -- Another Spring integration starter, --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> The < version > 3.13.6 < / version > < / dependency >Copy the code
2. The configuration
@Configuration public class RedissionConfig { @Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.password}") private String password; private int port = 6379; @Bean public RedissonClient getRedisson() { Config config = new Config(); config.useSingleServer(). setAddress("redis://" + redisHost + ":" + port). setPassword(password); config.setCodec(new JsonJacksonCodec()); return Redisson.create(config); }}Copy the code
3. Enable the distributed lock
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
rLock.unlock();
}
Copy the code
Simple and clear, all you need is an RLock, since I recommend Redisson, let’s take a look inside to see how he implements it.
Four, RLock
RLock is the core interface of Redisson distributed Lock, inherits the Concurrent package Lock interface and its own RLockAsync interface, RLockAsync return values are RFuture, is the core logic of Redisson asynchronous implementation. Netty is also the main position to play.
How to lock RLock?
Enter from RLock, find the RedissonLock class, find the tryLock method and then proceed to the agent’s tryAcquireOnceAsync method. This is the main code for locking. 3.13.6 is used as an example.)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime ! = -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }}}); return ttlRemainingFuture; }}Copy the code
Here there are two branches of leaseTime time judgment, which is actually whether the expiration time is set when locking. If the expiration time is not set (-1), there will be the lock renewal of watchDog (below), a renewal task registered with the locking event. So let’s look at the tryLockInnerAsync part that has an expiration date,
EvalWriteAsync is an entry point for the eval command to execute LuA
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" , Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }Copy the code
This is where the eval command is used to execute the Lua script, where the Lua script is expanded
Call ('exists', KEYS[1]) == 0) then ARGV[2], 1); Redis. Call ('pexpire', KEYS[1], ARGV[1]); return nil; end; If (redis. Call ('hexists', KEYS[1], ARGV[2]) == 1) then KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);Copy the code
Redisson implements a custom distributed lock as well as a custom distributed lock.
// keyName KEYS[1] = Collections.singletonList(this.getName()) // leaseTime ARGV[1] = this.internalLockLeaseTime // ARGV[2] = this.getLockName(threadId) ARGV[2] = this.getLockName(threadId)Copy the code
A total of three parameters complete a piece of logic:
Check whether the lock already has a hash table.
• No hash table exists: Set the key of an entry in the hash table to the lock name and value to 1. Then set the hash table validity time to leaseTime
• If a hash table exists, perform the +1 operation on the value of the lockName, that is, calculate the entry times, and set the expiration time leaseTime
• Finally returns the TTL remaining time for this lock
It is no different from the above custom locks
In that case, there must be a -1 operation for the unlock method, look at the unlock method, also look for the method name, all the way to
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" , Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)}); }Copy the code
Take out the Lua part
Key if (redis. Call ('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- counter -1 local counter = redis. Call ('hincrby', KEYS[1], ARGV[3], -1); Redis. call('pexpire', KEYS[1], ARGV[2]); return 0; Else -- Delete and publish unlock message redis. Call ('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;Copy the code
AsList (getName(), getChannelName())
Name Lock name channelName, the channel name used for pubSub to publish messagesCopy the code
The ARGV variable has three lockpubsub. UNLOCK_MESSAGE, internalLockLeaseTime, and getLockName(threadId)
Lockpubsub. UNLOCK_MESSAGE, channel Specifies the type of messages to be sent. The value is unlocked as 0 internalLockLeaseTime, specifies the timeout period configured for the watchDog. Default: 30s lockName The lockName here refers to the unique value of the uUID and threadIdCopy the code
The steps are as follows:
1. Return nil if the lock does not exist;
2. If the lock exists, hash the thread’s hash key counter -1,
3. Counter counter>0, reset expiry time, return 0; Otherwise, delete the lock, publish the unlockMessage unlockMessage, and return 1.
UnLock uses Redis to publish and subscribe to PubSub to complete message notification.
The subscription step is in the lock method of RedissonLock’s lock entry
long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); if (ttl ! // Subscribe RFuture<RedissonLockEntry> Future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } / / omittedCopy the code
Waiting for the lock to be released by another thread by listening for the release notification when the lock is held by another thread (the pub/sub notification is issued when the lock is released by another thread via RedissonLock) is also a common efficiency measure to avoid spin.
1. Unlock the message
To find out exactly what was notified and what was done after the notification, go to LockPubSub.
There is only one obvious listening method, onMessage, whose subscription and semaphore release are both in the parent class PublishSubscribe, and we are only concerned with the actual operation of the listening event
protected void onMessage(RedissonLockEntry value, Long message) { Runnable runnableToExecute; If (message.equals(unlockMessage)) {runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute ! = null) { runnableToExecute.run(); } // getLatch() returns Semaphore, This is the release of the semaphore // release the semaphore will wake up the waiting entry.getlatch ().tryacquire to try again to acquire the lock value.getlatch ().release(); } else if (message.equals(readUnlockMessage)) { while(true) { runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute == null) { value.getLatch().release(value.getLatch().getQueueLength()); break; } runnableToExecute.run(); }}}Copy the code
Redisson provides read/write locks, and read/write locks are mutually exclusive with read/write locks. We only look at the default unlockMessage branch above
The LockPubSub listener ends up doing two things
-
Runnabletoexecute.run () performs the listening callback
-
value.getLatch().release(); Release semaphore
Redisson listens for unlock messages through LockPubSub, performs listening callbacks and releases semaphore notifications to the waiting thread to reacquire the lock.
Now come back to the tryAcquireOnceAsync branch
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime ! = -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }}}); return ttlRemainingFuture; }}Copy the code
As you can see, when there is no timeout, a piece of inexplicable logic is executed after the lock operation
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }}})Copy the code
Netty’s Future/Promise-Listener model is involved here (see asynchronous programming in Netty). Redisson almost always communicates this way (so Redisson is implemented based on Netty’s communication mechanism)
In The Java Future, business logic is a Callable or Runnable implementation class. The completion of call() or run() of this class means the end of business logic. In the Promise mechanism, the success and failure of business logic can be manually set in the business logic. In this way, you can easily monitor your own service logic.
The superficial meaning of this code is that, after the operation of asynchronous locking is performed, if the locking is successful, it will confirm whether to execute a scheduled task according to whether the TTL returned by the locking is expired.
This timing task is the core of watchDog.
2. Lock the contract
Check the RedissonLock. This. ScheduleExpirationRenewal (threadId)
private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry ! = null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this.renewExpiration(); } } private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee ! = null) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent ! = null) { Long threadId = ent.getFirstThreadId(); if (threadId ! = null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e ! = null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); }}}); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); }}Copy the code
Broken down, this continuously nested and verbose code actually takes a few steps
• add a netty Timeout callback every (internalLockLeaseTime / 3) millisecond to renewExpirationAsync
• renewExpirationAsync resets the lock timeout, registers a listener, and the listener callback executes renewExpiration
RenewExpirationAsync Lua is as follows
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;" , Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); } if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;Copy the code
The timeout period was reset.
What is the purpose of Redisson’s addition of this logic?
This is to ensure service continuity in certain scenarios, for example, a task timed out but not finished, and the lock has been released.
When a thread holds a lock and the timeout period leaseTime is not set, Redisson sets the default value to 30 seconds, and enables the watchDog to renew the lock every 10 seconds and maintain the timeout period of 30 seconds until the lock is deleted.
This is Redisson’s lock renewal, which is the basic idea of WatchDog implementation.
3. Process summary
Through the overall introduction, the process is summarized as follows:
- Threads A and B vie for A lock, and when thread A gets it, thread B blocks
- When thread B blocks, it is not an active CAS, but a broadcast message subscribing to the lock in PubSub mode
- A completes the operation and releases the lock. Thread B receives A subscription notification
- B is awakened and continues to grab the lock, get the lock
The detailed locking and unlocking process is summarized as follows:
Five, fair lock
Reentrant locks described above are non-fair locks. Redisson also implements fair locks based on Redis’ queue (List) and ZSet
What is the definition of fairness?
Fairness is based on the client request first come first served queue to obtain the lock, first come first served, also known as FIFO, so queue and container order arrangement is essential
FairSync
Review the implementation of JUC’s ReentrantLock fair lock
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
Copy the code
AQS already provides the entire implementation, which is fair depending on whether the implementation class fetkes the node logic sequentially
AbstractQueuedSynchronizer is used to construct the lock or other synchronous component based framework, done through built-in FIFO queue resource acquisition thread line work, his own does not implement synchronous interface, simply defines a number of state synchronization acquisition and release method for use by the custom synchronous components (pictured above), Support for exclusive and shared access, a design based on the template method pattern, provides ground for fair/unfair.
Let’s use two diagrams to briefly explain the AQS waiting process (from The Art of Concurrent Programming in JAVA).
One is a flow chart of synchronization queues (FIFO bidirectional queues) managing thread references, wait states, and precursor and successor nodes that fail to acquire synchronization state (snatch lock failure)
One is the total process of exclusive acquisition of synchronous state, core acquire(int ARG) method call process
You can see the lock acquisition process
AQS maintains a synchronization queue, and the thread that fails to obtain the state will join the queue for spin. The condition for moving out of the queue or stopping the spin is that the precursor node successfully obtains the synchronization state for the head node.
A comparison of NonfairSync, another unfair lock class, reveals that the key code controlling fair and unfair is the HasqueuedEstablishes method.
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}Copy the code
NonfairSync reduces the HasqueuedToraise judgment condition, the function of this method is
Checks to see if the current node in the synchronization queue has a precursor node, and returns true if there is an earlier request for the lock than the current thread.
A fair rule is to ensure that the first node (thread) of the queue is taken every time to acquire the lock
Why does JUC default to unfair locking?
Because when a thread requests a lock, the lock is acquired to synchronize the state. In this case, the probability that the newly released thread will acquire the synchronization state again is very high, forcing other threads to wait in the synchronization queue. The advantage of this, however, is that unfair locking greatly reduces the switching overhead of the system thread context.
The cost of visibility fairness is performance and throughput.
There are no AQS in Redis, but there are lists and zsets. See how Redisson achieves fairness.
RedissonFairLock
RedissonFairLock is still simple to use
RLock fairLock = redissonClient.getFairLock(lockName);
fairLock.lock();
RedissonFairLock inherits from RedissonLock, again all the way down to the lock implementation method tryLockInnerAsync.
There are two lengthy Lua sections, but Debug finds that the entry to the fair lock is after command == RedisCommands.EVAL_LONG. The Lua section is long and has many parameters, so we focus on Lua implementation rules
parameter
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName) KEYS[1]: lock_name, KEYS[2]: "Redisson_lock_queue :{XXX}" KEYS[3]: "Redisson_lock_timeout :{XXX}" ARGV = internalLockLeaseTime, getLockName(threadId), ARGV[1]: "{leaseTime}" ARGV[2]: "{Redisson.UUID}:{threadId}" ARGV[3] = current time + thread wait time: (10:00:00) + 5000 ms = 10:00:05 ARGV[4] = current time (10:00:00) Deploy server time, Non-redis-server Indicates the server timeCopy the code
Lua script for fair lock implementation
Local firstThreadId2 = redis. Call ('lindex', KEYS[2], 0); If firstThreadId2 == false then break; end; Key local timeout = tonumber(redis. Call ('zscore', KEYS[3], firstThreadId2)) if timeout <= tonumber(ARGV[4]) then redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]); else break; end; end; - 2. There is no the lock && (there is no thread waiting queue | | thread is waiting queue and the first node is the thread ID), If (redis. Call ('exists', KEYS[1]) == 0) and ((redis. Call ('exists', KEYS[2]) == 0) or (redis. Call ('lindex', KEYS[2]) == 0) KEYS[2], 0) == ARGV[2])) then select redis. Call ('lpop', KEYS[2]); redis.call('zrem', KEYS[3], ARGV[2]); local keys = redis.call('zrange', KEYS[3], 0, -1); Select * from zSet key; For I = 1, #keys, 1 do redis. Call ('zincrby', keys[3], -tonumber(ARGV[3]), keys[I]); end; Redis. Call ('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; If redis. Call ('hexists', KEYS[1], ARGV[2]) == 1 then redis. Call ('hincrby', KEYS[1], ARGV[2],1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; Local timeout = redis. Call ('zscore', KEYS[3], ARGV[2]); If timeout ~= false then The subtraction here still calculates the TTL return timeout-tonumber (ARGV[3]) -tonumber (ARGV[4]) of the current thread; end; Local lastThreadId = redis. Call ('lindex', KEYS[2], -1); local ttl; If lastThreadId ~= false and lastThreadId ~= ARGV[2] then -- Calculate the remaining TTL = of the last queue node tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]); Else -- get TTL = redis. Call (' PTTL ', KEYS[1]); end; ZSet timeout (score), TTL + current time + 5000ms + current time, if there is no new, update, if there is no thread ID in the queue queue, if there is no thread ID in the queue queue, insert, Local timeout = TTL + tonumber(ARGV[3]) + tonumber(ARGV[4]); if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then redis.call('rpush', KEYS[2], ARGV[2]); end; return ttl;Copy the code
1. Fair locking and locking steps
From the above Lua, you can see that the key structures of Lua operations are lists and ordered sets.
List maintains a queue of waiting threads, redisson_lock_queue:{XXX}, and zSet maintains an ordered set of thread timeouts, redisson_lock_timeout:{XXX}. Although Lua is long, it can be broken down into six steps
- Queue cleaning
- Ensure that there are only unexpired waiting threads in the queue
- The first lock
- Hset lock, pEXPIRE expiration time
- Reentrant judgment
- Same as reentrant lock lua
- Returns the TTL
- Calculates the TTL of the tail node
- The initial value is the remaining expiration time of the lock
- At the end of the line
-
TTL + 2 * currentTime + waitTime is the default score formula
2. The simulation
If you simulate the following sequence, you will understand the whole process of redisson fair locking
Suppose T1 10:00:00 < T2 10:00:10 < T3 10:00:20
T1: when thread 1 obtains the lock for the first time
1. Wait queue headless node, out of the infinite loop ->2
2. The lock does not exist && no thread wait queue is established
2.1 LPOP, ZerM and Zincrby are all invalid operations, and only locking takes effect, indicating that it is the first time locking, and nil is returned after locking
The lock is locked successfully. Thread 1 acquires the lock
T2: Thread 2 attempts to acquire the lock (thread 1 does not release the lock)
1. Wait queue headless node, out of the infinite loop ->2
2. The lock is invalid if it does not exist ->3
3. Non-reentrant threads ->4
4. No value – > 5 score
5. If the last node is empty, set the initial TTL value to TTL of lock_name -> 6
6. Set zSet timeout score according to TTL + waitTime + currentTime + currentTime, and join the wait queue with thread 2 as the head node
score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10
T3: Thread 3 attempts to acquire the lock (thread 1 does not release the lock)
1. The wait queue has a head node
1.1 Not expired ->2
2. The lock is invalid if it does not exist ->3
3. Non-reentrant threads ->4
4. No value – > 5 score
5. The tail node is not null && The tail node thread is 2, not the current thread
5.1 subtract the currentTime from score: TTL = score – currentTime ->6
6. Run the TTL + waitTime + currentTime + currentTime command to set zSet timeout score and join the waiting queue
score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20
Thus, the three threads that need to grab a lock complete a queue, arrange their waiting thread ids in a list, and store expiration times in a zSet (for prioritizing purposes). The thread 2 client and thread 3 client that return TTL will spin the Lua repeatedly at a certain interval to try to lock, thus having the same effect as AQS.
When thread 1 releases the lock (Pub/Sub releases the unlock message to other threads)
10:00:30 Thread 2 attempts to acquire the lock (thread 1 has released the lock)
1. The wait queue has a head node and does not expire. ->2
2. There is no lock & wait queue head node is the current thread set up
2.1 Delete the queue information and zSet information of the current thread. The timeout period is:
Thread 2 10:00:35 + 10:00:10-10:00:30 = 10:00:15
Thread 3 10:00:35 + 10:00:20-10:00:30 = 10:00:25
2.2 Thread 2 obtains the lock and resets the expiration time
The lock is locked successfully. Thread 2 acquires the lock
Queuing structure is shown in figure
The release script of a fair lock is similar to that of a reentrenter lock, adding the while true logic of clearing expired keys at the beginning of the lock, which will not be described here.
As can be seen from the above, the gameplay of Redisson fair lock is similar to that of delay queue. The core of Redisson fair lock is the combination of Redis List and zSet structure, but it also draws on the implementation of AQS and has the same timing judgment head node (watchDog), which ensures fair competition and mutual exclusion of lock. In concurrent scenarios, the score of zSet in Lua script solves the problem of sequential insertion and prioritizing. In addition, in order to prevent the threads that exit due to exceptions from being unable to clean up, each request will judge the expiration of the head node and clear it. When finally released, the subscriber thread will be notified through the CHANNEL to acquire the lock, and the initial steps will be repeated to smoothly transfer to the next sequential thread.
Six, summarized
The whole implementation of Redisson distributed unlock process is slightly complicated, author Rui Gu on Netty and JUC, Redis in-depth research, the use of a lot of advanced features and semantics, worth further learning, this introduction is only a single Redis under the lock implementation, Redisson also provides MultiLock in the case of multiple machines and the officially recommended RedLock, more on that in the next chapter.
So when you really need distributed locks, you might want to look in Redisson first.