Disadvantages of custom Redis distributed locks

A custom Redis distributed lock to solve the pull problem of scheduled tasks on multiple nodes (to avoid repeated tasks)

Existing problems:

The lock operation is not atomic (i.e., the setnx and EXPIRE steps are not atomic, and an interlude may result in a deadlock)

public boolean tryLock(String lockKey, Object value, long expireTime, TimeUnit TimeUnit) {/ / setnx first Boolean lock. = redisTemplate opsForValue () setIfAbsent (lockKey, value); If (lock) {/ / to lockKey set expiration time to expire redisTemplate, expire (lockKey expireTime, timeUnit); } return lock; }

Higher versions of the SpringBoot Redis dependency actually provide atomicity with locking:

@Override public boolean tryLockAtomic(String lockKey, Object value, long expireTime, TimeUnit timeUnit) { try { redisTemplate.opsForValue().set(lockKey,value,expireTime,timeUnit); return true; } catch (Exception e) { e.printStackTrace(); } return false; }

The unlock operation is not atomic (may cause locks to be deleted between different nodes)

Unlock, designed in the previous article, is not an atomic operation, but it prevents different nodes from deleting locks from each other. @Override public boolean unLock(String lockKey, Object value) { Object originValue = redisTemplate.opsForValue().get(lockKey); if (originValue ! = null && value.equals(originValue)){ redisTemplate.delete(lockKey); return true; } return false; }

The problem of lock renewal is that the execution time of scheduled tasks is shortened by using queues, and tasks are directly thrown into tasks. However, tasks may be piled up, and some cases may occur: Because the queue is busy, the task has not been executed yet, and the status in the database has not been changed to status = 1(executed). As a result, the same task is pulled again next time, and the execution is repeated (simple solution is: Although repeated tasks cannot be prevented from joining the queue, they can be executed after status=0 when consuming in the queue.)

The introduction of Redis Message Queue will make the system more complex. If the above model structure leads to occasional bugs, it is very difficult to check, so the scheduled task should be designed simply:

To design a complete Redis distributed lock, at least three problems must be solved:

  • Locking atomicity (setnx and EXPIRE must be atomic, otherwise deadlocks will occur easily)
  • Unlock atomicity (can’t delete someone else’s lock by mistake)
  • You need to consider the time of business/scheduled tasks and renew the lock
  • Regardless of performance, unlocking atomicity can be done with Lua scripts (using Redis single threaded features)

Lua scripts execute one script at a time, either successfully or failing, without interleaving with other instructions.

The hardest part is how to renew the lock based on the execution time of the actual business.

Although we can prevent different nodes from removing locks from each other by judging MACHINE_ID: what we essentially need is:

Redisson has been implemented for lock renewal. Understand Rredisson’s lock renewal mechanism

Redisson case

The above code raises a question:

  • Is the lock() method atomic?
  • Does lock() have an expiration date? How much is the
  • Does lock() renew the lock
  • How does the lock() method block? How do you wake up?

Lock () source code parsing

Lock lock, remove the abnormal situation, only lock success and lock failure two cases, look at the successful lock lock process:

Rlock#lock()—>ttl = tryAcquire(-1, leaseTime, unit, threadId)—>tryAcquireAsync(waitTime, leaseTime, unit, threadId)—>tryLockInnerAsync–> evalWriteAsync—>executorService.evalWriteAsync;

Procedure: 1. A lock attempt is performed. The system returns NULL if the lock succeeds, and TTL 2 if the lock fails. Call tryAcquireAsync to get the asynchronous result Future, and call GET to get the asynchronous result TTL 3 from the Future. Call tryLockInnerAsync to execute the Lua script and return RFuture (waiting for a callback) (pass in the lua script, lock name, etc.) (evalWriteAsync method to send the script to Redis and execute, return RFuture) (RFuture is used for: 1. In lua, ttlRemaining is empty after the lock is completed. If the lock fails, the remaining time of the remaining lock is returned. Start additional threads for lock renewal) 5. Return RFuture 6. Obtain ttlRemaining from the RFuture and check whether the TTL is empty to determine whether the lock is successful.

