This is the sixth day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021.

JDK7 provides a framework for “divide and conquer” tasks – Fork/Join. It splits a large task into subtasks that are small enough, and further splits the subtasks if they are large. The partitioned subtasks are placed in two – end queues, and the starting threads get the tasks from the two – end queues. The results of subtasks are placed in another queue, and a thread is started to fetch data from the queue and merge the data.

The idea behind Fork/Join is as follows:

RecursiveTask

RecursiveTask applies to a divide-and-conquer task that returns a value, as an example of calculating a sum from 1 to 100:

public class RecursiveTest {
    // Define the minimum interval as 10
    private final static int MAX_THRESHOLD = 10;

    public static void main(String[] args) {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculateRecursiveTask(1.100));
        try {
            Integer result = future.get();
            System.out.println(result);
        } catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}private static class CalculateRecursiveTask extends RecursiveTask<Integer> {
        / / the start
        private int start;
        / / end
        private int end;

        public CalculateRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute(a) {
            // If the start and end ranges are smaller than the range we defined, then we compute directly
            if ((end - start) <= MAX_THRESHOLD) {
                return IntStream.rangeClosed(start, end).sum();
            } else {
                // Otherwise, split the scope into two subtasks
                int middle = (start + end) / 2;
                CalculateRecursiveTask leftTask = new CalculateRecursiveTask(start, middle);
                CalculateRecursiveTask rightTask = new CalculateRecursiveTask(middle + 1, end);
                // Execute subtasks
                leftTask.fork();
                rightTask.fork();

                // Summarize subtasks
                returnleftTask.join() + rightTask.join(); }}}}Copy the code

ForkJoinPool uses a difference between Submit and Invoke submission. An invoke executes synchronously, waiting for the task to complete before executing the following code. Submit executes asynchronously and blocks only when the Future calls GET.

The output of the startup program is as follows:

5050

Fork is not the best choice, but invokeAll is:

// Execute subtasks
// leftTask.fork();
// rightTask.fork();
invokeAll(leftTask,rightTask);
// Summarize subtasks
return leftTask.join() + rightTask.join();
Copy the code

RecursiveAction

RecursiveTask is used in the same way as RecursiveTask, except that there is no return value:

public class RecursiveActionTest {
    // Define the minimum interval as 10
    private final static int MAX_THRESHOLD = 10;
    private final static AtomicInteger SUM = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.submit(new CalculateRecursiveAction(0.100));
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        System.out.println(SUM);
    }

    private static class CalculateRecursiveAction extends RecursiveAction {
        / / the start
        private final int start;
        / / end
        private final int end;

        private CalculateRecursiveAction(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute(a) {
            // If the start and end ranges are smaller than the range we defined, then we compute directly
            if ((end - start) <= MAX_THRESHOLD) {
                SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
            } else {
                // Otherwise, split the scope into two subtasks
                int middle = (end + start) / 2;
                CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start, middle);
                CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle + 1, end);
                // Execute subtasks
                invokeAll(leftAction, rightAction);
                // Subtask result procedure is not summarized because no value is returned.}}}}Copy the code

The output is also 5050.

Understand the Fork/Join framework API

The Fork/Join framework is implemented under the java.util.concurrent package. It has four classes at its core:

  • ForkJoinTask: This is an abstract task class and runs inForkJoinPoolIn the.
  • ForkJoinPool: This is a thread pool that manages and runs numerous threadsForkJoinTaskTask.
  • RecursiveAction: ForkJoinTaskClass, which has no return value.
  • RecursiveTask: ForkJoinTaskHas a return value.

Basically, our code to solve the problem occurs in a RecursiveAction or RecursiveTask, and then submits the task to ForkJoinPool, which handles everything from thread management to multi-core processor utilization.

Let’s start by understanding the key methods in these classes.

ForkJoinTask

ForkJoinPool this is an abstract task class that runs in ForkJoinPool. Type V specifies the return result of the task. ForkJoinTask is a thread-like entity that represents a lightweight abstraction of a task rather than an actual thread of execution. This mechanism allows a large number of tasks to be managed by a small number of actual threads in the ForkJoinPool. The key method is:

  • final ForkJoinTask fork()
  • final V join()
  • final V invoke()

The fork() method submits and performs an asynchronous task. The method returns the ForkJoinTask and the calling thread continues running.

The Join () method waits for the task until the result is returned.

The invoke() method, a combination of fork() and join(), starts a task and waits for the result to be returned.

In addition, ForkJoinTask provides two static methods for invoking more than one task at a time

  • Static void invokeAll(ForkJoinTask task1, ForkJoinTask task2) : Executes two tasks
  • static void invokeAll(ForkJoinTask
    … TaskList): Executes a collection of tasks

RecursiveAction

This is a recursive ForkJoinTask subclass that does not return results. Recursive means that tasks can be divided into their own sub-tasks using a divide-and-conquer strategy (you’ll see code examples of how to do this in the next section below).

We have to rewrite the compute() method and put the code in it:

protected abstract void compute(a);
Copy the code

RecursiveTask

Same as RecursiveAction, but RecursiveTask returns a result of a type specified by V. We still need to override the compute() method:

protected abstract V compute(a);
Copy the code

ForkJoinPool

This is the core class of the Fork/Join framework. It is responsible for thread management and execution of a ForkJoinTask. In order to execute a ForkJoinTask, a ForkJoinPool instance must be obtained.

There are two ways to obtain a ForkJoinPool instance using a constructor:

  • ForkJoinPool(): Creates instances using the default constructor that creates a pool equal to the number of processors available in the system.
  • ForkJoinPool(int Parallelism): This constructor specifies the number of processors to create a pool with a custom parallelism level that must be greater than 0 and not exceed the actual number of processors available.

The level of parallelism determines the number of threads that can execute concurrently. In other words, it determines the number of tasks that can be executed simultaneously — but not more than the number of processors.

However, this does not limit the number of tasks that the pool can manage. ForkJoinPool can manage many more tasks than its parallel level.

The second way to obtain a ForkJoinPool instance is to use the following static methods to obtain a public pool instance:

public static ForkJoinPool commonPool(a);
Copy the code

A pool created this way is not affected by the shutdown() or shutdownNow() methods, but it is automatically terminated at System.exit(). Any program that relies on asynchronous task processing should call the awaitQuiescence() method before the main program terminates. This mode is static and can be used automatically.

Steal algorithm

A work-stealing algorithm is a thread stealing tasks from other queues to execute.

So why use a job-stealing algorithm? If we need to do a big task, we can put this task division for a number of mutually dependent child tasks, in order to reduce the competition between threads, then put these subtasks are different in the queue, and create a separate thread for each queue to perform the tasks in the queue, thread and queue one-to-one correspondence, For example, thread A handles tasks in queue A. However, some threads finish tasks in their queue first, while others have tasks in their queue. Instead of waiting, a finished thread can help another thread, so it steals a task from another thread’s queue to execute. In order to reduce the contention between the stolen thread and the stolen thread, a double-endian queue is usually used. The stolen thread always takes the task from the head of the double-endian queue, while the stolen thread always takes the task from the tail of the double-endian queue.

The advantage of the job stealing algorithm is that it makes full use of threads for parallel computation and reduces contention between threads. The disadvantage of the algorithm is that there is still contention in some cases, such as when there is only one task in a double-ended queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.

When to use it

This is just to demonstrate the use of Fork/Join, but it is actually more time consuming because of the overhead of cutting tasks and allocating threads. Don’t hesitate to Fork/Join a large enough task if it’s faster than traditional processing.