The problem

(1) What is a Phaser?

(2) What are the features of phasers?

(3) The advantages of Phaser over CyclicBarrier and CountDownLatch?

Introduction to the

Phaser, which translates as phase, is suitable for a scenario where a large task can be completed in multiple phases, and the tasks in each phase can be executed concurrently by multiple threads. However, the tasks in the previous phase must be completed before the next phase can be executed.

This scenario, while possible with CyclicBarrier or CountryDownLatch, is much more complex. First, the exact number of phases may change, and second, the number of tasks per phase may also change. Phaser is more flexible and convenient than CyclicBarrier and CountDownLatch.

Method of use

Let’s look at the simplest use case:

public class PhaserTest {

    public static final int PARTIES = 3;
    public static final int PHASES = 4;

    public static void main(String[] args) {

        Phaser phaser = new Phaser(PARTIES) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                // this article is by the public account "Tong Elder brother read source code" original, please support the original, thank you!
                System.out.println("=======phase: " + phase + " finished=============");
                return super.onAdvance(phase, registeredParties); }};for (int i = 0; i < PARTIES; i++) {
            new Thread(()->{
                for (int j = 0; j < PHASES; j++) {
                    System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j)); phaser.arriveAndAwaitAdvance(); }},"Thread "+ i).start(); }}}Copy the code

Here we define a large task to be completed in four stages, and each stage requires three small tasks. For these small tasks, we set up three threads respectively to execute these small tasks, and check the output as follows:

Thread 0: phase: 0
Thread 2: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 0: phase: 1
Thread 1: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 0: phase: 2
Thread 2: phase: 2
=======phase: 2 finished=============
Thread 0: phase: 3
Thread 2: phase: 3
Thread 1: phase: 3
=======phase: 3 finished=============
Copy the code

As you can see, each phase is completed by three threads before moving on to the next phase. How does this work? Let’s learn.

The principle of speculation

Based on the principles of AQS we learned earlier, take a rough guess at how Phaser implementation works.

First, we need to store the current phase phase, the number of tasks (participants) parties in the current phase, and the number of unfinished participants. We can store these three variables in a variable state.

Second, you need a queue to store the participants who finish first and wake up the participants in the queue when the last participant finishes.

Well, that’s about it.

In combination with the above cases:

At the beginning, the current stage is 0, the number of participants is 3, and the number of uncompleted participants is 3.

The first thread execution to phaser. ArriveAndAwaitAdvance (); Enter the queue;

The second thread execution to phaser. ArriveAndAwaitAdvance (); Enter the queue;

A third thread execution to phaser. ArriveAndAwaitAdvance (); Execute the summation onAdvance() for this phase, and wake up the first two threads to continue the next phase.

Well, the whole can make sense, as for whether it is so, let’s look at the source code.

Source code analysis

Main inner class

static final class QNode implements ForkJoinPool.ManagedBlocker {
    final Phaser phaser;
    final int phase;
    final boolean interruptible;
    final boolean timed;
    boolean wasInterrupted;
    long nanos;
    final long deadline;
    volatile Thread thread; // nulled to cancel wait
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L; thread = Thread.currentThread(); }}Copy the code

The first participant is put into the queue. We only need to focus on the thread and next properties. This is clearly a single linked list, which stores the queued threads.

The main properties

// State variable, used to store the current phase phase, number of participants parties, number of unarrived_count
private volatile long state;
// The maximum number of participants, i.e., the maximum number of tasks per stage
private static final int  MAX_PARTIES     = 0xffff;
// The maximum number of phases
private static final int  MAX_PHASE       = Integer.MAX_VALUE;
// The offset of the number of participants
private static final int  PARTIES_SHIFT   = 16;
// The offset of the current phase
private static final int  PHASE_SHIFT     = 32;
// Mask of incomplete number of participants, 16 bits lower
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
// Number of participants (middle 16 digits)
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
/ / counts of mask, counts is equal to the number of participants and the number of participants' | 'unfinished operations
private static final long COUNTS_MASK     = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;

// One participant at a time
private static final int  ONE_ARRIVAL     = 1;
// Used when adding or removing participants
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
// reduce the number of participants
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
// Used when there are no participants
private static final int  EMPTY           = 1;

