Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”
FutureTask is a FutureTask that can retrieve the result and be completed using the get method, but the get method blocks until the result is returned. FutureTask is ideal for time-consuming calculations where the main thread can retrieve the result after completing its own task.
01
Inheritance structure
FutureTask implements RunnableFuture, which in turn inherits both Future and Runnable, and Future provides a get method that returns the result, so there’s a question here, Why submit method returns Future instead of FutureTask. This is because you only want to expose the GET method to external callers, not the ability of FutureTask’s Run method.
02
Attribute resolution
Private volatile int state; Private static final int NEW = 0; Private static final int COMPLETING = 1; private static final int COMPLETING = 1; Private static final int NORMAL = 2; Exception () private static final int EXCEPTIONAL = 2; exception () {exception () {exception (); Private static final int CANCELLED = 4; // Private static final int interrupt = 5; Private static final int INTERRUPTED = 6; // submit (runnable/callable) runnable private <V> callable; In normal cases, the task ends normally, the outcome is saved, and the Callable returns a value. In abnormal cases, the Callable throws an exception, and the outcome saves an exception. // non-volatile, protected by state reads/ index // Private volatile Thread runner; // As there are many threads to get the current thread structure // We use a data structure called stack header that runs on a queue private Volatile WaitNode waiters;Copy the code
The entire running process of FutureTask is controlled by the current running status of the task.
03
Run method
When submitting a task through submit, the execute method is finally executed. We know that execute() finally calls the run() method of task, and the above task is wrapped as FutureTask. That is, the execute() method ends up calling the Run () method of FutureTask.
Public void run() {// Cas failed. The current task was preempted by another thread. = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // The current task must be in a NEW state, and the current thread preempted the task successfully. Try {// The current task must be in a NEW state and the current thread preempted the task successfully. // c! // state==NEW Prevents the external thread from cancelling the current task if (c! = null && state == NEW) {// result reference V result; // True the callable block succeeds. // false the callable block fails. Try {// Call the programmer's own implementation of callable or decorated runnable result = c.call(); // The call method does not throw any exceptions, ran is set to true ran = true; } throw (Throwable ex) {// throw (Throwable ex) {result = null; ran = false; Outcome setException(ex); If (ran) // Set is to set the outcome set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
-
First, check the running status of the current task. If the current task is not initialized or fails to be modified to run the task by the current thread, return the current task directly
-
If the current task is initialized and callable is not empty, the call method of Callable is called to run the current task
-
Sets the returned result value to outcome and wakes up all waiting threads in the waiterNode
-
Update the current task status
04
Set method
Protected void set(V V) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; PutOrderedInt (this, stateOffset, NORMAL); unsafe.putorderedint (this, stateOffset, NORMAL); unsafe.putorderedint (this, stateOffset, NORMAL); // Wake up all WaiterNode waiting threads finishCompletion(); }}Copy the code
The set method does three main things
-
Updates the current task execution status in real time
-
Assign the result value to outcome
-
Calling the finishCompletion method wakes up all threads that blocked while calling the GET method
05
FinishCompletion method
Private void finishCompletion() {// Loop through waitNode for (waitNode q; (q = waiters) ! = null;) {/ / using cas set waiters is null, because of the external thread USES the cancel to cancel the current task / / will trigger the if (UNSAFE.com pareAndSwapObject finishCompletion method (this, waitersOffset, q, null)) { // for (;;) { Thread t = q.thread; if (t ! = null) { q.thread = null; // Wake up the thread corresponding to the current node locksupport.unpark (t); } // next next node of the current node WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }Copy the code
The finishCompletion method is also simple, as it loops through each node of waitNode, waking each node up.
06
The get method
Public V get() throws InterruptedException, ExecutionException {// Obtain the current task status int s = state; If (s <= COMPLETING) // Join WaitNode, queuing, s = awaitDone(false, 0L); Return report(s); }Copy the code
The current task status is judged. If the task is not finished, the awaitDone method is called to make the current thread join the blocking queue for waiting. Finally, the report method is used to return the execution result.
07
AwaitDone method
private int awaitDone(boolean timed, Long nanos) throws InterruptedException {// Assuming 0 with no timeout final long deadline = timed? System.nanoTime() + nanos : 0L; // Reference the current thread wrapped as WaitNode object WaitNode q = null; boolean queued = false; for (;;) If (thread.interrupted ()) {return true and reset the Thread's interrupt flag back to false if (thread.interrupted ()) removeWaiter(q); // The get method throws new InterruptedException(); } // If the current thread is awakened by another thread using unpark, it will spin normally, // go to the following logic // get the latest state of the current task; If (s > COMPLETING) {// Do not equal empty, a node has been created for the current thread. thread=null helpgc if (q ! = null) q.thread = null; Return s; } else if (S == COMPLETING) // Cannot time out yet // Let the current Thread release CPU, the next CPU preempting thread.yield (); If (q == null) q = new WaitNode(); if (q == null) q = new WaitNode(); // For the second spin, the current thread has created the WaitNode object, 2. // But the Node object is not in the queue. Queued) // Next points to the head of the original queue, Waiters always point to the head of the queue / / cas mode set waiters reference point to thread node queued = UNSAFE.com pareAndSwapObject (this, waitersOffset, q.n ext = waiters, q); // Timed else if (timed) {nanos = deadline-system.nanotime (); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); Else locksupport. park(this);} // The current thread will be park, and the thread will be in a waiting state. }}Copy the code
Here we assume that the task has not yet been executed when we call get(), i.e. its state is NEW, and we try to follow the logic indicated above: 1, 2, 3, 4
-
The first loop, with a state of NEW, goes directly to 1, initializes the queue and wraps the caller thread in WaitNode;
-
The second loop, with the state NEW and the queue not empty, goes to 2 and enlists the WaitNode that contains the caller’s thread.
-
The third loop, the state is NEW, the queue is not empty, it is queued, and at three, the caller thread is blocked;
-
Assuming that after a while the task has finished, the run() method will finally unpark the caller thread, i.e., 3, and wake it up;
-
The fourth cycle, must be better than those COMPLETING it, exiting the cycle and returning;
conclusion
-
Future tasks are implemented by wrapping ordinary tasks as FutureTasks.
-
FutureTask can not only obtain the result of task execution, but also detect the exception of task execution, and even cancel the task.
-
AbstractExecutorService defines many template methods. This is an important design pattern.
-
FutureTask is a typical implementation of an exception call.