Phaser Concurrent Phaser

Phaser, introduced in JDK1.7, is a complex and powerful synchronization helper class that is a comprehensive upgrade to the synchronization utility classes CountDownLatch and CyclicBarrier and supports waiting scenarios in stages.

Recall that CountDownLatch specifies N threads, and the other threads wait until the N threads are finished, while CyclicBarrier specifies N threads. Such as N threads here everyone working at the same time (multiple travelers can meet to travel, go to need to wait for the later), and Phaser is a combination of both, can understand first specified N threads, such as N threads here began to do the first phase of the work, after the first stage all threads are finished work, then N threads started to do the second stage of the work, Until all phases have completed their work and the program is finished, it is important to note that each phase can add or remove threads as required by the business. It is not necessary to specify the number of threads per phase at the beginning.

An introduction to experience

The concept may not be easy to understand, so let’s start with a small demo

Public Class PhaserDemo1 {private static Random Random = new Random(System.currentTimemillis ()); public static void main(String[] args) { Phaser phaser = new Phaser(); // Register the thread with phaser phaser.register(); for (int i = 0; i <5 ; i++) { Task task = new Task(phaser); task.start(); } phaser.arriveAndAwaitAdvance(); System.out.println("all task execute close"); } static class Task extends Thread{ Phaser phaser; public Task(Phaser phaser){ this.phaser = phaser; this.phaser.register(); } @override public void run() {try {system.out.println (thread.currentThread ().getName()+" execute "); TimeUnit.SECONDS.sleep(random.nextInt(5)); System.out.println(thread.currentThread ().getName()+" execute done "); / / in the similar CountDownLatch await phaser. ArriveAndAwaitAdvance (); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

Phaser. register registers the thread with phaser, so why register the main thread?

Actually very simple main thread needs to wait for all children to continue to perform under the thread has been completed so must phaser. ArriveAndAwaitAdvance (); Block waiting for, and this statement is the meaning of the current thread has reached the barrier, after this wait for a period of time and conditions meet the need to continue down a barrier, if you are not the main thread of phaser. Register, direct call phaser. ArriveAndAwaitAdvance, It is mentioned in the source code that exceptions are possible, so you must register phaser.register() in the main program;

/* <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, An unregistered party calling this function is using an error method. However, this may lead to wrong {@ codeIllegalStateException} only in some subsequent operating the phaser, if any.Copy the code

Phaser solves subject-based exam questions

It’s not clear from the experience examples that this is an advantage. CountDownLatch is perfectly fine for the appeal scenario, so let’s switch to a different scenario to illustrate Phaser’s advantages.

Suppose a school holds a final exam, and there are three exams: Chinese, math and English. Students are allowed to hand in their papers in advance for each course, and the next exam can be held only after all students complete the exam. This is a typical phased task processing, as shown in the figure below.

The appeal scenario is semantically described as follows

public class PhaserExam { public static Random random = new Random(System.currentTimeMillis()); Public static void main(String[] args) {// Register Phaser Phaser = new Phaser(2); for (int i = 0; i <2 ; i++) { Exam exam = new Exam(phaser,random.nextLong()); exam.start(); } } static class Exam extends Thread{ Phaser phaser; Long id; public Exam(Phaser phaser,Long id){ this.phaser = phaser; this.id = id; } @override public void run() {try {system.out.println (thread.currentThread ().getName()+"=== "); TimeUnit.SECONDS.sleep(random.nextInt(5)); System.out.println(thread.currentThread ().getName()+"=== end "); phaser.arriveAndAwaitAdvance(); System.out.println(thread.currentThread ().getName()+"=== "); TimeUnit.SECONDS.sleep(random.nextInt(5)); System.out.println(thread.currentThread ().getName()+"=== end "); phaser.arriveAndAwaitAdvance(); System.out.println(thread.currentThread ().getName()+"=== "); TimeUnit.SECONDS.sleep(random.nextInt(5)); System.out.println(thread.currentThread ().getName()+"=== end "); phaser.arriveAndAwaitAdvance(); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

The code execution result is as follows. You can see that the three phases wait for all threads to complete execution before proceeding, which is equivalent to multiple fences.

Note that the number of parties, or threads, that are built through the Phaser class constructor needs to correspond to the number of loops, otherwise it may affect subsequent phases.

Two important states

There are two important states within a Phaser, phase and Party, that are hard to understand at first glance, and they are defined as follows.

Phase is a phase, such as the Chinese, math and English tests mentioned above, each of which corresponds to a phase. However, phase starts from 0. When all tasks are completed and it is ready to enter the next phase, phase will increase by one.

Party corresponds to the number of Phaser threads registered, and the initial value of party can take two forms

  • The first method is to initialize the party value through a Phaser’s parameter construction.

  • Methods using dynamic registration method phaser. The register () or phaser. BulkRegister (threads) to specify the number of threads, cancellation of thread calls phaser. ArriveAndDeregister party value is minus one () method.

Phaser commonly used API

The common Phaser apis are summarized below

Public final int getPhase(); Public int register(); Public int bulkRegister(int parties); Public int getRegisteredParties(); public int getRegisteredParties(); Public int arriveAndAwaitAdvance(); Public int arriveAndDeregister() public int arriveAndDeregister(); Public int getArrivedParties(); Public int getUnarrivedParties(); Public Boolean isTerminated(); // End Phaser public void forceTermination();Copy the code

The code is shown below

public class PhaserApiTest { public static void main(String[] args) throws InterruptedException { Phaser phaser = new Phaser(5); Println (" current phase "+phaser.getPhase()); system.out.println (" current phase "+ Phaser.getPhase ()); System. The out. Println (" the registered number of threads = = = "+ phaser. GetRegisteredParties ()); // Register a thread with phaser.register(); System. The out. Println (" the registered number of threads = = = "+ phaser. GetRegisteredParties ()); BulkRegister (4); // Register multiple threads with phaser. System. The out. Println (" the registered number of threads = = = "+ phaser. GetRegisteredParties ()); New Thread (() - > {/ / arrived and waiting for the phaser arriveAndAwaitAdvance (); System. The out. Println (Thread. CurrentThread (). The getName () + "= = = 1"); }).start(); New Thread (() - > {/ / to not wait and cancellation of a Thread from the phaser phaser. ArriveAndDeregister (); System. The out. Println (Thread. CurrentThread (). The getName () + "= = = 2"); }).start(); TimeUnit.SECONDS.sleep(3); System.out.println(" Number of reached threads ==="+ phaser.getarrivedparties ()); System. The out. Println (" the number of threads, = = = "+ phaser. GetUnarrivedParties ()); System.out.println("Phaser is over "+ phaser.isterminated ()); phaser.forceTermination(); System.out.println("Phaser is over "+ phaser.isterminated ()); }}Copy the code

The following information is displayed

ArriveAndAwaitAdvance parsing

ArriveAndAwaitAdvance is an important Phaser blocking API. ArriveAndAwaitAdvance is a combination of the Arrive and awaitAdvance methods

  • Arrive: Barrier reached but not blocked, returns the stage number reached.

  • AwaitAdvance (int) : Stage number that receives an int value and blocks at the specified barrier.

The test code is as follows

public class PhaserTestArrive { public static Random random = new Random(System.currentTimeMillis()); public static void main(String[] args) { Phaser phaser = new Phaser(5); for (int i = 0; i <5 ; i++) { new Task(i,phaser).start(); } phaser.register(); // The main thread needs to call ARRIVE because the sixth thread registered by the main thread has not arrived yet and needs to arrive manually to call awaitAdvance blocking barrier phaser.arrive(); Phaser. AwaitAdvance (0); Phaser. AwaitAdvance (0); System.out.println("all task is end"); } static class Task extends Thread{ Phaser phaser; public Task(int num,Phaser phaser){ super("Thread--"+String.valueOf(num)); this.phaser = phaser; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"===task1 is start"); TimeUnit.SECONDS.sleep(random.nextInt(3)); System.out.println(Thread.currentThread().getName()+"===task1 is end"); // Arrive without waiting for phaser.arrive(); System.out.println(Thread.currentThread().getName()+"===task2 is start"); TimeUnit.SECONDS.sleep(random.nextInt(3)); System.out.println(Thread.currentThread().getName()+"===task2 is end"); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

Interrupt response

We only need to pay special attention to is all Phaser API awaitAdvanceInterruptibly is interrupt response and all the rest will not interrupt response so don’t need to exception handling, demonstration as follows

public static void main(String[] args) { Phaser phaser = new Phaser(3); Thread T1 = new Thread(()->{ try { phaser.awaitAdvanceInterruptibly(phaser.getPhase()); } catch (InterruptedException e) {system.out.println (" InterruptedException "); e.printStackTrace(); } //phaser.arriveAndAwaitAdvance(); }); T1.start(); T1.interrupt(); phaser.arriveAndAwaitAdvance(); }Copy the code

This article uses the article synchronization assistant to synchronize