In distributed micro service system is popular today, the concept of distributed lock has been deep in the heart of every program monkey, there are many ways to achieve distributed lock, such as the use of database, Redis, ZooKeeper can achieve distributed lock, today we will analyze the use of Redis to achieve distributed lock underlying implementation principle.

In distributed scenarios, it is possible that multiple instances need to mutually exclusive access to shared resources. The most typical case is seckill, where instantaneous traffic is high and the number of objects is limited, to prevent oversold.

Distributed locks generally have the following characteristics:

  • Mutual exclusion: Only one thread can hold the lock at a time
  • Reentrancy: Once the lock is acquired by the same thread on the same node, it can be acquired again
  • Lock timeout: the thread that holds the lock cannot hold the lock all the time and must have a timeout mechanism to prevent deadlocks
  • High performance and high availability: Locking and unlocking must be efficient and do not occupy too many system resources and processing time. High availability must prevent lock failure
  • Response interrupt: Can wake up from a blocked state in time

##Redis theoretical low-level support for distributed lock implementation

About using Redis to achieve distributed lock theory knowledge, can refer to millet information department technical team, here don’t go into the link below: xiaomi – info. Making. IO / 2019/12/17 /…

Today we will focus on how to use the Spring Integration project to implement Redis distributed locks.

The Spring Integration project supports lightweight messaging in Spring-based applications and Integration with external systems through declarative adapters. These adapters provide a higher level of abstraction on top of Spring’s support for remoting, messaging, and scheduling. The primary goal of Spring Integration is to provide a simple model for building enterprise Integration solutions while maintaining the separation of concerns that is critical to generating maintainable, testable code. RedisLockRegistry


