Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
3. Task execution
Returning to our original flowchart, after creating a worker thread in forkJoinPool.createWorker (), the worker thread is started, The ForkJoinWorkerThread executes the run() method after the CPU is allocated to the worker thread.
3.1 ForkJoinWorkerThread. The run ()
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); // Pool. RunWorker (workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) {if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); // Handle exceptions}}}}Copy the code
Forkjoinpool.runworker () is executed by calling the custom hook functions onStart and onTermination before and after the worker thread runs. Or if all the task has been completed during an anomaly, by ForkJoinPool. DeregisterWorker closed working thread and deal with abnormal information (deregisterWorker way behind us in detail).
3.2 ForkJoinPool. RunWorker (WorkQueue w)
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<? > t; ;) { if ((t = scan(w, r)) ! = null)// Scan task execution w.runtask (t); else if (! awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }Copy the code
Description: runWorker is the main running method of a ForkJoinWorkerThread that executes tasks in sequence in the current worker thread. The flow of the function is simple: call the scan method to get the tasks in turn, and then call workqueue.runTask to run the tasks; If no task is scanned, awaitWork is called to wait until the worker thread/thread pool terminates or the wait times out.
Forkjoinpool. scan(WorkQueue w, int r)
private ForkJoinTask<? > scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) ! = null && (m = ws.length - 1) > 0 && w ! = null) { int ss = w.scanState; // initially non-negative // Initial scan starting point, spin scan for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ;) { WorkQueue q; ForkJoinTask<? >[] a; ForkJoinTask<? > t; int b, n; long c; if ((q = ws[k]) ! = null) {/ / get workQueue if ((n = = q.b ase (b) - q.t op) < 0 && (a = q.a rray)! Long I = (((a.length-1) &b) << ASHIFT) + ABASE; if ((t = ((ForkJoinTask<? >) U.getObjectVolatile(a, i))) ! //stable if (ss >= 0) {// scanning if (U.compareAndSwapObject(a, I, t, null)) {// q.base = b + 1; If (n < -1) // Signal others signalWork(ws, q); // Create or wake up a worker thread to run the task return t; } else if (oldSum == 0 && // try to activate the tryRelease(c = CTL, ws[m & (int) c], AC_UNIT); If (ss < 0) // refresh ss = w.scannState; // Refresh ss = w.scannState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; // Queue tasks are empty, If ((k = (k + 1) &m) == origin) {// Continue until stable But did not scan to task if ((ss > = 0 | | (ss = = (ss = baron canState))) && oldSum = = (oldSum = checkSum)) {if (ss < 0 | | w.q lock < 0) / / already inactive break; / / has been inactivated or stop, jump out of the loop / / inactivated operation on current WorkQueue int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); W.tackpred = (int) c; // Calculate CTL INACTIVE and reduce the number of active threads. // hold prev stack top U.putInt(w, QSCANSTATE, ns); If (U.compareAndSwapLong(this, CTL, c, nc))// Update scanState to inactive ss = ns; else w.scanState = ss; // back out } checkSum = 0; // reset checkSum, continue loop}}} return null; }Copy the code
Description: Scan and try to steal a task. The WorkQueue is indexed randomly with w.int, that is, tasks in the current WorkQueue are not necessarily executed, but are stolen from other workers to be executed.
The general execution flow of the function is as follows:
-
Take a WorkQueue at a random location;
-
ForkJoinTask retrieves the base bit, updates the base bit and returns to the task. If the number of tasks in the WorkQueue is greater than 1, call signalWork to create or wake up other worker threads.
-
If the current worker thread is INACTIVE (INACTIVE), tryRelease is called in an attempt to wake up the top stack worker thread to execute. TryRelease:
private boolean tryRelease(long c, WorkQueue v, long inc) { int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; // CTL can wake up the Parker thread if (v! // count the number of active threads (high 32 bits) and update to scanState (low 32 bits) at the top of the next stack long nc = (UC_MASK & (c) + inc)) | (SP_MASK & v.stackPred); if (U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; if ((p = v.parker) ! = null) U.unpark(p); Return true; } } return false; }
-
If the base bit task is empty or offset, the index bit is randomly shifted and then rescan;
-
If the entire workQueues have been scanned and no task has been captured, set the current worker thread to INACTIVE. It then resets the checkSum and returns null if there are no more tasks after another scan.
ForkJoinPool. AwaitWork (WorkQueue w, int r)
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss; ;) {if ((ss = w.seccanState) >= 0) else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred ! = 0 && (ws = workQueues) ! = null && (j = pred & SMASK) < ws.length && (v = ws[j]) ! = null && // see if pred parking (v.parker == null || v.scanState >= 0)) spins = SPINS; // continue spins}} else if (w.lock < 0) // workQueue terminates, returns false recheck after spins return false; else if (! Thread.interrupted()) {long c, prevctl, parkTime, deadline; int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); / / active threads if ((ac < = 0 && tryTerminate (false, false)) | | / / no active threads, try to terminate (runState & STOP)! = 0) // pool terminating return false; If (ac <= 0 && ss == (int) c) {// is last waiter // Count the number of active threads (high 32 bits) and update to scanState (low 32 bits) at the top of the next stack prevCTL = (UC_MASK & (c +) AC_UNIT)) | (SP_MASK & pred); int t = (short) (c >>> TC_SHIFT); // shrink excess spares if (t > 2 &&u.com pareAndSwapLong(this, CTL, c, prevctl)) return false; // else use timed wait parkTime = IDLE_TIMEOUT * ((t >= 0)? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; // set parker to block if (w.seccanState < 0 &&ctl == c) // recheck before park u.park (false, parkTime); // block the specified time u.puOrderedobject (w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); If (w.scannState >= 0)// If (w.scannState >= 0) if (parkTime ! = 0l&&ctl == c&&deadline-system.nanotime () <= 0l&&u.com pareAndSwapLong(this, CTL, c, prevctl)) Return false return false; // shrink pool } } return true; }Copy the code
Note: Back to the runWorker method, if the scan method does not detect a task, awaitWork is called to wait for the task. Return false if a worker thread or thread pool has terminated while waiting for a task. If there are no active threads, try to terminate the thread pool and return false. If the termination fails and the current Worker is the last to wait, block for the specified time (IDLE_TIMEOUT). If you find that you are in scanning (scanState >= 0) state when waiting for the session or waking up, it indicates that the task is waiting. If you skip waiting and return true to continue scan, otherwise, update the CTL and return false.
3.5 WorkQueue. RunTask ()
final void runTask(ForkJoinTask<? > task) { if (task ! = null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); // Update currentSteal and execute the task u.PuOrderEdobject (this, QCURRENTSTEAL, NULL); // release for GC execLocalTasks(); // Execute local tasks ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); / / stealing number of jobs increased scanState | = SCANNING; if (thread ! = null) thread.afterTopLevelExec(); // Execute hook function}}Copy the code
Workqueue.runtask () is called to execute the obtained task after the scan method has detected the task. The process is as follows:
-
Flag scanState as the executing state;
-
Update currentSteal to the currently obtained task and execute it using forkJoinTask.doexec ().
//ForkJoinTask.doExec() final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) {return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
-
Call execLocalTasks to execute the tasks in the current WorkerQueue.
Final void execLocalTasks() {int b = base, m, s; ForkJoinTask[] a = array; if (b – (s = top – 1) <= 0 && a ! = null && (m = a. length-1) >= 0) {if (config & FIFO_QUEUE) == 0) {// For (ForkJoinTask t; 😉 { if ((t = (ForkJoinTask<? >) u.port (a, ((m & s) << ASHIFT) + ABASE, null)) == null) U.putOrderedInt(this, QTOP, s); t.doExec(); // Execute if (base – (s = top-1) > 0) break; } } else pollAndExecAll(); // Execute in LIFO mode, take base task}}
-
Update the number of stolen tasks;
-
Restore scanState and execute the hook function.
3.6 ForkJoinPool. DeregisterWorker (ForkJoinWorkerThread wt, Throwable ex)
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; //1. Remove workQueue if (wt! = null && (w = wt.workQueue) ! [] ws; = null) {ForkJoinWorkerThread [] ws; // remove index from array int idx = w.config & SMASK; Int rs = lockRunState(); If ((ws = workQueues)! = null && ws.length > idx && ws[idx] == w) ws[idx] = null; // Remove workQueue unlockRunState(rs, rs & ~RSLOCK); } //2. Reduce CTL count long c; // decrement counts do {} while (! U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); //3. Process internal parameters of the removed workQueue if (w! = null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } //4. If the thread is not terminated, replace the removed workQueue and wake up the internal thread for (;;). { // possibly replace WorkQueue[] ws; int m, sp; / / try to terminate the thread pool if (tryTerminate (false, false) | | w = = null | | w. rray = = null | | (runState & STOP)! = 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; // Wake up the replaced thread depending on the next step if ((sp = (int)(c = CTL))! = 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } // Create a worker thread to replace else if (ex! = null && (c & ADD_WORKER) ! = 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } / / 5. Handle exceptions if (ex = = null) / / help clean on way out ForkJoinTask. HelpExpungeStaleExceptions (); else // rethrow ForkJoinTask.rethrow(ex); }Copy the code
Note: The deregisterWorker method is used to terminate a worker thread after it has finished running or to handle a worker exception. This method is used to remove a closed worker thread or to roll back operations that occurred before the thread was created, and to throw an incoming exception to ForkJoinTask. See the source code comments for details.
3.7 summary
In this section we describe the process of executing a task, and we will continue with the result fetch (Join/Invoke) of the task.
Join ()/ForkJoinTask.invoke()
The join () :
Public final V join() {int s; if ((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s); return getRawResult(); } //join, get, quietlyJoin private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }Copy the code
Invoke () :
Public Final V invoke() {int s; if ((s = doInvoke() & DONE_MASK) ! = NORMAL) reportException(s); return getRawResult(); } //invoke, quietlyInvoke () private int doInvoke() {int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }Copy the code
Note: The join() method is called after the task fork() and is used to retrieve (or “merge”) the results of the task.
ForkJoinTask’s Join () and Invoke () methods can be used to retrieve the result of a task (as well as the get method that calls doJoin to retrieve the result of a task, but responds to a runtime exception). The externalAwaitDone method is used to wait for execution results. The difference is that the invoke() method executes the current task directly; Join () is executed when the task is at the top of the queue (as determined by tryUnpush). If the task is not at the top or fails, forkJoinPool. awaitJoin is called to help execute or block the join. (The official documentation suggests the order in which ForkJoinTask is called. A pair of fork-join operations are called in the following order: a. fronk (); b.fork(); b.join(); a.join(); . Since task B is queued later, that is, at the top of the stack, a call to join() after it forks () can be executed without calling forkJoinPool.aWaitJoin.
Among these methods, join() is relatively comprehensive, so we will start from join() and go down step by step. First, we will look at the execution process of join() :
Join Execution process
In the rest of the source code analysis, we will start with the simpler external join task (externalAwaitDone) and then the internal join task (starting with ForkJoinPool.awaitJoin()).
4.1 ForkJoinTask. ExternalAwaitDone ()
Private int externalAwaitDone() {// Execute the task int s = ((this instanceof CountedCompleter)? / / try helping ForkJoinPool.com mon. ExternalHelpComplete (/ / CountedCompleter task (CountedCompleter <? >) this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); // ForkJoinTask if (s >= 0 && (s = status) >= 0) {Boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, S | SIGNAL)) {/ / update the state synchronized (this) {if (status > = 0) {/ / SIGNAL wait SIGNAL try {wait (0, l); } catch (InterruptedException ie) { interrupted = true; } } else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s; }Copy the code
Note: If the current JOIN is an external call, this method is called to execute the task. If the task fails, wait. The method itself is quite simple, but it should be noted that there are two cases for different task types:
-
If our task is of type CountedCompleter, the externalHelpComplete method is called to execute the task. (THE author will open a new chapter on CountedCompleter later, so I won’t introduce it in detail here. If you are interested, you can have a look first.)
-
Other types of ForkJoinTask tasks are executed by calling tryExternalUnpush.
// Provide tryUnpush for external submitters. Final Boolean tryExternalUnpush(ForkJoinTask task) {WorkQueue[] ws; WorkQueue w; ForkJoinTask[] a; int m, s; int r = ThreadLocalRandom.getProbe(); if ((ws = workQueues) ! = null && (m = ws.length – 1) >= 0 && (w = ws[m & r & SQMASK]) ! = null && (a = w.array) ! = null && (s = w.top) ! = w.base) { long j = (((a.length – 1) & (s – 1)) << ASHIFT) + ABASE; If (U.compareAndSwapInt(w, QLOCK, 0, 1)) {w.tab == s &&w.ray == a &&u.getobject (a, J) == task &&u.com pareAndSwapObject(a, j, task, null)) { // Update top u.puOrderedInt (w, QLOCK, 0); // Unlock, return true return true; } U.compareAndSwapInt(w, QLOCK, 1, 0); }} return false; }
TryExternalUnpush determines whether the current task is in the top position, if it is, ejects the task, and then calls doExec() in externalAwaitDone to execute the task.
4.2 ForkJoinPool. AwaitJoin ()
final int awaitJoin(WorkQueue w, ForkJoinTask<? > task, long deadline) { int s = 0; if (task ! = null && w ! = null) { ForkJoinTask<? > prevJoin = w.currentJoin; // Get the join task of the given Worker u.puOrderEdobject (w, QCURRENTJOIN, task); CountedCompleter<? CountedCompleter<? CountedCompleter<? > cc = (task instanceof CountedCompleter) ? (CountedCompleter<? >) task : null; for (; ;) {if ((s = task status) < 0) / / finished cancel the | | abnormal jump out of the loop break; if (cc ! Join helpComplete(w, cc, 0); Else if (w.b ase = = w.t op | | w.t ryRemoveAndExec (task)) / / try to perform helpStealer (w, task); / / the queue is empty or failure, the task may be stolen, help steal perform this task if ((s = task. The status) < 0) / / finished cancel the | | abnormalities, jump out of the loop break; Long ms, ns; if (deadline == 0L) ms = 0L; else if ((ns = deadline - System.nanoTime()) <= 0L) break; else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L; If (tryCompensate(w)) {task.internalWait(ms); U.goetandaddlong (this, CTL, AC_UNIT); // Update active thread count}} u.putOrderedobject (w, QCURRENTJOIN, prevJoin); } return s; }Copy the code
Note: If the current JOIN task is not in the top position of the Worker waiting queue, or the task fails to be executed, call this method to help execute or block the current join task. The function execution flow is as follows:
- Because every time I call
awaitJoin
The task of the current join takes precedence, so updates are made firstcurrentJoin
Is the current join task; - Enter spin:
-
First check to see if the task has been completed (through the task. The status < 0), cancel the | | if a given task has been completed abnormal return to execution state s jump out of the loop;
-
If the task type is CountedCompleter, call helpComplete to complete the join.
-
The CountedCompleter WorkQueue task type call. TryRemoveAndExec attempt to perform a task;
-
HelpStealer is called to help the stealer perform the task (that is, the stealer performs the task for me, and I will perform its task for the stealer) if the wait queue for a given WorkQueue is empty or the task fails.
-
Again, check whether the task is completed (task.status < 0). If the task fails, calculate a waiting time to prepare for compensation operation.
-
Call the tryCompensate method to attempt a compensation operation for the given WorkQueue. During the period of executive compensation, if it is found that resource contention in the unstable state | | pool current Worker terminated, call the ForkJoinTask. InternalWait () method is waiting for the specified time, task continues to spin after wake up, ForkJoinTask. InternalWait () the source code is as follows:
final void internalWait(long timeout) { int s; if ((s = status) >= 0 && // force completer to issue notify U.compareAndSwapInt(this, STATUS, s, S | SIGNAL)) {/ / update the task status to SIGNAL (waiting for wake up) synchronized (this) {if (status > = 0) try {wait (timeout); } catch (InterruptedException ie) { } else notifyAll(); }}}
inawaitJoin
In, we call three complex methods altogether:TryRemoveAndExec, helpStealer and tryCompensate
Let’s explain them one by one.
2 WorkQueue. TryRemoveAndExec (ForkJoinTask task)
final boolean tryRemoveAndExec(ForkJoinTask<? > task) { ForkJoinTask<? >[] a; int m, s, b, n; if ((a = array) ! = null && (m = a.length - 1) >= 0 && task ! = null) {while ((n = (s = top) - (b = base)) > 0) {ForkJoinTask<? > t; ;) { // traverse from s to b long j = ((--s & m) << ASHIFT) + ABASE; If ((t = (ForkJoinTask<? >) u.tobject (a, j)) == null) // shorter than expected else if (t == task) {Boolean removed = false; If (s + 1 == top) {// pop if (U.compareAndSwapObject(a, j, task, null)) {// Pop task u.puOrderedInt (this, QTOP, s); Removed = true; } } else if (base == b) // replace with proxy removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); (removed) task.doexec (); // The join task has been removed and replaced with a placeholder task if (removed) task.doexec (); / / execution break; } else if (t.status < 0 &&s + 1 == top) {// If (U.compareAndSwapObject(a, j, t, Null)) // Popup task u.puOrderedInt (this, QTOP, s); // Update top break; // was cancelled} if (--n == 0) // Return false; } if (task.status < 0) return false; } } return true; }Copy the code
Note: Start from top and spin down to find the given task. If found, remove it from the current Worker’s task queue and execute it. Note the arguments returned: True if the task queue is empty or the task has not completed; False is returned when the task is complete.
HelpStealer (WorkQueue W, ForkJoinTask Task)
private void helpStealer(WorkQueue w, ForkJoinTask<? > task) { WorkQueue[] ws = workQueues; int oldSum = 0, checkSum, m; if (ws ! = null && (m = ws.length - 1) >= 0 && w ! = null && task ! = null) { do { // restart point checkSum = 0; // for stability check ForkJoinTask<? > subtask; WorkQueue j = w, v; // v is subtask stealer descent: for (subtask = task; subtask.status >= 0; ) {/ / 1. Find a given WorkQueue for stealing of v (int h = j.h int | 1, k = 0, I; ; // Can't find Stealer break descent; // Can't find stealer break descent; if ((v = ws[i = (h + k) & m]) ! = null) {if (v.currentSteal == subtask) {// Locate the thief j.int = I; // Update stealer index break; } checkSum += v.base; }} //2. Help thief v to perform tasks for (; ;) { // help v or descend ForkJoinTask<? >[] a; // The task inside the stealer int b; checkSum += (b = v.base); ForkJoinTask<? > next = v.currentJoin; / / get the join of the steal tasks if (subtask. Status < 0 | | j.carol carroll urrentJoin! = subtask || v.currentSteal ! = subtask) // stale break descent; / / stale, jump out of the descent circulation again if (b - v.t op > = 0 | | (a = says v. rray) = = null) {if ((subtask = next) = = null) / / steal the join task is null, Break descent; j = v; break; // The internal task of the stealer is empty, maybe the task is also stolen; } int I = (((a. length-1) &b) << ASHIFT) + ABASE; ForkJoinTask<? > t = ((ForkJoinTask<? >) U.getObjectVolatile(a, i)); If (v.case == b) {if (t == null) // stale break descent; If (U.compareAndSwapObject(a, I, t, null)) {// start task v.case = b + 1; // Update the stealer's base ForkJoinTask<? > ps = w.currentSteal; // Get the task that the caller stole int top = w.tab; // Update the currentSteal of the given workQueue to the base task of the stealer, and then execute that task. // Then pop task (LIFO)-> Update currentSteal-> Execute the task do {u.putOrderEDobject (w, QCURRENTSTEAL, t); t.doExec(); // clear local tasks too } while (task.status >= 0 && w.top ! = top && // internal tasks, one by one pop-up execution (t = w.pop())! = null); U.putOrderedObject(w, QCURRENTSTEAL, ps); // Restore currentSteal if (w.case! = w.tab)// If the workQueue has its own task, help ends, return; // can't further help } } } } } while (task.status >= 0 && oldSum ! = (oldSum = checkSum)); }}Copy the code
Note: If the queue is empty or the task fails, it indicates that the task may be stolen. Call this method to help the thief execute the task. The basic idea is: The stealer helps me perform my task, and I help the stealer perform its task. The function execution flow is as follows:
- The thief is positioned in a loop. Since the Worker is in odd index bits, two index bits will be jumped each time. After the thief is located, update the values of the caller WorkQueue
hint
For stealing index, convenient next positioning; - After locating the stealer, start to help the stealer perform the task. From the thief
base
Index starts, stealing one task at a time to execute. After helping the stealer perform a task, if the caller finds that he already has a task (w.top ! = top
), then pop up their own tasks (LIFO order) and execute them (i.e. steal their own task execution).
Holdings ForkJoinPool. TryCompensate (WorkQueue w)
Private Boolean tryCompensate(WorkQueue w) {Boolean canBlock; WorkQueue[] ws; long c; int m, pc, sp; if (w == null || w.qlock < 0 || // caller terminating (ws = workQueues) == null || (m = ws.length - 1) <= 0 || (pc = config & SMASK) == 0) // parallelism disabled canBlock = false; Else if ((sp = (int) (c = CTL))! = 0) // release idle worker canBlock = tryRelease(c, ws[sp & m], 0L); // Wake up the waiting worker else {// No idle thread int ac = (int) (c >> AC_SHIFT) + PC; Int tc = (short) (c >> TC_SHIFT) + PC; Int nbusy = 0; // validate saturation for (int i = 0; i <= m; ++i) { // two passes of odd indices WorkQueue v; if ((v = ws[((i << 1) | 1) & m]) ! = null) {// select odd index bit if ((v.scannstate & SCANNING)! = 0)// No task is running, break; ++nbusy; }} if (nbusy! = (tc << 1) || ctl ! = c) canBlock = false; If (tc >= PC &&ac > 1 &&w.i sEmpty()) {if (tc >= PC &&ac > 1 &&w.i sEmpty()) { Do not need to compensate for long nc = ((AC_MASK & (c - AC_UNIT)) | (~ AC_MASK & c)); // uncompensated canBlock = U.compareAndSwapLong(this, CTL, c, nc); / / update the active threads} else if (tc > = MAX_CAP | | (this = = common && tc > = PC + commonMaxSpares)) / / exceed the maximum number of threads throw new RejectedExecutionException( "Thread limit exceeded replacing blocked worker"); else { // similar to tryAddWorker boolean add = false; int rs; // CAS within lock long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); If ((rs = lockRunState() &stop) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); UnlockRunState (rs, rs & ~RSLOCK); CanBlock = add && createWorker(); canBlock = add && createWorker(); // throws on exception } } return canBlock; }Copy the code
Note: the specific implementation of the source code and notes, here we simply summarize the need and do not need to compensate for several cases:
- Need compensation:
- The caller queue is not empty and there are idle worker threads, which wake up the idle thread (call
tryRelease
Methods) - The pool has not stopped and there are not enough active threads, a new worker thread will be created (call
createWorker
Methods)
- The caller queue is not empty and there are idle worker threads, which wake up the idle thread (call
- No compensation required:
- The caller has terminated or the pool is in an unstable state
- The total number of threads is greater than the degree of parallelism && the number of active threads is greater than 1 && the caller task queue is empty
conclusion
ForkJoinPool’s internal code implementation is very complex. This article has been written for nearly two months. If you want to challenge yourself, take a closer look at the whole article. Generally speaking, we just need to understand its inner mind. Key points of this chapter:
- Running mechanism of Fork/Join tasks
- ForkJoinPool work-stealing
** a little not very understand, first forward **
From:
JUC source code Analysis – Thread Pool (5) : ForkJoinPool 2