To make the whole life comfortable and happy, it is impossible, because human beings must have an attitude to cope with adversity — Rousseau
Author Mahua, the capital an ordinary programmer, original from the public number “source interest circle”
preface
Usually, we consider the key points and performance aspects when implementing distributed lock, which may not be very comprehensive. Redisson is the best one in the industry about distributed lock
However, whether to introduce the need to combine with their own projects, if only to introduce the function of distributed lock, I think it is not necessary, I can implement it by myself; If you rely heavily on the distributed functionality in Redisson, this can be referenced
Before reading Redisson source code, you need to understand the origin of distributed locks and the advantages and disadvantages of customizing each implementation. Click the link to view
Already into the lock
Before we talk about Redisson, let’s talk about JDK ReentrantLock
ReentrantLock ensures that only a single thread can operate on a JVM shared resource at any one time
Implementation approach
Already internal fair lock with the fair lock inherited AQS [AbstractQueuedSynchronizer]
1. AQS uses the int variable state modified by VOLATIl to control thread safety and lock reentrant in concurrent situations
2. Put the threads that are not locked into the queue of AQS to wake up through LockSupport#park and unPark suspension
See the link address for AQS and RLock principles
Redisson
If you’re not familiar with Github Redisson’s WIKI, check out the Redisson WIKI directory to see how Redisson has armed Redis to the teeth
Here’s a look at some of the content related to the article
From the project introduction, we can see that the person who wrote this project introduction is very skilled. From the first paragraph, we know two questions
What is Redisson
Redisson is a Java resident memory data grid framework based on Redis, which makes full use of a series of advantages provided by Redis key database and provides users with a series of common tool classes with distributed characteristics based on common interfaces in Java utility toolkit
The advantage of Redisson
As a result, the toolkit which was originally used to coordinate single multithreaded concurrent program has obtained the ability of coordinating distributed multi-machine multithreaded concurrent system, which greatly reduces the difficulty of designing and developing large-scale distributed system
At the same time, it further simplifies the cooperation between programs in the distributed environment by combining various distributed services
That’s about it, I won’t expand down, but if you want to know more about what it’s used for, check out the table of contents above
Redisson reentrant lock
Because Redisson isso complex, most of the API calls are designed using Netty, so the analysis here is only about how to lock, how to implement reentrant lock, and how to renew the lock
Create a lock
I have downloaded Redisson’s source code locally
This simple program uses Redisson to create an unfair reentrant lock
The lock() method has a default expiration time of 30 seconds and supports the “watchdog” renewal function
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer()
.setPassword("123456")
.setAddress("Redis: / / 127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");
try {
lock.lock();
// Business logic
} finally{ lock.unlock(); }}Copy the code
Let’s first look at the declaration of the RLock interface
public interface RLock extends Lock.RLockAsync {}
Copy the code
RLock inherits the Lock interface from JDK source JUC and RLockAsync
RLockAsync is literally a lock that supports asynchrony, proving that locks can be acquired asynchronously
Read Redisson’s source code will know that comments are more expensive than gold 🙃️
Because get lock API more, we here to lock() do source code to explain, look at the interface definition is quite simple
/** * lock does not specify the lock expiration time, default 30 seconds * if the lock is acquired, the lock will be renewed */
void lock(a);
Copy the code
Get lock instance
Based on the little Demo above, how do you get the lock in step 1
RLock lock = redisson.getLock("myLock");
// name is the lock name
public RLock getLock(String name) {
// Create a synchronous actuator by default (there are asynchronous actuators, because lock acquisition and release are strong consistency requirements, default synchronization)
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
Copy the code
All Redis commands in Redisson are executed by… The Executor implementation
After obtaining the default synchronizer, initialize the RedissonLock
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
/ / the only ID
this.id = commandExecutor.getConnectionManager().getId();
// Wait to obtain the lock time
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
// ID + lock name
this.entryName = id + ":" + name;
// Publish subscriptions, which will be used later in the add/unlock process
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
Copy the code
Attempt to acquire a lock
How does RLock#lock() get the lock
@Override
public void lock(a) {
try {
lock(-1.null.false);
} catch (InterruptedException e) {
throw newIllegalStateException(); }}Copy the code
LeaseTime: lock expiration time. -1 Use the default value 30 seconds
Unit: Time unit: milliseconds, seconds, minutes, hours…
Interruptibly: Whether an interrupt can be identified
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// Get the current thread ID
long threadId = Thread.currentThread().getId();
// 🚩 tries to obtain the lock
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// The lock was successfully acquired with an empty expiration time
if (ttl == null) {
return;
}
// Subscribe to a distributed lock to be notified when it is unlocked
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// Try again to obtain the lock
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// Succeeded in obtaining the lock, the expiration time is empty, success returned
if (ttl == null) {
break;
}
// If the lock expiration time is greater than zero, block fetch with expiration time is performed
if (ttl >= 0) {
try {
// If the lock is not acquired, it will block here, Semaphore, and release Semaphore notification when unlocking
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
// If the lock expiration time is less than zero, the lock is dead
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else{ future.getNow().getLatch().acquireUninterruptibly(); }}}}finally {
// Unsubscribeunsubscribe(future, threadId); }}Copy the code
This piece of code is used to perform the locking, so let’s move on to the method implementation
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
Copy the code
The lock () and tryLock (…). Method eventually calls this method, divided into two process branches
1, tryLock (…). API asynchronous locking returns
2. Lock() & tryLock() API asynchronously locks and continues the lock
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
/ / execution tryLock (...). Will enter the
if(leaseTime ! = -1) {
// Get the lock asynchronously
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// Try to obtain the lock asynchronously. If the lock is obtained successfully, null is returned. Otherwise, the remaining lock expiration time is returned
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// ttlRemainingFuture This operation is triggered after execution is complete
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if(e ! =null) {
return;
}
// ttlRemaining == null Indicates that the lock is obtained
// Perform the continuation operation after obtaining the lock
if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
}
Copy the code
Keep looking at tryLockInnerAsync(…) Detailed locking process, internal use of Lua script form, to ensure atomic operation
At this point it should be clear that Lua scripts are Redisoon wrapped and then transmitted via Netty
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Copy the code
evalWriteAsync(…) The Eval command and the use of Netty are not followed up
Lock the Lua
Redis lock Lua script, a screenshot to let you see the parameters and the specific meaning
KEYS[1]: myLock
ARGV[1]: 36000… This is the expiration time, self-tested in milliseconds
ARGV[2]: UUID + thread ID
# KEYS[1MyLock # judge KEYS[1] If yes, return1, there is no return0
if (redis.call('exists', KEYS[1= =])0) then# when KEYS [1] = =0# use hincrby to find KEYS[1] does not exist and creates a hash # ARGV[2] as the first key of the hash, val is1# hincrby myLock91089b45... 1
redis.call('hincrby', KEYS[1], ARGV[2].1); # set KEYS [1] Expiration time in milliseconds redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end; # to find KEYS [1] key ARGV [2] If yes, a return is returned1
if (redis.call('hexists', KEYS[1], ARGV[2= =])1) then# ARGV[2] is val + for key1
redis.call('hincrby', KEYS[1], ARGV[2].1); The above # redis. Call ('pexpire', KEYS[1], ARGV[1]);
return nil;
end; # return KEYS [1] Expiration time, in millisecondsreturn redis.call('pttl', KEYS[1]);
Copy the code
The whole Lua script lock process is illustrated as follows:
Now let’s go back to how the lock is deferred once acquired
Locked to continue
I have talked about this topic with my friends before, and the idea is basically the same as that reflected in Redisson
Let’s talk about Redisson’s idea, which is translated into Chinese as “watchdog.”
1. Execute the “watchdog” process after obtaining the lock
2. Use Netty Timeout to implement timing delay
3. For example, if the lock expires for 30 seconds, it will check whether the lock exists every 1/3 of the time, that is, 10 seconds, and update the timeout period of the lock if it exists
What if the check return exists and the lock is set to expire just as the lock is released?
With such a question, the representative really thought through all possible scenarios, but there was no need to worry
The Lua script used in Redisson does checks and sets expiration times that are atomic and do not occur
If you do not want to reference Netty packages, you can also use package tools such as delayed queues to complete the “watchdog”.
Here is also a ha of relevant code, can let small partners more intuitive understanding of how to lock the continuation of the
RedissonLock#tryAcquireAsync(…)
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// ...
// Try to obtain the lock asynchronously. If the lock is obtained successfully, null is returned. Otherwise, the remaining lock expiration time is returned
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// ttlRemainingFuture This operation is triggered after execution is complete
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if(e ! =null) {
return;
}
// Perform the continuation operation after obtaining the lock
if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
}
Copy the code
You can see that the continuation method continues threadId as an identifier
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if(oldEntry ! =null) {
oldEntry.addThreadId(threadId);
} else{ entry.addThreadId(threadId); renewExpiration(); }}Copy the code
It’s good to know the core idea, no need to study every line of code
private void renewExpiration(a) {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if(e ! =null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// Call itselfrenewExpiration(); }}); } }, internalLockLeaseTime /3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
Copy the code
Unlock operation
The operation of unlocking is relatively simple
@Override
public void unlock(a) {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throwe; }}}Copy the code
After the unlock is successful, the previous “watchdog” Timeout will be cancelled, and return to success
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// Cancel the automatic continuation function
cancelExpirationRenewal(threadId);
if(e ! =null) {
/ / fail
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
// The account is unlocked successfully
result.trySuccess(null);
});
return result;
}
Copy the code
Another quintessential point, unlocking the Lua script definition
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Copy the code
Lua script will analyze it in detail
Unlock the Lua
Same old rule — pictures and specs
KEYS[1]: myLock
KEYS[2]: redisson_lock_channel:{myLock}
ARGV[1]: 0
ARGV[2]: 360000… (Expiration time)
ARGV[3]: 7f0c54e2… (Lock Key in Hash)
Judging # KEYS [1Is ARGV[present in]3]
if (redis.call('hexists', KEYS[1], ARGV[3= =])0) then
return nil;
end; # KEYS [1ARGV [in]3] Val - 1
local counter = redis.call('hincrby', KEYS[1], ARGV[3].- 1); # if return greater than0Turns out to be a reentry lockif (counter > 0) then# rework expiration time redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else# delete KEYS [1]
redis.call('del', KEYS[1]); Redis. Call ('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
Copy the code
Redlock algorithm
Admittedly, Redisson’s distributed lock design is fantastic, but it still doesn’t solve the problem of lock loss caused by asynchronous synchronization of data between master and slave nodes
Therefore, Antirez, the author of Redis, introduced the red-lock algorithm. The essence of this algorithm is: there is no slave node. If multiple Redis are deployed, each instance is independent from each other, and there is no master-slave replication or other cluster coordination mechanism
How to use
Create multiple Redisson nodes that form a complete distributed lock
public static void main(String[] args) {
String lockKey = "myLock";
Config config = new Config();
config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6379");
Config config2 = new Config();
config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6380");
Config config3 = new Config();
config.useSingleServer().setPassword("123456").setAddress("Redis: / / 127.0.0.1:6381");
RLock lock = Redisson.create(config).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);
RedissonRedLock redLock = new RedissonRedLock(lock, lock2, lock3);
try {
redLock.lock();
} finally{ redLock.unlock(); }}Copy the code
Of course, the Redlock algorithm is not without its doubts, you can go to the Redis website to check out Martin Kleppmann and Redis author Antirez debate
Trade-offs between CAP principles
The CAP principle, also known as the CAP theorem, refers to the importance of Consistency, Availability and Partition tolerance in a distributed system
Consistency (C) : Whether all data backups in a distributed system have the same value at the same time (equivalent to all nodes accessing the same latest copy of data)
Availability (A): Whether the cluster as A whole can respond to read/write requests from clients after some nodes fail (high availability for data updates)
Partition tolerance (P): In practical terms, partitioning is equivalent to time-bound requirements for communication. If the system cannot achieve data consistency within the time limit, it means that A partitioning situation has occurred and that it must choose between C and A for the current operation
Distributed lock selection
To achieve strong consistency between distributed locks, Zookeeper’s distributed lock can be used because its underlying ZAB protocol (atomic broadcast protocol) naturally meets CP
But this also means performance degradation, so looking at Redis and Zookeeper without looking at specific data represents a trade-off between performance and consistency
If the project does not rely heavily on ZK, Redis is good, because Redis is so versatile these days that most projects reference Redis
There is no need to introduce a new component for this, if the business scenario becomes intolerable for Redis asynchronously synchronizing data with lock loss, it can be handled in the business layer
Write the last words
Recently are writing multi-threaded source code related, the following will output JUC source code analysis
1, CountDownLatch
2, ThreadLocal
3
Including two recent distributed lock articles, the length is relatively long, I hope you can be patient to watch
I hope you can give feedback to correct the wrong and incorrect places in the article 🙏, the love of small partners is the biggest support for me, and finally I hope you can like, comment, see three even!
Recommended reading:
- Essay | how to solve the JDK thread pool shall not exceed the maximum number of threads under rapid consumption task
- Swastika graphic | a chat already and AQS that something (read not you find me)
- How to handle thread execution exceptions in the JDK thread pool?
- ParallelStream, a new Java8 feature, is used with caution