• Java thread pool FutureTask source code

1. Introduction to FutureTask

Future is one of the most common classes we use when implementing asynchrony in Java, where we can submit a Callable to the thread pool and get the result of the execution through the Future object.

  • Usage scenarios of FutureTask: FutureTask can be used to asynchronously obtain the execution result or cancel the execution of the task. FutureTask is called directly by passing a Runnable or Callable task to itrunMethods are either put into a thread pool for execution, which can then be passed externally through FutureTask’sgetMethod asynchronously retrieves the result of the execution, so FutureTask is ideal for time-consuming computations where the main thread can retrieve the result after completing its task. In addition, FutureTask ensures that even if it is called multiple timesrunMethod, which either performs a Runnable or Callable task once, or passescancelCancel FutureTask execution, etc.
  • The basic use of FutureTask can be found in the article FutureTask Usage

2. Source code analysis

Member attribute

// Indicates the current task status
private volatile int state;
// Indicates that the task has not been executed
private static final int NEW          = 0;
// Indicates that the current task is ending, not completely finished, a critical state
private static final int COMPLETING   = 1;
// Indicates that the current task is successfully completed
private static final int NORMAL       = 2;
// An exception occurs during the current task execution. The internally wrapped callable.run() throws an exception up
private static final int EXCEPTIONAL  = 3;
// Indicates that the current task is canceled
private static final int CANCELLED    = 4;
// Indicates that the current task is interrupted..
private static final int INTERRUPTING = 5;
// Indicates that the current task is interrupted
private static final int INTERRUPTED  = 6;

//submit(runnable/ Callable) : Runnable uses decorator design mode to disguise itself as callable.
private Callable<V> callable;

In normal cases: The task ends normally. Outcome saves the execution result and serves as the callable return value.
// Abnormal: Callable throws an exception up, and outcome saves exception information
private Object outcome;

// The thread object reference of the currently executing task is saved while the current task is being executed by the thread.
private volatile Thread runner;

// Since there are many threads to get the result of the current task, we use a data structure called stack ** header, which fetches ** from a queue.
private volatile WaitNode waiters;

static final class WaitNode {
    volatile Thread thread;
    volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

\

A constructor

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // Callable is a programmer's own implementation of the business class
    this.callable = callable;
    // Set the current task status to NEW: indicates that the current task is not executed
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    // Use decorator mode to convert runnable to callable interface, which external threads get
    // When the current task executes the result, the result may be null or the value passed in.
    this.callable = Executors.callable(runnable, result);
    // Set the current task status to NEW: indicates that the current task is not executed
    this.state = NEW;
}
Copy the code

\

Members of the method

1. Run method

