This is the 15th day of my participation in the August Text Challenge.More challenges in August

The Fork frame of the Join

The Fork/Join framework is a framework provided in Java 7 for parallel execution of tasks. It is a framework for dividing large tasks into several smaller tasks and finally summarizing the results of each small task.

package com.example.xppdemo.chapter6;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2; / / threshold
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute(a) {
        int sum = 0;
        // Count tasks if they are small enough
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (inti = start; i <= end; i++) { sum += i; }}else {
            // If the task is larger than the threshold, it is divided into two subtasks
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            // Execute subtasks
            leftTask.fork();
            rightTask.fork();
            // Wait for subtasks to complete and get their results
            int leftResult=leftTask.join();
            int rightResult=rightTask.join();
            // Merge subtasks
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // Create a task that computes 1+2+3+4
        CountTask task = new CountTask(1.4);
        // Execute a task
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }
}
Copy the code

The main difference between ForkJoinTask and ordinary tasks is that it requires the compute method, in which the task is first determined to be small enough and then executed. If it is not small enough, it must be split into two subtasks, each of which, when fork is called, goes into compute to see if the current subtask needs to be split into subtasks, and if not, executes the current subtask and returns the result. Using the Join method waits for the subtask to complete and get its results.

Fork/Join uses two classes to do both. ForkJoinTask: To use the ForkJoin framework, we must first create a ForkJoin task. It provides a mechanism to perform fork() and join() operations in tasks. In general, we do not need to inherit ForkJoinTask directly, only its subclasses. The Fork/Join framework provides the following two subclasses.

  • RecursiveAction: Used for tasks that do not return results.

  • RecursiveTask: Used for tasks that return results.

â‘¡ForkJoinPool: A ForkJoinTask is executed through a ForkJoinPool. The subtasks segmented by the task are added to the two-ended queue maintained by the current worker thread and enter the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.

Implementation principle of Fork/Join framework

ForkJoinPool consists of the ForkJoinTask array and the ForkJoinWorkerThread array. The ForkJoinTask array is responsible for submitting programs to the ForkJoinPool. The ForkJoinWorkerThread array performs these tasks.

(1) The implementation principle of ForkJoinTask

When we call ForkJoinTask’s fork, the program asynchronously executes the task by calling ForkJoinWorkerThread’s pushTask, and returns the result immediately. Here’s the code.

public final ForkJoinTask<V> fork(a) {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
Copy the code

WorkQueue. Push:

final void push(ForkJoinTask
        task) { ForkJoinTask<? >[] a; ForkJoinPool p;int b = base, s = top, n;
    if((a = array) ! =null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if((p = pool) ! =null)
                p.signalWork(p.workQueues, this);
        }
        else if(n >= m) growArray(); }}Copy the code

The pushTask method places the current task in the ForkJoinTask array queue. ForkJoinPool’s signalWork() method is then invoked to wake up or create a worker thread to perform the task.

(2) Implementation principle of ForkJoinTask join method

The Join method blocks the current thread and waits for the result. Let’s take a look at the implementation of ForkJoinTask’s join method

public final V join(a) {
    int s;
    if((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s);return getRawResult();
}
Copy the code

It calls the doJoin() method, which obtains the status of the task to determine what result is returned. The status of the task is NORMAL, CANCELLED, SIGNAL, and EXCEPTIONAL.

  • If the task status is completed, the result of the task is returned.
  • If the task status is cancelled, a CancellationException is thrown.
  • If the task status is a throw exception, the corresponding exception is thrown directly.
private int doJoin(a) {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this.0L) :
        externalAwaitDone();
}
Copy the code

In the doJoin() method, first check the status of the task to see whether the task has been executed. If the execution is completed, the task status is directly returned. If not, the task is fetched from the task array and executed. If the task is successfully executed, set the task status to NORMAL. If an exception occurs, record the exception and set the task status to EXCEPTIONAL.