Introduction to the

Future is the top-level abstract interface for asynchronous programming introduced in version 1.5, and FutureTask is the base implementation class for Future. FutureTask also implements the Runnable interface, so it can be used either as a encapsulated task unit or as a stand-alone Runnable task. Today’s focus is on FutureTask to start learning about asynchronous programming.

Inheritance relationships

instructions

  • FutureTask does not implement the Future interface directly, but indirectly through the RunnableFuture interface – the RunnableFuture interface simply inherits the Runnable and Future interfaces
  • The uppermost FunctionalInterface is an annotation, and the interface with the annotation supports functional programming, as shown in the Runnable interface

The Future interface

public interface Future<V> {
    // Task cancellation method
    boolean cancel(boolean mayInterruptIfRunning);
    // Check whether the task is canceled
    boolean isCancelled(a);
    // Check whether the task is complete
    boolean isDone(a);
    // Block to get the result of the task
    V get(a) throws InterruptedException, ExecutionException;
    // Block gets the result of the task and sets the blocking timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

RunnableFuture interface

public interface RunnableFuture<V> extends Runnable.Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run(a);
}
Copy the code

attribute

public class FutureTask<V> implements RunnableFuture<V> {
    // State machine: there are 7 states
    private volatile int state;
    / / new
    private static final int NEW          = 0;
    // Task completed
    private static final int COMPLETING   = 1;
    // Task completed
    private static final int NORMAL       = 2;
    // The task is abnormal
    private static final int EXCEPTIONAL  = 3;
    // Task cancelled
    private static final int CANCELLED    = 4;
    // The task is interrupted
    private static final int INTERRUPTING = 5;
    // The task has been interrupted
    private static final int INTERRUPTED  = 6;

    // Support the result return task
    private Callable<V> callable;
    
    // Task execution result: normal and abnormal results can be obtained using the GET method
    private Object outcome; 
    
    // Task execution thread
    private volatile Thread runner;
    
    // Wait queue of stack structure, this node is the topmost node in the stack
    private volatile WaitNode waiters;
}
Copy the code

instructions

  • The state machine is used by FutureTask to mark the state of the task execution. In the source code, the author also describes the possible changes of these states please:
    • 实 习 : NEW -> 实 习 -> NORMAL
    • Exception: NEW -> COMPLETING -> EXCEPTIONAL
    • Task cancelled: NEW -> CANCELLE
    • The task is INTERRUPTED: NEW -> INTERRUPTED -> INTERRUPTED
  • Callable is the specific task to be performed by FutureTask and is passed in externally
  • Outcome is the result returned after a task has been performed
  • Runner is the worker who actually performs the task
  • Waiters is a wait node, but a top-most node, like a head node. The main purpose of the FutureTask wait queue is that when multiple threads request the get method to obtain the result, they may block at the same time, so the blocking information of these threads is stored in the wait node, and forms a stack wait structure

A constructor

// Pass the callable task directly
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

// Pass in the runnable task and result variables
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
    
Copy the code

instructions

  • FutureTask provides two constructors that support both runnable and Callable tasks, but ultimately convert to callable tasks
  • Runnable is converted to a Callable using the RunnableAdapter adapter. RunnableAdapter implements the Callable interface, and then in the call method, runnable’s Run method is actually executed. In addition, the result parameter passed in is returned as the result. The specific source code is as follows:
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call(a) {
        task.run();
        returnresult; }}Copy the code
  • The effect of result on the second constructor is somewhat questionable, at least because it does not tell whether the task was successfully executed, because even though the runnable exception thrown by the Run method in FutureTask is caught, However, the GET method throws an exception based on the state machine if it is abnormal or cancelled

The main method

Run ()- Task execution

  • The run method is the actual execution body of the FutureTask task. It mainly completes the call method execution of the wrapped callable and saves the execution result to the outcome. At the same time, it catches the exception of the call method execution and saves the exception information instead of throwing it directly.
  • In addition, the run method exists because it maintains the state machine, such as new-completeing-normal or new-completeing-exceptional, to ensure that the task is processed properly
  • The run method initially updates runner to the current thread through CAS, thus avoiding the problem of running being executed multiple times in multiple threads.
public void run(a) {
    // If the state machine is not NEW, the execution is complete or the task is canceled
    // Set the state machine to NEW, and set runner to the current thread to ensure that only one thread executes the run method at the same time. If the setting fails, it will return directly
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // The task check is not empty and the recheck status is NEW. Just to avoid the possibility of the task being cancelled between the start and this point?)
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
                // Execute the task
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                When a task throws an exception, the exception information is saved instead of being thrown directly
                setException(ex);
            }
            // If the command is executed successfully, the result is saved
            if(ran) set(result); }}finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        // The executor must not be null in order to avoid running () being called concurrently by multiple threads
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        // The state must be retrieved after the task is cancelled to prevent missed processing of interrupt requests
        int s = state;
        // If it is set to an interrupt state, the processing is interrupted
        if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

