
Mainstream distributed locks are generally implemented in three ways:

  1. Optimistic database locking
  2. Distributed lock based on Redis
  3. Distributed lock based on ZooKeeper

First, to ensure that distributed locks are available, we need to ensure that at least four of the following conditions are met:

  1. Mutual exclusivity. Only one client can hold the lock at any time.
  2. Deadlocks do not occur. Even if one client crashes while holding the lock and does not unlock actively, it is guaranteed that subsequent clients can lock it.
  3. It has fault tolerance. Clients can lock and unlock as long as most Redis nodes are running properly.
  4. You must tie the bell. Lock and unlock must be the same client, the client can not unlock the lock added by others.

Redisson locking principle

Redisson is a very powerful open source Redis client framework, official address:

It’s easy to use. After configuring Maven and connection information, here’s the code:

RLock lock = redisson.getLock("anyLock");

All of Redisson’s locking logic is done using Lua scripts, which ensure atomicity.

Let’s take a look at the RLock initialization code:

public class Redisson implements RedissonClient {
    public RLock getLock(String name) {
        return newRedissonLock(connectionManager.getCommandExecutor(), name); }}public class RedissonLock extends RedissonExpirable implements RLock {
	public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor; = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
The RedissonLock id returns a UUID object. Each machine has its own ID attribute, with an ID value similar to “8743C9c0-0795-4907-87FD-6C719a6b4586”.

Moving on to the code implementation of lock() :

public class RedissonLock extends RedissonExpirable implements RLock {
	public void lock(a) {
	    try {
	    } catch(InterruptedException e) { Thread.currentThread().interrupt(); }}@Override
	public void lockInterruptibly(a) throws InterruptedException {

	public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
		// Get the current thread ID
	    long threadId = Thread.currentThread().getId();
	    Long ttl = tryAcquire(leaseTime, unit, threadId);
	    // lock acquired
	    if (ttl == null) {

	    RFuture<RedissonLockEntry> future = subscribe(threadId);

	    try {
	        while (true) {
	            ttl = tryAcquire(leaseTime, unit, threadId);
	            // lock acquired
	            if (ttl == null) {

	            // waiting for message
	            if (ttl >= 0) {
	                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
	            } else{ getEntry(threadId).getLatch().acquire(); }}}finally {
	        unsubscribe(future, threadId);

	<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	    internalLockLeaseTime = unit.toMillis(leaseTime);

	    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
	              "if ('exists', KEYS[1]) == 0) then " +
	                  "'hset', KEYS[1], ARGV[2], 1); " +
	                  "'pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "if ('hexists', KEYS[1], ARGV[2]) == 1) then " +
	                  "'hincrby', KEYS[1], ARGV[2], 1); " +
	                  "'pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "return'pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }}Copy the code

The tryAcquire() method is passed with an expiration time of -1, followed by the current thread ID, followed by the core lua script execution flow. Let’s take a step by step look at how this works:

"if ('exists', KEYS[1]) == 0) then " +
  "'hset', KEYS[1], ARGV[2], 1); " +
  "'pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
KEYS[1] parameter is: anyLock ARGV[2] Yes: id + “:” + threadId

First, use exists to determine whether the current key exists in redis. If not, it is equal to 0. Then run the hset command to store “anyLock ID :threadId 1” in Redis.

And just for the record, the last one is for reentrant counting, which we’ll talk about later.

Look below, and then use pEXPIRE to set the expiration time, which defaults to 30 seconds with internalLockLeaseTime. The result is null, and the lock is successfully added.

Redisson’s reentrant principle

Let’s take a look at the lock key exists, the same machine, the same thread how to lock?

"if ('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "'hincrby', KEYS[1], ARGV[2], 1); " +
  "'pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
"return'pttl', KEYS[1]);".Copy the code

ARGV[2] is :” id + “:” + threadId “if the same thread requests again from the same machine, this will be 1, then execute hincrby, hset value+1 becomes 2, and then continue to set expiration time.

Similarly, after a thread reenters, value – 1 is unlocked

Redisson watchDog principle

If you have A scenario where A and B are running services, and A has A distributed lock, but the production environment is different, if in case A lock times out, but A’s services are still running. In this case, the lock of USER A is released due to timeout, user B obtains the lock, and user B executes the service logic. So distributed locking is meaningless, right?

Therefore, Redisson introduced the concept of Watch Dog. After A obtains the lock execution, if the lock does not expire, A background thread will automatically extend the lock expiration time to prevent the lock expiration due to incomplete service execution.

Let’s look at the implementation:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if(leaseTime ! = -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        public void operationComplete(Future<Long> future) throws Exception {
            if(! future.isSuccess()) {return;

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}});return ttlRemainingFuture;
When we’re done with tryLockInnerAsync, we’ll add a listener. Look at the implementation in the listener:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if ('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "'pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        internalLockLeaseTime, getLockName(threadId));
Copy the code

In this case, the scheduling task is executed every 10 seconds. In the Lua script, the renewal expiration time is used so that the lock held by the current thread will not be invalid when the expiration time is reached

Redisson’s mutual exclusion principle

Lua lock: lua lock: lua lock

"return'pttl', KEYS[1]);".Copy the code

Returning how long the lock will expire, we continue to look at the code:

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // If TTL is returned, the lock is successful. If it is not empty, the lock fails
    if (ttl == null) {

    RFuture<RedissonLockEntry> future = subscribe(threadId);

    try {
    	// Loop endlessly to try to get the lock
        while (true) {
        	// Try locking again
            ttl = tryAcquire(leaseTime, unit, threadId);
            // If TTL =null, the lock is preempted successfully
            if (ttl == null) {

            // If the TTL is greater than 0, the lock fails to preempt
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else{ getEntry(threadId).getLatch().acquire(); }}}finally{ unsubscribe(future, threadId); }}Copy the code

Redisson lock release principle

Look directly at the Lua code:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
		// Check whether the lock key exists
        "if ('exists', KEYS[1]) == 0) then " +
            "'publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end;" +
        // Check whether the key corresponding to the current machine and thread ID exists
        "if ('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        // Count -1 can be reentrant lock
        "local counter ='hincrby', KEYS[1], ARGV[3], -1); " +
        // If the counter is greater than 0, the lock is still held
        "if (counter > 0) then " +
            "'pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
        	// Delete the key with the del command
            "'del', KEYS[1]); " +
            "'publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
        "end; " +
        "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