// submit(runnable/callable) -> newTaskFor(runnable) -> execute(task) -> pool
// The thread executes the entry method
public void run(a) {
    // State! If the NEW condition is true, the current task has been executed or cancelled.
    // If a task is not in the NEW state, the thread does not process it. The end!
    // Condition 2:! UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
    // Condition true: CAS failed, the current task was preempted by another thread...
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;

    The current task must be in a NEW state, and the current thread preempted the task successfully!
    try {
        // Callable is a programmer's own wrapped logic callable or decorated runnable
        Callable<V> c = callable;
        // c! = null Prevents null pointer exceptions
        // State == NEW prevents the external thread from canceling the current task.
        if(c ! =null && state == NEW) {
            // Result reference
            V result;
            // true indicates that the callable. Run code block is successfully executed and no exception is thrown
            // false Indicates that the callable. Run code block fails to execute and an exception is thrown
            boolean ran;
            try {
                // Call the programmer's own implementation of callable or decorated runnable
                result = c.call();
                C. Call does not throw any exceptions. Ran is set to true to indicate that the task was successfully executed
                ran = true;
            } catch (Throwable ex) {
                // There is a bug in the logic block written by the programmer.
                result = null;
                // Ran is set to false
                ran = false;
                // Assign the current exception information ex to outcome
                setException(ex);
            }
            
            / / the run:
            // true -> Code block executed successfully without throwing an exception
            // false -> Code block execution failure throws an exception
            if (ran)
                // Indicates that the current c. Call execution is complete.
                // Set method: CAS sets result to outcomeset(result); }}finally {
        // Set the current thread of execution to null
        runner = null;
        // Current task status
        int s = state;
        // s >= INTERRUPTING: If the condition is correct, the current task is interrupted or has been interrupted...
        if (s >= INTERRUPTING)
            // Cancel (); // Cancel ()handlePossibleCancellationInterrupt(s); }}/** * Sets outcome V to outcome */ in CAS mode
protected void set(V v) {
    // Use the CAS mode to set the current task status to Completed...
    // Is there any chance of failure? The external thread can't wait and cancels the task just before the SET executes the CAS. (Low probability event)
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // Assign v to outcome
        outcome = v;
        // Immediately after assigning v to outcome, the current task state is changed to NORMAL.
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

        // After the thread is suspended, the waiting thread is woken up if the task thread completes execution.
        // this step is done in finishCompletion, which will be discussed later when we analyze get()
        // Remove and wake up all waiting threads, execute done, and empty callablefinishCompletion(); }}/** * Assigns the current exception information t to the outcome */
protected void setException(Throwable t) {
    // Use the CAS mode to set the current task status to Completed..
    // Is there any chance of failure? The external thread can't wait and cancels the task just before the SET executes the CAS. Low probability event.
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // References an exception thrown by Callable to the upper layer.
        outcome = t;
        // Change the status of the current task to EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        
        // After the thread is suspended, the waiting thread is woken up if the task thread completes execution.
        // this step is done in finishCompletion, which will be discussed later when we analyze get()
        // Remove and wake up all waiting threads, execute done, and empty callablefinishCompletion(); }}Copy the code

2. The get method

// get Gets the result after the current task is executed:
// Note: get does not have to be a single thread to get the result of the task. There are also multiple threads waiting to get the result of the current task.
public V get(a) throws InterruptedException, ExecutionException {
    // Get the current task status
    int s = state;
    // s <= 实 习 :
    // The condition is true: the current task may be in the state: not executed 0, executing 1, completing 1, or not finished yet.
    // The external thread calling get is blocked on the get method.
    if (s <= COMPLETING)
        // awaitDone returns the current status of the task. If the task is interrupted during execution, an interrupt exception is raised:
        // awaitDone is a key way to implement futureTask blocking: wait for the task to complete and stop if the task cancels or times out!
        s = awaitDone(false.0L);
    // report(s) to get the result of the final task execution
    return report(s);
}

// awaitDone method: awaitDone is the key method for futureTask to implement blocking
/** 1. Wait until the task is complete. If the task is canceled or times out, stop the task.@paramTimed is true: Set the timeout period to 3.@paramNanos timeout is 4.@returnStatus at task completion 5.@throwsInterruptedException interrupts the exception */
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 0 with no timeout
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // reference the current thread wrapped as a queue of WaitNode objects (** headers, ** headers).
    WaitNode q = null;
    // Indicates whether the current thread waitNode object is enqueued/pushed
    boolean queued = false;
    / / spin
    for (;;) {
        The current thread was awakened by an interrupt from another thread. interrupted()
        // Returning true resets Thread's interrupt flag back to false.
        if (Thread.interrupted()) {
            // The current thread node is outqueued
            removeWaiter(q);
            // Throw up so that get throws an interrupt exception.
            throw new InterruptedException();
        }

        // If the current thread is woken up by another thread using unpark(thread), it spins normally, using the following logic:

        // Get the latest task status
        int s = state;
        // Condition true: indicates that the current task has a result.. (This could be completion, exception, interrupt, etc.)
        if (s > COMPLETING) {
            HelpGC = null If WaitNode has been created for the current thread
            if(q ! =null)
                q.thread = null;
            // Return the current state directly.
            return s;
        }
        // The condition is true: The current task is close to completion or close to failure...
        // Let the current thread release the CPU for the next CPU grab:
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // The condition is true: for the first spin, the current thread has not yet created a WaitNode object, so create a WaitNode for the current thread
        else if (q == null)
            q = new WaitNode();
        For the second spin, the current thread has created a WaitNode object, but the Node object has not yet been queued
        else if(! queued){// The current thread node loves its next node that points to the head of the original queue.
            q.next = waiters;
            Waiters // Cas setup waiters reference points to the current thread node, queued == true if it works otherwise, other threads might get queued ahead of you.
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
        }
        // The third spin will be here: indicates whether timeout is set
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // Remove the waiting node if it has timed out
                removeWaiter(q);
                return state;
            }
             // If no timeout occurs, the current thread will be suspended for a specified time
            LockSupport.parkNanos(this, nanos);
        }
        else
            // The thread of the current get operation will be park. The thread is WAITING.
            // Unless another thread wakes you up or interrupts the current thread.
            // If the current thread is woken up by another thread, the execution continues from here (continue into the spin for condition).
            // (The suspended thread will be woken up in finishCompletion!)
            LockSupport.park(this); }}// report(s) to get the result of the final task execution
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Under normal circumstances, outcome stores the result of the end of the callable
    // In normal cases, exceptions thrown by callable are saved.
    Object x = outcome;
    // The condition is true (normal) : The current task status is normal
    if (s == NORMAL)
        // Returns the result of the callable operation directly
        return (V)x;

    // Condition true (abnormal case) : The current task is cancelled or interrupted
    if (s >= CANCELLED)
        // throw an exception!
        throw new CancellationException();

    // There is a bug in the callable interface.
    throw new ExecutionException((Throwable)x);
}

// node queue method:
private void removeWaiter(WaitNode node) {
    if(node ! =null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
                s = q.next;
                if(q.thread ! =null)
                    pred = q;
                else if(pred ! =null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q, s))
                    continue retry;
            }
            break; }}}// After the thread is suspended, the waiting thread is woken up if the task thread completes execution. This step is done in finishCompletion
