A, definitions,
Fork/ Join is a framework for parallel task execution provided by Java7. Fork is to divide a large task into parallel execution of non-trivial sub-tasks, and JOIN is to merge the execution results of these sub-tasks to obtain the result of the large task.
If 1 + 2 + 3 +…… +10000 can be divided into 10 subtasks, each of which sums 1000 numbers respectively, and finally summarizes the results of the 10 subtasks.
Second, the implementation
Requirement: Calculate 1+2+3+4
Use the Fork and the Join framework should first consider how to divide the task, then if you want to each subtask up to perform the two Numbers together, so we set the segmentation threshold is 2, because the four Numbers together, so the framework will take this task Fork into two subtasks, the subtasks is responsible for the calculation of 1 + 2, the subtasks is responsible for the calculation of 3 + 4, 2 Then join the results of the two recursiveTasks, so we must inherit RecursiveTask, and implement the following code:
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() { int sum = 0; Boolean canCompute = (end-start) <= THRESHOLD;if (canCompute) {
for(int i = start; i <= end; i++) { sum += i; }}elseInt middle = (start + end) / 2; int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // Execute the subtask lefttask.fork (); rightTask.fork(); Int leftResult = lefttask.join (); int rightResult = rightTask.join(); Sum = leftResult + rightResult; }returnsum; }}Copy the code
public static void main(String[] strs) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 4); CountTask task = new CountTask(1, 4); // Execute a task Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }}Copy the code
The main difference between ForkJoinTask and ordinary tasks is that a ForkJoinTask implements a compute method, in which the task is determined to be small enough, and if it is small enough, the task is executed, and if it is not, it must be split into two subtasks. When fork is called, compute is entered to see if the current subtask needs to be divided into subtasks. If not, the current subtask is executed and the result is returned. Join waits for the subtask to complete and gets the result.
A ForkJoinTask may occasionally raise an exception when it is executing a task, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally thrown an exception or been canceled. Exceptions can also be caught through the ForkJoinTask getException method
Three, the implementation principle
ForkJoinPool consists of the ForkJoinTask array and the ForkJoinWorkerThread array. The ForkJoinTask array is responsible for submitting programs to the ForkJoinPool. The ForkJoinWorkerThread array performs these tasks.
(1). Implementation principle of ForkJoinTask fork method
When we call ForkJoinWorkerThread’s pushTask, the program asynchronously executes the ForkJoinWorkerThread’s pushTask, and immediately returns the result as follows:
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}Copy the code
The pushTask method places the current task in the ForkJoinTask array queue. ForkJoinPool’s signalWork() method is then invoked to wake up or create a worker thread to perform the task as follows:
final void pushTask(ForkJoinTask<? > t) { ForkJoinTask<? >[] q; int s, m;if((q = queue) ! = null) { // ignoreif queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if(s == m) growQueue(); }}Copy the code
(2) Implementation principle of ForkJoinTask join method
The Join method blocks the current thread and waits for the result. Let’s take a look at the implementation of ForkJoinTask’s join method as follows:
public final V join() {
if (doJoin() ! = NORMAL)return reportResult();
else
return getRawResult();
}Copy the code
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if(s == EXCEPTIONAL && (ex = getThrowableException()) ! = null) UNSAFE.throwException(ex);return getRawResult();
}Copy the code
First, it calls the doJoin() method, which takes the status of the current task (NORMAL, CANCELLED, SIGNAL, and EXCEPTIONAL) to determine what result is returned.
① : If the task status is completed, the task result is returned directly
CancellationException is thrown if the task status is cancelled
③ : If the task status is throw exception, directly throw the corresponding exception
The doJoin method code is as follows:
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec(a); } catch (Throwable rex) {return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}Copy the code
In doJoin () method, the first by looking at the tasks of the state, to see if the task has been executed is complete, if completed, the direct return to the task status, if not performed, then remove the task from the task array and perform, if completed tasks carried out smoothly, then set the task status to NORMAL, if abnormal, record abnormal, And set the task status to EXCEPTION.