Get ()- blocks to get the execution result of the task

  • There are two types of GET methods, which are always blocking and timeout blocking
  • The get method is intended to directly obtain the execution result of a task. However, if the task is not completed, the current thread will be blocked and wait until the task is completed.
  • When the requesting thread blocks, a waiter node is created and added to the stack of blocking waits.
  • When a task is completed or a thread with a blocking timeout timeout is set, the thread is removed from the blocking stack. The method of removal is very complex, taking into account the concurrency of multiple threads.
// always block fetching
public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    // The task is blocked by awaitDone until it is not finished
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
    return report(s);
}

// Timeout blocks access
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // Block gets and throws a timeout exception when the blocking timeout is reached
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

// Thread blocks the wait method
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // Calculate the blocking timeout
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    // No blocking by default
    boolean queued = false;
    for (;;) {
        If the blocking thread is interrupted, the current thread is removed from the blocking queue
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { 
            // Return the result when the task is complete
            if(q ! =null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) 
            // If the task is completed but the last step is not completed, the CPU is released to the task thread to continue execution
            Thread.yield();
        else if (q == null)
            // The new thread adds the wait node
            q = new WaitNode();
        else if(! queued)// The node is not yet added to the waiters stack after it was created at the previous step, so the next loop will push it here and put the wait node for the current thread at the top of the stack
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            // If the blocking timeout period is set, it checks whether the blocking timeout period is reached. If the blocking timeout period is reached, it deletes the waiting node of the current thread and exits the loop. Otherwise, it continues blocking
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            // Non-timeout blocking
            LockSupport.park(this); }}If the task is executed normally, the result is returned directly; otherwise, an exception is thrown
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

Copy the code

Cancel – The task is cancelled

  • When a task is canceled, check whether the task can be canceled first. The task cannot be canceled when it is completed or in the process of being completed (the task is executed normally and the abnormal result is processed or the abnormal result is processed).
  • The cancel method has a Boolean input parameter. If false, it wakes up only all waiting threads and does not interrupt the executing thread. True interrupts the task execution thread and changes the state machine to INTERRUPTED.
public boolean cancel(boolean mayInterruptIfRunning) {
        // Cancel not allowed: The state machine is not NEW or the CAS fails to update the state machine
        if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // If a task in progress is INTERRUPTED, the task execution thread is INTERRUPTED and the status machine is updated to the final status INTERRUPTED
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if(t ! =null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
            finishCompletion();
        }
        return true;
    }
Copy the code

Other methods

SetException ()- Task execution exception handling

The setException method is mainly used for work execution exception handling. It saves exception information to outcom results, updates state machine changes from NEW to EXCEPTIONAL, and wakes up blocking all threads that request get in the waiters queue

protected void setException(Throwable t) {
    // Update the state machine from NEW to COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // Save the exception information to the output result
        outcome = t;
        Exception exception exception exception exception exception exception exception exception exception exception
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // A generic waiters completion operation that woks up blocking threads requesting GET in the waiters queue, more on that belowfinishCompletion(); }}Copy the code

Set ()- Normal task execution processing

The NORMAL task processing process is basically the same as the exception processing process, except that the state changes to new-Completeing-normal

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code

The cancellation of the handlePossibleCancellationInterrupt () – processing can interrupt

This method has A long and vague name, but what it does is to release the CPU from thread B when the INTERRUPTING thread A updates its state machine with A INTERRUPTING status. Allows thread A, which initiated the interrupt, to continue processing the interrupt B. (You need to combine the cancel method to fully understand this sentence.)

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}
Copy the code

RemoveWaiter ()- Removes the waiting node

Delete the wait node method and clean up the thread wait node that is returned directly due to the task. The main purpose is to avoid concurrent updates in multiple threads

private void removeWaiter(WaitNode node) {
    if(node ! =null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
                s = q.next;
                if(q.thread ! =null)
                    pred = q;
                else if(pred ! =null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if(! UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break; }}}Copy the code

conclusion

So far, FutureTask’s source code is basically finished, in general, FutureTask is the simplest implementation of Future, basically is the Future interface method to achieve again, without too much function expansion. Let’s briefly summarize some of its functions and features:

  • FutureTask implements the Runnable interface, so it can perform task processing as a thread. For example, a Submit method in a thread pool wraps a Runnable or Callable task in the FutureTask class.
  • FutureTask has an internal state machine that records the processing status of the task, for example, there are three final states: normal completed, execution exception, and task canceled
  • The get method blocks to obtain the execution result of the task, and a blocking wait stack is maintained internally, which is used for multiple threads to call the GET method concurrently. At the same time, these threads are blocked and their blocking information is saved, so as to wake up after the task is executed
  • The cancellation of a task is supported, but it is allowed only when the task is not fully executed successfully. The cancellation can be divided into two types: wake up the thread that blocks and waits for the result, or wake up the thread and forcibly interrupt the thread that executes the task