// To find the number of incomplete participants
private static int unarrivedOf(long s) {
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
// Count the number of participants (middle 16 bits), note the position of int
private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}
// The number of phases (high 32 bits) is used to calculate the number of phases (high 32 bits)
private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}
// Number of completed participants
private static int arrivedOf(long s) {
    int counts = (int)s; / / low 32-bit
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
// Used to store the thread of the completed participant, with different queues selected according to the parity of the current phase
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
Copy the code

The main attributes are state and evenQ and oddQ:

(1) State, state variable, high 32 bits storage current phase, middle 16 bits storage number of participants, low 16 bits storage number of incomplete participants. ;

(2) evenQ and oddQ, the queues stored by completed participants. When the last participant completes the task, the participants in the queue are awakened to continue to perform the task at the next stage or finish the task.

A constructor

public Phaser(a) {
    this(null.0);
}

public Phaser(int parties) {
    this(null, parties);
}

public Phaser(Phaser parent) {
    this(parent, 0);
}

public Phaser(Phaser parent, int parties) {
    if(parties >>> PARTIES_SHIFT ! =0)
        throw new IllegalArgumentException("Illegal number of parties");
    int phase = 0;
    this.parent = parent;
    if(parent ! =null) {
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if(parties ! =0)
            phase = parent.doRegister(1);
    }
    else {
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();
    }
    // The state variable is stored in three segments
    this.state = (parties == 0)? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}
Copy the code

The constructor also contains parent and root, which are used to construct multilevel phases and are beyond the scope of this article.

The focus is on how state is assigned, with the high 32 bits storing the current phase, the middle 16 bits storing the number of participants, and the low 16 bits storing the number of unfinished participants.

Let’s take a look at the source of several main methods:

The register () method

Registers an actor that waits for the onAdvance() method to finish executing if it is called while it is in progress.

public int register(a) {
    return doRegister(1);
}
private int doRegister(int registrations) {
    // state specifies the value to be added to both parties
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (;;) {
        / / the value of the state
        long s = (parent == null)? state : reconcileState();// The lower 32 bits of state, which are values for parties and unarrived
        int counts = (int)s;
        / / the value of the parties
        int parties = counts >>> PARTIES_SHIFT;
        / / unarrived values
        int unarrived = counts & UNARRIVED_MASK;
        // Check for overflow
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        // Current phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            break;
        // Not the first participant
        if(counts ! = EMPTY) {// not 1st registration
            if (parent == null || reconcileState() == s) {
                // unarrived = 0 indicates that the onAdvance() method is being executed in the current phase and is waiting for its execution to complete
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                // Otherwise change the value of state, increase adjust, and jump out of the loop if successful
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    break; }}// Is the first participant
        else if (parent == null) {              // 1st root registration
            // Calculate the value of state
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            // Change the value of state and break out of the loop if successful
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                break;
        }
        else {
            // Multilevel phase processing
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".
                    while(! UNSAFE.compareAndSwapLong (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break; }}}}return phase;
}
// Wait for the onAdvance() method to complete
// The principle is to spin a certain number of times, if you go to the next stage, this method directly returns,
// The current thread is queued to wake up after onAdvance() completes if it does not move to the next phase after a certain number of spins
private int internalAwaitAdvance(int phase, QNode node) {
    // Make sure the queue is empty
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    // Number of spins
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    // Check whether the current phase changes, if the change indicates that the next phase, then there is no need to spin
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        // If node is empty, it will be passed empty during registration
        if (node == null) {           // spinning in noninterruptible mode
            // Number of participants not completed
            int unarrived = (int)s & UNARRIVED_MASK;
            // Unarrived changes, increasing the number of spins
            if(unarrived ! = lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();
            // Create a new node
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false.false.0L); node.wasInterrupted = interrupted; }}else if (node.isReleasable()) // done or aborted
            break;
        else if(! queued) {// push onto queue
            // The node is queued
            AtomicReference<QNode> head = (phase & 1) = =0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                // The current thread is blocked, waiting to be woken up, just like calling locksupport.park ()
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true; }}}// The thread on which the node is located has been awakened
    if(node ! =null) {
        // Empty the thread in the node
        if(node.thread ! =null)
            node.thread = null;       // avoid need for unpark()
        if(node.wasInterrupted && ! node.interruptible) Thread.currentThread().interrupt();if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    // Wake up the thread that is currently blocked
    releaseWaiters(phase);
    return p;
}
Copy the code

The overall logic of adding a participant is:

(1) To add a participant, two values of parties and UNarrived should be added at the same time, that is, the middle 16 bits and the lower 16 bits of state;

