Hello, I am Xiao Hei, a migrant worker who lives on the Internet.

A new Fork/Join thread pool was introduced in JDK1.7, which can split a large task into smaller tasks to execute in parallel and aggregate the results.

Fork/Join adopts the basic idea of divide and conquer. Divide and conquer is to divide a complex task into multiple simple small tasks according to specified thresholds, and then summarize and return the results of these small tasks to get the final task.

Divide and conquer method

Divide-and-conquer method is one of the commonly used algorithms in the field of computer. The main idea is to decompose a scale N problem into K smaller sub-problems, which are independent from each other and have the same nature as the original problem. The subproblems are solved and the original problems are combined.

Ideas for solving problems

  • Segmentation problem;
  • Solving subproblems;
  • The solution of the merging subproblem is the solution of the original problem.

Usage scenarios

Binary search, factorial calculation, merge sort, heap sort, quicksort, Fourier transform all use the idea of divide-and-conquer.

ForkJoin parallel processing framework

The ForkJoinPool thread pool introduced in JDK1.7 is used to execute ForkJoinTask tasks. A ForkJoinTask is a thread-like entity that is lighter than a normal thread.

ForkJoin uses the ForkJoin framework to complete the following 100 to 1 billion sums.

public class ForkJoinMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L.10_0000_0000L));
        System.out.println("Calculation result:"+ rootTask.get()); }}class SumForkJoinTask extends RecursiveTask<Long> {
    private final Long min;
    private final Long max;
    private Long threshold = 1000L;

    public SumForkJoinTask(Long min, Long max) {
        this.min = min;
        this.max = max;
    }
    @Override
    protected Long compute(a) {
        // If the value is smaller than the threshold, the value is directly calculated
        if ((max - min) <= threshold) {
            long sum = 0;
            for (long i = min; i < max; i++) {
                sum = sum + i;
            }
            return sum;
        }
        // Break it down into small tasks
        long middle = (max + min) >>> 1;
        SumForkJoinTask leftTask = new SumForkJoinTask(min, middle);
        leftTask.fork();
        SumForkJoinTask rightTask = new SumForkJoinTask(middle, max);
        rightTask.fork();
        // Aggregate the results
        returnleftTask.join() + rightTask.join(); }}Copy the code

The above code logic can be seen more intuitively in the following figure.

ForkJoin framework implementation

Some of the interfaces and classes that are important in the ForkJoin framework are shown in the following figure.

ForkJoinPool

ForkJoinPool is a thread pool for running ForkJoinTasks that implements the Executor interface.

A ForkJoinPool object can be created directly through New ForkJoinPool().

public ForkJoinPool(a) {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null.false);
}

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode){
    this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
}
Copy the code

By looking at the constructor source, we can see that a ForkJoinPool is created with the following four parameters:

  • parallelism: Expected number of concurrent requests. The default isRuntime.getRuntime().availableProcessors()The value of the
  • factory: createForkJoinFactory for worker threads, default todefaultForkJoinWorkerThreadFactory
  • handler: The handler for unrecoverable errors encountered while executing a task. The default value isnull
  • AsyncMode: Does the worker thread use FIFO mode or LIFO mode to obtain tasks? The default is LIFO

ForkJoinTask

ForkJoinTask is an abstract class definition for running tasks in a ForkJoinPool.

ForkJoinTask implements the Future interface, which can handle a large number of tasks and subtasks with a small number of threads. The asynchronous task execution is mainly arranged through the fork() method, and the result of the task execution is waiting for through the join() method.

If you want to use ForkJoinTask to handle a large number of tasks with a small number of threads, you need to accept some restrictions.

  • Avoid synchronizing methods or blocks of code in split tasks;
  • Avoid performing blocking I/O operations in subdivided tasks, ideally based on variables accessed completely independently of other running tasks;
  • It is not allowed to throw checked exceptions in segmentation tasks.

ForkJoinTask is an abstract class that cannot be instantiated, so the JDK provides three specific types of ForkJoinTask parent classes that we can inherit from when we are using them.

  • RecursiveAction: The subtask does not return results
  • RecursiveTask: Return result of subtask
  • CountedCompleter: Execution is triggered after the task has finished executing

ForkJoinWorkerThread

A ForkJoinPool thread that executes a ForkJoinTask.

Now that ForkJoinPool implements an Executor interface, how is it different from before ThreadPoolExecutor?

If you use ThreadPoolExecutor to perform divide-and-conquer logic, then you need to create one thread for each subtask. If the number of subtasks is very large, perhaps tens of thousands, then using ThreadPoolExecutor to create tens of thousands of threads is not feasible or reasonable.

When ForkJoinPool processes a task, it does not open a thread based on the task. Instead, it creates the desired number of parallel threads. At the time of each thread’s work, the current task is placed in the ForkJoinWorkerThread’s task queue recursively until it reaches the outermost task.

Job stealing algorithm

ForkJoinPool workers maintain a separate queue of tasks to reduce competition between the threads.

Each thread will ensure that the task in its own queue is finished, when its own task is finished, it will check whether there are unfinished tasks in the task queue of other threads, if so, it will help other threads to execute.

To reduce contention when helping other threads to perform tasks, a double-ended queue is used to hold tasks, with stolen tasks fetching tasks only from the head of the queue, while normally processed threads fetching tasks from the tail of the queue each time.

advantages

It makes full use of thread resources, avoids the waste of resources, and reduces the competition between threads.

disadvantages

You need to create a queue space for each thread; There is also thread contention when there is only one task in the work queue.

The last

If you find the article helpful, give it a thumbs up. I’m Hei. See you next time