Today is 520, but as a sweet-smelling single, I wrote this article about FutureTask’s source code analysis in the dead of night. Code word is not easy, pay attention to a wave of line, in a hurry, if there is a wrong place to write welcome in the comment area, I wish everyone happy learning.

Introduction to the

FutureTask is a procedure class that provides a cancelable asynchronous computation. FutureTask implements the basic methods of Future, providing start and cancel operations to check whether the computation has completed and to retrieve the result of the computation. The result can only be retrieved after the calculation is complete. The GET method blocks if the calculation is not complete. Once the calculation is complete, the calculation cannot be started again or cancelled. A FutureTask can be used to wrap a Callable or a Runnable object. Because FurtureTask implements the Runnable method, a FutureTask can submit to an Excutor execution.

FutureTask can be used in scenarios where an execution result is retrieved asynchronously or a task is cancelled. By passing a Runnable or Callable task to FutureTask, calling its RUN method or putting it into a thread pool for execution, FutureTask can then get the result of the execution asynchronously externally via FutureTask’s GET method. Therefore, FutureTask is very suitable for time-consuming calculations. The main thread can retrieve the result after completing its task. In addition, FutureTask can ensure that even if the run method is called multiple times, it will only execute a Runnable or Callable task once, or cancel the execution of FutureTask with cancel, etc.

FutureTask internally maintains a simple linked list, implemented by the WaitNode class, when fetching the result of a task execution, which holds all threads waiting to return data and suspends them until the result is returned, waking them up only when the task completes or the wait times out.

State of 1.

FutureTask internally maintains a volatile int state that represents the state. There are seven states, as shown in the source code below:

/ / state
private volatile int state;
// Create a state
private static final int NEW          = 0;
// In the execution state
private static final int COMPLETING   = 1;
// Complete state
private static final int NORMAL       = 2;
// Abnormal state
private static final int EXCEPTIONAL  = 3;
// Cancel the state
private static final int CANCELLED    = 4;
// In the interrupt state
private static final int INTERRUPTING = 5;
// Interrupt status
private static final int INTERRUPTED  = 6;
Copy the code

Possible state transitions:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

2. Source code analysis

2.1 Construction method

FutureTask provides two constructors that wrap Callable and Runable objects, respectively, and initialize the state to NEW. Since Runable does not return, we wrap Runable in the RunnableAdapter class. The RunnableAdapter call method returns the result passed in by the constructor as follows:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    // Initialize the state to NEW
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    // Wrap Runable as the RunnableAdapter class
    this.callable = Executors.callable(runnable, result);
    // Initialize the state to NEW
    this.state = NEW;
}
Copy the code

2.2 Inner class WaitNode

FutureTask uses the internal WaitNode class to build a simple singly linked list of threads waiting for the results of the work, and places this instance on the waiters member variables, as defined below:

// The thread waiting for the result
private volatile WaitNode waiters;

static final class WaitNode {
    // Waiting thread
    volatile Thread thread;
    // Next node
    volatile WaitNode next;
    Constructor that assigns the current thread to threadWaitNode() { thread = Thread.currentThread(); }}Copy the code

2.3 the run

The run method is a key method for thread to execute tasks. The invocation of specific task execution logic, return value acquisition and exception capture are all carried out in this method, and the specific process is as follows:

  1. If the current state is not new, return.
  2. Assign the current thread to the runner member variable via CAS so that the thread can be interrupted when the cancel method is called, which ensures that the task will be executed only once;
  3. The call method of the Callable object is then called to perform the task, which is checked again to see if the state is new.
  4. Obtain the return value of the call method and assign the value to the outcome member variable through the set method, where there will be state changes of NEW -> COMPLETING -> NORMAL;
  5. Exception: NEW -> EXCEPTIONAL (实 现); EXCEPTIONAL (实 现); EXCEPTIONAL (实 现)
  6. Here has been performed, and finally the runner set to null, if the status is interrupted or interrupt status, call handlePossibleCancellationInterrupt method thread of execution concessions, Ensure that any interrupts from Cancel (True) are only passed to the task at run or runAndReset.

The specific source code is as follows:

