The target

  • Redisson reentrant lock simple application
  • Redisson reentrant lock lock/unlock source code analysis
  • conclusion

Redisson reentrant lock simple application

Pom depends on

<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version>Copy the code

Reentrant lock

/** * reentrantLock(long waitTime, long waitTime, long waitTime) */ private void reentrantLock(long waitTime, long waitTime) long expireTime) { RLock rLock = client.getLock("reentrant-lock-test"); Rlock. lock(); Rlock. lock(expireTime, timeunit.seconds); Rlock. tryLock(waitTime, expireTime, timeUnit.seconds); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); }}Copy the code

Redisson reentrant lock lock/unlock source code analysis

RLock RLock = client.getlock ("reentrant-lock-test"); // Create an object from the redisson client that implements the RLock interface.Copy the code
  • Redisson class getLock

GetLock takes a string argument, in this case the name of the lock. The method creates an instance of a RedissonLock object. The constructor takes a CommandAsyncExecutor type argument, which is roughly an asynchronous thread pool for sending commands to Redis.

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
Copy the code
  • lock

The following is an implementation of a reentrant lock with no arguments

@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, Boolean interruptibly) throws InterruptedException {// obtain the currentThread ID long threadId = thread.currentthread ().getid (); TTL = tryAcquire(leaseTime, unit, threadId) Long TTL = tryAcquire(leaseTime, unit, threadId); If (TTL == null) {return; RFuture<RedissonLockEntry> Future = subscribe(threadId); / / interruptibly according to parameters, subscribe to the if (interruptibly) {commandExecutor. SyncSubscriptionInterrupted (future); } else { commandExecutor.syncSubscription(future); } try {// loop to acquire lock while (true) {TTL = tryAcquire(leaseTime, unit, threadId); If (TTL == null) {// Break; If (TTL >= 0) {try {// Get semaphore, Future.getnow ().getlatch ().tryacquire (TTL, timeunit.milliseconds); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); }}}} finally {// unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } // Try to obtain the lock, the return get method receives an RFuture argument, Private Long tryAcquire(Long leaseTime, TimeUnit Unit, Long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, Long threadId) {// If (leaseTime! = -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // tryLockInnerAsync is the redis operation RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); / / reactive programming model approach, performed by the onComplete processing results ttlRemainingFuture. The onComplete ((ttlRemaining, e) - > {the if (e! = null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}); return ttlRemainingFuture; <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// Calculate the lock rent time internalLockLeaseTime = unit.tomillis (leaseTime); // Execute the lua script, the general logic is, if the lock doesn't exist, set the lock, set the expiration time and return nil; // If the current thread lock exists, the argument +1 represents a reentrant; // If the current thread does not acquire the lock and fails to acquire it, Then return to lock the rest of the return commandExecutor. EvalWriteAsync (getName (), LongCodec. The INSTANCE, the command, "the if (redis. Call (' exists, KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }Copy the code
  • Release the lock

Here’s how to release the lock

@override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> Future = unlockInnerAsync(threadId); Future. onComplete((opStatus, e) -> {if (e! CancelExpirationRenewal (threadId); cancelExpirationRenewal(threadId); result.tryFailure(e); return; } / / try to unlock is not the current thread lock the if (opStatus = = null) {IllegalMonitorStateException cause = new IllegalMonitorStateException (" attempt  to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; } // Unlock lua script, the general logic is // 1. Unlock is the lock held by the current thread, if not return nil // 2. Gets the technique -1 that holds the lock for the current thread and returns the value, reset the expiration time if the technique is greater than 0 and return 0 // 3. Otherwise the lock will be fully released, del will drop the redis data, And publish protected RFuture < Boolean > unlockInnerAsync (long threadId) {return commandExecutor. EvalWriteAsync (getName (), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil; " + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }Copy the code

conclusion

Redisson reentrant Lock is roughly source was seen again, some of which is not the core locks process without introducing, Redisson reentrant Lock the implementation framework of JUC Lock interface, reids operation is their implementation agreement network part USES netty, locking and unlocking of the core is two lua script, Redisson uses redis hash table structure implementation, rlock. lock(expireTime, timeunit.seconds); And rLock. The lock (); The trylock-like implementation is somewhat different, but the Lua script part is exactly the same.