The whole process is simply to execute lua script (atomic, success or failure) to try to lock, return RFuture, RFuture can get TTL, determine whether the lock was successful by the TTL, can also set the callback function, in this callback function (when the lock was successful, Start additional threads for lock renewal)

Two difficulties:

  • Lua script
  • What does ttlRemaining.oncomplete () do
  • Lua script interpretation
  • What are KEYS and ARGV when executing lua scripts

` KEYS: Collections. SingtonList (getName ()) ARGV: InternalLockLeaseTime, getLockName (threadId)

If (redis. Call (‘exists’, KEYS[1]) == 0) then — set lock by hincrby: hincrby bravo1988_distributed_lock a1b2c3d4:666 1 redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); — Set the expiration time. ARGV[1]==internalLockLeaseTime redis.call(‘pexpire’, KEYS[1], ARGV[1]); — Return null return nil; end;

— If the current node is already set to “bravo1988_distributed_lock” ARGV[2]== node id) if (redis. Call (‘hexists’, KEYS[1], ARGV[2]) == 1) then Call (‘hincrby’, KEYS[1], ARGV[2], 1); — Set the expiration time. ARGV[1]==internalLockLeaseTime redis.call(‘pexpire’, KEYS[1], ARGV[1]); — Return null return nil; end;

TTL return redis. Call (‘ PTTL ‘, KEYS[1]); `

In a nutshell:

  • 1. If the current node does not have a key lock, run the hincrby command to lock the node and set the expiration time
  • 2. If the current node has a key lock, hincrby count++ can re-enter the lock, set the expiration time, and return null
  • 3. If there is a lock that is not set on the current node, the TTL of the expiration time of the last lock is returned

Redisson’s distributed lock uses a hash structure:

  • Key: Lock_Name + Client_Id
  • Value: count (reentrant count)

The callback function does the following:

CompleteFuture callback mechanism: ` RFuture# ttlRemainingFuture onComplete method. The onComplete ((ttlRemaining, e) – > {/ / when an exception occurs directly return the if (e! = null) { return; }

/ / lock if success (ttlRemaining = = null) {/ / start the additional threads, according to certain rules to the current lock renewal scheduleExpirationRenewal (threadId); }});Copy the code

‘onComplete pushes the callback function onto the task stack for easy stack execution by asynchronous threads.

At this point, we’ve solved the previous two problems

What is the meaning of the lua script (atomic lock set expiration time, dealing with the problems of the current node locked) ttlRemainingFuture. The onComplete () function (set the callback function, The onComplete parameter BiConsumer is wrapped as an object and pushed onto the task stack, which is then called back by the asynchronous thread.

Redisson uses lua scripts to try locking and return to RFuture. RFuture does two things:

  • TtlRemaining is the expiration time of the previous lock. If the value is null, the lock is successfully added. Otherwise, the lock fails and the remaining time needs to wait
  • Set the callback function with RFuture

** now the problem is: **

  • Who are asynchronous threads? Which come of?
  • What does the onComplete() setup callback do?
  • Where did we get the argument ttlRemaining,e in the callback?

Found that when the DEBUG thread will get return TTL again callback BiConsumer# scheduleExpirationRenewal accept arrived

The thread that executes two statements is the main thread and the asynchronous thread

Lock renewals are related to RFuture callbacks

How does Redisson implement lock renewal

private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; }

/** * Start a timer: Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Note that each scheduled task is executed only once, and it is executed late. So the question becomes: * * * 1. InternalLockLeaseTime / 3 is how long? * 2. If the scheduled task is executed only once, it does not seem to solve the problem, essentially the same as if we set the expiration time manually: how long is appropriate? */ Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; RFuture<Boolean> Future = renewationAsync (threadId); RFuture<Boolean> Future = renewationAsync; // Set a callback future.onComplete((res, e) -> {if (e! = null) { log.error("Can't update lock " + getName() + " expiration", e); // If the server is down, it will not be renewed. } if (res) {// Reschedule itself renewExpiration(); if (res) {// Reschedule itself renewExpiration(); }}); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task);Copy the code

}

/ * *

  • Re-executing evalWriteAsync() is similar to, but a little different from, the Lua script used when locking
  • The expire parameter is also set to internalLockLeaseTime
  • Looks like we’ll have to investigate internalLockLeaseTime!

*/ protected RFuture renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return 1; ” + “end; ” + “return 0;” , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

WatchDog lock renewal mechanism: Lock () Set the lock expiration time to 30s by default when successfully locking for the first time. Private RFuture tryAcquireAsync(Long waitTime=-1, Long leaseTime=-1, TimeUnit Unit =null, long threadId=666) {

// lock() defaults to leaseTime=-1, so it skips if (leaseTime! = -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // execute lua script to lock and return RFuture. The second parameter is leaseTime, from LockWatchdogTimeout!! RFuture<Long> ttlRemainingFuture = tryLockInnerAsync( waitTime=-1, CommandExecutor. GetConnectionManager (.) getCfg () getLockWatchdogTimeout () = 30 seconds, TimeUnit. MILLISECONDS, threadId = 666, RedisCommands.EVAL_LONG); / / set the callback method ttlRemainingFuture. The onComplete ((ttlRemaining, e) - > {/ / when an exception occurs directly return the if (e! = null) { return; } / / lock if success (ttlRemaining = = null) {/ / start the additional threads, according to certain rules to the current lock renewal scheduleExpirationRenewal (threadId); }}); // Return RFuture, ttlRemaining return ttlRemainingFuture;Copy the code

}

RFuture tryLockInnerAsync(long waitTime=-1, long leaseTime=30*1000, TimeUnit Unit = ms, long threadId=666, RedisStrictCommand command) {//… }

The onComplete () sets the callback, Redis call after you come back, such as asynchronous callback thread BiComsumer# accept, enter scheduleExpirationRenewal, began to be renewed every 10 s to lock. `

As with locking, lua scripts are actually fast to execute, so future.onComplete, though asynchronous, will be invoked soon, and renewExpiration will be invoked, followed by another TimerTask that will renew the lock after 10 seconds. In other words, Redisson’s Watchdog timed task is executed only once, but each time is recursive, so it is equivalent to repeated delay execution. The asynchronous thread that enables lock renewal is the daemon thread, which will continue to renew the lock as long as the main thread does not end.

There are two cases of lock release:

  • UnLock deletes the lock
  • UnLock is not called. However, because the daemon thread has ended, there is no background thread to renew the lock. After 30 seconds, the lock expires automatically

The locking failure logic involves:

  • While (true) infinite loop
  • Redis Publish notification when the lock is released (as seen later in the unLock process)
  • After receiving the lock release signal, other nodes scramble for the lock again

UnLock source code

‘– Parameter description: — KEYS[1] => “distributed_lock” — KEYS[2] => getChannelName() — ARGV[1] => LockPubSub.UNLOCK_MESSAGE — ARGV[2] => internalLockLeaseTime — ARGV[3] => getLockName(threadId)

Call (‘hexists’, KEYS[1], ARGV[3]) == 0) then return nil; end;

Local counter = redis. Call (‘hincrby’, KEYS[1], ARGV[3], -1);

Redis. Call (‘pexpire’, KEYS[1], ARGV[2]); return 0; Lockpubsub. UNLOCK_MESSAGE else redis. Call (‘del’, KEYS[1]); — COUNT– if < or equal to 0, delete the lock and send NIO to the corresponding Channel. redis.call(‘publish’, KEYS[2], ARGV[1]); return 1; end;