package org.springframework.integration.redis.util;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
 * Implementation of {@link ExpirableLockRegistry} providing a distributed lock using Redis.
 * Locks are stored under the key {@code registryKey:lockKey}. Locks expire after
 * (default 60) seconds. Threads unlocking an
 * expired lock will get an {@link IllegalStateException}. This should be
 * considered as a critical error because it is possible the protected
 * resources were compromised.
 * <p>
 * Locks are reentrant.
 * <p>
 * <b>However, locks are scoped by the registry; a lock from a different registry with the
 * same key (even if the registry uses the same 'registryKey') are different
 * locks, and the second cannot be acquired by the same thread while the first is
 * locked.</b>
 * <p>
 * <b>Note: This is not intended for low latency applications.</b> It is intended
 * for resource locking across multiple JVMs.
 * <p>
 * {@link Condition}s are not supported.
 *
 * @author Gary Russell
 * @author Konstantin Yakimov
 * @author Artem Bilan
 * @author Vedran Pavic
 *
 * @since 4.0
 *
 */
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {

	private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);

	private static final long DEFAULT_EXPIRE_AFTER = 60000L;

	private static final String OBTAIN_LOCK_SCRIPT =
			"local lockClientId = redis.call('GET', KEYS[1])\n" +
					"if lockClientId == ARGV[1] then\n" +
					"  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
					"  return true\n" +
					"elseif not lockClientId then\n" +
					"  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
					"  return true\n" +
					"end\n" +
					"return false";


	private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();

	private final String clientId = UUID.randomUUID().toString();

	private final String registryKey;

	private final StringRedisTemplate redisTemplate;

	private final RedisScript<Boolean> obtainLockScript;

	private final long expireAfter;

	/**
	 * An {@link ExecutorService} to call {@link StringRedisTemplate#delete} in
	 * the separate thread when the current one is interrupted.
	 */
	private Executor executor =
			Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));

	/**
	 * Flag to denote whether the {@link ExecutorService} was provided via the setter and
	 * thus should not be shutdown when {@link #destroy()} is called
	 */
	private boolean executorExplicitlySet;

	private volatile boolean unlinkAvailable = true;

	/**
	 * Constructs a lock registry with the default (60 second) lock expiration.
	 * @param connectionFactory The connection factory.
	 * @param registryKey The key prefix for locks.
	 */
	public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
		this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
	}

	/**
	 * Constructs a lock registry with the supplied lock expiration.
	 * @param connectionFactory The connection factory.
	 * @param registryKey The key prefix for locks.
	 * @param expireAfter The expiration in milliseconds.
	 */
	public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
		Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
		Assert.notNull(registryKey, "'registryKey' cannot be null");
		this.redisTemplate = new StringRedisTemplate(connectionFactory);
		this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
		this.registryKey = registryKey;
		this.expireAfter = expireAfter;
	}

	/**
	 * Set the {@link Executor}, where is not provided then a default of
	 * cached thread pool Executor will be used.
	 * @param executor the executor service
	 * @since 5.0.5
	 */
	public void setExecutor(Executor executor) {
		this.executor = executor;
		this.executorExplicitlySet = true;
	}

	@Override
	public Lock obtain(Object lockKey) {
		Assert.isInstanceOf(String.class, lockKey);
		String path = (String) lockKey;
		return this.locks.computeIfAbsent(path, RedisLock::new);
	}

	@Override
	public void expireUnusedOlderThan(long age) {
		long now = System.currentTimeMillis();
		this.locks.entrySet()
				.removeIf((entry) -> {
					RedisLock lock = entry.getValue();
					return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
				});
	}

	@Override
	public void destroy() {
		if (!this.executorExplicitlySet) {
			((ExecutorService) this.executor).shutdown();
		}
	}

	private final class RedisLock implements Lock {

		private final String lockKey;

		private final ReentrantLock localLock = new ReentrantLock();

		private volatile long lockedAt;

		private RedisLock(String path) {
			this.lockKey = constructLockKey(path);
		}

		private String constructLockKey(String path) {
			return RedisLockRegistry.this.registryKey + ':' + path;
		}

		public long getLockedAt() {
			return this.lockedAt;
		}

		@Override
		public void lock() {
			this.localLock.lock();
			while (true) {
				try {
					while (!obtainLock()) {
						Thread.sleep(100); //NOSONAR
					}
					break;
				}
				catch (InterruptedException e) {
					/*
					 * This method must be uninterruptible so catch and ignore
					 * interrupts and only break out of the while loop when
					 * we get the lock.
					 */
				}
				catch (Exception e) {
					this.localLock.unlock();
					rethrowAsLockException(e);
				}
			}
		}

		private void rethrowAsLockException(Exception e) {
			throw new CannotAcquireLockException("Failed to lock mutex at " + this.lockKey, e);
		}

		@Override
		public void lockInterruptibly() throws InterruptedException {
			this.localLock.lockInterruptibly();
			try {
				while (!obtainLock()) {
					Thread.sleep(100); //NOSONAR
				}
			}
			catch (InterruptedException ie) {
				this.localLock.unlock();
				Thread.currentThread().interrupt();
				throw ie;
			}
			catch (Exception e) {
				this.localLock.unlock();
				rethrowAsLockException(e);
			}
		}

		@Override
		public boolean tryLock() {
			try {
				return tryLock(0, TimeUnit.MILLISECONDS);
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				return false;
			}
		}

		@Override
		public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
			long now = System.currentTimeMillis();
			if (!this.localLock.tryLock(time, unit)) {
				return false;
			}
			try {
				long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
				boolean acquired;
				while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
					Thread.sleep(100); //NOSONAR
				}
				if (!acquired) {
					this.localLock.unlock();
				}
				return acquired;
			}
			catch (Exception e) {
				this.localLock.unlock();
				rethrowAsLockException(e);
			}
			return false;
		}

		private boolean obtainLock() {
			Boolean success =
					RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
							Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
							String.valueOf(RedisLockRegistry.this.expireAfter));

			boolean result = Boolean.TRUE.equals(success);

			if (result) {
				this.lockedAt = System.currentTimeMillis();
			}
			return result;
		}

		@Override
		public void unlock() {
			if (!this.localLock.isHeldByCurrentThread()) {
				throw new IllegalStateException("You do not own lock at " + this.lockKey);
			}
			if (this.localLock.getHoldCount() > 1) {
				this.localLock.unlock();
				return;
			}
			try {
				if (!isAcquiredInThisProcess()) {
					throw new IllegalStateException("Lock was released in the store due to expiration. " +
							"The integrity of data protected by this lock may have been compromised.");
				}

				if (Thread.currentThread().isInterrupted()) {
					RedisLockRegistry.this.executor.execute(this::removeLockKey);
				}
				else {
					removeLockKey();
				}

				if (LOGGER.isDebugEnabled()) {
					LOGGER.debug("Released lock; " + this);
				}
			}
			catch (Exception e) {
				ReflectionUtils.rethrowRuntimeException(e);
			}
			finally {
				this.localLock.unlock();
			}
		}

		private void removeLockKey() {
			if (RedisLockRegistry.this.unlinkAvailable) {
				try {
					RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
				}
				catch (Exception ex) {
					RedisLockRegistry.this.unlinkAvailable = false;
					if (LOGGER.isDebugEnabled()) {
						LOGGER.debug("The UNLINK command has failed (not supported on the Redis server?); " +
								"falling back to the regular DELETE command", ex);
					}
					else {
						LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
								"falling back to the regular DELETE command: " + ex.getMessage());
					}
					RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
				}
			}
			else {
				RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
			}
		}

		@Override
		public Condition newCondition() {
			throw new UnsupportedOperationException("Conditions are not supported");
		}

		public boolean isAcquiredInThisProcess() {
			return RedisLockRegistry.this.clientId.equals(
					RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey).get());
		}

		@Override
		public String toString() {
			SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd@HH:mm:ss.SSS");
			return "RedisLock [lockKey=" + this.lockKey
					+ ",lockedAt=" + dateFormat.format(new Date(this.lockedAt))
					+ ", clientId=" + RedisLockRegistry.this.clientId
					+ "]";
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			int result = 1;
			result = prime * result + getOuterType().hashCode();
			result = prime * result + ((this.lockKey == null) ? 0 : this.lockKey.hashCode());
			result = prime * result + (int) (this.lockedAt ^ (this.lockedAt >>> 32));
			result = prime * result + RedisLockRegistry.this.clientId.hashCode();
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj) {
				return true;
			}
			if (obj == null) {
				return false;
			}
			if (getClass() != obj.getClass()) {
				return false;
			}
			RedisLock other = (RedisLock) obj;
			if (!getOuterType().equals(other.getOuterType())) {
				return false;
			}
			if (!this.lockKey.equals(other.lockKey)) {
				return false;
			}
			return this.lockedAt == other.lockedAt;
		}

		private RedisLockRegistry getOuterType() {
			return RedisLockRegistry.this;
		}

	}

}
Copy the code

