1. What is Fork/Join framework

Fork/Join framework is a framework provided by Java7 for parallel task execution. It is a framework that divides large tasks into several small tasks and finally summarizes the results of each small task to obtain the results of large tasks.

Fork is to divide a large task into several sub-tasks for parallel execution. Join is to merge the execution results of these sub-tasks and finally obtain the result of the large task. For example, 1+2+. +10000, can be divided into 10 subtasks, each subtask to sum 1000 numbers, and finally summarize the results of the 10 subtasks. The running flow chart of Fork/Join is as follows:

2. Job-stealing algorithms

A work-stealing algorithm is a thread stealing tasks from other queues to execute. The operation flow chart of job theft is as follows:

So why use a job-stealing algorithm? If we need to do a big task, we can put this task division for a number of mutually dependent child tasks, in order to reduce the competition between threads, then put these subtasks are different in the queue, and create a separate thread for each queue to perform the tasks in the queue, thread and queue one-to-one correspondence, For example, thread A handles tasks in queue A. However, some threads finish tasks in their queue first, while others have tasks in their queue. Instead of waiting, a finished thread can help another thread, so it steals a task from another thread’s queue to execute. In order to reduce the contention between the stolen thread and the stolen thread, a double-endian queue is usually used. The stolen thread always takes the task from the head of the double-endian queue, while the stolen thread always takes the task from the tail of the double-endian queue.

Related Manufacturer content

Huawei uses Docker to support optimization practices for system containers

Operation and maintenance practice of Suning big data platform

Practice of risk control system in containerization era

How does eBay implement the Internet Ingress practice based on Kubernetes?

Interpretation of Baidu PB-level data warehouse Palo open source architecture

Related Sponsors

CNUTCon Global Operation & Maintenance Technology Conference, Sep.10-Sep.11, Shanghai Everbright Convention & Exhibition Center Hotel, watch the highlights first

The advantage of the job stealing algorithm is that it makes full use of threads for parallel computation and reduces contention between threads. The disadvantage of the algorithm is that there is still contention in some cases, such as when there is only one task in a double-ended queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.

3. Introduction of Fork/Join framework

Now that we know the requirements for Fork/Join, let’s think about how we would design a Fork/Join framework. This thought will help you understand the design of the Fork/Join framework.

The first step is to split tasks. First, we need to have a fork class to split the large task into subtasks, which may be large, so we need to keep splitting until the subtasks are small enough.

The second step is to execute the task and merge the results. The partitioned subtasks are placed in a two-end queue, and then several starting threads get the task to execute from the two-end queue. The results of subtasks are placed in a queue, and a thread is started to take data from the queue and merge the data.

Fork/Join uses two classes to do both things:

  • ForkJoinTask: To use the ForkJoin framework, we must first create a ForkJoin task. ForkJoinTask provides a mechanism for performing fork() and join() operations on tasks. Generally, we do not need to inherit ForkJoinTask directly, but only subclasses. The fork /Join framework provides the following two subclasses:
    • RecursiveAction: Used for tasks that do not return results.
    • RecursiveTask: Used for tasks that return results.
  • ForkJoinPool: A ForkJoinTask is executed through a ForkJoinPool. Subtasks are added to the two-end queue maintained by the current worker thread. The subtasks are placed at the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.

4. Use Fork/Join framework

Let’s use the Fork/Join framework with a simple requirement: compute the result of 1+2+3+4.

When using Fork/Join framework, we should first consider how to split tasks. If we want each subtask to perform the addition of two numbers at most, then we set the threshold of splitting as 2. Since it is the addition of four numbers, Fork/Join framework will Fork the task into two subtasks, the first one is responsible for calculating 1+2. Subtask 2 computs 3+4 and then joins the results of the two subtasks.

Since it is a resultable task, we must inherit RecursiveTask with the following implementation code:

ForkJoinTask. The main difference between ForkJoinTask and ordinary tasks is that it implements compute, in which the task is determined to be small enough, and if it is small enough, the task is executed. If it is not small enough, it must be split into two subtasks. Each subtask, when fork is called, enters compute to see if the current subtask needs to be divided into subtasks. If not, the current subtask is executed and the result is returned. Using the Join method waits for the subtask to complete and get its results.

