Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

ForkJoinPool is the source for this article.

ForkJoinTask Task status

/** The run status of this task */
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
static final int NORMAL      = 0xf0000000;  // must be negative
static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
static final int SMASK       = 0x0000ffff;  // short bits for tags 
Copy the code

The status of ForkJoinTask with an initial value of 0 indicates the status of a regular processing task. NORMAL: Indicates that the task is NORMAL. CANCELLED: The identification task was CANCELLED; EXCEPTIONAL: indicates that the task execution is abnormal. SIGNAL: indicates that other tasks depend on the current task. Before the task ends, other tasks are notified to join the result of the current task.

ForkJoinTask

ForkJoinTask is a subclass of the Futrue interface that splits tasks into separate tasks and waits for the results of sub-tasks to be merged into the results of the parent task. ForkJoinTask has a member status that is an integer 16 bits higher than NORMAL, CANCELLED, or EXCEPTIONAL. The lower 16 bits are reserved for user defined task labels. The source code is as follows:

public abstract class ForkJoinTask<V> implements Future<V>, Serializable { private static final ExceptionNode[] exceptionTable; private static final ReentrantLock exceptionTableLock; private static final ReferenceQueue<Object> exceptionTableRefQueue; Private static final Int EXCEPTION_MAP_CAPACITY = 32; private static final Int EXCEPTION_MAP_CAPACITY = 32; private static final Int EXCEPTION_MAP_CAPACITY = 32; static final class ExceptionNode extends WeakReference<ForkJoinTask<? >> { final Throwable ex; ExceptionNode next; final long thrower; // Thread id final int hashCode; // Store hashCode ExceptionNode(ForkJoinTask<? > task, Throwable ex, ExceptionNode Next) {// // After ForkJoinTask is recycled by GC, ExceptionTableRefQueue Super (Task, exceptionTableRefQueue); this.ex = ex; this.next = next; this.thrower = Thread.currentThread().getId(); this.hashCode = System.identityHashCode(task); } // Return null public abstractv getRawResult(); Protected abstract void setRawResult(V value); / / if execution throws an exception record capture anomalies and change the task status to EXCEPTIONAL / / if you execute NORMAL end, set the task status to NORMAL end state / / if the current is NORMAL child tasks, Set the state to SIGNAL and notify other threads that need to join the task. Public final V get(); Public final V get(long timeout, TimeUnit unit) public final V get(long timeout, TimeUnit unit); / / threads blocked non-working until the end of the task or interrupt (this process may be stealing action), return the status of task value private int externalInterruptibleAwaitDone (); False public Boolean Cancel (Boolean mayInterruptIfRunning); false public Boolean cancel(Boolean mayInterruptIfRunning); Public Final Boolean isDone(); Public final Boolean isCancelled(); Final int doExec(); Private int setcompletion (int completion); Public Boolean Cancel (Boolean mayInterruptIfRunning); Public Final ForkJoinTask<V> fork(); public Final ForkJoinTask<V> fork(); public final V join(); private int doJoin(); Public Final V invoke(); private int doInvoke(); // The external thread tries to help execute private int externalAwaitDone(); // The external thread tries to help execute private int externalAwaitDone(); public static void invokeAll(ForkJoinTask<? > t1, ForkJoinTask<? > t2); public static void invokeAll(ForkJoinTask<? >... tasks); public static <T extends ForkJoinTask<? >> Collection<T> invokeAll(Collection<T> tasks); / / record anomaly information, and set the task status final int recordExceptionalCompletion Throwable (ex); private void clearExceptionalCompletion(); private static void expungeStaleExceptions(); private Throwable getThrowableException(); }Copy the code

ForkJoinTask members include status indicating the status of the task, and other members are associated with logging abnormal information about the task.

Fork (), the join ()

ForkJoinTask’s main methods are fork(), join(), and invoke.

Public final ForkJoinTask<V> fork() {Thread t; // If the thread type is ForkJoinWorkerThread, the task is pushed into the workQueue. If ((t = thread.currentThread ()) Instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } // join method public final V join() {int s; CANCELLED: CancellationException() CancellationException(); CancellationException() CancellationException(); Exception ((s = doJoin() &done_mask)! = NORMAL) reportException(s); return getRawResult(); } private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; Return (s = status) <0? S: // If the current thread type is ForkJoinWorkerThread and the task has been removed successfully from the work queue, doExec() is called to execute the task. If the task status is normal, the task status is returned. Otherwise, awaitJoin() waits for the task to complete. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? S: wt.pool.awaitJoin(w, this, 0L) : // If no, externalAwaitDone() is called to wait for externalAwaitDone() to complete; }Copy the code

Fork: Check whether the current thread is a worker thread in the pool: the current task is pushed into the task queue of the current thread; No: Pushes the current task to the task queue of a worker thread in the common pool.

Check whether the status of the task is less than 0:

Less than: The task is complete and the status value is returned

If the current thread is ForkJoinWorkerThread, the type of the current thread is ForkJoinWorkerThread.

If yes, fetch the current task execution from the thread task queue and check whether the execution is complete:

End: Returns the status value at the end of the execution

Unfinished: Call the awaitJoin method to wait for the task to finish

No: Call the externalAwaitDone() method to wait for the task to complete

Invoke the source code

Looking at the invoke source code, ForkJoinPool thread pool execution. There are three ways to submit a task: invoke(), execute() and submit(). The source code is as follows:

public final V invoke() { int s; If ((s = doInvoke() & DONE_MASK)! // Execute the task and return the status, similar to doJoin() if ((s = doInvoke() & DONE_MASK)! = NORMAL) reportException(s); return getRawResult(); } private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; Return (s = doExec()) <0? Return (s = doExec()) <0? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); } public static void invokeAll(ForkJoinTask<? > t1, ForkJoinTask<? > t2) { int s1, s2; // submit task T2 to the thread pool to execute t2.fork(); If ((s1 = t1.doinvoke () & DONE_MASK)! = NORMAL) t1.reportException(s1); If ((s2 = t2.dojoin () &done_mask)! = NORMAL) t2.reportException(s2); } public static void invokeAll(ForkJoinTask<? >... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<? > t = tasks[i]; If (t == null) {if (ex == null) ex = new NullPointerException(); } // Not the last task, then push the thread pool to execute else if (I! = 0) t.fork(); Else if (t.doinvoke () < NORMAL && ex == null) ex = t.geexception (); } for (int I = 1; i <= last; ++i) { ForkJoinTask<? > t = tasks[i]; if (t ! = null) {// If (ex! = null) t.cancel(false); Else if (t.dojoin () < NORMAL) ex = t.geexception (); } } if (ex ! = null) rethrow(ex); }Copy the code

WorkQueue source

The WorkQueue is a ForkJoinPool WorkQueue that contains submitted ForkJoinTask, thread pool, execution ForkJoinWorkerThread, and other task related data. A ForkJoinWorkerThread has a separate WorkQueue for each executor. Forkjointasks are stored in a WorkQueue, and ForkJoinPool uses a WorkQueue array to manage the queue for all executor threads. The source code is as follows:

volatile int scanState; int stackPred; int nsteals; int hint; int config; volatile int qlock; volatile int base; int top; ForkJoinTask<? >[] array; final ForkJoinPool pool; final ForkJoinWorkerThread owner; volatile Thread parker; volatile ForkJoinTask<? > currentJoin; volatile ForkJoinTask<? > currentSteal;Copy the code

ScanState: If the WorkQueue has no owner of its own (even subscript none), the value is inactive, which is a negative number; If it has its own owner, the value starts with its subscript in the WorkQueue[] array, and is definitely odd; If the value is even, the Thread to which the queue belongs is executing a Task.

StackPred: the CTL value at the top of the previous stack, which forms a stack;

Int Nsteals: Statistics on the number of stolen tasks;

Int Hint: records the subscript value of the workQueue of a randomly selected stolen task in an array.

Config: records the subscript and working mode of the current queue in the array. The higher 16 bits record the working mode, and the lower 16 bits record the array subscript. If the WorkQueue subscript is even, its mode is of shared type. If you have your own owner, the default is LIFO;

Qlock: Lock identifier (similar to AQS is state lock identifier), which is used to preempt the lock. A value of 1 indicates that the queue is locked, 0 indicates that it is not locked, and less than 0 indicates that the current queue is deactivated or the thread pool is closed (-1 at terminate)

Base: The offset from the worker’s steal (the index of the next pool operation (bottom of stack/head of queue)).

Top: the offset (index of the next push operation (top of stack/bottom of queue) by the owner to perform the task.

Array: an array that stores tasks. Space is not allocated during initialization but is initialized in lazy loading mode.

Pool: reference to the owning thread pool.

Owner: reference to the thread that the current queue belongs to. Null if the shared queue is for external submitted tasks.

Parker: If owner hangs, use this variable to record the owner’s thread.

CurrentJoin: The task that is currently joining and waiting for results.

CurrentSteal: This variable is used to record the task that is currently executed as the steal task.

Static final class WorkQueue {ForkJoinPool pool, ForkJoinWorkerThread owner) {this.pool = pool; this.owner = owner; Base = top = INITIAL_QUEUE_CAPACITY >>> 1; } final void push(ForkJoinTask<? > task); // Expansion method: When the number of queue elements reaches its capacity, double the number and move the elements to a new array final ForkJoinTask<? >[] growArray(); Final ForkJoinTask<? > pop(); Final ForkJoinTask<? Final ForkJoinTask<? > poll(); }Copy the code

A WorkQueue uses an array to store all tasks that need to be executed. When a thread executes, it retrieves the task from the queue, or if the array is empty, it tries to steal the task from another thread. The ForkJoinPool has a WorkQueue (workQueues), and each WorkQueue has a ForkJoinTask (workQueues).

conclusion

This article has taken a look at ForkJoinPool’s source code. There are more details about externalSubmit and externalPush for Invoke, task execution and thread suspension, the implementation of stealing, and the implementation of splitting and merging tasks.