/** * remove and wake up all waiting threads, execute done, and null callable */
private void finishCompletion(a) {
    // Walk through the wait node:
    // Q points to the header of the waiters chain table
    for(WaitNode q; (q = waiters) ! =null;) {
        // Configure the waiters to NULL using CAS
        // To prevent an external thread from cancelling the current task with cancel, the finishCompletion method is also triggered. (Low probability event)
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // Get the thread encapsulated by the current node
                Thread t = q.thread;
                // Conditional: The current thread is not null
                if(t ! =null) {
                    q.thread = null;// help GC
                    // Wake up the thread corresponding to the current node (park in the last else judgment of the awaitDone method, wake up here)
                    LockSupport.unpark(t);
                }
                // next: the next node of the current node
                WaitNode next = q.next;
                // next == null to break the last node
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break; }}// The template method can be overridden
    done();
    // Set callable to null helpGC
    callable = null;
}
Copy the code

3. Cancel method

// Cancel the task of the current thread:
public boolean cancel(boolean mayInterruptIfRunning) {
    // State == NEW, indicating that the current task is running or in the thread pool task queue..
    PareAndSwapInt (this, stateOffset, NEW, mayInterruptIfRunning? INTERRUPTING: CANCELLED))
    // If the condition is true, the CAS status is successfully modified and the following logic can be performed. Otherwise, false is returned, indicating that cancel fails.
    if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;

    try {
        if (mayInterruptIfRunning) {
            try {
                // The thread executing the current FutureTask may now be null if the current task is in the queue and no thread has yet fetched it.
                Thread t = runner;
                // If the condition is true, the current thread runner is executing the task.
                if(t ! =null)
                    // Give the runner thread an interrupt signal.. If your program is responding to interrupts it will walk interrupt logic.. Suppose your program does not respond to interrupts.. Nothing is going to happen.
                    t.interrupt();
            } finally { // final state
                // Set the task status to Interrupted completed.
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
        // Wake up all threads that get() blocks.
        finishCompletion();
    }
    return true;
}
Copy the code

  • If this article is helpful, give it a thumbs up