The problem
(1) What is a Phaser?
(2) What are the features of phasers?
(3) The advantages of Phaser over CyclicBarrier and CountDownLatch?
Introduction to the
Phaser, which translates as phase, is suitable for a scenario where a large task can be completed in multiple phases, and the tasks in each phase can be executed concurrently by multiple threads. However, the tasks in the previous phase must be completed before the next phase can be executed.
This scenario, while possible with CyclicBarrier or CountryDownLatch, is much more complex. First, the exact number of phases may change, and second, the number of tasks per phase may also change. Phaser is more flexible and convenient than CyclicBarrier and CountDownLatch.
Method of use
Let’s look at the simplest use case:
public class PhaserTest {
public static final int PARTIES = 3;
public static final int PHASES = 4;
public static void main(String[] args) {
Phaser phaser = new Phaser(PARTIES) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// this article is by the public account "Tong Elder brother read source code" original, please support the original, thank you!
System.out.println("=======phase: " + phase + " finished=============");
return super.onAdvance(phase, registeredParties); }};for (int i = 0; i < PARTIES; i++) {
new Thread(()->{
for (int j = 0; j < PHASES; j++) {
System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j)); phaser.arriveAndAwaitAdvance(); }},"Thread "+ i).start(); }}}Copy the code
Here we define a large task to be completed in four stages, and each stage requires three small tasks. For these small tasks, we set up three threads respectively to execute these small tasks, and check the output as follows:
Thread 0: phase: 0
Thread 2: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 0: phase: 1
Thread 1: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 0: phase: 2
Thread 2: phase: 2
=======phase: 2 finished=============
Thread 0: phase: 3
Thread 2: phase: 3
Thread 1: phase: 3
=======phase: 3 finished=============
Copy the code
As you can see, each phase is completed by three threads before moving on to the next phase. How does this work? Let’s learn.
The principle of speculation
Based on the principles of AQS we learned earlier, take a rough guess at how Phaser implementation works.
First, we need to store the current phase phase, the number of tasks (participants) parties in the current phase, and the number of unfinished participants. We can store these three variables in a variable state.
Second, you need a queue to store the participants who finish first and wake up the participants in the queue when the last participant finishes.
Well, that’s about it.
In combination with the above cases:
At the beginning, the current stage is 0, the number of participants is 3, and the number of uncompleted participants is 3.
The first thread execution to phaser. ArriveAndAwaitAdvance (); Enter the queue;
The second thread execution to phaser. ArriveAndAwaitAdvance (); Enter the queue;
A third thread execution to phaser. ArriveAndAwaitAdvance (); Execute the summation onAdvance() for this phase, and wake up the first two threads to continue the next phase.
Well, the whole can make sense, as for whether it is so, let’s look at the source code.
Source code analysis
Main inner class
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L; thread = Thread.currentThread(); }}Copy the code
The first participant is put into the queue. We only need to focus on the thread and next properties. This is clearly a single linked list, which stores the queued threads.
The main properties
// State variable, used to store the current phase phase, number of participants parties, number of unarrived_count
private volatile long state;
// The maximum number of participants, i.e., the maximum number of tasks per stage
private static final int MAX_PARTIES = 0xffff;
// The maximum number of phases
private static final int MAX_PHASE = Integer.MAX_VALUE;
// The offset of the number of participants
private static final int PARTIES_SHIFT = 16;
// The offset of the current phase
private static final int PHASE_SHIFT = 32;
// Mask of incomplete number of participants, 16 bits lower
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
// Number of participants (middle 16 digits)
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
/ / counts of mask, counts is equal to the number of participants and the number of participants' | 'unfinished operations
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// One participant at a time
private static final int ONE_ARRIVAL = 1;
// Used when adding or removing participants
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
// reduce the number of participants
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
// Used when there are no participants
private static final int EMPTY = 1;
// To find the number of incomplete participants
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
// Count the number of participants (middle 16 bits), note the position of int
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
// The number of phases (high 32 bits) is used to calculate the number of phases (high 32 bits)
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
// Number of completed participants
private static int arrivedOf(long s) {
int counts = (int)s; / / low 32-bit
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
// Used to store the thread of the completed participant, with different queues selected according to the parity of the current phase
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
Copy the code
The main attributes are state and evenQ and oddQ:
(1) State, state variable, high 32 bits storage current phase, middle 16 bits storage number of participants, low 16 bits storage number of incomplete participants. ;
(2) evenQ and oddQ, the queues stored by completed participants. When the last participant completes the task, the participants in the queue are awakened to continue to perform the task at the next stage or finish the task.
A constructor
public Phaser(a) {
this(null.0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties) {
if(parties >>> PARTIES_SHIFT ! =0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if(parent ! =null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if(parties ! =0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
// The state variable is stored in three segments
this.state = (parties == 0)? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
Copy the code
The constructor also contains parent and root, which are used to construct multilevel phases and are beyond the scope of this article.
The focus is on how state is assigned, with the high 32 bits storing the current phase, the middle 16 bits storing the number of participants, and the low 16 bits storing the number of unfinished participants.
Let’s take a look at the source of several main methods:
The register () method
Registers an actor that waits for the onAdvance() method to finish executing if it is called while it is in progress.
public int register(a) {
return doRegister(1);
}
private int doRegister(int registrations) {
// state specifies the value to be added to both parties
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
/ / the value of the state
long s = (parent == null)? state : reconcileState();// The lower 32 bits of state, which are values for parties and unarrived
int counts = (int)s;
/ / the value of the parties
int parties = counts >>> PARTIES_SHIFT;
/ / unarrived values
int unarrived = counts & UNARRIVED_MASK;
// Check for overflow
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
// Current phase
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
// Not the first participant
if(counts ! = EMPTY) {// not 1st registration
if (parent == null || reconcileState() == s) {
// unarrived = 0 indicates that the onAdvance() method is being executed in the current phase and is waiting for its execution to complete
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
// Otherwise change the value of state, increase adjust, and jump out of the loop if successful
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break; }}// Is the first participant
else if (parent == null) { // 1st root registration
// Calculate the value of state
long next = ((long)phase << PHASE_SHIFT) | adjust;
// Change the value of state and break out of the loop if successful
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
// Multilevel phase processing
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while(! UNSAFE.compareAndSwapLong (this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break; }}}}return phase;
}
// Wait for the onAdvance() method to complete
// The principle is to spin a certain number of times, if you go to the next stage, this method directly returns,
// The current thread is queued to wake up after onAdvance() completes if it does not move to the next phase after a certain number of spins
private int internalAwaitAdvance(int phase, QNode node) {
// Make sure the queue is empty
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
// Number of spins
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
// Check whether the current phase changes, if the change indicates that the next phase, then there is no need to spin
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
// If node is empty, it will be passed empty during registration
if (node == null) { // spinning in noninterruptible mode
// Number of participants not completed
int unarrived = (int)s & UNARRIVED_MASK;
// Unarrived changes, increasing the number of spins
if(unarrived ! = lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();
// Create a new node
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false.false.0L); node.wasInterrupted = interrupted; }}else if (node.isReleasable()) // done or aborted
break;
else if(! queued) {// push onto queue
// The node is queued
AtomicReference<QNode> head = (phase & 1) = =0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
// The current thread is blocked, waiting to be woken up, just like calling locksupport.park ()
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true; }}}// The thread on which the node is located has been awakened
if(node ! =null) {
// Empty the thread in the node
if(node.thread ! =null)
node.thread = null; // avoid need for unpark()
if(node.wasInterrupted && ! node.interruptible) Thread.currentThread().interrupt();if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
// Wake up the thread that is currently blocked
releaseWaiters(phase);
return p;
}
Copy the code
The overall logic of adding a participant is:
(1) To add a participant, two values of parties and UNarrived should be added at the same time, that is, the middle 16 bits and the lower 16 bits of state;
(2) If it is the first participant, try to update the value of state atomically, and exit if successful;
(3) If it is not the first participant, check whether onAdvance() is executing, if it is waiting for onAdvance() to complete, if not, try atomic update state until exit successfully;
(4) Waiting for onAdvance() to complete is to wait by spinning first and then entering the queue to reduce the thread context switch;
ArriveAndAwaitAdvance () method
The current thread completes the current phase and waits for other threads to complete the current phase.
If the current thread is the last to arrive in the phase, the current thread executes the onAdvance() method and wakes the other threads to the next phase.
public int arriveAndAwaitAdvance(a) {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
/ / the value of the state
long s = (root == this)? state : reconcileState();// Current phase
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
// values for parties and unarrived
int counts = (int)s;
// The value of unarrived (lower 16 bits of state)
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// Change the value of state
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
// If it is not the last to arrive, call internalAwaitAdvance() to spin or queue
if (unarrived > 1)
InternalAwaitAdvance () = register()
return root.internalAwaitAdvance(phase, null);
// This is the last participant to arrive
if(root ! =this)
return parent.arriveAndAwaitAdvance();
// n only preserves the parties part of state, i.e. the middle 16 bits
long n = s & PARTIES_MASK; // base of next state
// The value of parties, the number of participants that need to arrive at the next time
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
// Execute the onAdvance() method and return true to indicate that the number of participants in the next phase is zero
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
// n plus the value unarrived
n |= nextUnarrived;
// The next phase waits for the current phase to increment by 1
int nextPhase = (phase + 1) & MAX_PHASE;
// n plus the value of the next phase
n |= (long)nextPhase << PHASE_SHIFT;
// Change the state value to n
if(! UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
// Wake up other participants and proceed to the next stage
releaseWaiters(phase);
// Returns the value of the next stage
returnnextPhase; }}}Copy the code
The rough logic of arriveAndAwaitAdvance is:
(1) Modify the value of the unarrived part in state to decrease by 1;
(2) If it is not the last one to arrive, call internalAwaitAdvance() to spin or queue;
(3) If it is the last one to arrive, call onAdvance(), then change the value of state to the corresponding value of the next stage, and wake up the other waiting threads;
(4) Return the value of the next stage;
conclusion
(1) Phaser is suitable for multi-stage, multi-task scenarios, where tasks in each stage can be carefully controlled;
(2) The Phaser internally implements the entire logic using state variables and queues. ;
(3) The high 32 bits of state store the current phase phase, the 16 bits store the number of participants (tasks) in the current phase parties, and the low 16 bits store the number of unfinished participants unarrived;
(4) Different queues will be selected according to the parity of the current stage;
(5) When not the last participant arrives, it spins or enters the queue to wait for all participants to complete the task;
(6) When the last participant completes the task, it wakes up the thread in the queue and enters the next stage;
eggs
Phaser versus CyclicBarrier and CountDownLatch?
A: There are two main advantages:
(1) Phasers can complete multiple phases, whereas a CyclicBarrier or CountDownLatch typically controls only one or two phases;
(2) The number of tasks per phase of the Phaser can be controlled, whereas the number of tasks per CyclicBarrier or CountDownLatch cannot be modified once determined.
Recommended reading
The beginning of the Java Synchronization series
2, Unbroadening Java magic class parsing
JMM (Java Memory Model)
Volatile parsing of the Java Synchronization series
Synchronized parsing of Java series
6, Deadknock Java synchronization series write a Lock Lock yourself
7. AQS of The Java Synchronization series
ReentrantLock (a) — fair lock, unfair lock
ReentrantLock – Conditional lock
ReentrantLock VS Synchronized Java series
ReentrantReadWriteLock source code parsing
Semaphore source code analysis of Semaphore Java synchronization series
CountDownLatch source code parsing
The AQS finale of the Java Sync series
Java synchronization series StampedLock source code parsing
CyclicBarrier Java synchronization series source code parsing
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.