Fork/Join is a tool framework. The core idea of Fork/Join is to cut a large operation into several small parts to make the most efficient use of resources. It mainly involves three classes :ForkJoinPool/ForkJoinTask/RecursiveTask
An overview,
Ava. Util. Concurrent. ForkJoinPool presided over by Doug Lea Java masters writing, it can be a big task into many subtasks for parallel processing, finally the subtasks results merged into the final results, and the output. In this article to explain the Fork/Join framework, based on JDK1.8+ in the Fork/Join framework implementation, reference to the main source code of the Fork/Join framework is also based on JDK1.8+.
This article will first talk about Recursive Tasks and then cover the basic use of the Fork/Join framework. Then combined with the working principle of Fork/Join framework to understand the use points that need to pay attention to; Finally, the Fork/Join framework is used to solve some practical problems.
2, say RecursiveTask
RecursiveTask is a recursive implementation of ForkJoinTask. For example, it can be used to calculate Fibonacci sequences:
class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } Integer compute() { if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); }}Copy the code
The ForkJoinTask interface is inherited by RecursiveTask, which has several main methods inside:
// Node 1: returns the result and stores the final result V result; // Node 2: compute, protected abstract V compute(); Public final V getRawResult() {return result; } // Node 4: compute protected final Boolean exec() {result = compute(); return true; }Copy the code
Basic use of Fork/Join framework
Here is a simple example of using the Fork/Join framework, in which we calculate the sum of 1-1001:
Public class ForkJoinPool {private static final Integer = 200; public class forkJoinPool {private static final Integer = 200; Static class MyForkJoinTask extends RecursiveTask<Integer> {private Integer startValue; // The value calculated at the end of the subtask private Integer endValue; public MyForkJoinTask(Integer startValue , Integer endValue) { this.startValue = startValue; this.endValue = endValue; } @override protected Integer compute() {// if this is true, If (endValue - startValue < MAX) {system.out.println (" startValue "); startValue = " + startValue + "; endValue = " + endValue); Integer totalValue = 0; for(int index = this.startValue ; index <= this.endValue ; index++) { totalValue += index; } return totalValue; } else {MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2); subTask1.fork(); MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue); subTask2.fork(); return subTask1.join() + subTask2.join(); }}} public static void main(String[] args) {// This is the thread pool of the Fork/Join framework ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> taskFuture = pool. Submit (new MyForkJoinTask(1,1001)); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); }}}Copy the code
The above code is simple, with comments at key points. This article illustrates the main points in the above examples. First take a look at the possible execution results of the above example code:
StartValue = 1; StartValue = 127; EndValue = 251 startValue = 252; StartValue = 377; EndValue = 501; startValue = 502; StartValue = 627; endValue = 626; StartValue = 752; endValue = 751; StartValue = 877; endValue = 876; endValue = 1001 result = 501501Copy the code
4. Work sequence diagram
The following figure shows an overview of how this code works, but the inner workings of the Fork/Join framework are much more complex, such as how to determine which thread to run a recursive task; Another example is how to decide whether to create a new thread to run when a task/subtask is submitted to the Fork/Join framework or to queue it.
Therefore, if we do not deeply understand the operation principle of Fork/Join framework, and only observe the operation effect according to the simplest example above, then we can only know that the subtasks are broken down into small enough in the Fork/Join framework, and the internal multithreading is used to complete the calculation of these small tasks in parallel, and then the upward merging action of results is carried out. Finally, the top result is formed. Take it one step at a time. Let’s start with this outline of the process. , we can only observe the operation effect according to the simplest example above, so we can only know that after subtasks are broken down to a small enough size in the framework of Fork/Join, and the internal multithreading is used to complete the calculation of these small tasks in parallel, then the upward merging action of results is carried out, and finally the top-level results are formed. Take it one step at a time. Let’s start with this outline of the process.
The topmost task in the diagram is submitted to the Fork/Join framework, which puts it into a thread to run, and the compute code in the task starts analyzing T1. If the current task has a large range of numbers to add (greater than 200 is set in the code), the calculation task is split into two subtasks (T1.1 and T1.2), each of which is responsible for half of the data sum. See the fork method in the code. If the number range in the current subtask is small enough (less than or equal to 200), it is added up and then returned to the upper task.
The ForkJoinPool constructor
ForkJoinPool has four constructors. The one with the most complete parameters looks like this:
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)
Copy the code
- Parallelism: The level of parallelism set by the Fork/Join framework to determine the number of threads that execute in parallel within the framework. Each parallel task will be processed by one thread, but do not interpret this property as the maximum number of threads in the Fork/Join framework. Do not compare this property to the corePoolSize and maximumPoolSize properties of a ThreadPoolExecutor thread pool, because the ForkJoinPool is organized and works differently. Later in the discussion, the reader will see that the relationship between the number of threads available in the Fork/Join framework and the value of this parameter is not absolute (valid but not entirely determined by it).
- Factory: When the Fork/Join framework creates a new thread, the thread creation factory is also used. But this thread factory is no longer need to implement ThreadFactory interface, but need to implement ForkJoinWorkerThreadFactory interface. The latter is a functional interface that simply implements a method called newThread. In the Fork/Join framework has a default ForkJoinWorkerThreadFactory interface implementation: DefaultForkJoinWorkerThreadFactory.
- Handler: exception catching handler. When an exception occurs in a executing task and is thrown from the task, it is caught by the handler.
- AsyncMode: This parameter is also important. It literally means asynchronous mode. It does not mean whether the Fork/Join framework works in synchronous or asynchronous mode. The Fork/Join framework provides a task queue for each independent working thread, which is a bidirectional queue composed of arrays. That is, tasks that exist in the queue can work in either first-in, first-out or last-in, first-out (LIFO) mode.
- When asyncMode is set to true, the queue works in fifO mode. Otherwise, it works last in, first out (LIFO), which defaults to false
. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, ......Copy the code
ForkJoinPool has two other constructors, one with only parallelism arguments, which allows the Fork/Join framework to set the maximum number of parallel tasks. The other constructor takes no arguments and just a default value for the maximum number of parallel tasks — the number of CPU cores currently available to the operating system (runtime.geTruntime ().availableProcessors()). ForkJoinPool actually has a private, native constructor, and the three constructors mentioned above are calls to this private, native constructor.
. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }...Copy the code
If you don’t have specific execution requirements for the Fork/Join framework, you can simply use the constructor with no arguments. In other words, it is recommended that the number of CPU cores that can be used by the current operating system be used as the maximum number of parallel tasks within the framework of Fork/Join, so as to ensure that the running state switching between task threads is as little as possible when the CPU is processing parallel tasks (in fact, the state switching between threads on a single CPU core is basically unavoidable. Because the operating system runs multiple threads and processes simultaneously).
2. Fork and Join methods
The Fork and Join methods provided in the Fork/Join framework are arguably the two most important methods provided in the framework, and they work in conjunction with the parallelism “number of parallel tasks”. This can lead to different performance of split subtasks T1.1, T1.2 and even TX in the Fork/Join framework. For example, a TX subtask or waiting for another existing thread to run the associated subtask, or executing another task “recursively” in the thread running TX, or starting a new thread to run the subtask…
The fork /Join framework determines whether the new subtask will wait in the queue based on the state of the ForkJoinWorkerThread that is executing the ForkJoinTask concurrently. Or create a new ForkJoinWorkerThread to run it, or evoke other ForkJoinWorkerThreads waiting for the task to run it.
A ForkJoinTask is a specific type of task that can be run within the Fork/Join framework, and only this type of task can be split and merged within the Fork/Join framework. The ForkJoinWorkerThread is a feature thread that runs in the Fork/Join framework. In addition to the features of ordinary threads, the main feature of ForkJoinWorkerThread is that each ForkJoinWorkerThread has its own work queue. This task queue is used to store the subtasks that have been split up in this thread.
The join method is used to block the current thread until the corresponding subtask finishes running and returns the execution result. Or, if the subtask exists in the current thread’s work queue, the subtask is fetched for “recursive” execution. The goal is to get the results of the current subtask as soon as possible, and then proceed.
Use Fork/Join to solve practical problems
The previous example uses the Fork/Join framework to sum integers from 1 to 1000. This example would have been fine if it had only demonstrated the use of the Fork/Join framework, but it still falls short of what you would face in the real world. In this article, we use the Fork/Join framework to solve a practical problem, namely the efficient sorting problem.
1. Use merge algorithm to solve sorting problem
Sorting is a common problem in our work. At present, there are also a lot of existing algorithms are invented to solve this problem, such as a variety of interpolation sorting algorithm, a variety of exchange sorting algorithm. The union sorting algorithm is a sorting algorithm with better average time complexity (O(NLGN)) and better stability among all sorting algorithms. The core idea of the algorithm is to decompose a large problem into several smaller problems and combine the results.
The splitting phase of the algorithm is to recursively split an unsorted set of numbers from a large set into smaller sets, which either contain at most two elements, or are deemed small enough to continue the splitting.
Then sorting the elements in a set becomes two problems: 1. Sorting the size of at most two elements in a smaller set; 2. 2. How to combine two ordered sets into a new ordered set. The first question is easy to solve, but is the second one complicated? In fact, the second problem is very simple. It only requires a single walk through both sets — compare the smallest element in the current set and put the smallest element in the new set with time complexity of O(n) :
Here is a simple implementation of the merge sort algorithm:
package test.thread.pool.merge; import java.util.Arrays; import java.util.Random; /** * merge sort * @author yinwenjie */ public class Merge1 {private static int MAX = 10000; private static int inits[] = new int[MAX]; Static {Random r = new Random(); static {Random r = new Random(); for(int index = 1 ; index <= MAX ; index++) { inits[index - 1] = r.nextInt(10000000); } } public static void main(String[] args) { long beginTime = System.currentTimeMillis(); int results[] = forkits(inits); long endTime = System.currentTimeMillis(); / / if involved in sorting data is very large, remember to remove the means of this kind of printing System. Out. The println (" time-consuming = "+ (endTime - beginTime) +" | "+ Arrays. ToString (results)); } private static int[] forkits(int source[]) {int sourceLen = source.length; if(sourceLen > 2) { int midIndex = sourceLen / 2; int result1[] = forkits(Arrays.copyOf(source, midIndex)); int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen)); Int mer[] = joinInts(result1, result2); return mer; } else {// If the condition is true, there is only one element in the array. If the condition is true, there is only one element. Or have had arrayed position if the elements of an array (sourceLen = = 1 | | source [0] < = source [1]) {return source; } else { int targetp[] = new int[sourceLen]; targetp[0] = source[1]; targetp[1] = source[0]; return targetp; @param array1 * @param array2 */ private static int[] joinInts(int array1[], int array2[]) { int destInts[] = new int[array1.length + array2.length]; int array1Len = array1.length; int array2Len = array2.length; int destLen = destInts.length; For (int index = 0, array1Index = 0, array2Index = 0; array1Index = 0, array2Index = 0; index < destLen ; index++) { int value1 = array1Index >= array1Len? Integer.MAX_VALUE:array1[array1Index]; int value2 = array2Index >= array2Len? Integer.MAX_VALUE:array2[array2Index]; If (value1 < value2) {array1Index++; destInts[index] = value1; } else {array2Index++; destInts[index] = value2; } } return destInts; }}Copy the code
The above merging algorithm takes only 2-3 milliseconds to sort 10,000 random numbers, around 20 milliseconds to sort 100,000 random numbers, and an average of about 160 milliseconds to sort a million random numbers (depending on how messy the randomly generated array is). The visible merge algorithm itself has good performance. Using JMX tools and the operating system’s built-in CPU monitor to monitor the execution of the application, you can see that the entire algorithm runs in a single thread, and that only a single kernel of the CPU is working as the main processing kernel at any one time.
CPU operation:
2. Run the merge algorithm using Fork/Join
However, as the data size in the set to be sorted continues to increase, the code implementation of the above merging algorithm becomes a little bit inadequate. For example, when the above algorithm sorts the set of 100 million random numbers, it takes about 27 seconds.
We can then use the Fork/Join framework to optimize the performance of the merging algorithm by instantiating the subtasks into multiple ForkJoinTask tasks to be queued and scheduling these tasks across multiple ForkJoinWorkerThreads by the Fork/Join framework. As shown in the figure below:
The following is the code for the merge algorithm using the Fork/Join framework. Note that the code for merging two ordered sets into a new ordered set in the joinInts method is unchanged. See the previous section of this article. So I won’t go over it in the code:
. /** ** @author yinwenjie */ public class Merge2 {private static int MAX = 100000000; private static int inits[] = new int[MAX]; // Random queue initialization is also done. We will not repeat it here static {...... } public static void main(String[] args) throws Exception {// Start long beginTime = System.currentTimemillis (); ForkJoinPool pool = new ForkJoinPool(); MyTask task = new MyTask(inits); ForkJoinTask<int[]> taskResult = pool.submit(task); try { taskResult.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); } long endTime = System.currentTimeMillis(); Println (" time =" + (endTime - beginTime)); } @author yinwenjie */ static class MyTask extends RecursiveTask<int[]> {private int source[]; public MyTask(int source[]) { this.source = source; } /* (non-Javadoc) * @see java.util.concurrent.RecursiveTask#compute() */ @Override protected int[] compute() { int sourceLen = source.length; If (sourceLen > 2) {int midIndex = sourceLen / 2; MyTask task1 = new MyTask(arrays.copyof (source, midIndex)); task1.fork(); MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen)); task2.fork(); Int result1[] = task1.join(); // Merge two ordered arrays into one ordered array. int result2[] = task2.join(); int mer[] = joinInts(result1 , result2); return mer; } else {// If the condition is true, there is only one element in the array. If the condition is true, there is only one element. Or have had arrayed position if the elements of an array (sourceLen = = 1 | | source [0] < = source [1]) {return source; } else { int targetp[] = new int[sourceLen]; targetp[0] = source[1]; targetp[1] = source[0]; return targetp; }}} private int[] joinInts(int array1[], int array2[]) {//Copy the code
With the optimization of Fork/Join framework, the sorting processing time of the same 100 million random numbers is about 14 seconds, of course, this also depends on the clutter of the collection itself, CPU performance and so on. But overall, this approach offers a performance improvement of about 30% over merge sort without Fork/Join. The following are the observed CPU and thread states at execution time:
JMX memory, thread state:
CPU usage:
With the Fork/Join framework, we basically leverage the computing resources of each CPU core at the same time while ensuring the thread size of the operating system, in addition to the internal optimization details implemented by merging the algorithm code.