This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

1. Introduction

In the Thread creation method, we can directly inherit Thread and implement the Callable interface to create threads, but neither method of Thread creation can return the result of execution. Starting with JDK1.5, Callable and Future interfaces are available to create threads that return the results of their execution after the task is completed.

The Future mode allows threads to get the results of execution asynchronously, rather than waiting for the logic to run() and then execute.

Here’s what asynchrony is:

Port A is the master thread, and port B is the auxiliary thread. When port B is blocked, port A does not affect water flow.

Let’s write a little demo to prove that the Future thread is asynchronous.

// Future is an interface, and FutureTask is an implementation class of Future
package FutureTask;

import java.util.concurrent.*;

public class Demo02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        long start = System.currentTimeMillis();
        FutureTask<Integer> futureTask1 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                // Sleep for 2000 milliseconds
                Thread.sleep(2000);
                return 1; }}); FutureTask<Integer> futureTask2 =new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                // Sleep 1000 milliseconds
                Thread.sleep(1000);
                return 2; }});// This thread sleeps for 2000 milliseconds
        new Thread(futureTask1).start();
        // This thread sleeps 1000 milliseconds
        new Thread(futureTask2).start();
        Integer s1 = futureTask1.get();
        Integer s2 = futureTask2.get();
        Integer s3 = s1 + s2;
        long end = System.currentTimeMillis();
        System.out.println("Result:" + s3);
        // Outputs the total execution time
        System.out.println("Take time:" + (end - start) + "毫秒"); }}/* If the Future is not asynchronous, the output time must be greater than 3000 ms. * /
Copy the code

The results are as follows:

As you can see, the elapsed time is less than 3000 milliseconds, indicating that the worker thread created with the Future executes asynchronously with the main thread. A Future is an interface, and you must implement the interface if you want to use the functionality of a Future. FutureTask, which we will introduce next, is the Future implementation class

2. FutureTask inheritance system

We see that FutureTask implements the RunnableFuture interface, where the RunnableFuture interface inherits the Runnable and Future interfaces, so we can say that FutureTask implements the Future and Runnable interfaces, It is important to note, however, that FutureTask also implements the Callable interface, which is not visible in the inheritance architecture but can be seen in the FutureTask source code. Let’s take a look at FutureTask, RunnableFuture inheritance system part of the source code.

  • FutureTask inheritance system
// 
      
        refers to the generic type passed in as the type of the result to return
      
public class FutureTask<V> implements RunnableFuture<V> 
Copy the code
  • RunnableFuture inheritance system
// 
      
        refers to the generic type, which is passed in the type of the result to be returned
      
public interface RunnableFuture<V> extends Runnable.Future<V> {
    void run(a);
}
Copy the code
# FutureTask usage scenarioFutureTask is used for asynchronously executing or canceling tasks. The FutureTask object is passed either a Runnable implementation class or a Callable implementation class to a Thread object or Thread pool. The get() method asynchronously returns the result of the execution, and if the task is not complete, the thread calling the get() method blocks until the task is complete. The run() logic in FutureTask is executed only once, no matter how many threads call the GET () method. We can also call the FutureTask object's cancel() method to cancel the current task.Copy the code

3. Source code analysis

3.1 Member Variables

State indicates the status of the current task:

  • NEW: indicates that the task is created and has not been executed.
  • COMPLETING: Indicates that the current task is about to end, but not completely, and the return value has not been written, in a critical state.
  • NORMAL: Indicates that the task is normally terminated (no exception, interruption, or cancellation occurs).
  • EXCEPTIONAL: Indicates that the current task is interrupted because an exception occurs and is abnormally terminated.
  • CANCELLED: Indicates that the current task is CANCELLED due to a call to Cancel.
  • INTERRUPTING: Indicates a phase when a task is interrupted but not completely interrupted.
  • INTERRUPTED: Indicates that the current task is completely INTERRUPTED.
	// Indicates the status of the current task
    private volatile int state;
    // Indicates that the state of the current task is newly created and has not yet been executed
    private static final int NEW          = 0;
    // Indicates that the current task is about to end. It is not completely finished, and the value has not been written
    private static final int COMPLETING   = 1;
    // Indicates that the current task is successfully completed
    private static final int NORMAL       = 2;
    // Indicates that an exception occurred during the execution of the current task, and the enclosed callable.call() raised an exception
    private static final int EXCEPTIONAL  = 3;
    // Indicates that the current task is canceled
    private static final int CANCELLED    = 4;
    // Indicates that the current task is interrupted
    private static final int INTERRUPTING = 5;
    // Indicates that the current task is interrupted
    private static final int INTERRUPTED  = 6;

	// When we use the FutureTask object, we pass in a Callable implementation class or Runnable implementation class that stores the Callable
	// The incoming Callable implementation class or Runnable implementation class (Runnable would be disguised as Callable using decorator design patterns)
    //submit(callable/runnable): Runnable uses decorator design mode to masquerade as callable
    private Callable<V> callable;

    Under normal circumstances, outcome stores the returned results of a task
    In abnormal cases, outcome saves exceptions thrown by the task
    private Object outcome; // non-volatile, protected by state reads/writes

    // Save a reference to the thread executing the task during the current task execution
    private volatile Thread runner;

    // Since there are many threads to get results, the thread is encapsulated as a WaitNode, a data structure: a stack, with a headplug
    private volatile WaitNode waiters;

	static final class WaitNode {
        // Thread object
        volatile Thread thread;
        // Next WaitNode
        volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

3.2 Construction method

There are two constructors: one is to pass in a single Callable object and the Callable object returns the same result as the FutureTask object. The other is to pass in a Runnable object and a generic variable, where the Runnable object is disguised as a Runnable object and the generic variable is returned by FutureTask after it executes the task.

// This constructor passes a callable, and the result returned by the call to get is the result returned by callable
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    // Set the state to newly created
    this.state = NEW;       // ensure visibility of callable
}
// This constructor passes in a runnable, a result variable, which is returned by calling get
public FutureTask(Runnable runnable, V result) {
    // Decorator mode to convert runnable to callable interface
    this.callable = Executors.callable(runnable, result);
    // Set the state to newly created
    this.state = NEW;       // ensure visibility of callable
}
Copy the code

3.3. Membership method

3.3.1 Run () method and its related methods

Run () executes the specific logic of the current task. It involves setException method, set method, handlePossibleCancellationInterrupt, finishCompletion, we will talk about these methods.

The run () method

It will call the specific logic of the Callable object’s run() task and some logic about the task’s status and return results.

public void run(a) {
        // If the status of the current task is not new or the old value of the runner is not null, it will be returned directly
    	// Execute once.
        if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        // Only if the task status is new and the runner's old value is null
        try {
            // The incoming Callable task
            Callable<V> c = callable;
            // The task is executed only when the task is not null and the current task status is new
            // Condition 1: prevent null pointer exceptions
            // Condition 2: Prevent external thread cacle from dropping the current task
            if(c ! =null && state == NEW) {
                // Store the result of the task
                V result;
                // Save the execution successfully
                boolean ran;
                try {
                    // Call callable.run() and return the result
                    result = c.call();
                    // Set ran to true
                    ran = true;
                } catch (Throwable ex) {
                    // Callable's run() method throws an exception
                    // Set the result to null
                    result = null;
                    Failed to execute set ran to false
                    ran = false;
                    Outcome is set internally to the exception thrown,
                    // Update the task status to EXCEPTIONAL and wake up the blocked thread
                    setException(ex);
                }
                // If the command is executed successfully (normal)
                if (ran)
                    // Internally sets the outcome to the result of the callable execution, updates the status of the task to NORMAL (the task executes normally) and wakes up the blocked threadset(result); }}finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            // Set the thread of the current task to null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            // Current task status
            int s = state;
            // If state>=INTERRUPTING, the current task is interrupted or has been interrupted
            if (s >= INTERRUPTING)
                // If the current task is in a state, the thread executing this method will continue to relinquish the CPU until the task is interruptedhandlePossibleCancellationInterrupt(s); }}Copy the code

SetException Throwable (t) method

This method is executed if an exception occurs during execution of the run() method. The logic is as follows:

  • Set the state of the task to EXCEPTIONAL(exception)
  • Set the outcome to the exception thrown by the Run () method of the Callable object
  • Execute the finishCompletion() method to wake up the thread that is blocked by calling the get() method.
protected void setException(Throwable t) {
        // If the current task is in the new state, set the task state to critical (about to complete).
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // Sets the outcome to the exception thrown by callable.run()
            outcome = t;
            // Set the status of the current task to an interrupt exception
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            // Wake up all waiting threads calling get() and empty the stackfinishCompletion(); }}Copy the code

The set V (V) method

This method is executed if the run() method ends without exception, and the logic is as follows:

  • Set the status of the task to NORMAL.
  • Set the outcome to the result returned by a call to the run() method on the Callable object
  • Wake up a thread that is blocked by calling the get() method.
protected void set(V v) {
        // If the current task is in the new state, set the current task to the critical state (about to complete).
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // Set the outcome to the result returned by callable.run()
            outcome = v;
            // Set the status of the current task to Normal end
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // Wake up all waiting threads calling get() and empty the stackfinishCompletion(); }}Copy the code

HandlePossibleCancellationInterrupt (int s) method

This method might be executed in the run() method, where, when the task status is interrupted, the thread that grabs the CPU frees up CPU resources until the task status is updated to interrupted.

private void handlePossibleCancellationInterrupt(int s) {

        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

    }
Copy the code

FinishCompletion () method

Task completion (normal and abnormal completion) calls this method to wake up any threads that have been blocked by calling the GET () method.

private void finishCompletion(a) {
        // assert state > COMPLETING;
        // If the condition is true, there are currently blocked threads
        for(WaitNode q; (q = waiters) ! =null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    // Get the thread encapsulated by the current node
                    Thread t = q.thread;
                    // Prevent null pointer exceptions
                    if(t ! =null) {
                        // Set q.htread to null for GC
                        q.thread = null;
                        // Wake up the thread wrapped by the current node
                        LockSupport.unpark(t);
                    }
                    // Get the next WaitNode
                    WaitNode next = q.next;
                    // If next is empty, the stack is now empty, and all threads blocked by get() have been woken up
                    if (next == null)
                        break;
                    // There are threads blocked by calls to get(), and the spin then wakes up
                    // where q.ext is set to null to help GC
                    q.next = null; // unlink to help gc
                    // Next WaitNode
                    q = next;
                }
                / / the interrupt
                break; }}// Empty method, subclasses can be overridden
        done();
        // Set callable to null for GC
        callable = null;        // to reduce footprint
    }
Copy the code

3.3.2 Get () method and its related methods

The get () method

The get() method retrieves the result returned after the task is executed. For the get() method of an empty parameter, if a thread calls the get() method to get the result before the task is finished, the thread will block. The blocking method is awaitDone, as we’ll see below.

public V get(a) throws InterruptedException, ExecutionException {
        int s = state;
        // Determine whether the current task state is smaller than that of COMPLETING Those tasks
        if (s <= COMPLETING)
            // Condition creation calls the awaitDone method to spin and wait until the task is complete
            s = awaitDone(false.0L);
        return report(s);
    }
Copy the code

A TimeoutException will be thrown if no result is returned within the specified time.

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
Copy the code

AwaitDone (Boolean timed, long nanos) method

This method is used to block all threads that have not finished executing FutureTask because they called get() to get results. The awaitDone method is called inside the **get()** method.

AwaitDone = awaitDone = awaitDone = awaitDone = awaitDone

  1. The mission is not finished yet
  2. The thread calls the **get()** method
// The function of this method is to wait for the task to complete (complete normally or abnormally), be interrupted, or be timed out
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // This WaitNode actually stores the current thread
        WaitNode q = null;
        // Indicates whether the WaitNode object represented by the current thread was successfully pushed
        boolean queued = false;
        for (;;) {
            // If the current thread has an interrupt exception, the WaitNode represented by the thread is removed from the stack and an interrupt exception is thrown
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // Get the status of the current task
            int s = state;
            COMPLETING the task (COMPLETING, interrupted, and cancelled) // If the current task status is greater than that of COMPLETING those tasks (COMPLETING, interrupted, and cancelled), return the task status directly
            if (s > COMPLETING) {
                if(q ! =null)
                    // Set q.htread to null for GC
                    q.thread = null;
                return s;
            }
            // The current thread releases the CPU when the task is in the critical state and is about to complete
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // For the first spin, if the current WitNode is null, new a WaitNode
            else if (q == null)
                q = new WaitNode();
            // The second spin, if the current WaitNode is not enqueued, attempts to enqueue
            else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // The third spin indicates whether the timeout is defined
            else if (timed) {
                nanos = deadline - System.nanoTime();
                // When the specified time is exceeded, the current node is removed and the task status is returned
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // Suspend the current thread for a certain amount of time
                LockSupport.parkNanos(this, nanos);
            }
            else
                // Suspends the current thread, which sleeps (when will the thread resume execution? Unless another thread calls unpark() or interrupts it.)
                LockSupport.park(this); }}Copy the code

RemoveWaiter method

When an interrupt occurs, the node in the stack is emptied.

private void removeWaiter(WaitNode node) {
        if(node ! =null) {
            The GC / / help
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
                    s = q.next;
                    // Subsequent nodes are not empty
                    if(q.thread ! =null)
                        pred = q;
                    // The precursor node is not null
                    else if(pred ! =null) {
                        // The successor of the precursor points to the successor of the current node, which is equivalent to deleting the current node
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    
                    else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break; }}}Copy the code

The report (int s) method

This method is actually used to get the return result of the task. This method is called inside the **get()** method, and if it is called, the task is finished.

private V report(int s) throws ExecutionException {
        // Get the value of outcome
        Object x = outcome;
        If the status of the current task is normal end, the outcome value is returned
        if (s == NORMAL)
            return (V)x;
        // If the status of the current task is >= CANCELLED, the status is CANCELLED, or interrupted, or has been interrupted
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
Copy the code

3.3.3 Cancel (Boolean mayInterruptIfRunning) method

This method cancels the current task.

public boolean cancel(boolean mayInterruptIfRunning) {
        // If condition 1 is true, the task is running or in a thread pool queue
        // If condition 2 is true, CAS can execute the following logic successfully; otherwise, false is returned, indicating cancel failed
        if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    // Retrieves the current thread of execution, or null, which means the current task is in the queue and the thread has not fetched it yet
                    Thread t = runner;
                    // Give Thread an interrupt signal. If your program responds to interrupts, follow the interrupt logic. If your program is not interrupt responsive, do nothing
                    if(t ! =null)
                        t.interrupt();
                } finally { // final state
                    // Set the task status to Interrupted
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
            // Wake up all threads blocked by the call to get()
            finishCompletion();
        }
        return true;
    }
Copy the code

4. To summarize

  • FutureTask is executed asynchronously.
  • FutureTask objects can use the get() method to return the result of execution, and if the task is not complete, the thread calling get() will block until the task is complete.
  • The FutureTask object can call the Cancel method to cancel the execution of the task.