In addition to the generic ThreadPoolExecutor, Java provides a special purpose thread pool called ForkJoinPool. This class is roughly similar to the ThreadPoolExecutor class and implements the Executor and ExecutorService interfaces. When using these interfaces, ForkJoinPool uses an unbounded queue to store tasks that are executed by the number of threads specified in the thread pool constructor. If the number of threads is not set, the default number of threads is the number of cpus currently available on the machine or the number of cpus configured in the Docker container.

ForkJoinPool is used to implement divide-and-conquer algorithms, in which a task is broken into subtasks that can be added, executed in parallel, and then aggregated into a final result, such as a quicksort algorithm.

One thing to note about the use of divide-and-conquer is that it tends to create a large number of tasks, but you are unlikely to create as many threads to execute them. For example, to sort an array of 10 million elements, the process of subtask decomposition looks like this: split the array in half, sort it, and merge the two subarrays once, recursively, until the subarrays are odd or small.

Assuming decomposition until the subarray length <=47, there are now 262,144 tasks for sorting subarrays, 131,072 tasks for merging these subarrays, and 65536 additional tasks for merging subarrays again, and so on, resulting in 524,287 tasks.

As you can see, the parent task cannot be executed until the subtask is complete, and if we implemented this algorithm using ThreadPoolExecutor, performance would be quite poor. A ForkJoinPool thread, on the other hand, does not have to wait until the subtask is complete and can perform other tasks when the task is suspended.

Take a simple example: We have an array of type double and need to count the number of elements in the array that are less than 0.5. We use a divide-and-conquer strategy to do this.

public class TestForkJoinPool { private static double[] d; private class ForkJoinTask extends RecursiveTask<Integer> { private int first; private int last; public ForkJoinTask(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int subCount = 0; if (last - first < 10) { for (int i = first; i <= last; I ++) {if (d[I] < 0.5) {subCount++; } } return subCount; } else { int mid = (first + last) >>> 1; ForkJoinTask left = new ForkJoinTask(first, mid); left.fork(); ForkJoinTask right = new ForkJoinTask(mid + 1, last); right.fork(); subCount = left.join(); subCount += right.join(); } return subCount; }}Copy the code

The fork() and join() methods are key here, and you can’t achieve this recursion using ThreadPoolExecutor. These two methods use a series of internal, per-thread queues to perform tasks, as well as switching between tasks performed by the thread. These details are transparent to the developer. How do ForkJoinPool and ThreadPoolExecutor classes choose between them in practice?

First, the fork/join method has the ability to suspend tasks in progress, which allows all tasks to run in just a few threads. Passing a 2 million element array into the above code would produce up to 4 million tasks, but it would only take a few threads or even one to run them. If you run a similar task using ThreadPoolExecutor, you need 4 million threads because each thread must wait for its subtasks to complete, which can only be completed if additional threads are available in the thread pool. So the pause of fork/join allows us to use algorithms that would otherwise not be available, which is a big performance advantage.

Of course, the usage scenarios in this example are not very common in actual production and are actually more applicable to the following scenarios:

