An overview of the

FutureTask implements the RunnableFuture interface, which can either be submitted to Executor as a Runnable for execution, retrieved as a Future for asynchronous task execution, or canceled. Defined in one sentence, FuntureTask represents a cancelable asynchronous task.

State machine model for FutureTask

In JDK1.8, FutureTask is implemented using a state machine model and the values of different states are carefully designed. FutureTask internally maintains a state variable that represents the various stages of task execution.

    // Use volatile to disable threads and CPU caching
    // So that other threads can immediately observe the state change
    private volatile int state;

    // Initial state
    private static final int NEW          = 0;
    // Waiting for completion
    private static final int COMPLETING   = 1;
    // Complete normally
    private static final int NORMAL       = 2;
    // Abnormal end
    private static final int EXCEPTIONAL  = 3;
    / / has been cancelled
    private static final int CANCELLED    = 4;
    // The response is interrupted
    private static final int INTERRUPTING = 5;
    / / has been interrupted
    private static final int INTERRUPTED  = 6;
Copy the code

COMPLETING and INTERRUPTING are intermediate states, and NORMAL, EXCEPTIONAL, CANCELLED, and INTERRUPTED are final states. Also, the state transitions are not arbitrary, such as not being able to interrupt from COMPLETING -> INTERRUPTING, and the possible state transitions are one of the following:

    /** * Possible state transitions: * * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */
Copy the code