/** * Task execution method */
public void run(a) {
    // 1. Check whether the status is created
    // 2. Assign the current thread to the runner member variable via CAS to ensure that the task will be executed only once
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
        // If not, return directly
        return;
    try {
        // Callable object corresponding to the current task
        Callable<V> c = callable;
        // double check state
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
                // Execute the task
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // Catch an exception
                result = null;
                ran = false;
                // Mark an exception
                setException(ex);
            }
            if (ran)
                // Assign the result to the outcome member variableset(result); }}finally {
        // Runner must be non-empty until the task is completed
        // To ensure that the task is executed only once
        runner = null;
        // The state must be re-read after runner is empty to prevent interrupts from being missed
        int s = state;
        if (s >= INTERRUPTING)
            // If interrupts or interrupts
            / / call handlePossibleCancellationInterrupt executing thread concessions
            // Ensure that any interrupts from Cancel (true) are only passed to the task at run or runAndResethandlePossibleCancellationInterrupt(s); }}Copy the code

2.3.1 the set

NEW -> COMPLETING -> NORMAL; COMPLETING -> COMPLETING -> NORMAL

/** * Task completion result assignment and successful status conversion */
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // Assign the result to the outcome
        outcome = v;
        // Mark the status as success
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        // Wake up and remove all threads waiting for resultsfinishCompletion(); }}/** ** Wake up and remove all threads waiting for results */
private void finishCompletion(a) {
    // assert state > COMPLETING;
    for(WaitNode q; (q = waiters) ! =null;) {
        // Run the waiters variable to null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // loop over the single linked list
            for (;;) {
                Thread t = q.thread;
                if(t ! =null) {
                    q.thread = null;
                    // Wake up the waiting thread
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
Copy the code

2.3.2 setException

Exception: NEW -> EXCEPTIONAL (实 现) (实 现) {exception: NEW -> EXCEPTIONAL (实 现); exception: NEW -> EXCEPTIONAL (实 现)

/** * indicates an exception */ 
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // Assign exception information to outcome
        outcome = t;
        // Mark the status as abnormal
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
        // Wake up and remove all threads waiting for resultsfinishCompletion(); }}Copy the code

2.3.2 handlePossibleCancellationInterrupt

HandlePossibleCancellationInterrupt method performs thread concessions, to ensure that from the cancel any interrupt (true) only when running or runAndReset just passed to the task, the source code is as follows:

/** * Ensure that any interrupts from Cancel (true) are only passed to the task at run or runAndReset */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            // Wait for interrupts to be handled
            Thread.yield();
}
Copy the code

2.4 the get

The get method is used to get the result of a task execution. There are two overloaded methods, one with timeout and one with no timeout. Both methods suspend the thread until the task is complete. The get method with a timeout throws a TimeoutException and the get method without a timeout remains suspended. Both methods respond with InterruptedException. The thread will call the report method to retrieve and return the result of the task execution.

/** * no timeout */
public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // The task has not been completed
        // Suspend the current thread with no timeout
        s = awaitDone(false.0L);
    // Get and return the task execution result
    return report(s);
}

/** * with timeout */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        // The task is not completed and suspends timeout, throwing a TimeoutException
        throw new TimeoutException();
    // Get and return the task execution result
    return report(s);
}
Copy the code

Against 2.4.1 awaitDone

Wait for the task to complete, or interrupt the task or wait for timeout to terminate, source code as follows:

/** * wait and timeout */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // Wait for the result node
    WaitNode q = null;
    // Indicate whether the currently waiting node has been added to the wait table
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            // Thread interruption removes the current waiting node
            // And throws InterruptedException
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            COMPLETING the task has been completed or cancelled, abnormal or interrupted
            if(q ! =null)
                // Set thread to null
                // Wait until the node can be reclaimed
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // The task is being executed
            // The thread gives in until the task completes
            Thread.yield();
        else if (q == null)
            // Create the current waiting node
            q = new WaitNode();
        else if(! queued)// Add the current waiting node to the waiters head of the wait table
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // The wait has timed out. Remove the node
                removeWaiter(q);
                return state;
            }
            // The thread is suspended
            LockSupport.parkNanos(this, nanos);
        }
        else
            // The thread is suspended
            LockSupport.park(this); }}/** * Remove the timeout or interrupted wait node */
private void removeWaiter(WaitNode node) {
    if(node ! =null) {
        node.thread = null;
        retry:
        for (;;) {
            // Start traversal again
            for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
                s = q.next;
                if(q.thread ! =null)
                    // Use the node whose thread is not null as the precursor node
                    pred = q;
                else if(pred ! =null) {
                    // The pioneer node is not null
                    // Remove the node whose thread is null
                    pred.next = s;
                    if (pred.thread == null)
                        // Thread of the precursor node is null
                        // iterate again
                        continue retry;
                }
                else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                        q, s))
                    // Thread is null and pred is null
                    // Modify the header of the next node, S, through CAS
                    // Then iterate again
                    continue retry;
            }
            break; }}}Copy the code

2.4.2 report

Get and return task execution results, source code as follows:

/** * Obtain and return the execution result */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        // If the status is NORMAL, the task is complete
        // Returns a normal result
        return (V)x;
    if (s >= CANCELLED)
        // Status >= CANCELLED Indicates that the task is CANCELLED or interrupted
        / / throw CancellationException
        throw new CancellationException();
    // Otherwise, task execution fails
    throw new ExecutionException((Throwable)x);
}
Copy the code

2.5 the cancel

Cancel task execution, source code as follows:

/** * Cancel the task execution *@paramMayInterruptIfRunning true: Interrupts the task. False: Cancels the task */
public boolean cancel(boolean mayInterruptIfRunning) {
    if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        // The state is not new
        // Return false if the state fails to change
        return false;
    try {
        if (mayInterruptIfRunning) {
            // Interrupt the task
            try {
                Thread t = runner;
                if(t ! =null)
                    // Add an interrupt flag
                    t.interrupt();
            } finally {
                // Change the interrupt status
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
        // Wake up and remove all threads waiting for results
        finishCompletion();
    }
    return true;
}
Copy the code

3. Summary

  1. FutureTask blocks threads and wakes them up with LockSupport.

  2. As for the multithreaded access to member variables, they work as CAS.

Wechat id: Silentao_com