This article is based on jdk1.7, through learning the art of Java concurrent programming, understanding Fork/Join framework.

What is the Fork/Join framework

Fork/Join framework is a framework provided by Java7 for parallel task execution. It is a framework that divides large tasks into several small tasks and finally summarizes the results of each small task to obtain the results of large tasks.

Its main idea is: divide and rule.

Job stealing algorithm

A work-stealing algorithm is a thread stealing tasks from other queues to execute.

What does it take to use a job-stealing algorithm? If we need to do a big task, we can put this task division for a number of mutually dependent child tasks, in order to reduce the competition between threads, then put these subtasks are different in the queue, and create a separate thread for each queue to perform the tasks in the queue, thread and queue one-to-one correspondence, For example, thread A handles tasks in queue A. However, some threads finish tasks in their queue first, while others have tasks in their queue. Instead of waiting, a finished thread can help another thread, so it steals a task from another thread’s queue to execute. In order to reduce the contention between the stolen thread and the stolen thread, a double-endian queue is usually used. The stolen thread always takes the task from the head of the double-endian queue, while the stolen thread always takes the task from the tail of the double-endian queue.

The advantage of the job stealing algorithm is that it makes full use of threads for parallel computation and reduces contention between threads. The disadvantage of the algorithm is that there is still contention in some cases, such as when there is only one task in a double-ended queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.

introduce

The design of Fork/Join framework is divided into two steps:

The first step is to split tasks. First, we need to have a fork class to split the large task into subtasks, which may be large, so we need to keep splitting until the subtasks are small enough.

The second step is to execute the task and merge the results. The partitioned subtasks are placed in a two-end queue, and then several starting threads get the task to execute from the two-end queue. The results of subtasks are placed in a queue, and a thread is started to take data from the queue and merge the data.

Fork/Join uses two classes to do both things:

  • ForkJoinTask: To use the ForkJoin framework, we must first create a ForkJoin task. ForkJoinTask provides a mechanism for performing fork() and join() operations on tasks. Generally, we do not need to inherit ForkJoinTask directly, but only subclasses. The fork /Join framework provides the following two subclasses:

    RecursiveAction: Used for tasks that do not return results. RecursiveTask: Used for tasks that return results.

  • ForkJoinPool: A ForkJoinTask is executed through a ForkJoinPool. Subtasks are added to the two-end queue maintained by the current worker thread. The subtasks are placed at the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.

use

Use Fork/Join framework to calculate: 1+2+3+…… + 100000000.

When using Fork/Join framework, the first thing to consider is how to divide tasks. If we want each sub-task to perform the sum of 10,000 numbers at most, we set the threshold of segmentation to 10000. Since it is the sum of 100000000 digits, it will be continuously divided into two parts. That is, 1~50000000 and 5000000001 ~100000000, the second time continue to divide 1~50000000 into 1~25000000 and 25000001~5000000, Divide 50000001~100000000 into 50000001~75000000 and 75000001~100000000…… , and keep dividing until the difference between the beginning and the end is less than or equal to 10,000.

import java.util.concurrent.*;
public class CountTask extends RecursiveTask<Long> {

    /** * the threshold */
    private static final long THRESHOLD = 10000;
    / / number
    private long start;
    / / end
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute(a) {
        long sum = 0;
        // Calculate if it is small enough
        boolean canComplute = (end - start) <= THRESHOLD;
        if(canComplute) {
            for(longi = start; i <= end; i++) { sum += i; }}else {  // Otherwise, split the large task
            / / split 50-50
            long middle = (start + end) /2;
            // perform recursion
            CountTask left = new CountTask(start, middle);
            CountTask right = new CountTask(middle + 1, end);
            // Execute subtasks
            invokeAll(left, right);
            // Get the result
            long lResult = left.join();
            long rRight = right.join();
            sum = lResult + rRight;
        }
        return sum;
    }

