preface
Mainstream distributed locks are generally implemented in three ways:
- Optimistic database locking
- Distributed lock based on Redis
- Distributed lock based on ZooKeeper
I’ve blogged about mysql and Redis implementing distributed locks: www.cnblogs.com/wang-meng/p… It’s basically an implementation principle.
This [distributed lock] series of articles is mainly in-depth redis client Reddision source code and ZK these two distributed lock implementation principle.
reliability
First, to ensure that distributed locks are available, we need to ensure that at least four of the following conditions are met:
- Mutual exclusivity. Only one client can hold the lock at any time.
- Deadlocks do not occur. Even if one client crashes while holding the lock and does not unlock actively, it is guaranteed that subsequent clients can lock it.
- It has fault tolerance. Clients can lock and unlock as long as most Redis nodes are running properly.
- You must tie the bell. Lock and unlock must be the same client, the client can not unlock the lock added by others.
Redisson locking principle
Redisson is a very powerful open source Redis client framework, official address: redisson.org/
It’s easy to use. After configuring Maven and connection information, here’s the code:
RLock lock = redisson.getLock("anyLock");
lock.lock();
lock.unlock();
Copy the code
All of Redisson’s locking logic is done using Lua scripts, which ensure atomicity.
Let’s take a look at the RLock initialization code:
public class Redisson implements RedissonClient {
@Override
public RLock getLock(String name) {
return newRedissonLock(connectionManager.getCommandExecutor(), name); }}public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
Copy the code
The RedissonLock id returns a UUID object. Each machine has its own ID attribute, with an ID value similar to “8743C9c0-0795-4907-87FD-6C719a6b4586”.
Moving on to the code implementation of lock() :
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void lock(a) {
try {
lockInterruptibly();
} catch(InterruptedException e) { Thread.currentThread().interrupt(); }}@Override
public void lockInterruptibly(a) throws InterruptedException {
lockInterruptibly(-1.null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// Get the current thread ID
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else{ getEntry(threadId).getLatch().acquire(); }}}finally {
unsubscribe(future, threadId);
}
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }}Copy the code
The tryAcquire() method is passed with an expiration time of -1, followed by the current thread ID, followed by the core lua script execution flow. Let’s take a step by step look at how this works:
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
Copy the code
KEYS[1] parameter is: anyLock ARGV[2] Yes: id + “:” + threadId
First, use exists to determine whether the current key exists in redis. If not, it is equal to 0. Then run the hset command to store “anyLock ID :threadId 1” in Redis.
{
"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}
Copy the code
And just for the record, the last one is for reentrant counting, which we’ll talk about later.
Look below, and then use pEXPIRE to set the expiration time, which defaults to 30 seconds with internalLockLeaseTime. The result is null, and the lock is successfully added.
Redisson’s reentrant principle
Let’s take a look at the lock key exists, the same machine, the same thread how to lock?
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);".Copy the code
ARGV[2] is :” id + “:” + threadId “if the same thread requests again from the same machine, this will be 1, then execute hincrby, hset value+1 becomes 2, and then continue to set expiration time.
Similarly, after a thread reenters, value – 1 is unlocked
Redisson watchDog principle
If you have A scenario where A and B are running services, and A has A distributed lock, but the production environment is different, if in case A lock times out, but A’s services are still running. In this case, the lock of USER A is released due to timeout, user B obtains the lock, and user B executes the service logic. So distributed locking is meaningless, right?
Therefore, Redisson introduced the concept of Watch Dog. After A obtains the lock execution, if the lock does not expire, A background thread will automatically extend the lock expiration time to prevent the lock expiration due to incomplete service execution.
Let’s look at the implementation:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if(leaseTime ! = -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if(! future.isSuccess()) {return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}});return ttlRemainingFuture;
}
Copy the code
When we’re done with tryLockInnerAsync, we’ll add a listener. Look at the implementation in the listener:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
Copy the code
In this case, the scheduling task is executed every 10 seconds. In the Lua script, the renewal expiration time is used so that the lock held by the current thread will not be invalid when the expiration time is reached
Redisson’s mutual exclusion principle
Lua lock: lua lock: lua lock
"return redis.call('pttl', KEYS[1]);".Copy the code
Returning how long the lock will expire, we continue to look at the code:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// If TTL is returned, the lock is successful. If it is not empty, the lock fails
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// Loop endlessly to try to get the lock
while (true) {
// Try locking again
ttl = tryAcquire(leaseTime, unit, threadId);
// If TTL =null, the lock is preempted successfully
if (ttl == null) {
break;
}
// If the TTL is greater than 0, the lock fails to preempt
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else{ getEntry(threadId).getLatch().acquire(); }}}finally{ unsubscribe(future, threadId); }}Copy the code
Redisson lock release principle
Look directly at the Lua code:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// Check whether the lock key exists
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// Check whether the key corresponding to the current machine and thread ID exists
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// Count -1 can be reentrant lock
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// If the counter is greater than 0, the lock is still held
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// Delete the key with the del command
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Copy the code
conclusion
Summary of a picture:
statement
This article first from my public number: a flower is not romantic, if reprinted please indicate the source!
Interested partners can pay attention to personal public account: a flower is not romantic