First, let’s take a look at the Javadoc of this class. By reading Javadoc, we can quickly understand the author’s intention and realize spiritual communication with the author. When we write code, we should also use javadoc to express the design intention, so that subsequent maintainers can understand the design intention at that time. It is also convenient for me to recall my design plan quickly when I need to modify the code in the future.

RedisLockRegsitry implements ExpirableLockRegistry interface, based on Redis implements distributed lock, lock stored in the registryKey:lockKey this key, the default expiration time is 60 seconds, A thread unlocking an expired lock causes an IllegalStateException. This should be considered a serious mistake as it could lead to the destruction of protected resources. However, the lock is only valid in Registry and is considered to be a different lock if the same registryKey is used and registered in different Registries. Note: This is not a distributed locking implementation designed for low-latency applications, this is an implementation designed for locking resources between different JVMS.

RedisLockRegistry implements not only the ExpirableLockRegistry interface, but also the DisposableBean interface. When the Spring container is destroyed, you can call the Destroy method to release resources, which in this case are the thread pool.

@Override
	public void destroy() {
		if (!this.executorExplicitlySet) {
			((ExecutorService) this.executor).shutdown();
		}
	}
Copy the code
private Executor executor =
			Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
			
Copy the code

This class provides two constructors:

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
		this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
	}
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
		Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
		Assert.notNull(registryKey, "'registryKey' cannot be null");
		this.redisTemplate = new StringRedisTemplate(connectionFactory);
		this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
		this.registryKey = registryKey;
		this.expireAfter = expireAfter;
	}