    public static void main(String[] args) {
        long s = System.currentTimeMillis();
        ForkJoinPool pool = ForkJoinPool.commonPool();
        CountTask countTask = new CountTask(1.100000000);   // Parameters are start and end values
        Future<Long> result = pool.submit(countTask);
        // If the task is complete
        if(! ((ForkJoinTask<Long>) result).isCompletedAbnormally()) {try {
                // Get the result of the task
                System.out.println("Fork /join calculates as:" + result.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Fork /join calculation time:" + (System.currentTimeMillis() - s) + "ms");

        s = System.currentTimeMillis();
        long sum = 0;
        for(int i = 1; i <= 100000000 ; i++) {
            sum += i;
        }
        System.out.println("Calculation result:" + sum);
        System.out.println("Ordinary calculations take time:" + (System.currentTimeMillis() - s) + "ms"); }}Copy the code

Fork/Join calculation: 5000000050000000 Fork/Join calculation time: 53ms Calculation result: 5000000050000000 Common calculation time: 55ms

Three ways to submit a task to ForkJoinPool:

  1. Execute (): executes asynchronously, with no return.
  2. Invoke (): Execute synchronously, after which you wait for the task to complete before executing the following code.
  3. Submit (): executes asynchronously, blocks when the get method is called, and returns a Future object on completion to check the status and run results.
ForkJoinPool commonPool = ForkJoinPool.commonPool(); 
Copy the code

Providing a reference to a common pool, using a predefined common pool reduces resource consumption because it discourages the creation of a separate thread pool for each task.

Check the running status of tasks

  • The isDone() method returns true regardless of how the task is terminated;
  • The isCompletedNormally() method returns true if no cancelation or exception occurs while completing the task;
  • The isCancelled() method returns true if the task isCancelled;
  • The isCompletedAbnormally() method returns true if the task is canceled or an exception is encountered

Exception handling

ForkJoinTask may occasionally raise exceptions when they are being executed, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally encountered or been canceled. An exception can be obtained through the getException method of ForkJoinTask. Use the following code:

if(task.isCompletedAbnormally()) {
    System.out.println(task.getException());
}
Copy the code

The getException method returns the Throwable object or CancellationException if the task was canceled. Returns NULL if the task did not complete or no exception was thrown.

Differences from ExecutorService

Fork/Join uses “work steal” mode, where when a new task is executed it can be broken up into smaller tasks, added to the thread queue, and then stolen from a random thread and added to its queue.

ForkJoin, for example, uses CPU resources more efficiently than A traditional thread, by stealing A task from the end of the queue when two different cpus are running.

Realize the principle of

ForkJoinPool consists of a ForkJoinTask array that stores the tasks submitted to the ForkJoinPool by programs, and a ForkJoinWorkerThread array. The ForkJoinWorkerThread array performs these tasks.

ForkJoinTask implements the fork method. When a ForkJoinTask forks, the program asynchronously executes the task by calling the ForkJoinWorkerThread pushTask and returns the result immediately. The code is as follows:

public final ForkJoinTask fork(a) {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
}
Copy the code

PushTask places the current task in the ForkJoinTask queue. ForkJoinPool’s signalWork() method is then invoked to wake up or create a worker thread to perform the task. The code is as follows:

final void pushTask(ForkJoinTask t) {
        ForkJoinTask[] q; int s, m;
        if((q = queue) ! =null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
	else if(s == m) growQueue(); }}Copy the code

First, it calls the doJoin() method, which takes the status of the current task (NORMAL, CANCELLED, SIGNAL, and EXCEPTIONAL) to determine what result is returned.

  • If the task status is completed, the result of the task is returned.
  • If the task status is cancelled, a CancellationException is thrown.
  • If the task status is a throw exception, the corresponding exception is thrown directly.

Let’s look at the implementation code of the doJoin() method:

private int doJoin(a) {
        Thread t; 
    	ForkJoinWorkerThread w; 
    	int s; 
    	boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
 				return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }
Copy the code

In the doJoin() method, first check the status of the task to see whether the task has been executed. If so, the task status will be returned directly; if not, the task will be removed from the task array and executed. If the task is successfully executed, set the task status to NORMAL. If an exception occurs, record the exception and set the task status to EXCEPTIONAL.

Please pay attention to my official account and receive the latest articles at the first time. Search the official account: Mika or scan the qr code below: