Synchronized, CAS, and ReentrankLock lock are all JVMS, and they don’t work in a cluster. This is where we need locks that can determine the order of execution between multiple JVMS. Currently distributed locks are mainly implemented by Redis, Zookeeper, and database methods, but the performance is poor, that is, a third party supervision is required.
background
When doing a consumer Kafka news recently found that there are too many online consumers, often, multiple machines at the same time to deal with a primary key types of data, if the last is to perform the update operation, that is a question of update order, but if just need to insert the data, there will be a primary key to repeat the problem. This is not allowed in production (because the company has abnormal supervision mechanism, deduction points etc.), this is the need for a distributed lock, after considering the implementation of Redis (because there are many online examples).
Analysis of the
Redis implementation of a distributed lock, implementation principle is set methods, because multiple threads at the same time request, only one thread can succeed and return the results, you can also set the period of validity, to avoid the occurrence of a deadlock, everything is so perfect, but there is a problem, at the time of the set, will be returned directly as a result, the success or failure, has no blocking effect, We need to deal with failed thread processes ourselves in two ways
- discarded
- Since our system needs this data, we have to try again to get it. Used here
redis
theList
Type implements the role of wait sequence
code
Directly on the code actually directly redis utility class can be solved
package com.test import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.List; /** * @date **/ public class RedisUcUitl {private static final String LOCK_SUCCESS ="OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private RedisUcUitl() {} /** * logger **/ /** * store the redis queue sequence stored in the queue head ** @param key byte type * @param value byte type */ public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {returnjedis.lpush(key, value); } /** * remove the last element in the list and add the modified element to another list. When the list is empty, block the connection until timeout ** @param srckey * @param dstkey * @param timeout 0: never timeout * @return
*/
public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {
returnjedis.brpoplpush(srckey, dstkey, timeout); } /** * return the specified key, the redis data of the starting position * @param redisKey * @param start * @param end -1 means to the end * @return
*/
public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
returnjedis.lrange(redisKey, start, end); } public static void delete(Jedis Jedis, final byte[] redisKey) {returnjedis.del(redisKey); } /** ** Attempt to lock * @param lockKey Key name * @param requestId ID * @param expireTime expiration time * @return
*/
public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
returnLOCK_SUCCESS.equals(result); } /** * release lock * @param lockKey Key name * @param requestId IDENTITY * @return
*/
public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
returnRELEASE_SUCCESS.equals(result); }}Copy the code
The main code of the business logic is as follows
// 1. Consume the queue firstwhile(trueBytes [] bytes = redisucuitL.brpoplPush (keystr.getBytes (UTF_8), dstKeyStr.getBytes(UTF_8), 1);if(bytes = = null | | bytes. IsEmpty ()) {/ / exit if no data in the queuebreak; } / / deserialized Object Map < String, Object > singleMap = (Map < String, Object >) ObjectSerialUtil. BytesToObject (bytes); String requestId = uuid.randomuuid ().toString(); // Insert a unique value to prevent it from being misunderstood by other threads. boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);if(lockGetFlag) {/ / success acquiring a lock for business processing / / TODO/processed/releases the lock Boolean freeLock = RedisUcUitl. ReleaseDistributedLock (keyStr, requestId); }else{/ / failed to get locked into the waiting queue RedisUcUitl. Lpush (keyStr. GetBytes (UTF_8), ObjectSerialUtil. ObjectToBytes (param)); } }catch(Exception e){break; }} // 2. Process the latest received dataCopy the code
General serialization with fastJson column can be used, here is the JDK with its own tool class as follows
public class ObjectSerialUtil {
private ObjectSerialUtil() {// utility class} /** * serialize Object to byte[] ** @param obj Object * @returnByte array * @throws Exception */ public static byte[] objectToBytes(Object obj) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); byte[] bytes = bos.toByteArray(); bos.close(); oos.close();returnbytes; } /** * restores the bytes array to an object ** @param bytes * @return
* @throws Exception
*/
public static Object bytesToObject(byte[] bytes) {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bin);
return ois.readObject();
} catch (Exception e) {
throw new BaseException("Deserialization error!", e); }}}Copy the code