preface

A few days ago we talked about one of the distributed lock implementations: ZooKeeper, but this time we’ll talk about Redis, which is equally popular and even better.

In addition to these two kinds of database in fact distributed lock ah, but this way is not mainstream, so I will not speak here, to speak I will talk about the mainstream.

Distributed lock several features

  • Mutual exclusivity. At any time, only one client can hold the lock, also known as uniqueness.
  • Deadlocks do not occur. Even if one client crashes while holding the lock and does not unlock actively, it is guaranteed that subsequent clients can lock it.
  • You must tie the bell. Lock and unlock must be the same client. The client cannot unlock the lock added by others, that is, cannot misunderstand the lock.
  • Locks cannot fail by themselves. A lock cannot fail for any reason during normal execution of a program.
  • It has fault tolerance. As long as most Redis nodes are up and running, the client is able to acquire and release locks.

Here are some implementations to get a feel for these features.

First implementation (elementary)

public void wrongRedisLock(Jedis jedis, String lockKey, int expireTime) {
    // Expiration time
    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);

    if (jedis.setnx(lockKey, expiresStr) == 1) {
        // Start executing the code logic}}Copy the code

Mutual exclusivity

The first thing I’m going to do is use the setnx command, and the feature of this command is that if the key I want to set doesn’t exist, then I can set it. If the key exists, I fail to set it.

This feature ensures that there is only one unique key in Redis, and when a group of clients set the key at the same time, only one of them can set the key successfully.

Because of this feature, it guarantees the first feature: mutual exclusion.

Deadlocks do not occur

The lock will be automatically released even when the client is down, because the key will be deleted automatically when the expiration time is up.

Because of this feature, he guarantees the second feature: no deadlocks.

In addition to the preceding two features, the other three features are not met.

Second implementation (Intermediate)

Locking implementation

/** * obtain distributed lock (lock code) * @param jedis Redis client * @param lockKey lock * @param requestId requestId * @param expireTime expiration time * @return Check whether the file is successfully obtained */
public static boolean getDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}
Copy the code

You must tie the bell

This lock differs from the first one in that it sets a value. (Value is a unique identifier for the client)

The value indicates which client added the lock. When unlocking, you need to compare the value to see if the lock was added by the client. If yes, the unlock succeeds. Otherwise, the unlock fails.

He ensured a third property: that the bell must be tied.

Unlock the implementation

/** * release distributed lock (unlock code) * @param jedis Redis client * @param lockKey lock * @param requestId requestId * @return whether release succeeded */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

    String script = "if " +
                        "redis.call('get', KEYS[1]) == ARGV[1]" +
                        "then"+
                        "return redis.call('del', KEYS[1])" +
                    "else" +
                        "return 0 end";

    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}
Copy the code

The Lua script is used here. Why use this script?

If /else unlocking is not atomic in normal case, there are concurrency issues.

So executing Lua scripts in Redis ensures that these operations are atomic and there are no concurrency issues, that’s what Lua scripts do.

Explain the meaning of the above Lua script.

Redis. Call is the API method that calls redis, and here are the get and delete methods called. Key is KEYS[1], which is equivalent to a placeholder expression.

If the value obtained by the lockKey is equal to the value requestId passed in from the method, then the lockKey is removed, otherwise 0 is returned.

This second method, which guarantees mutual exclusion, no deadlocks, and must tie the bell, can be said to meet the requirements of most scenarios, so the fourth and fifth points are still not satisfied, let’s continue to look at the next.

Third implementation (advanced)

Redisson

Redisson is a Java in-memory Data Grid based on Redis.

It not only provides a series of distributed Java common objects, but also implements Reentrant Lock, Fair Lock, MultiLock, RedLock, ReadWriteLock, etc. Many distributed services are also provided.

Redisson provides the easiest and most convenient way to use Redis. The goal of Redisson is to promote the Separation of Concern for Redisso that users can focus more on business logic.

Usage, for example,