Copy the code

To construct an instance of this class, you need to provide the connection factory of Redis, registryKey, and expiration time of registryKey. The expiration time can be 60 seconds by default

private static final long DEFAULT_EXPIRE_AFTER = 60000L;
Copy the code

So how does RedisLockRegistry set locks on the Redis server? This uses the power of Lua scripting:

private static final String OBTAIN_LOCK_SCRIPT =
			"local lockClientId = redis.call('GET', KEYS[1])\n" +
					"if lockClientId == ARGV[1] then\n" +
					"  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
					"  return true\n" +
					"elseif not lockClientId then\n" +
					"  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
					"  return true\n" +
					"end\n" +
					"return false";
Copy the code

Void expire KEYS[1], ARGV[2]; void expire KEYS[1], ARGV[2]; If you don’t get to the value, then perform SET KEYS [1], ARGV [1], the PX ARGV [2], translation means if Redis already exist in the value of ARGV [1] key, then get the lock, just for the key SET expiration time ARGV [2], if you don’t have to value, Set the key value and expiration time.

Let’s continue to look at how to acquire locks:

@Override
	public Lock obtain(Object lockKey) {
		Assert.isInstanceOf(String.class, lockKey);
		String path = (String) lockKey;
		return this.locks.computeIfAbsent(path, RedisLock::new);
	}
Copy the code

The obtain method is the method defined in the LockRegistry interface, locks is an implementation of Map

LOCKS = new ConcurrentHashMap<>(), RedisLock is an internal class of RedisLockRegistry that implements the Lock interface and holds an internal ReentrantLock, which is the implementation of the ReentrantLock provided by J.U.C. When a lock is acquired, the specified RedisLock is generated by specifying a lockKey:
,>

private RedisLock(String path) {
			this.lockKey = constructLockKey(path);
		}
Copy the code

RegistryKey +”:”+path is the final locked key in Redis, so no suffixed *’:’* is required when setting registryKey.

This.locks.com puteIfAbsent (path, RedisLock: : new) this method, if can’t get to the key from the locks with the value of path, then the new a RedisLock and put it in to the locks, if you can take to, Therefore, only one lock with key path exists in the current RedisLockRegistry, ensuring the uniqueness of the lock.

Once we have an instance of the Lock, we also need to try to apply for the Lock. The Lock interface defines three ways to apply for the Lock:

  • lock()
  • tryLock()
  • tryLock(long time,TimeUnit unit)

The first method will try to acquire the lock until it is acquired, the second method will try to acquire the lock once and stop if it is not acquired, and the third method will try to acquire the lock for a specified period of time until the lock is acquired or the time expires. Obviously, we will use the third method to acquire the lock most of the time. Let’s look at the implementation of the third method overridden in RedisLock:

@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long now = System.currentTimeMillis(); if (! This.locallock. tryLock(time, unit)) {① return false; } try { long expire = now + TimeUnit.MILLISECONDS.convert(time, unit); boolean acquired; while (! (acquired = obtainLock()) && System.currentTimemillis () < expire) {//NOSONAR ② thread.sleep (100); // sonar ③} if (! acquired) { this.localLock.unlock(); (4)} return acquired; } catch (Exception e) { this.localLock.unlock(); rethrowAsLockException(e); } return false; }Copy the code

First, the ReentrantLock held by RedisLock attempts to acquire the lock. If the lock cannot be acquired within the specified time range, false is returned. After obtaining the lock, the obtainLock method is used to repeatedly attempt to set the lock in Redis within the specified time interval of 100 ms. If the lock cannot be set in Redis within the specified time interval, the ReentrantLock held internally needs to be released. If the lock was successfully set in Redis, true is returned indicating that the lock was successfully acquired. Let’s look at the implementation of the obtainLock method:

private boolean obtainLock() {
			Boolean success =
					RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
							Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
							String.valueOf(RedisLockRegistry.this.expireAfter));

			boolean result = Boolean.TRUE.equals(success);

			if (result) {
				this.lockedAt = System.currentTimeMillis();
			}
			return result;
		}
Copy the code

Using redisTemplate execution OBTAIN_LOCK_SCRIPT the lua script mentioned above, the key has a value of RedisLockRegistry. Enclosing clientId, We can see that its implementation is uuid.randomuuid ().toString();

When the current thread is finished using the resource, it needs to release the lock so that other threads can acquire the lock.

@Override public void unlock() { if (! This. LocalLock. IsHeldByCurrentThread ()) {(1) throw new an IllegalStateException (" You do not own the lock at "+ enclosing lockKey); } the if (this. LocalLock. GetHoldCount () > 1) {(2) this. LocalLock. Unlock (); return; } try { if (! isAcquiredInThisProcess()) { throw new IllegalStateException("Lock was released in the store due to expiration. " + "The  integrity of data protected by this lock may have been compromised."); } if (Thread.currentThread().isInterrupted()) { RedisLockRegistry.this.executor.execute(this::removeLockKey); } else { removeLockKey(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Released lock; " + this); } } catch (Exception e) { ReflectionUtils.rethrowRuntimeException(e); } finally { this.localLock.unlock(); }}Copy the code

First of all, it is necessary to determine whether the current thread to release the lock is the lock adding thread. Lock releasing and lock adding thread must be the same thread to release the lock. SQL > select * from ReentrantLock; SQL > select * from ReentrantLock; SQL > select * from ReentrantLock; SQL > select * from ReentrantLock; The ReentrantLock held in RedisLock can only be reduced by 1. If the number of ReentrantLock held in RedisLock is 1, the Redis lock needs to be released.

private void removeLockKey() {
			if (RedisLockRegistry.this.unlinkAvailable) {
				try {
					RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
				}
				catch (Exception ex) {
					RedisLockRegistry.this.unlinkAvailable = false;
					if (LOGGER.isDebugEnabled()) {
						LOGGER.debug("The UNLINK command has failed (not supported on the Redis server?); " +
								"falling back to the regular DELETE command", ex);
					}
					else {
						LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
								"falling back to the regular DELETE command: " + ex.getMessage());
					}
					RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
				}
			}
			else {
				RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
			}
		}
Copy the code

After analyzing the process of locking, the process of releasing the lock is easy to understand.

If a thread can respond to an interrupt, it basically depends on fate. I personally do not recommend using interrupts in the program. When the program is executed, who can say it clearly?

The Java client of Redis, in addition to the mentioned Lettuce and Jedis, also has a Redisson client, which is also very good, later we will look at the implementation of Redisson’s distributed lock.

In addition, in order to improve the high availability of Redis, typically using master-slave mode deployment, so if the master goes down, the Redis data needs to be asynchronous synchronous to the slave node, the node need to synchronize data, and provide service for the client, so this time if the lock data haven’t synchronization is complete, the new client application lock, As a result, two clients hold the same lock at the same time. Therefore, this kind of distributed lock implementation method cannot achieve 100% exclusivity. How to design distributed lock?

In fact, the author of Redis has given the answer, on the official website of Redis, the author introduces Redlock, but the implementation of Redlock is quite complicated, we will introduce it in detail later.