Instance variable of FutureTask

    /** The actual Callable that is executed, because Runnable cannot return the result */
    private Callable<V> callable;
    /** The execution result of the task or exceptions during the execution */
    private Object outcome;
    /** The thread that executes the Callable */
    private volatile Thread runner;
    /** * The Treiber stack is a stack of waiting threads, * because multiple threads may call Future#get() * to get the result of the task */
    private volatile WaitNode waiters;

		/** Treiber stack */
    static final class WaitNode {
        volatile Thread thread;
        volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

What exactly is the Treiber stack data structure when the term Treiber stack is used more frequently throughout the java.util.concurrent package? Simply put, the Treiber Stack is a lock-free structure implemented using fine-grained concurrency primitives (CAS). The Treiber stack maintains linked list headers internally and uses CAS to keep threads safe during push and pop. Here is a simple implementation of the Treiber Stack.

public class TreiberStack<E> {

    /** * Maintains the top element of the stack using AtomicReference for subsequent CAS operations */
    private final AtomicReference<Node<E>> top = new AtomicReference<>();

    /** * The top element of the stack is checked first, and the stack can only be successfully pushed if other threads * have not modified, otherwise retry */
    public void push(E e) {
        Node<E> old;
        Node<E> cur = new Node<>(e);
        do {
            old = top.get();
            cur.next = old;
        } while(! top.compareAndSet(old, cur)); }/** * The top element of the stack must be checked before it is removed from the stack, otherwise it will be retried
    public E pop(a) {
        Node<E> old;
        Node<E> cur;
        do {
            old = top.get();
            if (old == null) {
                return null;
            }
            cur = old.next;
        } while(! top.compareAndSet(old, cur));return old.value;
    }

    private static class Node<E> {
        final E value;
        Node<E> next;
        Node(E value) {
            this.value = value; }}}Copy the code

The constructor

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
Copy the code

FutureTask actually gets the execution result of an asynchronous task by wrapping Runnable as a Callable, while displaying the initialization state makes use of the happens-before program order rules and volatile variable rules to ensure thread visibility of Callable:

  • Procedural order rule: In a thread, happens-before actions written earlier are written later, in order of control flow.
  • Rule for volatile variables: Writes to a volatile variable happens-before reads to that variable, where “after” refers to the chronological order.

Core method

run
/** * the run method ensures that the task will be executed only once */
public void run(a) {
    // 1. The task must be in the initial state NEW
    // 2. No thread is currently allocated to perform the underlying callable
    // The task can only be started if both conditions are met
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // The callable is empty after the task is completed
        // State may revert to its initial state in #runAndReset()
        // This step is also checked to ensure that it is really necessary to continue
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                // The execution is complete
                ran = true;
            } catch (Throwable ex) {
                result = null;
                // An exception was encountered during execution
                ran = false;
                // Convert the state to EXCEPTIONAL
                setException(ex);
            }
            // If normal execution is complete
            if (ran)
                // Change the state to NORMALset(result); }}finally {
        // The runner is not null throughout the task execution
        // Ensures that the run() method is not executed simultaneously by multiple threads
        runner = null;
        After the run() method ends, state must reach its final state
        // If an interrupt is encountered during execution, the state may be changed
        Or interrupt, without arriving INTERRUPTED
        int s = state;
        if (s >= INTERRUPTING)
            // Spin waits for state to reach final state INTERRUPTEDhandlePossibleCancellationInterrupt(s); }}Copy the code
   /** * #run() */
	 protected void set(V v) {
        // The CAS has switched to the middle state well
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // Set the return value
            outcome = v;
            // Set the normal end
            // The transition cannot continue after the final state is reached, so you can use lazySet
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // Wake up the thread blocked by #get() during executionfinishCompletion(); }}/** * #run() failed */
    protected void setException(Throwable t) {
        // The CAS has switched to the middle state well
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // Set the exception information
            outcome = t;
            // Set to exception end
            // The transition cannot continue after the final state is reached, so you can use lazySet
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            // Wake up the thread blocked by #get() during executionfinishCompletion(); }}Copy the code
    /** * Common completion logic: Clear and wake up threads blocked by #get() during execution */
		private void finishCompletion(a) {
        // assert state > COMPLETING;
        // Get the list head node
        for(WaitNode q; (q = waiters) ! =null;) {
            // Clear the header, there is no reference to it except the local variable q
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // Loop through each list node
                for (;;) {
                    // Wake up the thread blocked by #get()
                    Thread t = q.thread;
                    if(t ! =null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    // Check to reach the tail node
                    WaitNode next = q.next;
                    if (next == null)
                        // If yes, it is completed
                        break;
                    // Break the reference
                    // So that the GC can be detected and cleaned up in time
                    q.next = null; // unlink to help gc
                    // Go to the next node
                    q = next;
                }
                break; }}// Call the hook method
        done();
        // Callable is null at the end
        callable = null;        // to reduce footprint
        The waiters instance variable is null, and the local variable Q is out of scope when the method exits
        // This completes the Treiber stack cleanup
    }
Copy the code
    /** * Make sure state reaches the final state */ when #run() exits
    private void handlePossibleCancellationInterrupt(int s) {
    		// If you do have an in-between INTERRUPTING
        if (s == INTERRUPTING)
            // Then spin wait
            while (state == INTERRUPTING)
                // Tell the scheduler to spare CPU time
                Thread.yield(); // wait out pending interrupt
				// As agreed, INTERRUPTING must be followed by interruption
        // assert state == INTERRUPTED;
    }
Copy the code
cancel
    /** * Cancel the task *@paramMayInterruptIfRunning Whether you need to interrupt a task if it is already running */
		public boolean cancel(boolean mayInterruptIfRunning) {
      	// Only tasks that have not reached their final state can be cancelled
        if(! (state == NEW &&// If the task has started, determine whether to interrupt the task based on the parameters
              // Of course, CAS is also used to control concurrency
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // If you need to interrupt
            if (mayInterruptIfRunning) {
                try {
                    // Just interrupt it
                    Thread t = runner;
                    if(t ! =null)
                        t.interrupt();
                } finally { // final state
                    // Interrupt completion to enter the final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
            // Again, a cleanup is required after successful cancellation
            finishCompletion();
        }
        return true;
    }
Copy the code
get
    /** * Obtain the task execution result */
		public V get(a) throws InterruptedException, ExecutionException {
        int s = state;
        // Tasks that have not started or are in progress are only possible to obtain results
        if (s <= COMPLETING)
            // block the current thread until the final state
            s = awaitDone(false.0L);
        // Get the result based on the state
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // Tasks that have not started or are in progress are only possible to obtain results
        if (s <= COMPLETING &&
            // After the wait, if the final state is still not reached, the timeout occurs
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        // Get the result based on the state
        return report(s);
    }
Copy the code
    /** * Wait for the task to produce execution results (blocking), or if the waiting process is interrupted, throw an exception *@paramTimed is a wait with a time limit@paramNanos maximum wait time */
		private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
      	// Calculate the wait cutoff time
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // According to the documentation of locksupport.parkxxx series methods
        // The park series of methods will be because
        // 1. Other threads call unpark()
        // 2. Arrive at parkXXX specified time
        // 3. The thread is interrupted
        // 4. There is no reason
        // The above four cases return, so the Park family of methods need to ensure that the condition is checked again during the loop when the method returns
        for (;;) {
        		// Check whether the return is due to thread interruption
            if (Thread.interrupted()) {
                // If yes, clean up the node
                removeWaiter(q);
                // Throw an exception
                throw new InterruptedException();
            }
						// Read the current state
            int s = state;
            // State machine values are carefully designed
            // The state after COMPLETING those years is either the final state
            // Either interrupts, which have already been handled
            // Thus greater than those COMPLETING those years
            if (s > COMPLETING) {
                if(q ! =null)
                    q.thread = null;
                return s;
            }
            // Being in the COMPLETING state, the task will soon be over
            // Instead of blocking wait, thread.yield () is called.
            // Allocate CPU time to COMPLETING tasks, thus COMPLETING them from the previous step
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            S == NEW, which is the state that really needs to block wait
            // If there is no node already, create one
            else if (q == null)
                q = new WaitNode();
            // If it has not been pushed, push it
            // These two steps are a normal Treiber stack loading operation
            // After these two steps, the thread calling #get() is successfully queued
            else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                // Calculate the waiting time
                nanos = deadline - System.nanoTime();
                < 0 indicates that the wait is over
                if (nanos <= 0L) {
                  	// Clear the node
                    // Corresponds to the Treiber stack unstack operation
                    removeWaiter(q);
                    return state;
                }
                // Wait with a time limit
                LockSupport.parkNanos(this, nanos);
            }
            else
              	// Wait indefinitely
                LockSupport.park(this); }}/** * Report the execution result of the task */
		private V report(int s) throws ExecutionException {
        Object x = outcome;
      	// NORMAL Indicates that the command is successfully executed
        if (s == NORMAL)
            Outcome carries the return value of callable
            return (V)x;
      	// If cancelled, an exception is thrown
        if (s >= CANCELLED)
            throw new CancellationException();
      	// All that remains is to execute the exception
        throw new ExecutionException((Throwable)x);
    }
Copy the code
isCancelled/isDone
    public boolean isCancelled(a) {
      	// Only INTERRUPTING and INTERRUPTED are greater than CANCELLED
        INTERRUPTING and INTERRUPTED are also states of cancellation
      	// It represents a forcible cancellation in case the task has already started execution
        return state >= CANCELLED;
    }

    public boolean isDone(a) {
        returnstate ! = NEW; }Copy the code

Thanks to the elaborate design of state machine values, isCancelled and isDone can be determined simply according to the state. Note that isDone does not only mean that the task is completed normally or an exception occurs during the execution. Cancelled or interrupted tasks are also counted as isDone. This is the end of FutureTask’s source code analysis, next time share the thread pool source code analysis.