Ref
- Java 8 In Action book chapter 7 –Branch/merge framework
- Introduction to the Fork/Join framework
- Introduction to Java concurrency – Fork/Join framework
- Double-endian queues and job theft
What is the Fork/Join framework
The Fork/Join framework, also known as the branch/merge framework, is a framework provided in Java 7 for parallel execution of tasks. It is a framework for splitting large tasks into several smaller tasks and finally summarizing the results of each small task to obtain the results of a large task. Fork/Join framework, similar to MapReduce, implements parallel computing by splitting large tasks into small ones, mainly using the idea of divide-and-conquer to achieve multi-task parallel computing.
Fork is to divide a large task into several sub-tasks for parallel execution, and Join is to merge the execution results of these sub-tasks and finally get the result of the large task. For example, 1+2+. +10000, can be divided into 10 subtasks, each subtask to sum 1000 numbers, and finally summarize the results of the 10 subtasks. The running flow of Fork/Join is as follows.
The Fork/Join framework creates tasks that need to be started by a ForkJoinPool. ForkJoinPool is a thread pool whose number of threads is set according to the number of cores in the CPU. ForkJoinPool is used to increase CPU utilization through work-stealing algorithms.
Job stealing algorithm
A work-stealing algorithm is a thread stealing tasks from other queues to execute. The workflow of job theft is shown below.
A two-endian queue is maintained for each thread to store the tasks that need to be executed, and the work stealing algorithm allows you to steal the Oldest (bottom of the queue) task from the other thread to execute it, thus avoiding competing with the thread that the current task belongs to.
To reduce contention between the stolen and stolen task threads, a double-endian queue is usually used, with the stolen thread always taking tasks from the head of the double-endian queue and the stolen thread always taking tasks from the tail of the double-endian queue.
As shown in the figure above, Thread2 takes the latest Task1 from Thread1 and Thread1 takes Task2 from Thread1, thus avoiding a race.
Job stealing algorithm advantage
- Take full advantage of threads for parallel computation
- Reduced contention between threads
Job stealing algorithm shortcomings
- In some cases there will be contention (only one task in a two-ended queue)
- More system resources are consumed
In practice, work stealing means that these tasks are distributed more or less equally among all threads in the ForkJoinPool to redistribute and balance tasks among worker threads in the pool. This process is shown in the figure below. When a task in the worker thread queue is split into two subtasks, a subtask is “stolen” by idle worker threads. As mentioned earlier, this process can continue recursively until the condition that states that the subtasks should be executed sequentially is true.
Fork/Join framework base classes
Let’s consider how to design a Fork/Join framework
-
Step 1 Split tasks. First, we need to have a fork class to divide the large task into subtasks. The subtasks may be large, so we need to keep splitting until the subtasks are small enough.
-
Step 2 executes the task and merges the results. The partitioned subtasks are placed in a two-end queue, and then several starting threads get the task to execute from the two-end queue. The results of subtasks are placed in a queue, and a thread is started to take data from the queue and merge the data.
Fork/Join uses two classes to do both
ForkJoinTask
: We want to useFork/Join
Framework, you must first create oneForkJoinTask
Task. It provides execution in tasksfork()
和join()
Mechanism of operation. Usually we don’t need direct inheritanceForkJoinTask
Class, and only need to inherit from its subclasses,Fork/Join
The framework provides the following two subclassesRecursiveAction
: for tasks that do not return results.RecursiveTask
: for tasks that return results.
ForkJoinPool
:ForkJoinTask
Need to pass throughForkJoinPool
To execute, the subtasks separated by the task are added to the two-end queue maintained by the current worker thread and enter the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.
In addition, the framework provides the following two classes
ForkJoinWorkerThread
Is:ForkJoinPool
Within theworker thread
, the implementation ofForkJoinTask
, internalForkJoinPool.WorkQueue
To save what to executeForkJoinTask
.ForkJoinPool.WorkQueue
: Saves what to executeForkJoinTask
.
The Fork/Join framework executes the process
ForkJoinPool
Each worker thread maintains a two-ended work queue (WorkQueue
), the queue holds the tasks (ForkJoinTask
).- Each worker thread generates a new task at run time (call
fork()
), put into the head of the work queue (The team leader task had the shortest wait time), and the worker thread processes its own work queue usingFIFO
That is, every time you take a task from the front of the team. - Each worker thread tries to steal a task (or one that has just been submitted to) while working on its own work queue
pool
The stolen task is at the end of another thread’s work queue, that is, when a worker steals a task from another worker thread, it usesLIFO
Way. - In case of
join()
If neededjoin
The other tasks are processed first and wait for them to complete. - Go to sleep when you have neither your own mission nor one to steal.
FIFO: First in, First out. LIFO: Last in, First out.
Fork/Join using the Demo
public class CountTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// Create a task to count from 1 to 12
CountTask countTask = new CountTask(1.12);
Future<Integer> future = forkJoinPool.submit(countTask);
System.out.println("Final calculation result:"+ future.get()); }}class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute(a) {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
// The task is small enough to calculate directly and return the result
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("Perform a computing task, compute." + start + "到 " + end + "And the result is:" + sum + "Thread executing this task:" + Thread.currentThread().getName());
} else { // The task is too large to cut
System.out.println("Task too large, cut task:" + start + "To" + end + "And the thread that executes this task:" + Thread.currentThread().getName());
int middle = (start + end) / 2;
// Cut into two subtasks
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// Execute subtasks
leftTask.fork();
rightTask.fork();
// Wait for the subtask to complete and get the execution result
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// Merge subtasks
sum = leftResult + rightResult;
}
returnsum; }}Copy the code
The results are as follows
Tasks too large, cut tasks:1Added to the12And the thread that performs this task: ForkJoinPool- 1-worker- 1Tasks too large, cut tasks:7Added to the12And the thread that performs this task: ForkJoinPool- 1-worker- 3Tasks too large, cut tasks:1Added to the6And the thread that performs this task: ForkJoinPool- 1-worker2 -Perform computing tasks, calculate7to9The sum of, and the result is:24The thread that performs this task is ForkJoinPool- 1-worker- 3Perform computing tasks, calculate1to3The sum of, and the result is:6The thread that performs this task is ForkJoinPool- 1-worker- 1Perform computing tasks, calculate4to6The sum of, and the result is:15The thread that performs this task is ForkJoinPool- 1-worker- 1Perform computing tasks, calculate10to12The sum of, and the result is:33The thread that performs this task is ForkJoinPool- 1-worker- 3Final calculation result:78
Copy the code
As can be seen from the results, the submitted computation task is executed by thread 1, which makes the first cut into two subtasks “7 plus to 12” and “1 plus to 6”, and submits these two subtasks. These two tasks are then stolen by thread 2 and thread 3. There are no more tasks in thread 1’s internal queue, and thread 2 and thread 3 each perform a task cut and submit two subtasks, so thread 1 steals the task (in this case, the subtasks of thread 2).
Exception handling of Fork/Join framework
ForkJoinTask may occasionally raise exceptions when they are being executed, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally encountered or been canceled. An exception can be obtained through the getException method of ForkJoinTask. Use the following code
if(task.isCompletedAbnormally()) {
System.out.println(task.getException());
}
Copy the code
The getException method returns the Throwable object or CancellationException if the task was canceled. Returns NULL if the task did not complete or no exception was thrown.
FAQ
ForkJoinPool uses the distinction between submit and Invoke submissions
invoke
Is synchronous execution, after the call needs to wait for the task to complete before the following code can be executed.submit
Is executed asynchronously, only inFuture
callget
Will block.
What is the difference between inheriting RecursiveTask and RecursiveAction?
- inheritance
RecursiveTask
: Applies to scenarios with returned values. - inheritance
RecursiveAction
: Suitable for scenarios where there is no return value.
What is the difference between invoking fork and invokeAll for subtasks?
fork
: Let the child thread to complete the task itself, the parent thread supervise the child thread execution, waste the parent thread.invokeAll
The thread pool can be better utilized when the child and parent threads work together.