(2) If it is the first participant, try to update the value of state atomically, and exit if successful;

(3) If it is not the first participant, check whether onAdvance() is executing, if it is waiting for onAdvance() to complete, if not, try atomic update state until exit successfully;

(4) Waiting for onAdvance() to complete is to wait by spinning first and then entering the queue to reduce the thread context switch;

ArriveAndAwaitAdvance () method

The current thread completes the current phase and waits for other threads to complete the current phase.

If the current thread is the last to arrive in the phase, the current thread executes the onAdvance() method and wakes the other threads to the next phase.

public int arriveAndAwaitAdvance(a) {
    // Specialization of doArrive+awaitAdvance eliminating some reads/paths
    final Phaser root = this.root;
    for (;;) {
        / / the value of the state
        long s = (root == this)? state : reconcileState();// Current phase
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        // values for parties and unarrived
        int counts = (int)s;
        // The value of unarrived (lower 16 bits of state)
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            throw new IllegalStateException(badArrive(s));
        // Change the value of state
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
            // If it is not the last to arrive, call internalAwaitAdvance() to spin or queue
            if (unarrived > 1)
                InternalAwaitAdvance () = register()
                return root.internalAwaitAdvance(phase, null);
            
            // This is the last participant to arrive
            if(root ! =this)
                return parent.arriveAndAwaitAdvance();
            // n only preserves the parties part of state, i.e. the middle 16 bits
            long n = s & PARTIES_MASK;  // base of next state
            // The value of parties, the number of participants that need to arrive at the next time
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            // Execute the onAdvance() method and return true to indicate that the number of participants in the next phase is zero
            if (onAdvance(phase, nextUnarrived))
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                n |= EMPTY;
            else
                // n plus the value unarrived
                n |= nextUnarrived;
            // The next phase waits for the current phase to increment by 1
            int nextPhase = (phase + 1) & MAX_PHASE;
            // n plus the value of the next phase
            n |= (long)nextPhase << PHASE_SHIFT;
            // Change the state value to n
            if(! UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                return (int)(state >>> PHASE_SHIFT); // terminated
            // Wake up other participants and proceed to the next stage
            releaseWaiters(phase);
            // Returns the value of the next stage
            returnnextPhase; }}}Copy the code

The rough logic of arriveAndAwaitAdvance is:

(1) Modify the value of the unarrived part in state to decrease by 1;

(2) If it is not the last one to arrive, call internalAwaitAdvance() to spin or queue;

(3) If it is the last one to arrive, call onAdvance(), then change the value of state to the corresponding value of the next stage, and wake up the other waiting threads;

(4) Return the value of the next stage;

conclusion

(1) Phaser is suitable for multi-stage, multi-task scenarios, where tasks in each stage can be carefully controlled;

(2) The Phaser internally implements the entire logic using state variables and queues. ;

(3) The high 32 bits of state store the current phase phase, the 16 bits store the number of participants (tasks) in the current phase parties, and the low 16 bits store the number of unfinished participants unarrived;

(4) Different queues will be selected according to the parity of the current stage;

(5) When not the last participant arrives, it spins or enters the queue to wait for all participants to complete the task;

(6) When the last participant completes the task, it wakes up the thread in the queue and enters the next stage;

eggs

Phaser versus CyclicBarrier and CountDownLatch?

A: There are two main advantages:

(1) Phasers can complete multiple phases, whereas a CyclicBarrier or CountDownLatch typically controls only one or two phases;

(2) The number of tasks per phase of the Phaser can be controlled, whereas the number of tasks per CyclicBarrier or CountDownLatch cannot be modified once determined.

Recommended reading

The beginning of the Java Synchronization series

2, Unbroadening Java magic class parsing

JMM (Java Memory Model)

Volatile parsing of the Java Synchronization series

Synchronized parsing of Java series

6, Deadknock Java synchronization series write a Lock Lock yourself

7. AQS of The Java Synchronization series

ReentrantLock (a) — fair lock, unfair lock

ReentrantLock – Conditional lock

ReentrantLock VS Synchronized Java series

ReentrantReadWriteLock source code parsing

Semaphore source code analysis of Semaphore Java synchronization series

CountDownLatch source code parsing

The AQS finale of the Java Sync series

Java synchronization series StampedLock source code parsing

CyclicBarrier Java synchronization series source code parsing


Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.