public void testReentrantLock(RedissonClient redisson){

  RLock lock = redisson.getLock("anyLock");
  try{
      // 1. Most common usage
      //lock.lock();

      // 2. The account can be unlocked after expiration. The account is automatically unlocked after 10 seconds
      //lock.lock(10, TimeUnit.SECONDS);

      // 3. Try locking. Wait 3 seconds at most
      boolean res = lock.tryLock(3.10, TimeUnit.SECONDS);
      if(res){    / / success
          // do your business} } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }}Copy the code

As you can see, ReentrantLock is similar in usage, so let’s read its source code.

List of key methods

public class RedissonLock {
    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the Lock interface methods -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --

    /** * Lock The lock validity period is 30 seconds */
    void lock();
    /** * the tryLock() method returns a value indicating that it was used to attempt to acquire the lock, returning true on success or false. */ on failure (that is, the lock was acquired by another thread)
    boolean tryLock();
    /** * The tryLock(long time, TimeUnit unit) method is similar to the tryLock() method except that the tryLock() method will wait a certain amount of time before the lock is retrieved. Returns true if the lock was acquired initially or during the wait. * * @param time Wait time * @param unit Time unit such as hour, minute, second, or millisecond */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    /** * unlock */
    void unlock();
    ** * interrupt(); /** * interrupt(); /** * interrupt(); Method to actually interrupt the thread */
    void lockInterruptibly();

    ** @param leaseTime Lock validity time * @param unit Time unit: hour, minute, second, millisecond */
    void lock(long leaseTime, TimeUnit unit);
    ** @param waitTime waitTime * @param leaseTime lock validity time * @param unit unit of time such as hour, minute, second, millisecond */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /** * checks whether the lock is used by the thread, and returns True */ if so
    boolean isLocked();
    /** * Checks whether the current thread has acquired the lock (the difference is that this method can determine whether the current thread has acquired the lock, but not whether the lock is owned by the thread) */
    boolean isHeldByCurrentThread();
    @param leaseTime lock duration * @param unit Time unit: hour, minute, second, millisecond, etc. */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}
Copy the code

Let’s talk about one of the locking methods: tryLock, the rest of the you can see, the principle is similar.

TryLock add lock source code interpretation

Let’s take a look at the lock flow chart

The entire code main flow

The code has been commented, you can follow the comments to read the source

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // Get the maximum waiting time
    long time = unit.toMillis(waitTime);
    // Record the current time
    long current = System.currentTimeMillis();
    // Get the current thread id
    long threadId = Thread.currentThread().getId();
    //1. Try to apply for a lock and return the remaining lock expiration time
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //2. If the command output is empty, the lock application is successful
    if (ttl == null) {
        return true;
    }
    //3. If the lock application time is greater than or equal to the maximum waiting time, the lock application fails
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        /** * Sets the result of asynchronous execution to null with promise.trySuccess * promise from Uncompleted-->Completed notifies the Future that asynchronous execution is complete */
        acquireFailed(threadId);
        return false;
    }

    current = System.currentTimeMillis();

    /** * 4. Subscribe to lock release event and block await lock release with await method. * If this. Await returns false, the wait time has exceeded the maximum wait time to acquire the lock. Unsubscribe and return failed to acquire the lock * When this.await returns true, enter the loop trying to acquire the lock */
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //await the block with CountDownLatch and subscribe asynchronously (with Netty's Future applied)
    if(! await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if(! subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if(e == null) { unsubscribe(subscribeFuture, threadId); }}); } acquireFailed(threadId);return false;
    }

    try {
        // Calculate the total time taken to obtain the lock. If the time is greater than or equal to the maximum waiting time, the lock fails to be obtained
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        /** * 5. After receiving the lock release signal, within the maximum waiting time, the loop attempts to acquire the lock * successfully, immediately return true, * if the lock has not been acquired within the maximum waiting time, consider the lock failed, return false to end the loop */
        while (true) {
            long currentTime = System.currentTimeMillis();
            // Try the lock again
            ttl = tryAcquire(leaseTime, unit, threadId);
            // If the lock is successfully acquired, return true to terminate the loop
            if (ttl == null) {
                return true;
            }

            // If the maximum wait time is exceeded, return false to end the loop
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            /** * 6. Block wait lock (block by semaphore (shared lock), waiting for unlock message) : */
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                If the remaining time (TTL) is less than wait time, a license is obtained from the Entry's semaphore within the TTL (unless interrupted or no license is available).
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // The semaphore can pass within the wait time range
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            //7. Update the remaining wait time (maximum wait time - block time consumed)
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        //7. Unsubscribe unlock messages regardless of whether the lock is acquiredunsubscribe(subscribeFuture, threadId); }}Copy the code

Core locking code

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // Set the lock holding time
   if(leaseTime ! =- 1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    // The lock holding time is not set, use the default watchdog 30 seconds
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    
    If the lock is successfully obtained, the timed thread is started to renew the lock
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if(e ! = null) {return;
        }

        / / start the WatchDog
        if(ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
}
Copy the code
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    /** * The Lua script is executed with the EVAL command to obtain the lock, ensuring atomicity */
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 1. If the key does not exist in the cache, run hset (hset key UUID+threadId 1), then run pexpire to set the lock expiration time.
              // Return nil to obtain the lock successfully
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
               // If the key already exists and the value matches, the lock is held by the current thread. Run the hincrby command, increase the reentrant count by 1, and set the expiration time
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
               // If the key exists but the value does not match, the lock is held by another thread. Run the PTTL command to obtain the remaining lifetime of the lock and return
              "return redis.call('pttl', KEYS[1]);".KEYS[1], ARGV[1], ARGV[2]
               Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Copy the code

Parameter Description:

  • KEYS [1] is the Collections. SingletonList (getName ()), said distributed lock key;
  • ARGV[1] is internalLockLeaseTime. ARGV[1] is internalLockLeaseTime.
  • ARGV[2] is getLockName(threadId), which is the unique value of the set when obtaining the lock, i.e. UUID+threadId.

Have you noticed the watchdog feature? scheduleExpirationRenewal(threadId); The mission of this method is to renew the life of the lock.

Simply put, it is a regular task, regular to determine how long the lock will fail, if it is about to fail, extend the lock’s failure time.

This brings us to our fourth point: locks cannot fail on their own

TryLock unlock source code interpretation

Let’s take a look at the lock flow chart

The source code

Call relation: UNLOCK — > unlockAsync — > unlockInnerAsync, unlockInnerAsync is the core code to unlock

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if(e ! = null) { result.tryFailure(e);return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                                  + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

// Core unlock code
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    /** * The Lua script is executed with the EVAL command to obtain the lock, ensuring atomicity */
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // If a distributed lock exists, but value does not match, the lock has been occupied by another thread, so it is not allowed to release the lock.
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
             // If the values match, the current thread owns the distributed lock, and the reentrant count is reduced by 1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
             // If the reentrant count minus 1 is greater than 0, it indicates that the distributed lock has been reentrant. Therefore, the expiration time can only be updated, but cannot be deleted
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
             // If the reentrant count is 0, the KEY can be deleted, and the unlock message is released, returning 1
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;".KEYS[1], KEYS[2], ARGV[1], ARGV[2], ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}
Copy the code

Unlock message notification:

Before adding the lock source written, if not get the lock success, listen to the lock, listen to when it is released, so unlock, to issue this message notice, let other want to get the lock of the client know.

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {

        /** * Check whether it is an unlock message */
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if(runnableToExecute ! = null) { runnableToExecute.run(); }/** * Releases a semaphore to wake up the waiting entry.getlatch (). TryAcquire to try again
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                /** * If there are other Listeners, they are also called */
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); }}}Copy the code

Here, Redis official implementation of distributed lock source code is finished, but there is a problem, although it can not achieve their own lock failure this feature, but fault tolerance is not implemented.

Examples of fault tolerance scenarios

Because Redis is clustered at work, the problem of cluster node failure is considered. To give you an example:

  • 1. A client requests the master node to obtain the lock
  • 2. The master node hangs, but has not synchronized lock information to other slave nodes
  • 3. Since the master node hangs, the master/slave switchover starts at this time. The slave node becomes the master node and continues to work, but there is no locking information of client A on the new master node
  • 4. At this time, client B locks the node, because it is a new master node, there is no information about other clients locking, so client B successfully obtains the lock
  • 5, there is A problem, A and B both hold the lock at the same time, at the same time executing code, then the distributed lock will be invalid.

Here you will wonder why the authorities give a distributed lock implementation and do not solve this problem, because the probability of this happening is not high, and the cost of solving this problem is a little high.

RedissonLock is recommended if the business scenario can tolerate such a small probability of error, but if not, here’s a question for those who can’t.

RedissonRedLock, the Chinese name is red lock, it can solve the cluster fault tolerance problem, here it as a question for you. Don’t slack off, go down and study hard.

Partly from: Gopher_39b2