This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021

preface

There is often a need for a query interface, the first time, if the query does not find the initialization method, initialization data out, subsequent queries can directly query the database data. The idea is that if the data that needs to be initialized is too large to be processed in a single call, or if not every piece of data needs to be initialized, the query-first data gets initialized first.

The problem

With such a scheme comes a problem. Query interfaces are known to be naturally idempotent and do not require additional idempotent processing. But in the scenario, the query is not just a query. The initialization method is executed without a query, which is essentially insert logic. So we have to do idempotent things ourselves.

plan

Single service, we can use Java lock to implement idempotent, each data primary key ID as the lock. But these days, distributed services are basically distributed services. As mentioned in the previous article, we can implement distributed lock RedissonLock. RedissonLock (RedissonLock) : RedissonLock (RedissonLock) : RedissonLock (RedissonLock) : RedissonLock Lock release, you can first query data initialization good, completed directly check the library. How does RedissonLock implement wait?

tryLock

RedissonLock provides an API in the lock method that provides waitTime, the waitTime.

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
Copy the code

Messages are subscribed to within waitTime, using redis’s own publish-subscribe functionality.

RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (! subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (! subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); }}); } acquireFailed(threadId); return false; }Copy the code

This way, when the lock is released, the message is released at the same time. All threads listening for the lock will be notified, and they will once again compete for the lock to achieve the idempotent functionality we need. We are looking at the logic of releasing locks. Is there a message?

unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil; " + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }Copy the code

In the unlockInnerAsync method, the Lua script is executed. In the script, we can easily see that the publish command is executed.

thinking

Redisson cleverly uses the publish and subscribe function of Redis to realize the wait function of distributed lock. So what are the application scenarios of redis’ publish and subscribe function in actual business applications? Welcome to comment!