5. Exception handling of Fork/Join framework

ForkJoinTask may occasionally raise exceptions when they are being executed, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally encountered or been canceled. An exception can be obtained through the getException method of ForkJoinTask. Use the following code:

__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__if(task.isCompletedAbnormally()) {  System.out.println(task.getException()); } __Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__Copy the code

The getException method returns the Throwable object or CancellationException if the task was canceled. Returns NULL if the task did not complete or no exception was thrown.

6. Implementation principle of Fork/Join framework

ForkJoinPool consists of a ForkJoinTask array that stores the tasks submitted to the ForkJoinPool by programs, and a ForkJoinWorkerThread array. The ForkJoinWorkerThread array performs these tasks.

ForkJoinTask implements the fork method. When a ForkJoinTask forks, the program asynchronously executes the task by calling the ForkJoinWorkerThread pushTask and returns the result immediately. The code is as follows:

__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__public final ForkJoinTask fork() {  ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; } __Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__Copy the code

PushTask places the current task in the ForkJoinTask queue. ForkJoinPool’s signalWork() method is then invoked to wake up or create a worker thread to perform the task. The code is as follows:

__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__final void pushTask(ForkJoinTask t) { ForkJoinTask[] q; int s, m; if ((q = queue) ! = null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; // or use putOrderedInt if ((s -= queueBase) <= 2) pool.signalWork(); else if (s == m) growQueue(); } } __Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__Copy the code

ForkJoinTask implements join methods. The Join method blocks the current thread and waits for the result. Let’s take a look at the implementation of ForkJoinTask’s join method as follows:

__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__public final V join() { if (doJoin() ! = NORMAL) return reportResult(); else return getRawResult(); } private V reportResult() { int s; Throwable ex; if ((s = status) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) ! = null) UNSAFE.throwException(ex); return getRawResult(); } __Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__Copy the code

First, it calls the doJoin() method, which takes the status of the current task (NORMAL, CANCELLED, SIGNAL, and EXCEPTIONAL) to determine what result is returned.

  • If the task status is completed, the result of the task is returned.
  • If the task status is cancelled, a CancellationException is thrown.
  • If the task status is a throw exception, the corresponding exception is thrown directly.

Let’s look at the implementation code of the doJoin() method:

__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
 return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }
__Fri Sep 08 2017 16:49:54 GMT+0800 (CST)____Fri Sep 08 2017 16:49:54 GMT+0800 (CST)__Copy the code

In the doJoin() method, first check the status of the task to see whether the task has been executed. If so, the task status will be returned directly; if not, the task will be removed from the task array and executed. If the task is successfully executed, set the task status to NORMAL. If an exception occurs, record the exception and set the task status to EXCEPTIONAL.

7. Reference materials

  • JDK1.7 source
  • http://ifeve.com/fork-join-5/

8. Author introduction

Fang Teng fei, flower Qing Ying, concurrent programming webmaster. Currently, I am working in Alibaba Micro loan Division. Concurrent programming network: ifeve.com, personal microblog: weibo.com/kirals, welcome to technical exchanges through my microblog.

Thanks to Zhang Long for correcting this article.

To contribute or translate InfoQ Chinese, please email [email protected]. You are also welcome to follow us on Sina Weibo (@InfoQ) or Tencent Weibo (@InfoQ) and communicate with our editors and other readers.

Related topics:
  • Language & Development
  • Architecture & Design
  • multithreading
  • concurrent
  • The enterprise architecture
  • Java

The related content

How did we optimize HAProxy to support 2,000,000 concurrent SSL connections?

High Concurrency architecture design of Qunar air ticket Search System

The future of concurrent and distributed programming

Google uses Grumpy to solve CPython’s concurrency problem

Concurrency: such as drinking water, lengnuanzizhi

Hello, friend!

Register an InfoQ account
The login


Get more experiences from InfoQ.

Tell us what you think

Watch Thread
What are the classic scenarios?Posted by Chen Jiale on August 5, 2014 05:34
Re: What are the classic scenarios?Posted by Feixue on June 25, 2015 05:50

What are the classic scenarios? By Chen Jiale 05:34 aug 5, 2014

Re: What are the classic scenarios? June 25, 2015 05:50 by Flying snow

Shut down
Shut down
Shut down