“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge”

I am Chen PI, an ITer of Internet Coding. I search “Chen PI’s JavaLib” on wechat and read the latest articles as soon as possible, reply to [information], and then I can get the technical materials, electronic books, interview materials of first-line big factories and excellent resume templates carefully arranged by me.

Phaser

Phaser, released in JDK1.7, is a reusable synchronization barrier similar to CyclicBarrier and CountDownLatch, but it supports more flexible usage.

Let me briefly explain what this class does. Assuming that there is a big project, can be divided into several stages, each stage has more than one person to participate in, and each stage need to participate in all complete this stage, is allowed to enter the next phase, and then all the people continued to do the next stage, until all stages are completed, of course this way everyone can quit at any time, The whole project can also be terminated midway.

For example, one day Chen PI invited Xiao Mei and Xiao Xue went to his house for dinner. This can be divided into three stages: the first stage is to buy food materials in the supermarket, the second stage is to cook vegetables, and the third stage is to eat. It is assumed that after each stage is completed, the next stage can be continued.

The Phaser class is first defined, and Phaser overrides the onAdvance method to perform different operations on each phase.

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionEating stage device *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class DiningPhaser extends Phaser {

  /** * This method is executed when each phase is reached, after which phase automatically increments by 1, representing the next phase **@paramPhase stands for what phase, starting from 0 *@paramRegisteredParties register tasks *@returnWhether to terminate */
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    switch (phase) {
      case 0:
        System.out.println("Stage one, grocery shopping done! Total number of participants: + registeredParties);
        return false;
      case 1:
        System.out.println("Stage two, the cooking is done! Total number of participants: + registeredParties);
        return false;
      case 2:
        System.out.println("Stage three, dinner over! Total number of participants: + registeredParties);
        return false;
      default:
        return true; }}}Copy the code

Next, define the tasks involved and write what each task needs to do at each stage.

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionTask, represents tangerine peel *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class ChenPiTask implements Runnable {

  private Phaser phaser;

  public ChenPiTask(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run(a) {
    System.out.println(Thread.currentThread().getName() + "Bought pork...");
    // When the first stage is done, wait for others to finish before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + "The pork is cooked...");
    // The second stage of the work is done, waiting for others to complete before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + " 吃饱了...");
    // When the third stage is done, wait for others to finish before moving on to the next stagephaser.arriveAndAwaitAdvance(); }}Copy the code
package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionMission, representing Mei *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class XiaoMeiTask implements Runnable {

  private Phaser phaser;

  public XiaoMeiTask(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run(a) {
    System.out.println(Thread.currentThread().getName() + "Bought cabbage...");
    // When the first stage is done, wait for others to finish before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + "The cabbage is cooked...");
    // The second stage of the work is done, waiting for others to complete before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + " 吃饱了...");
    // When the third stage is done, wait for others to finish before moving on to the next stagephaser.arriveAndAwaitAdvance(); }}Copy the code
package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionTask, representing light snow *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class XiaoXueTask implements Runnable {

  private Phaser phaser;

  public XiaoXueTask(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run(a) {
    System.out.println(Thread.currentThread().getName() + "Bought abalone...");
    // When the first stage is done, wait for others to finish before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + "Sauteed abalone...");
    // The second stage of the work is done, waiting for others to complete before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + " 吃饱了...");
    // When the third stage is done, wait for others to finish before moving on to the next stagephaser.arriveAndAwaitAdvance(); }}Copy the code

Finally, write test class, test verification.

package com.chenpi;

/ * * *@Description
 * @AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class ChenPiMain {

  public static void main(String[] args) {

    // Register 3 tasks (people)
    DiningPhaser diningPhaser = new DiningPhaser();
    diningPhaser.bulkRegister(3);

    // Three people began to work at the same time
    Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
    thread1.setName("Orange");
    thread1.start();

    Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
    thread2.setName("Little beauty");
    thread2.start();

    Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
    thread3.setName("Light snow"); thread3.start(); }}Copy the code

Finally, the service is started and the following output is displayed.

Tangerine peel bought pork... Amy bought the cabbage... Snow bought abalone... Stage one, the grocery shopping is done... Xiao Xue cooked abalone... Tangerine peel sauteed pork... Amy cooked the cabbage... Stage two, the cooking is done... Mei is full... Full of tangerine peel... Xiao Xue is full... Stage three, after dinner...Copy the code

Phaser,

You’ve seen what Phaser does from this simple example. It does more than that.

The number of registered tasks can be dynamically adjusted (the maximum number of registered tasks is 65535). Tasks can be registered at any time (register and bulkRegister methods, or register an initial number of tasks as constructors) or unregistered tasks can be registered at any time of arrival (arriveAndDeregister methods).

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionTask, representing light snow *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class XiaoXueTask implements Runnable {

  private Phaser phaser;

  public XiaoXueTask(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run(a) {
    System.out.println(Thread.currentThread().getName() + "Bought abalone...");
    // When the first stage is done, wait for others to finish before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + "Sauteed abalone...");
    // The second stage of the work is finished, the light snow has left in advance
    phaser.arriveAndDeregister();
    System.out.println(Thread.currentThread().getName() + "Yeah, I got to go."); }}Copy the code

We modify the task of snow, she finished the second stage of the things have to leave in advance, that is, cancel the task. The results are as follows:

Tangerine peel bought pork... Amy bought the cabbage... Snow bought abalone... Stage one, the grocery shopping is complete! Total number of participants: 3 Light snow fried abalone... Light snow has gone beforehand tangerine peel fried pork... Amy cooked the cabbage... The second stage, the cooking is done! Total number of participants: 2 Mei is full... Full of tangerine peel... Stage three, dinner is over! Total number of participants: 2Copy the code

Registration and deregistration only affect internal counts. Specific registration tasks are not recorded internally. Therefore, you cannot query whether a task has been registered. But we can write subclasses of Phaser to accomplish the specific task of recording registration.

package com.chenpi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;

/ * * *@DescriptionEating stage device *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class DiningPhaser extends Phaser {

  // Record the registered task
  private List<Runnable> registeredTask = new ArrayList<>();

  public int register(Runnable task) {
    registeredTask.add(task);
    return super.register();
  }

  public List<Runnable> getRegisteredTask(a) {
    return registeredTask;
  }

  /** * This method is executed when each phase is reached, after which phase automatically increments by 1, representing the next phase **@paramPhase stands for what phase, starting from 0 *@paramRegisteredParties register tasks *@returnWhether to terminate */
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    switch (phase) {
      case 0:
        System.out.println("Stage one, grocery shopping done! Total number of participants: + registeredParties);
        return false;
      case 1:
        System.out.println("Stage two, the cooking is done! Total number of participants: + registeredParties);
        return false;
      case 2:
        System.out.println("Stage three, dinner over! Total number of participants: + registeredParties);
        return false;
      default:
        return true; }}}Copy the code

In the test class, each task is registered and recorded separately.

package com.chenpi;

/ * * *@Description
 * @AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class ChenPiMain {

  public static void main(String[] args) {

    // Register 3 tasks (people)
    DiningPhaser diningPhaser = new DiningPhaser();

    // Three people began to work at the same time
    Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
    thread1.setName("Orange");
    diningPhaser.register(thread1);
    thread1.start();

    Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
    thread2.setName("Little beauty");
    diningPhaser.register(thread2);
    thread2.start();

    Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
    thread3.setName("Light snow");
    diningPhaser.register(thread3);
    thread3.start();

    System.out.println("Task of registration:"+ diningPhaser.getRegisteredTask()); }}Copy the code

Start the service, which prints the tasks registered in the stager.

Tangerine peel bought pork... [Thread[tangerine,5,main], Thread[tangerine,5,main], Thread[Tangerine,5,main], Thread[tangerine,5,main] Snow bought abalone... Stage one, the grocery shopping is complete! Total number of participants: 3 Light snow fried abalone... Tangerine peel sauteed pork... Amy cooked the cabbage... The second stage, the cooking is done! Total number of participants: 3 Tangerine peel full... Xiao Xue is full... Mei is full... Stage three, dinner is over! Total number of participants: 3Copy the code

For synchronous properties, like a CyclicBarrier, phasers can wait repeatedly. The arriveAndAwaitAdvance method of Phaser acts like the await method of CyclicBarrier.

Each Phaser object is associated with a number of phases. This number starts at 0 and increments once all registered tasks have reached each stage. In particular, if this number reaches integer. MAX_VALUE it will revert back to 0.

The Arrive and arriveAndDeregister methods record arrivals. These two methods do not block and return the number of stages that have been reached.

The Arrive method indicates that the current task has reached a stage, but does not wait for other tasks to reach that stage. The arriveAndDeregister method indicates that the current task has reached a certain stage and the task is deregistered.

In each phase, when all the tasks have arrived, the onAdvance method is executed by the task that arrived in the last trigger phase and then proceeds to the next phase. The onAdvance method controls the termination of a Phaser. If our Phaser object is a subclass of Phaser, we can override the onAdvance method, which will be called when each phase arrives to do what we want at each phase.

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionEating stage device *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class DiningPhaser extends Phaser {

  /** * This method is executed when each phase is reached, after which phase automatically increments by 1, representing the next phase **@paramPhase stands for what phase, starting from 0 *@paramRegisteredParties register tasks *@returnWhether to terminate */
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println(Thread.currentThread().getName() + "Called the onAdvance method.");
    switch (phase) {
      case 0:
        System.out.println("Stage one, grocery shopping done! Total number of participants: + registeredParties);
        return false;
      case 1:
        System.out.println("Stage two, the cooking is done! Total number of participants: + registeredParties);
        return false;
      case 2:
        System.out.println("Stage three, dinner over! Total number of participants: + registeredParties);
        return false;
      default:
        return true; }}}Copy the code

We print the current thread in the onAdvance method, and the result shows that the task that is actually the last trigger phase to arrive executes the onAdvance method as follows:

Tangerine peel bought pork... Amy bought the cabbage... Snow bought abalone... Yuki calls the onAdvance method in the first stage, buy ingredients complete! Total number of participants: 3 Light snow fried abalone... Tangerine peel sauteed pork... Amy cooked the cabbage... Amy calls the onAdvance method stage 2, and the cooking is done! Total number of participants: 3 Mei is full... Full of tangerine peel... Xiao Xue is full... Yuki called onAdvance method stage 3, finished eating! Total number of participants: 3Copy the code

Phasers can be terminated at any time. When terminated, all synchronous methods (such as arriveAndAwaitAdvance) return immediately without blocking wait and return a negative number. Similarly, tasks cannot be registered after being terminated. The isTerminated method determines whether it has terminated.

Phaser can terminate by returning true in the onAdvance method. For example, our subclasses, which inherit from Phaser, could override this method to terminate the Phaser by returning true when a certain stage is reached.

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionEating stage device *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class DiningPhaser extends Phaser {

  /** * This method is executed when each phase is reached, after which phase automatically increments by 1, representing the next phase **@paramPhase stands for what phase, starting from 0 *@paramRegisteredParties register tasks *@returnWhether to terminate */
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println(Thread.currentThread().getName() + "Called the onAdvance method.");
    switch (phase) {
      case 0:
        System.out.println("Stage one, grocery shopping done! Total number of participants: + registeredParties);
        return false;
      case 1:
        System.out.println("Stage two, the cooking is done! Total number of participants: + registeredParties);
        return false;
      case 2:
        System.out.println("Stage three, dinner over! Total number of participants: + registeredParties);
        return false;
      default:
        return true; }}}Copy the code

The default implementation of the onAdvance method is to return true when the registered task is logged out to 0.

protected boolean onAdvance(int phase, int registeredParties) {
    return registeredParties == 0;
}
Copy the code

Phaser’s forceTermination method can also forceTermination of the stager. Again, after the first stage of mei’s work, she thought it was too time-consuming to wait and wait, so she stopped the stage.

package com.chenpi;

import java.util.concurrent.Phaser;

/ * * *@DescriptionMission, representing Mei *@AuthorDried tangerine or orange peel *@Date 2021/7/4
 * @Version1.0 * /
public class XiaoMeiTask implements Runnable {

  private Phaser phaser;

  public XiaoMeiTask(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run(a) {
    System.out.println(Thread.currentThread().getName() + "Bought cabbage...");
    // When the first stage is done, wait for others to finish before moving on to the next stage
    phaser.arriveAndAwaitAdvance();

    System.out.println(Thread.currentThread().getName() + "The cabbage is cooked...");
    // Mei decided that waiting around was too much time, so she stopped the timer
    phaser.forceTermination();

    System.out.println(Thread.currentThread().getName() + " 吃饱了...");
    // When the third stage is done, wait for others to finish before moving on to the next stagephaser.arriveAndAwaitAdvance(); }}Copy the code

After the first stage, subsequent tasks are no longer executed in stages. The result is as follows:

Tangerine peel bought pork... Amy bought the cabbage... Snow bought abalone... Yuki calls the onAdvance method in the first stage, buy ingredients complete! Total number of participants: 3 Light snow fried abalone... Amy cooked the cabbage... Mei is full... Xiao Xue is full... Tangerine peel sauteed pork... Full of tangerine peel...Copy the code

Phaser supports hierarchies, and you can create a tree-structured Phaser with the constructors Phaser(Phaser parent) and Phaser(Phaser parent, int parties). This reduces the race caused by registering too many tasks on a Phaser, which increases throughput, with the disadvantage of increasing the overhead of a single operation.

You will notice that the arriveAndAwaitAdvance method does not throw InterruptedException and will not return even if the current thread is interrupted. Instead, the method continues to wait. So if you want to interrupt while waiting, or timeout, you can use the following methods:

// When the current phase is equal to the method's parameter phase, wait; Returns immediately if the value is not equal.
awaitAdvanceInterruptibly(int phase) 
// When the current phase is equal to the method's parameter phase, wait; Returns immediately if the value is not equal. A timeout throws a timeout exception
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)  
Copy the code

Here are some other common methods:

// Get the current number of arrived tasks
int getArrivedParties(a)

// Get the superior Phaser
Phaser getParent(a)
    
// Get the current phase
final int getPhase(a)
    
// Get the number of tasks currently registered
int getRegisteredParties(a)
    
// Get the current number of unarrived tasks
int getUnarrivedParties(a)
    
// Determine whether the current stager is terminated
boolean isTerminated(a)
Copy the code

Doug Lea, a great god

Doug Lea really likes to use different bits of an integer to represent various states, number of things, etc. Then all kinds of bit operations SAO operation, have to admire. For example, the Phaser class uses a single variable of type long to indicate the number of tasks that did not reach the barrier, the number of registered tasks, the number of phases, and the terminating state of the Phaser. If we were to represent this information, we would probably define four variables.

/** * * unarrived -- the number of parties yet to hit barrier (bits 0-15) * parties -- the number of parties to wait (bits 16-31) * phase -- the generation of the barrier (bits 32-62) * terminated -- set if barrier is terminated (bit 63 / sign) * */
private volatile long state;
Copy the code

And you’ll find many classes in JUC that define variable assignments using hexadecimal, displacement, and bit operations. Unsurprisingly, bitwise computing is extremely fast, which is probably why he likes to use bitwise.

private static final int  MAX_PARTIES     = 0xffff;
private static final int  MAX_PHASE       = Integer.MAX_VALUE;
private static final int  PARTIES_SHIFT   = 16;
private static final int  PHASE_SHIFT     = 32;
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK     = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;

// some special values
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
private static final int  EMPTY           = 1;

// The following unpacking methods are usually manually inlined

private static int unarrivedOf(long s) {
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}

private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}

private static int arrivedOf(long s) {
    int counts = (int)s;
    return (counts == EMPTY) ? 0 :
    (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
Copy the code

Here’s Doug Lea!