The introduction
The principle of AQs-based ReentrantLock was analyzed in detail in the previous article. ReentrantLock represents an exclusive lock by converting between state variables 0 and 1 in AQS. So think about it, what does it mean when the state variable is greater than 1? Is there an AQs-based implementation of this in J.U.C? If so, how did they do it? These questions will be answered by a detailed analysis of the Semaphore and CountDownLatch classes in J.U.C.
- Semaphore shared logic with CountDownLatch
- Examples of Semaphore and CountDownLatch use
- 2.1 Use of Semaphore
- 2.2 Use of CountDownLatch
- Source code analysis
- 3.1 Realization of shared lock in AQS
- 3.2 Semaphore source code analysis
- 3.3 CountDownLatch source code analysis
- conclusion
1. Sharing mode of Semaphore and CountDownLatch
An exclusive lock means that only one thread can acquire the lock, and all other threads must wait for the lock to be released before proceeding. By analogy, does a shared lock mean that multiple threads can use the lock at the same time, without waiting? If that were the case, the point of the lock would be lost. Sharing in J.U.C means that multiple threads can acquire locks at the same time, but this number is limited, not infinite. Semaphore and CountDownLatch are used to implement finite shared locks in J.U.C.
A Semaphore, also known as a Semaphore, allocates signals to each thread using a shared ‘packet’. If there are enough signals in the packet, the thread can acquire the lock. If there are not enough signals in the packet, the thread cannot acquire the lock and has to wait for enough signals to be released.
CountDownLatch, also known as the counter, controls the acquisition of thread locks by a shared total count. When the total count is greater than zero, threads are blocked and cannot acquire locks. Only when the total count is zero, all blocked threads are released at the same time.
Semaphore and CountDownLatch both have a shared total, which is implemented by state.
2. Examples of Semaphore and CountDownLatch
Before analyzing the principle of Semaphore and CountDownLatch in detail, let’s take a look at how they are used so that we can understand their principle later. What is he first? And then why? Here are two examples to illustrate the use of Semaphore and CountDownLatch.
2.1 Use of Semaphore
// Initialize 10 semaphores in the signal packet, Public static void main(String[] args) throws InterruptedException {Semaphore Semaphore = new Semaphore(10); SemaphoreTest(semaphore); } private static void SemaphoreTest(final Semaphore Semaphore) throws InterruptedException { ThreadA = new threadA (new)Runnable() {
@Override
public void run() {
try {
semaphore.acquire(4);
System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
semaphore.release(1);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
semaphore.release(1);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 2 semaphore"); semaphore.release(2); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadA.setName("threadA"); ThreadB = new Thread(new) threadB = new Thread(new)Runnable() {
@Override
public void run() {
try {
semaphore.acquire(5);
System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
semaphore.release(2);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 3 semaphore"); semaphore.release(3); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadB.setName("threadB"); ThreadC = new Thread(new) threadC = new Thread(new)Runnable() {
@Override
public void run() {
try {
semaphore.acquire(4);
System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " release 4 semaphore"); semaphore.release(4); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadC.setName("threadC"); ThreadD = new Thread(new) threadD = new Thread(new)Runnable() {
@Override
public void run() {
try {
semaphore.acquire(10);
System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " release 10 semaphore"); semaphore.release(10); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadD.setName("threadD"); Threada.start (); // threadA and thread B acquire 4 and 5 semaphores, respectively. threadB.start(); Thread.sleep(1); // threadC tries to get 4 finds and waits for threadc.start (); Thread.sleep(1); Threadd.start (); // threadD tries to get 10 discoveries and waits for threadd.start (); }Copy the code
The result is as follows:
threadB get 5 semaphore
threadA get 4 semaphore
threadA release 1 semaphore
threadB release 2 semaphore
threadC get 4 semaphore
threadA release 1 semaphore
threadC release 4 semaphore
threadB release 3 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore
Copy the code
You can see that threadA and threadB get nine semaphores and threadC and threadD wait for enough semaphores to continue. ThreadA and threadB can execute simultaneously if the semaphore is sufficient.
If four semaphores are released when threadD is queued before threadC, will threadC execute before threadD? Or do you have to wait in line? This question will be answered after a detailed analysis of Semaphore’s source code.
2.2 Use of CountDownLatch
Public static void main(String[] args) throws InterruptedException {CountDownLatch CountDownLatch = new CountDownLatch(2); CountDownLatchTest(countDownLatch); } private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException { ThreadA = new Thread(new)Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " await"); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadA.setName("threadA"); ThreadB = new Thread(newRunnable() {
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " await"); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadB.setName("threadB"); ThreadC = new Thread(new) threadC = new Thread(newRunnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + " countDown"); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadC.setName("threadC"); ThreadD = new Thread(new) threadD = new Thread(newRunnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + " countDown"); } catch (InterruptedException e) { e.printStackTrace(); }}}); threadD.setName("threadD");
threadA.start();
threadB.start();
threadC.start();
threadD.start();
}
Copy the code
The result is as follows:
threadC countDown
threadD countDown
threadA await
threadB await
Copy the code
ThreadA and threadB block when they attempt to execute because the total number of counters is 2. When threadC and threadD decrement the total number of counters to 0, threadA and threadB start executing simultaneously.
To recap: Semaphore is like a revolving sushi bar, with 10 seats, and when a seat becomes available, waiting people can take it. If there are only two empty seats and a family of three comes, there is only waiting. If it’s a couple, you can just sit on it and eat. Of course, if 5 seats are available at the same time, the family of 3 and a couple can eat at the same time. CountDownLatch is like a temporary amusement park in a large shopping mall. After each time, people waiting for the ride come in at the same time, while in the middle of the ride, there will be people who don’t like to play at any time, but can not enter. Once all the visitors come out, a new batch of people can enter at the same time.
3. Source code analysis
Understand what Semaphore and CountDownLatch do and how to use them. Let’s take a look at how these functions are implemented under Semaphore and CountDownLatch.
3.1 Realization of shared lock in AQS
In the previous article, we analyzed ReentrantLock and found several key methods to implement exclusive lock in AQS:
Private volatile int state; private volatile int state; Public final void acquire(int arg) public final void acquire(int arg) ReentrantLock protected Boolean tryAcquire(int arg); Public final Boolean release(int arg) public final Boolean release(int arg); ReentrantLock protected Boolean tryRelease(int arg);Copy the code
The logic of obtaining and releasing an exclusive lock is implemented in ReentrantLock, and the logic that needs to be processed after obtaining or releasing an exclusive lock fails is managed in AQS. Does shared lock implementation follow this rule? Thus, we found the following similar methods in AQS:
Public final void acquireShared(int arg) public final void acquireShared(int arg) Protected int tryAcquireShared(int arg); Public Final Boolean releaseShared(int arg); Protected Boolean tryReleaseShared(int arg);Copy the code
Shared locks and cores are among the four key methods above. Let’s look at how Semaphore calls the above methods to implement shared locks.
3.2 Semaphore source code analysis
Semaphore’s constructor is the same as ReentrantLock’s constructor. It has two constructors to implement a fair shared lock and an unfair shared lock. This goes back to the question that followed the example above, whether later threads can fetch the semaphore directly when there is a waiting thread before them, or whether they must queue. Waiting is fair, of course; queue-jumping is unfair.
Again, take the example of rotating sushi: Now there are only 2 empty seats, there is already a family of 3 people waiting, and then a couple comes. The implementation of the fair share lock means that the couple have to wait until the family of 3 people is served. Instead of the implementation of the fair share lock, the couple can eat directly because there are just 2 empty seats. Let a family of 3 continue to wait (seems very unfair……) In this case, the advantage of unfair lock sharing is to maximize the profits of the sushi restaurant (as if it also offended the waiting customers……). “Is Semaphore’s default implementation.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code
Semaphore in the case of the two core method is used to acquire and release, respectively called the AQS acquireSharedInterruptibly and releaseShared method:
Public void acquire(int Permits) throws InterruptedException {public void acquire(int permits) throws InterruptedException {if(permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); Permitting public void release(int permitting) {if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ifTryAcquireShared (arG) < 0) // Try to get arG semaphoredoAcquireSharedInterruptibly(arg); } public final Boolean releaseShared(int arg) {if(tryReleaseShared(arg)) {// Try to release arg semaphoredoReleaseShared();
return true;
}
return false;
}
Copy the code
Semaphore obtains and releases semaphores through AQS. The specific success of obtaining or releasing Semaphore is realized by Semaphore itself.
Protected int tryAcquireShared(int acquires) {for (;;) {
if(Hasqueuedtoraise ()) // Whether there is a queue before it, if it returns to retrieve the failedreturn- 1; int available = getState(); Int remaining = available - acquires; // Remaining semaphores int remaining = available - acquires;if(remaining < 0 | | compareAndSetState (available, remaining)) / / semaphore left is not enough, enough to try to get under the condition of (rotary sushi restaurant seats are not enough, or to both for musical chair)returnremaining; } final int nonfairTryAcquireShared(int acquires) {for(;;) { int available = getState(); Int remaining = available - acquires; // Remaining semaphores int remaining = available - acquires;if(remaining < 0 | | compareAndSetState (available, remaining)) / / semaphore left is not enough, enough to try to get under the condition of (rotary sushi restaurant seats are not enough, or at the same time two couples to grab a seat)returnremaining; }}Copy the code
You can see that the difference between a fairly shared lock and an unfair shared lock is whether you need to determine whether there are already waiting threads in the queue. A fairly shared lock needs to be judged first. An unfairly shared lock cuts to the queue even though there is already a thread waiting.
To verify this conclusion, modify the above example slightly:
threadA.start(); threadB.start(); Thread.sleep(1); threadD.start(); //threadD is queuing thread.sleep (3500); threadC.start(); ThreadC cuts the queue after 3500 millisecondsCopy the code
Result output:
threadB get 5 semaphore threadA get 4 semaphore threadB release 2 semaphore threadA release 1 semaphore threadC get 4 Release 1 semaphore threadB release 3 Semaphore threadC release 4 semaphore threadA release 2 semaphore threadD get 10 semaphore threadD release 10 semaphoreCopy the code
This example is a good example of trying to acquire a shared lock before queuing when it is an unfair lock.
After failed to get a semaphore to line up, line up of this operation by AQS doAcquireSharedInterruptibly method to implement:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); // Join the queue Boolean failed =true;
try {
for(;;) { final Node p = node.predecessor(); // Get the front node of the current nodeif(p == head) { int r = tryAcquireShared(arg); // If the front node is the head node, the current node is the first thread node to suspend, and attempts to acquire the shared lock againif (r >= 0) {
setHeadAndPropagate(node, r); // The difference with ReentrantLock is that the head node is successfully set to acquire the shared lock, and the next node is notified p.ext = null; //help GC
failed = false;
return; }}ifNode (shouldParkAfterFailedAcquire (p) && / / the head node or failed to get the lock, check the node status, see whether need to hung thread parkAndCheckInterrupt ()) / / hung thread, the current thread is blocked in here! throw new InterruptedException(); } } finally {if(failed) cancelAcquire(node); }}Copy the code
This code is basically the same as the acquireQueued(addWaiter(Node.exclusive), ARG) method in ReentrantLock. First, nodes of node. SHARED type are added to the waiting queue. Second, the next node will be notified after obtaining the lock successfully, that is, the next thread will wake up. In the case of rotating sushi restaurant, for example, there are 5 guests in front at the same time, and 5 seats are available. After a family of 3 people sit in, they will tell a couple behind them to sit in, so as to achieve the purpose of sharing. ShouldParkAfterFailedAcquire and parkAndCheckInterrupt methods are detailed in the previous article, do explained here.
TryReleaseShared is called to try to release the semaphore, and doReleaseShared is called to determine if the subsequent thread needs to be woke up.
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if(next < current) // overflow // throw new Error"Maximum permit count exceeded");
if(compareAndSetState(current, next)) // CAS operation sets new semaphorereturn true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if(h ! = null && h ! = tail) { int ws = h.waitStatus;if(ws == node.signal) {// Wake up subsequent nodes in SIGNAL stateif(! compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases unparkSuccessor(h); // Wake up the successor node}else if(ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
The logic of the release is well understood, with only a slight difference in the number of states compared to ReentrantLock.
3.3 CountDownLatch source code analysis
CountDownLatch is logically much simpler to implement than Semaphore, and there is no distinction between fair and unfair because when the counter reaches zero, all waiting threads are released, and when the counter is not zero, all waiting threads are blocked. Take a look directly at CountDownLatch’s two core methods await and countDown.
Public void await() throws InterruptedException {public void await() throws InterruptedException {public void await() throws InterruptedException { Because behind CountDownLatch tryAcquireShared implementation through the getState () = = 0 to judge's sync. AcquireSharedInterruptibly (1); } public Boolean await(long timeout, TimeUnit unit) throws InterruptedExceptionfalseExecute the following code and do not block for longreturn sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() { sync.releaseShared(1); / / counting each release 1} public final void acquireSharedInterruptibly (int arg) throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
ifTryAcquireShared (arG) < 0) // Try to get arG semaphoredoAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) {return(getState() == 0) ? 1:1; // Set the basis for simultaneous lock acquisition, regardless of the initial State, only when the count is equal to 0}Copy the code
There are two differences between Semaphore and State. First, State decreases by 1 each time, and all waiting threads are released only when State is 0. Second, it provides a timeout wait method. AcquireSharedInterruptibly methods like Semaphore, did not elaborate, key said tryAcquireSharedNanos method here.
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout); } static final Long spinForTimeoutThreshold = 1000L; private booleandoAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false; final long deadline = System.nanoTime() + nanosTimeout; // Calculate a deadline final Node Node = addWaiter(node.shared); boolean failed =true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if(nanosTimeout <= 0L) // Returns after timeoutfalse, continue to executereturn false;
if(shouldParkAfterFailedAcquire(p, Node) && nanosTimeout > spinForTimeoutThreshold) // Suspend thread locksupport. parkNanos(this, nanosTimeout); // There is also a timeout limit for suspended threadsif (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if(failed) cancelAcquire(node); }}Copy the code
Focus on the several lines of code marked with comments, first calculate a timeout time, when the timeout directly exit waiting, continue to execute. If the cas operation has not timed out and is longer than the minimum cas operation time, which is defined here as 1000ns, the suspension is suspended, and the suspension operation has a timeout limit. This implements the timeout wait.
4. To summarize
The two implementations of AQS shared lock Semaphore and CountDownLatch are analyzed. The biggest difference between them and non-shared locks is that multiple threads can acquire locks at the same time. I hope you can have a deep understanding of Semaphore and CountDownLatch after reading it. If you don’t understand, think about the examples of rotating sushi restaurant and amusement park. If you think it is good, you can give a thumbs up.