preface

Mainstream distributed locks are generally implemented in three ways:

  1. Optimistic database locking
  2. Distributed lock based on Redis
  3. Distributed lock based on ZooKeeper

I’ve blogged about mysql and Redis implementing distributed locks: www.cnblogs.com/wang-meng/p… It’s basically an implementation principle.

This [distributed lock] series of articles is mainly in-depth redis client Reddision source code and ZK these two distributed lock implementation principle.

reliability

First, to ensure that distributed locks are available, we need to ensure that at least four of the following conditions are met:

  1. Mutual exclusivity. Only one client can hold the lock at any time.
  2. Deadlocks do not occur. Even if one client crashes while holding the lock and does not unlock actively, it is guaranteed that subsequent clients can lock it.
  3. It has fault tolerance. Clients can lock and unlock as long as most Redis nodes are running properly.
  4. You must tie the bell. Lock and unlock must be the same client, the client can not unlock the lock added by others.

Redisson locking principle

Redisson is a very powerful open source Redis client framework, official address: redisson.org/

It’s easy to use. After configuring Maven and connection information, here’s the code:

RLock lock = redisson.getLock("anyLock");

lock.lock();
lock.unlock();
Copy the code

All of Redisson’s locking logic is done using Lua scripts, which ensure atomicity.

Let’s take a look at the RLock initialization code:

public class Redisson implements RedissonClient {
	
	@Override
    public RLock getLock(String name) {
        return newRedissonLock(connectionManager.getCommandExecutor(), name); }}public class RedissonLock extends RedissonExpirable implements RLock {
	public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}
Copy the code

The RedissonLock id returns a UUID object. Each machine has its own ID attribute, with an ID value similar to “8743C9c0-0795-4907-87FD-6C719a6b4586”.

Moving on to the code implementation of lock() :

public class RedissonLock extends RedissonExpirable implements RLock {
	@Override
	public void lock(a) {
	    try {
	        lockInterruptibly();
	    } catch(InterruptedException e) { Thread.currentThread().interrupt(); }}@Override
	public void lockInterruptibly(a) throws InterruptedException {
	    lockInterruptibly(-1.null);
	}

	@Override
	public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
		// Get the current thread ID
	    long threadId = Thread.currentThread().getId();
	    Long ttl = tryAcquire(leaseTime, unit, threadId);
	    // lock acquired
	    if (ttl == null) {
	        return;
	    }

	    RFuture<RedissonLockEntry> future = subscribe(threadId);
	    commandExecutor.syncSubscription(future);

	    try {
	        while (true) {
	            ttl = tryAcquire(leaseTime, unit, threadId);
	            // lock acquired
	            if (ttl == null) {
	                break;
	            }

	            // waiting for message
	            if (ttl >= 0) {
	                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
	            } else{ getEntry(threadId).getLatch().acquire(); }}}finally {
	        unsubscribe(future, threadId);
	    }
	}

	<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	    internalLockLeaseTime = unit.toMillis(leaseTime);

	    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
	              "if (redis.call('exists', KEYS[1]) == 0) then " +
	                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
	                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
	                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
	                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }}Copy the code

The tryAcquire() method is passed with an expiration time of -1, followed by the current thread ID, followed by the core lua script execution flow. Let’s take a step by step look at how this works:

"if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
Copy the code

KEYS[1] parameter is: anyLock ARGV[2] Yes: id + “:” + threadId

First, use exists to determine whether the current key exists in redis. If not, it is equal to 0. Then run the hset command to store “anyLock ID :threadId 1” in Redis.

{
  "8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}
Copy the code

And just for the record, the last one is for reentrant counting, which we’ll talk about later.

Look below, and then use pEXPIRE to set the expiration time, which defaults to 30 seconds with internalLockLeaseTime. The result is null, and the lock is successfully added.

Redisson’s reentrant principle

Let’s take a look at the lock key exists, the same machine, the same thread how to lock?

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);".Copy the code

ARGV[2] is :” id + “:” + threadId “if the same thread requests again from the same machine, this will be 1, then execute hincrby, hset value+1 becomes 2, and then continue to set expiration time.

Similarly, after a thread reenters, value – 1 is unlocked

Redisson watchDog principle

If you have A scenario where A and B are running services, and A has A distributed lock, but the production environment is different, if in case A lock times out, but A’s services are still running. In this case, the lock of USER A is released due to timeout, user B obtains the lock, and user B executes the service logic. So distributed locking is meaningless, right?

Therefore, Redisson introduced the concept of Watch Dog. After A obtains the lock execution, if the lock does not expire, A background thread will automatically extend the lock expiration time to prevent the lock expiration due to incomplete service execution.

Let’s look at the implementation:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if(leaseTime ! = -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if(! future.isSuccess()) {return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }}});return ttlRemainingFuture;
}
Copy the code

When we’re done with tryLockInnerAsync, we’ll add a listener. Look at the implementation in the listener:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}
Copy the code

In this case, the scheduling task is executed every 10 seconds. In the Lua script, the renewal expiration time is used so that the lock held by the current thread will not be invalid when the expiration time is reached

Redisson’s mutual exclusion principle

Lua lock: lua lock: lua lock

"return redis.call('pttl', KEYS[1]);".Copy the code

Returning how long the lock will expire, we continue to look at the code:

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // If TTL is returned, the lock is successful. If it is not empty, the lock fails
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
    	// Loop endlessly to try to get the lock
        while (true) {
        	// Try locking again
            ttl = tryAcquire(leaseTime, unit, threadId);
            // If TTL =null, the lock is preempted successfully
            if (ttl == null) {
                break;
            }

            // If the TTL is greater than 0, the lock fails to preempt
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else{ getEntry(threadId).getLatch().acquire(); }}}finally{ unsubscribe(future, threadId); }}Copy the code

Redisson lock release principle

Look directly at the Lua code:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
		// Check whether the lock key exists
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end;" +
        // Check whether the key corresponding to the current machine and thread ID exists
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        // Count -1 can be reentrant lock
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        // If the counter is greater than 0, the lock is still held
        "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
        	// Delete the key with the del command
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
        "end; " +
        "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Copy the code

conclusion

Summary of a picture:

statement

This article first from my public number: a flower is not romantic, if reprinted please indicate the source!

Interested partners can pay attention to personal public account: a flower is not romantic