  • Merge result sets (not a simple summation as in the example).
  • Algorithm design can well limit the number of tasks.

In other cases, it is easier to split the array into multiple threads and then use ThreadPoolExecutor to open multiple threads through the subarray. For example, use a thread pool with a core thread count of 4 and a maximum thread count of 4, and split the array into four threads. Using four threads to traverse the four subarrays will not create too many tasks, and the performance will be better. Here’s a test comparison:

Number of threads ForkJoinPool ThreadPoolExecutor
1 285 + 15 ms          5ms
4 86 + 20 ms          1ms

The main reason for this gap is that the divide-and-conquer algorithm generates a large number of task objects, and the overhead of managing these task objects hampers ForkJoinPool performance and has an impact on GC. This should therefore be avoided if alternatives are available.

Work stealing

As mentioned above, the first principle of using ForkJoinPool is to ensure that tasks are split properly. An even more powerful feature in addition to suspending tasks is that it implements work theft. Each thread in its pool has its own queue of tasks. The thread will preferentially process the tasks in its queue. If the queue is empty, it will look for tasks in other threads’ queue. Thus, even if one of the four million tasks takes a long time, other threads in ForkJoinPool can complete the rest. ThreadPoolExecutor cannot do this, and if this happens to it, other threads cannot take on additional tasks.

Next, modify the original example so that the values of the elements in the array change based on their subscripts.

for (int i = first; i <= last; I ++) {if (d[I] < 0.5) {subCount++; } for (int j = 0; j < i; j++) { d[i] += j; }}Copy the code

Since the loop is based on the position of the element in the array, the calculation time is proportional to the position of the element. For example, the calculation of d[0] is very fast, but the calculation of D [d.length-1] takes much longer.

In this scenario, if ThreadPoolExecutor is used and the array is divided into four parts, the time to evaluate the fourth subarray (assuming sequential partitioning) is much longer than the time to evaluate the first subarray. Once the thread that computed the first subarray completes its task, it will enter the idle state.

With ForkJoinPool, one thread becomes stuck in the fourth array, but the other threads remain active and do not idle. Here’s how the tests compare:

Number of threads ForkJoinPool ThreadPoolExecutor
1 31 + 3 s 30 + 3 s
4 6 + 1 s 10 + 2 s

With only one thread, the results are basically the same. When the number of threads reaches four, ForkJoinPool has an advantage. When some tasks in a series take longer than others, resulting in an imbalance, it can be concluded that splitting and using ThreadPoolExecutor is better when tasks can be divided into a balanced set of tasks, and ForkJoinPool is better when tasks can be divided into a balanced set of tasks.

Further performance tuning could be done here, but it’s more algorithmic: figuring out when to stop recursing. In the example above, the recursion ends when the array size is less than 10. But in the case of balanced execution efficiency, ending at 500000 is more appropriate.

However, in an unbalanced case, a smaller subarray gives better performance. Again, the above example shows that the values of the elements in the array change according to their subscripts. Here is the test result (reduced to 200,000 elements to save time) :

Subarray size ForkJoinPool
100000 17988 + 100 ms
50000 10613 + 100 ms
10000 4964 + 100 ms
1000 3940 + 100 ms
100 3735 + 100 ms
10 3687 + 100 ms

This adjustment of leaf values is common in this kind of algorithm. Java quicksort implementation, leaf value of 47.

Automatic parallel

Java has the ability to automatically parallelize certain types of code, which ForkJoinPool relies on. The JVM creates a generic fork-Join thread pool for this purpose, which is a static object of the ForkJoinPool class. The default size is the number of processors available on the machine.

This kind of automatic parallelism is common in the methods of the Arrays class, such as sorting Arrays using quicksort algorithms and manipulating each element of an array. It is also useful in stream processing, where each element in a collection can be manipulated (serial or parallel).

Here is an example of creating a collection of user objects and then calculating the activity coefficient for each user:

List<User> users= ... ; Stream<User> stream = users.parallelStream(); stream.forEach(u -> { int val=calculate(u); . });Copy the code

The foreach() method creates a task foreach user object, and each task is processed by the JVM’s common ForkJoinPool.

Adjusting the size of the public ForkJoinPool is as important as adjusting the size of any other thread pool. By default, the common thread pool has the same number of threads as the machine’s available CPU. If you have multiple JVMS running on a single machine, you often need to consider limiting the number of threads so that there is no competition for resources between JVMS. Similarly, if a server is performing other requests in parallel, but you want to make sure you have enough CPU resources to do it, consider reducing the number of threads in the common thread pool. Of course, you may need to increase the size of the public pool if the tasks in it are often blocked waiting for IO.

To adjust the public pool size, you can change the Java system property Djava.util.concurrent.ForkJoinPool.com mon. Parallelism = N. This is version-dependent. Prior to Java8 version 192, you need to set it manually. You can do this by using the following method

ForkJoinPool.commonPool().getParallelism()
Copy the code

To view the current size of the public pool, note that this method does not work at runtime. You must modify it before the ForkJoinPool class is loaded.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
Copy the code

An additional point to note here is that the foreach() method uses both threads from the executing statement and threads from the common pool to process elements in the flow. Therefore, if you use parallel streams or other automatic parallelization methods and need to adjust the common pool size, you can reduce the expected value by 1.

Resources: Oreilly.java Performance