JUC contains some common synchronization tool classes. Today, we will discuss in detail how to use CountDownLatch, CyclicBarrier, Semaphore and the differences between them.
1 CountDownLatch
1.1 Function Description
CountDownLatch is a synchronization utility class that coordinates synchronization between multiple threads. It allows one or more threads to wait until the other threads have completed their operations before executing
CountDownLatch enables one thread to wait for other threads to complete their work before continuing. This is implemented using a counter. The initial value of the counter is the number of threads. As each thread completes its task, the counter is reduced by one. When the counter value is 0, all threads are finished, and the threads waiting on CountDownLatch can resume to perform the next task.
Countdownlatch provides two methods, countDown and await. When countdownlatch is initialized, an integer is passed in. Any program that calls await must wait until the integer counts down to zero. Then countDown by countDown.
1.2 Simple Cases
public class CountDownLatchDemo {
private static final CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) {
try {
System.out.println("Main thread starts executing");
for (int i = 0; i < 3; i++) {
new Thread(new CountDownLatchThread(), String.valueOf(i)).start();
TimeUnit.SECONDS.sleep(1);
}
// block until countDownLatch is 0
// The main thread wakes up after all three child threads have finished executing
countDownLatch.await();
System.out.println("Main thread completes execution");
} catch(InterruptedException e) { e.printStackTrace(); }}static class CountDownLatchThread implements Runnable {
@Override
public void run(a) {
try {
System.out.println(Child thread + Thread.currentThread().getName() + "Commence execution");
TimeUnit.SECONDS.sleep(10);
// When the current thread calls this method, the count is reduced by one
countDownLatch.countDown();
System.out.println(Child thread + Thread.currentThread().getName() + "Execution completed");
} catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
The main thread needs to wait for the other three child threads to finish before continuing to execute code. Somewhat similar to join, but more flexible than join.
Join: If the join method of a thread is called, the current thread will be blocked until the thread completes execution. The principle of join is to continuously check whether the thread is alive. If the thread is alive, let the current thread wait until the thread terminates, and the this.notifyAll of the thread is called.
The main thread only needs to wait for the child thread to complete the first phase before it can execute. This is not possible with a join, so change the position of countDown() using countDownLatch.
1.3 Source Code Analysis
1.3.1 UML class diagrams
Its underlying implementation is an AQs-based shared lock.
1.3.2 initialization
CountDownLatch uses the shared lock mode. CountDownLatch uses an internal class Sync for synchronization control of CountDownLatch. Sync is an implementation class of AQS that uses the state of AQS to represent count.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Copy the code
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount(a) {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}}Copy the code
1.3.3 await ()
Causes the current thread to wait until count is counted down to zero, or the thread is interrupted.
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// The thread is interrupted, and an interrupt exception is thrown
if (Thread.interrupted())
throw new InterruptedException();
// Try to obtain the shared lock
if (tryAcquireShared(arg) < 0)
// Failed to obtain the lock
doAcquireSharedInterruptibly(arg);
}
Copy the code
TryAcquireShared Can obtain the lock only when State is 0
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
Copy the code
DoAcquireSharedInterruptibly: failed to get the lock to wait queue (interruptible), the ReadLoack ReentrantReadWriteLock with code, as shown in the previous code analysis.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
1.3.4 countDown ()
public void countDown(a) {
sync.releaseShared(1);
}
Copy the code
public final boolean releaseShared(int arg) {
// Release the shared lock. State is 0 after release. Wakes up the thread waiting in the synchronization queue of the shared lock.
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
2 CyclicBarrier
2.1 Function Description
Effect: Causes all threads to wait for completion before proceeding to the next step.
A fence is similar to a lock, blocking a group of threads until an event occurs. The key difference between a fence and a lock is that all threads must reach the fence at the same time before execution can continue. A lock is used to wait for events, and a fence is used to wait for other threads.
A CyclicBarrier causes a number of threads to converge repeatedly at the fence location. The await method is called when the thread reaches the fence position and blocks until all threads have reached the fence position. If all threads reach the fence, the fence will open, all threads will be released, and the fence will be reset for next use.
2.2 Simple Cases
public class CyclicBarrierDemo {
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4.new BarrierAction());
public static void main(String[] args) throws Exception {
for (int i = 0; i < 4; i++) {
new Thread(new BarrierThread(), String.valueOf(i)).start();
TimeUnit.MILLISECONDS.sleep(1); }}static class BarrierThread implements Runnable {
@Override
public void run(a) {
try {
System.out.println("Thread:" + Thread.currentThread().getName() + "Start");
TimeUnit.SECONDS.sleep(5);
cyclicBarrier.await();
System.out.println("Thread:" + Thread.currentThread().getName() + "The end");
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e) { e.printStackTrace(); }}}/** * CyclicBarrier can be used again */
static class CyclicBarrierThread implements Runnable {
@Override
public void run(a) {
try {
System.out.println("Thread:" + Thread.currentThread().getName() + "Start");
cyclicBarrier.await();
System.out.println("Thread:" + Thread.currentThread().getName() + "The end");
TimeUnit.SECONDS.sleep(10);
System.out.println("Thread:" + Thread.currentThread().getName() + "Start again");
cyclicBarrier.await();
System.out.println("Thread:" + Thread.currentThread().getName() + "Over again.");
} catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }}}static class BarrierAction implements Runnable {
@Override
public void run(a) {
System.out.println("The first thread to execute after all the fences have arrived."); }}} thread:0Start thread:1Start thread:2Start thread:3After the start fence is reached, the first thread to execute is thread:3End thread:0End thread:2End thread:1The end of theCopy the code
What is the output of changing parties to 2?
Execute two before the other two. Execute in two batches. This is the recycling of the fence
Thread:0Start thread:1Start thread:2Start thread:3After the start fence is reached, the first thread to execute is thread:1End thread:0When the end fence is reached, the first thread to execute is thread:3End thread:2The end of theCopy the code
2.3 Source Code Analysis
The underlying implementation uses ReentrantLock and Condition.
2.3.1 initialization
/** The lock for guarding barrier entry */
// Reentrant exclusive lock
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// Wait for the queue
private final Condition trip = lock.newCondition();
/** The number of parties */
// The number of threads participating
private final int parties;
/* The command to run when tripped */
// The operation performed by the last thread to enter the barrier
private final Runnable barrierCommand;
/** The current generation */
/ / the current generation
private Generation generation = new Generation();
// The number of threads waiting to enter the barrier
private int count;
Copy the code
private static class Generation {
boolean broken = false;
}
Copy the code
The main member variable in CyclicBarrier.
- Cyclicbarriers are built using the ReentrantLock and Condition classes. Both classes have been examined in previous source code analyses, as described in previous articles.
- CyclicBarrier has an inner class, Generation, which can be used as an instance of Generation every time you use CyclicBarrier.
public CyclicBarrier(int parties) {
this(parties, null);
}
Copy the code
public CyclicBarrier(int parties, Runnable barrierAction) {
If the number of threads involved is less than or equal to zero, an exception is raised
if (parties <= 0) throw new IllegalArgumentException();
/ / set the parties
this.parties = parties;
/ / set the count
this.count = parties;
/ / set barrierCommand
this.barrierCommand = barrierAction;
}
Copy the code
CyclicBarrier has two constructors
The CyclicBarrier’s default constructor is CyclicBarrier(int parties), whose argument is the number of threads that the barrier blocks, and each thread calls the await method to tell the CyclicBarrier that the current thread has reached the barrier, and that the current thread is blocked.
CyclicBarrier’s other constructor, CyclicBarrier(int parties, Runnable barrierAction), is used to preferentially execute barrierAction when threads reach the barrier, making it easier to handle more complex business scenarios. The action performed by the last thread to enter the barrier.
2.3.2 await
The thread calling the await method tells the CyclicBarrier that it has reached the synchronization point, and the current thread blocks until the parties’ participating threads call the await method. CyclicBarrier also provides await with timeout and await methods without timeout.
Await the timeout after the time arrives. Will destroy the fence first, then wake up all threads.
public int await(a) throws InterruptedException, BrokenBarrierException {
try {
// Wait without timeout
return dowait(false.0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen}}Copy the code
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
Copy the code
Both methods will eventually call the dowait(Boolean, Long) method, which is also the core method of CyclicBarrier
2.3.3 dowait
The main logical handling of the dowait(Boolean, long) method is simple. If the thread is not the last to call the await method, it will remain waiting unless:
- The last thread arrives, index == 0
- A participating thread has timed out
- A participating thread was interrupted
- The CyclicBarrier’s reset() method is called. This method resets the barrier to its initial state
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// Get the exclusive lock
final ReentrantLock lock = this.lock;
// Current thread is locked
lock.lock();
try {
/ / the current generation
final Generation g = generation;
// If this generation is corrupted, throw an exception
if (g.broken)
throw new BrokenBarrierException();
// If the thread breaks, throw an exception
if (Thread.interrupted()) {
// Set the damage state to true
// And notify other threads blocking on the fence
breakBarrier();
throw new InterruptedException();
}
// Reduce the number of threads waiting to enter the fence
// The number of threads waiting to enter the fence
int index = --count;
// The number of threads waiting to enter the fence is 0, and all threads have entered
if (index == 0) { // tripped
// Run the action identifier
boolean ranAction = false;
try {
// Run action during initialization
final Runnable command = barrierCommand;
// Run action is not empty
if(command ! =null)
/ / run
command.run();
// Set ranAction status
ranAction = true;
// Into the next generation, the important principle of recycling
nextGeneration();
return 0;
} finally {
// There is an exception
if(! ranAction)// Damage the current barrierbreakBarrier(); }}// loop until tripped, broken, interrupted, or timed out
// Infinite loop
for (;;) {
try {
// Wait time is not set
if(! timed)// The current thread is waiting and joins the wait queue
trip.await();
// set the wait time
else if (nanos > 0L)
// The current thread is waiting and joins the wait queue. The current thread wakes up automatically after timeout
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { // The thread is waiting to be interrupted in the await
// is equal to the current generation and the fence is not damaged
if (g == generation && ! g.broken) {
// Damage the current fence
breakBarrier();
// Throw an exception
throw ie;
} else {
// If the above condition is not met, this thread is not of this generation
// This will not affect the execution of the current generation of fences, so mark an interrupt
// Interrupt the current threadThread.currentThread().interrupt(); }}// When any thread breaks, the breakBarrier method is called
// Will wake up other threads, which will also throw an exception when they wake up
if (g.broken)
throw new BrokenBarrierException();
// does not equal the current generation
// g ! = generation Returns the subscript of the current thread's fence
// If g == generation, there is no generation yet.
// Since one thread can use more than one fence, when another fence wakes the thread, it will go here, so it needs to check if it is the current generation.
// It is for this reason that generation is needed to ensure correctness.
if(g ! = generation)// The number of threads waiting to enter the fence
return index;
// The wait time is set and the wait time is less than 0
// await(long timeout, TimeUnit unit)
// Fence broken, wake up all await threads
if (timed && nanos <= 0L) {
// Damage the fence
breakBarrier();
// Throw an exception
throw newTimeoutException(); }}}finally {
/ / releases the locklock.unlock(); }}Copy the code
We may need to pay attention to the Generation object. In the above code we always see a BrokenBarrierException thrown. When is the exception thrown? BrokenBarrierException is thrown if another thread calls reset() while a thread is waiting, or if the barrier is already broken. Meanwhile, if any thread is interrupted while waiting, all other threads throw a BrokenBarrierException and place the Barrier in a broken state.
At the same time, Generation describes the updating of CyclicBarrier. In a CyclicBarrier, the same set of threads belong to the same generation. When parties threads reach the barrier, the generation is updated. Broken indicates whether the current CyclicBarrier has been interrupted.
2.3.4 breakBarrier
When a barrier is broken or a thread breaks, all threads are terminated by breakBarrier().
In addition to setting broken to true in breakBarrier(), signalAll is called to wake up all threads waiting on CyclicBarrier.
private void breakBarrier(a) {
// Set the state
generation.broken = true;
// Restore the number of threads waiting to enter the fence
count = parties;
// Wake up all threads
trip.signalAll();
}
Copy the code
2.3.5 nextGeneration
When all threads have reached the barrier (index == 0), an update swap is performed via nextGeneration(), which does three things: wake up all threads, reset count, and generation
private void nextGeneration(a) {
// signal completion of last generation
// Wake up all threads
trip.signalAll();
// set up next generation
// Reset the number of threads waiting to enter the barrier
count = parties;
// The new generation
generation = new Generation();
}
Copy the code
In addition to the CyclicBarrier updates and damage states mentioned above, we should also pay attention to the following points when using cyclicbarriers:
- CyclicBarrier uses an exclusive lock to execute await methods and concurrency may not be very high
- If the thread is interrupted while waiting, an exception is thrown. But if the interrupted thread’s CyclicBarrier is not of this generation, for example, the “generation” object was updated after the last time the thread executed signalAll. In this interval, the thread is interrupted, and the JDK considers the task finished and doesn’t care about the interruption, just to mark it. This portion of the source code is annotated in the dowait(Boolean, Long) method.
- If the thread is woken up by another CyclicBarrier, then g must be equal to generation, and the event cannot return, but continues the loop blocking. Conversely, if the current CyclicBarrier is awakened, the subscript of the thread within the CyclicBarrier is returned. Completed a dash through the fence. This portion of the source code is annotated in the dowait(Boolean, Long) method.
3 Semaphore
3.1 Function Description
A semaphore can control the number of threads accessing at the same time, acquire a license through acquire, wait if there is no license, release a license through release. It’s kind of like limiting the flow. For example, there are only five parking Spaces in a shopping mall, and only one car can be parked in each parking space. If 10 cars arrive at this time, they have to wait for an empty parking space in front of them to enter.
3.2 Simple Cases
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(new ParkCar(), "Vehicle" + i).start();
TimeUnit.MILLISECONDS.sleep(100); }}static class ParkCar implements Runnable {
@Override
public void run(a) {
try {
// Applying for a permit is similar to checking for parking space
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "Stop the car.");
// Simulate the stop for 10s
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Stop the car");
// A release permit is similar to a car leaving a parking spacesemaphore.release(); }}}Copy the code
Output result: After listening to 5 cars at a time, when the car behind arrives, there is no empty seat in the parking lot. We need to wait for the car in front to leave the parking lot. When one car leaves, another car can join the car behind.
There are only 5 parking Spaces in total. Who should be given to in the second batch after the first batch of vehicles are used up and released?
All the waiting vehicles want to get permission first, first pass, what to do. And that’s where the lock comes in. Everybody grabs it. Whoever gets it first, stops first.
vehicle0Start stop vehicle1Start stop vehicle2Start stop vehicle3Start stop vehicle4Start stop vehicle0Stop vehicle5Start stop vehicle1Stop vehicle6Start stop vehicle2Stop vehicle7Start stop vehicle3Stop vehicle8Start stop vehicle4Stop vehicle9Start stop vehicle5Stop vehicle6Stop vehicle7Stop vehicle8Stop vehicle9The end of the parkingCopy the code
3.3 Source Code Analysis
3.3.1 UML class diagram
Its underlying implementation is an AQs-based shared lock.
3.3.2 rainfall distribution on 10-12 initialization
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
Copy the code
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code
Semaphore implements unfair locks by default, but we can also specify the type of lock and whether it is a fair lock.
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
protected final void setState(int newState) {
state = newState;
}
Copy the code
We can see that the NonfairSync class inherits Sync, which in turn inherits AQS. Semaphore is implemented based on AQS. AQS synchronization status value state Indicates the number of storage licenses
The following code uses an unfair lock as an example
3.3.3 acquire
public void acquire(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// Throw an exception if the thread breaks
if (Thread.interrupted())
throw new InterruptedException();
// Apply for a shared lock
if (tryAcquireShared(arg) < 0)
// Failed to apply for the shared lock
doAcquireSharedInterruptibly(arg);
}
Copy the code
3.3.4 tryAcquireShared
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
Copy the code
final int nonfairTryAcquireShared(int acquires) {
/ / spin
for (;;) {
// Get the license number
int available = getState();
// Number of remaining licenses
int remaining = available - acquires;
If the number of remaining permissions is greater than 0, the current thread can execute. The CAS modified the number of permissions
// If the number of remaining permissions is less than 0, join the synchronization queue and wait.
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}Copy the code
3.3.5 doAcquireSharedInterruptibly
When the number of permissions is used up, join the synchronization queue. Waiting for clearance to be released.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
Previous analysis ReentrantReadWriteLock ReadLock analyzed in detail.
3.3.6 release
public void release(a) {
sync.releaseShared(1);
}
Copy the code
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
Here sync calls the AQS method releaseShared, in which the doReleaseShared method is called if the release is successful. This method was explained in the PREVIOUS AQS Shared mode article and is not covered in detail here. Its main function is to release the nodes in the queue.
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// Get the current license number
int current = getState();
// The number of permissions after release
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// Update the new license number
if (compareAndSetState(current, next))
return true; }}Copy the code
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code