preface
Having covered most of Redisson’s content, let’s take a look at what components Redisson has on its website:
With Semaphore and CountDownLatch remaining, let’s see how Redisson does it while the iron is hot.
Semaphore and CountDownLatch are two brothers we know about in the JDK, but we won’t go into details here.
Semaphore example
Here’s the Semaphore schematic:
Then let’s look at the example used in Redisson:
RSemaphore semaphore = redisson.getSemaphore("semaphore");
A maximum of 3 threads are allowed to acquire the lock at the same time
semaphore.trySetPermits(3);
for(int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run(a) {
try {
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Attempt to acquire Semaphore lock");
semaphore.acquire();
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Semaphore lock was successfully acquired and started working.");
Thread.sleep(3000);
semaphore.release();
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Release Semaphore lock");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
Copy the code
Semaphore source code analysis
Let’s see how the source code is implemented based on the example above:
Semaphore. TrySetPermits (3);
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;", Arrays.<Object>asList(getName(), getChannelName()), permits); }}Copy the code
The execution process is as follows:
- Get semaphore: gets a current value
- The first time the data is 0, then use Set Semaphore 3 to set the number of clients that this semaphore can allow to acquire locks at the same time to 3
- Then publish some messages that return 1
Next look at semaphore.acquire(); And semaphore release (); Logic:
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }}Copy the code
TryAcquireAsync () :
- Get semaphore, get a current value, say 3,3 > 1
- Decrby semaphore 1, decrement the number of clients allowed to acquire locks by 1 to 2
- decrby semaphore 1
- decrby semaphore 1
- After three locks, the semaphore value is 0
At this point, if the lock is added again, it will directly return 0, and then enter an infinite loop to obtain the lock, as shown in the following figure:
Then look at the unlock logic releaseAsync() :
- Incrby semaphore 1: each time a client releases the lock, the semaphore value is incremented by 1, so the semaphore value is not zero
This shows how easy it is for Redisson to implement Semaphore
CountDownLatch example use
Use cases:
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(3);
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Sets that there must be 3 threads executing countDown to enter the wait...");
for(int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run(a) {
try {
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Some operations are being performed, please be patient...");
Thread.sleep(3000);
RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");
localLatch.countDown();
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Execute countDown");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
latch.await();
System.out.println(new Date() + ": thread [" + Thread.currentThread().getName() + "] Received notification that three threads have all performed countDown, can proceed");
Copy the code
CountDownLatch source code parsing
The source code is as follows:
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
}
@Override
public RFuture<Void> countDownAsync(a) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage); }}Copy the code
First analysistrySetCount()
Method logic:
- Exists anyCountDownLatch, the first time definitely does not exist
- set redisson_countdownlatch__channel__anyCountDownLatch 3
- Returns 1
Then analysislatch.await();
Method, as shown below:
If the latch is still greater than 0, the latch will continue the loop. Otherwise, the latch will exit the loop
In the final analysislocalLatch.countDown();
Methods:
- Decr anyCountDownLatch, which countDown the cocuntDownLatch one client at a time, decreases the value of the cocuntDownLatch by one
await()
The aspect has been parsed to determine in an infinite loop whether anyCountDownLatch stores a value of 0, and if 0 then executes its own logic
conclusion
See how easy these two components are here?
This is the end of the Redisson section, and we will also learn how ZK implements distributed locking.
statement
This article first from my public number: a flower is not romantic, if reprinted please indicate the source!
Interested partners can pay attention to personal public account: a flower is not romantic