The target

  • Redisson Distributed watchdog lock mechanism
  • Source code analysis

Redisson Distributed watchdog lock mechanism

In the last article, we analyzed the process of Redisson lock and unlock. The overall logic is still clear, mainly using asynchronous lua script to lock, but some of the details, such as internalLockLeaseTime in RedissonLock, were not detailed before. This variable starts with a default value of 30,000 milliseconds, and the Config class has an interesting name for the default variable lockWatchdogTimeout. LockWatchdogTimeout is the dog that watches the lock. Let’s keep thinking about it.

Suppose that in a distributed environment, multiple service instances acquire the lock, and service instance 1 acquires the lock. What if service instance 1 hangs, hangs, and freezes after acquiring the lock? If service instance 1 goes down, the lock will still be released when the lock expires. Suppose that service instance 1 did not go down, but that the business took too long to execute, exceeding the timeout period of the lock? The result is that service instance 1 is still executing, but the lock is released, and other service instances can preempt the lock. Therefore, in this case, a mechanism for the expiration time to be extended is needed, which is where the watchdog comes in. Let’s take a look at the source code for Redisson’s reentrant lock watchdog mechanism

Source code analysis

Redisson Asynchronous locking

The RedisExecutor class sets up a listener after executing the Lua script asynchronously, as follows:

writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { checkWriteFuture(writeFuture, attemptPromise, connection); }});Copy the code

I’m not going to look at the code, but the logic is, if the lock fails, it returns, if the lock is successful, it gets the result synchronously, and if the lock is successful, the Lua script returns nil, and then ttlRemaining is null, So will go to scheduleExpirationRenewal method.

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime ! = -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); TtlRemainingFuture. The onComplete ((ttlRemaining, e) - > {/ / if failed to get the lock, direct return if (e! = null) { return; } / / get the lock, perform scheduleExpirationRenewal if (ttlRemaining = = null) {scheduleExpirationRenewal (threadId); }}); return ttlRemainingFuture; }Copy the code

Below is scheduleExpirationRenewal method code, code logic first gets to extend the expiration time is EXPIRATION_RENEWAL_MAP lock information (please call lock information), the map is stored information is to delay the expiration time lock, Here the MAO would be empty if it was the first time, so it would go to the else and renewExpiration() method.

    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
Copy the code

The renewExpiration method gets a EXPIRATION_RENEWAL_MAP for a lock to be renewed, and returns a TimerTask if it does not. InternalLockLeaseTime / 3 It is 10 seconds according to the default value. The execution logic of the timed task is to obtain the information of the lock to be extended from the Map first. If it does not obtain the information, it will directly return. RenewExpirationAsync, execute Lua asynchronously and we’ll see that in a second, execute onComplete asynchronously, and if there’s an exception print a log and return, Otherwise, renew the renewExpiration of scheduled tasks.

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
Copy the code

Renewationasync is an asynchronous execution of Lua scripts. The logic of the script is to determine whether the map structure of the lock key contains the lock information. If it does, the pEXPIRE command will be invoked to set the expiration time of the lock key. If the service instance does not release the lock, the gate will extend the lock expiration time by 30 seconds every 10 seconds. This ensures that the instance that acquires the lock remains in its own hands even if the expiration date is exceeded while processing business.

protected RFuture<Boolean> renewExpirationAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }Copy the code

conclusion

The Redisson reentrant lock locking/unlocking and guard dog mechanism has been clarified from the previous and this articles. The following diagram illustrates the process.