return nil; `

In other words, when a lock is released, the node that originally held the lock sends a lockpubsub. UNLOCK_MESSAGE through NIO’s Channel to other subscribing clients: “We have released the lock, come and grab it!” At this point, other blocked nodes will compete for the lock again.

Reentrant and reverse reentrant:

Redisson.lock (); redisson.lock(); redisson.lock(); // Execute business executeTask(); Redisson.unlock (); redisson.unLock(); redisson.unLock();

The actual development doesn’t do this, but sometimes there are child and parent class calls or multiple methods of the same thread using the same lock over and over again, Reentry count++ of the lock will occur, and when these methods are completed, stack by stack, a group of unLock unLock count– lock(leaseTime,TimeUnit) custom expiration time will not be renewed.

LeaseTime (unit -1, null); leaseTime (unit -1, null); Private RFuture tryAcquireAsync(Long waitTime=-1, Long leaseTime=-1, TimeUnit Unit =null, long threadId=666) {

// lock() defaults to leaseTime=-1, which skips if and executes the following code. But if it is lock(10, timeUnit.seconds), if is executed and the code is skipped. if (leaseTime ! = -1) {tryLockInnerAsync(); Return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, rediscommands.eval_long); } return () {return ();} /* RFuture<Long> ttlRemainingFuture = tryLockInnerAsync( waitTime=-1, CommandExecutor. GetConnectionManager (.) getCfg () getLockWatchdogTimeout () = 30 seconds, TimeUnit. MILLISECONDS, threadId = 666, RedisCommands.EVAL_LONG); // Set the callback method (not executed!!) TtlRemainingFuture. The onComplete ((ttlRemaining, e) - > {/ / when an exception occurs directly return the if (e! = null) { return; } / / lock if success (ttlRemaining = = null) {/ / start the additional threads, according to certain rules to the current lock renewal scheduleExpirationRenewal (threadId); }}); // Will not execute!! return ttlRemainingFuture; * /Copy the code

}

RFuture tryLockInnerAsync(long waitTime=-1, long leaseTime=30*1000, TimeUnit unit= ms, long threadId=666, RedisStrictCommand command) {//… } ‘When leaseTime is not equal to -1, lua will return, without setting the callback function to renew the lock.

TryLock series: Let the caller decide what to do if the lock fails

If multiple nodes call lock, the thread subscribing to the node that did not acquire the lock will block until the node that originally held the lock is deleted and publishes lockpubsub.unlock_message. But what if the caller doesn’t want to block? He might think: If the lock fails, I’ll just give up.

Purpose of locking:

On the premise of ensuring thread safety, try to make all threads execute successfully on the premise of ensuring thread safety, only let one thread execute successfully. The former is suitable for seckill, order and other operations, hope to do our best to achieve; The latter is used for timed tasks that only one node can execute and should fast-fail if no lock is acquired.

In other words, when a node fails to acquire a lock, it can be handled in various ways:

  • Block waiting for
  • Give up directly
  • Retry N times and abort
  • .

Lock The interface is blocked and waiting after the lock fails

TryLock removes this intermediate judgment and returns the result directly to the caller, who can decide what to do if the lock fails: ‘

@Test public void testTryLock() { RLock lock = redissonClient.getLock(“bravo1988_distributed_lock”); boolean b = lock.tryLock(); If (b) {// Business operation… }

// The call ends immediately without blockingCopy the code

} `

TryLock returns true on success, false on failure. Subsequent operations are up to each node. Lock and tryLock

TryLock, like lock, will trigger lock renewal on success (except that the logic after failure is left to the caller)

Handling after locking failure: @Test public void testLockSuccess() throws InterruptedException { RLock lock = redissonClient.getLock(“bravo1988_distributed_lock”); // This is basically the same as lock(), the lock will automatically renew, but failed to obtain the lock. TryLock (10, timeunit.seconds); TryLock (10, 30, timeunit.seconds); // If the current lock is released within 10 SECONDS, try locking. }

If leaseTime= -1 and leaseTime= -1, the lock will be renewed.

So what does waitTime control?

If tryLock() fails to send the lock, it will return false immediately. If waitTime is set to wait for the lock, leaseTime is set to -1

Defects in Redisson distributed locks

In sentinel or master-slave mode, if the master instance goes down, multiple nodes may be locked at the same time.

Take the master-slave mode as an example. Since all write operations are performed on the master and then synchronized to all slave nodes, the data between the master and each slave node has a certain delay. For the Redisson distributed lock, for example, the client has just written the Redisson lock to the master. The master is then copied asynchronously to each slave node, but the master node is down for too long, and one of the slave nodes is elected to become the master node. Incidentally, this slave (the other slaves are locked) has not been synchronized to the Redissson lock, so other clients may lock again.