Reprint address: http://yhjhappy234.blog.163.com/blog/static/3163283220135875759265/
CountDownLauch is a set of synchronization tools in Java, often referred to as a counter in concurrency, or as a lock!
CountDownLauch is mainly used in two scenarios. One is called a switch, which allows one or a group of threads to wait until a task is complete. This is often referred to as locking, popular speak be, equivalent to a gate, all threads are blocked before the doors opened, once the door opened, all threads will be passed, but once the door opened, all threads are passed, then the blocking state of failure, the state of the door also cannot be changed, can only be open. Another scenario, often referred to as a counter, allows a task to be split into N smaller tasks, with the main thread waiting until all tasks are complete, and the counter reduced by one when each task is complete, until the main thread is unblocked when all tasks are complete.
So let’s look at the API for CountDownLauch.
Summary of construction methods |
---|
CountDownLatch(int count) Construct a DE> CountDownLatchDE> initialized with the given count. |
Methods in this paper, | |
---|---|
void |
await() Causes the current thread to wait until the latch recounts to zero, unless the thread is interrupted. |
boolean | await(long timeout, TimeUnit unit) Causes the current thread to wait until the latch recounts to zero, unless the thread is interrupted or the specified wait time is exceeded. |
void | countDown(a) Decrement the count of the latch, and if the count reaches zero, all waiting threads are released. |
long | getCount(a) Returns the current count. |
String | toString(a) Returns a string identifying this latch and its state. |
CountDownLatch maintains a positive counter, the countDown method decrement the counter, and the await method waits for the counter to reach zero. All await threads block until the counter reaches zero or wait for the thread to interrupt or time out.
Let’s take a look at a corresponding application example:
package com.yhj.lauth;
import java.util.Date; import java.util.concurrent.CountDownLatch; / / workers class Worker extends Thread{
private int workNo; / / work number private CountDownLatch startLauch; // Initiators are locked private CountDownLatch workLauch; // Work process – counter
public Worker(int workNo,CountDownLatch startLauch,CountDownLatch workLauch) { this.workNo = workNo; this.startLauch = startLauch; this.workLauch = workLauch; }
@Override public void run() { try { System.out.println(new Date()+” -yhj “+ “workNo+” Ready to work!” ); startLauch.await(); // Wait for instructions from the boss System.out.println(new Date()+” -yhj “+ “workNo “+” workNo “… ); Thread.sleep(100); // Work for 100ms per person } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println(new Date()+” -yhj “+ “workNo”! ); workLauch.countDown(); }
} } // Test case public class CountDownLauthTestCase {
public static void main(String[] args) throwsInterruptedException { int workerCount = 10; // Number of workers CountDownLatch startLauch = new CountDownLatch(1); // Latching is equivalent to a switch CountDownLatch workLauch = newCountDownLatch(workerCount); / / counter System.out.println(new Date()+” -boss: set ready to start!” ); for(int i=0; i new Worker(i, startLauch, workLauch).start(); } System.out.println(new Date()+” -boss: rest for 2s!” ); Thread.sleep(2000); System.out.println(new Date()+” -boss: start!” ); startLauch.countDown(); // Turn on the switch workLauch.await(); // Notify the Boss when the task is complete System.out.println(new Date()+” -boss: Ok! Mission accomplished! Call it a day and go home!” ); } } Execution Result: Sat Jun 08 18:59:33 CST 2013 – Boss: Get ready! Sat Jun 08 18:59:33 CST 2013-YHJ0 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ2 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ1 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ4 Ready to go! Sat Jun 08 18:59:33 CST 2013 – Boss: Rest for 2s Sat Jun 08 18:59:33 CST 2013-YHJ8 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ6 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ3 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ7 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ5 Ready to go! Sat Jun 08 18:59:33 CST 2013-YHJ9 Ready to go! Sat Jun 08 18:59:35 CST 2013 – Boss: Start! Sat Jun 08 18:59:35 CST 2013 – YHJ0 Sat Jun 08 18:59:35 CST 2013-YHJ2 Sat Jun 08 18:59:35 CST 2013-YHJ1 Sat Jun 08 18:59:35 CST 2013-YHJ4 Sat Jun 08 18:59:35 CST 2013 – YHJ8 Sat Jun 08 18:59:35 CST 2013 – YHJ6 Sat Jun 08 18:59:35 CST 2013 – YHJ3 Sat Jun 08 18:59:35 CST 2013 – YHJ7 Sat Jun 08 18:59:35 CST 2013 – YHJ5 Sat Jun 08 18:59:35 CST 2013-YHJ9 Sat Jun 08 18:59:35 CST 2013-YHJ5 Sat Jun 08 18:59:35 CST 2013-YHJ1 Sat Jun 08 18:59:35 CST 2013-YHJ3 Sat Jun 08 18:59:35 CST 2013-YHJ6 Sat Jun 08 18:59:35 CST 2013-YHJ7 Sat Jun 08 18:59:35 CST 2013-YHJ9 Sat Jun 08 18:59:35 CST 2013-YHJ4 Sat Jun 08 18:59:35 CST 2013-YHJ0 Sat Jun 08 18:59:35 CST 2013-YHJ2 Sat Jun 08 18:59:35 CST 2013-YHJ8 Sat Jun 08 18:59:35 CST 2013 – Boss: Good! Mission accomplished! Call it a day and go home! |
This example uses two CountDownLauch scenarios, respectively. The first startLauch is a switch. Before it is enabled, no thread will execute, and after it is enabled, all threads can execute at the same time. The second workerLauch is actually a counter. When the counter does not reach zero, the main thread waits. When all threads are finished, the main thread unblocks and continues to execute.
The second scenario is often used in thread pools, which we’ll learn more about later!
Another important feature here is that
Memory consistency effect: the happen-before operation before calling countDown() in the thread immediately follows the successful return of the operation corresponding to await() from another thread.
We have seen the scene application, what is it based on, how to achieve it?
Let’s take a look at the corresponding source:
private static final class Sync extends AbstractQueuedSynchronizer |
In the second line of the class we see a synchronizer that implements AQS internally. Let’s focus on the methods we use: await and countDown. Let’s start with the await method
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } |
This is obviously a direct call to the internally reimplemented synchronizer’s method for acquiring the shared lock.
public final void acquireSharedInterruptibly(int arg) throwsInterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } |
Here, if the thread is interrupted, exit, otherwise try to get the shared lock. Let’s look at the implementation of tryAcquireShared(arG).
public int tryAcquireShared(int acquires) { return getState() == 0? 1:1; } |
The so-called shared lock means that all threads that share the lock share the same resource. Once any thread gets the shared resource, all threads own the same resource. That is, in general, a shared lock is just a flag, and all threads wait for that flag to be met. Once that flag is met, all threads are activated (equivalent to all threads taking the lock). The lock CountDownLatch here is an implementation based on shared locks. And the obvious indication here is that state etc. is not equal to zero, whereas state is actually how many threads are competing for the resource. As we saw earlier, it was passed in a value greater than zero through the constructor, so at this point in time the value returned here is always -1.
Sync(int count) { setState(count); } |
When tryAcquireShared returned data is less than zero, show that no access to resources, the need to block, the executing code doAcquireSharedInterruptibly () :
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null ; // help GC return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); } |
Here in the first place to share mode to add a node to join the CLH the queue, then check the current node before the node (insert data in the line), if the node is head node and the current counter to 0, fire up the subsequent nodes behind (wake up), otherwise the judge whether need to block, if necessary, to block the current thread. Until awakened or interrupted!
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } |
The Obj parameter in locksupport. park(Obj) is a blocked monitoring object, not a blocked object. The blocked object is the current thread. Don’t get confused!
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread ! = null ) unsafe.unpark(thread); } |
Let’s look at the implementation of the corresponding countDown method
public void countDown() { sync.releaseShared(1); } |
CountDown executes an internal lock release every time it executes!
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true ; } return false ; } |
If the attempt succeeds, set the current node as the head node and wake up the successor nodes of the corresponding node!
public boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false ; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } |
CountDownLauch (CountDownLauch) returns false if the count is equal to zero. Otherwise, CAS sets the count and subtracts the count from countdown. Return true (nexTC == 0), false (nexTC == 0).
When tryReleaseShared returns true, the counter is zero and the blocked resource needs to be released! The unparksucceeded (h) method is then executed to wake up the heads in the queue.
Instead of a singleAll-like approach to wake up all threads, an elegant queue is designed to release the blocked threads in turn. So how does it work? Our code only woke up the head node (which is really the successor of the head, which is just an empty node), let us first look at the implementation of the unparksucceeded node
private void unparkSuccessor(Node node) { / * * Try to clear status in anticipation of signalling. It is * OK if this fails or if status is changed by waiting thread. * / compareAndSetWaitStatus(node, Node.SIGNAL, 0);
/ * * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. * / Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null ) LockSupport.unpark(s.thread); } |
Obviously, we can see that the parameter passed in is the header, and after setting the data through CAS, we wake up the successor of the header (note that the unpacked thread is not blocking the monitor). And then it came back!
How do the remaining blocked threads wake up? We’ll look at await doAcquireSharedInterruptibly realization method
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // tag 2 if (r >= 0) { setHeadAndPropagate(node, r); // tag 3 p.next = null ; // help GC return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// tag 1 break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); } |
We can see that parkAndCheckInterrupt() blocks. When we wake up the successor node of the header (the first node to enter the queue), tag1 is woken up. After break, the tag2 line continues to spin. So tryAcquireShared(ARg) returns 1, r is greater than zero and goes into tag3, which sets the current thread as the head node and then continues to wake up subsequent nodes.
private void setHeadAndPropagate(Node node, int propagate) { setHead(node); // tag 4 if (propagate > 0 && node.waitStatus ! = 0) { / * * Don’t bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. * / Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); // tag 5 } } |
After the successor node is woken up, it continues to wake up the following successor nodes, thus waking up the data in the queue in turn!
The whole CountDownLatch looks like this. In fact, with the previous atomic operation and AQS principle and implementation, it is relatively easy to analyze CountDownLatch.