FutureTask was introduced in JDK1.5 and is the work of concurrency guru Doug Lea. It represents an asynchronous computation task that will complete the computation and get a result at some point in the future. When a thread calls GET () to get the result, it returns immediately if the calculation is complete, otherwise the thread will block until the calculation is complete.
Application scenario For example, if you need to export a copy of Excel but not download it immediately, you can create an instance of FutureTask and submit it to the thread pool to run. When the user wants to download, FutureTask obtains the Excel generation path to find the file and respond to the client.
UML FutureTask realizedRunnable
Interface, which means it can be handed to a thread to execute, and implemented at the same timeFuture
Interface, which means it is an asynchronous task that can get a result at some point in the future, and the task can be canceled.
Introduction to the source code
FutureTask does not rely on AQS and is relatively complete and independent. It has an inner class, WaitNode, which represents nodes that block threads, and these nodes form a simple one-way linked list. When the thread calls GET () to get the result, FutureTask calls awaitDone() to pair and Park the current thread if the calculation has not yet finished.
attribute
The callable variable is used to hold a calculation task with a return value. Outcome is used to store the result of a calculation, and if an exception is calculated, the exception information is saved. Runner is used to record the thread that performs the computation. Waiters are a one-way linked list of blocked threads waiting for the results of a calculation.
// Tasks with return values will be cleared after running
private Callable<V> callable;
// The result of calculation. If there is an exception during calculation, the exception information is saved.
private Object outcome;
// The thread that performs the computation task
private volatile Thread runner;
// Wait queue, one-way linked list
private volatile WaitNode waiters;
Copy the code
State is used to represent the state of FutureTask. It has seven values, all declared as static constants:
// Task status
private volatile int state;
private static final int NEW = 0;// Create a new one
private static final int COMPLETING = 1;// The calculation is complete
private static final int NORMAL = 2;// The calculation is complete and the result is saved
private static final int EXCEPTIONAL = 3;// The calculation is wrong
private static final int CANCELLED = 4;// It was cancelled
private static final int INTERRUPTING = 5;// The thread will be interrupted
private static final int INTERRUPTED = 6;// The thread has been interrupted
Copy the code
The constructor
FutureTask provides two constructors:
- The incoming
Callable
, the task starts calling its call() to get the result. - The incoming
Runnable
And the resultsresult
, run() returns after the execution is completeresult
This is a predefined result.
/* Given a Callable, it is executed in run() with the result */ preserved
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/* Given a Runnable and the result result. It is executed in run() and results are returned after execution. * /
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Copy the code
Core method
get
Get () gets the results, and the thread blocks if the task is not finished. FutureTask provides two get() methods, both of which respond to interrupts:
- get()
Get the result, otherwise wait indefinitely until the calculation is complete, or the thread interrupts. 2. Get (long timeout, TimeUnit Unit) Obtains the result of a timeout until the calculation is complete, the timeout occurs, or the thread is interrupted.
(实 习) Call awaitDone(实 习), COMPLETING tasks (实 习), COMPLETING tasks (实 习), COMPLETING tasks (实 习), and COMPLETING tasks (实 习).
/* Get the result of the calculation, this process is blocked, waiting for the calculation to complete. * /
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
/* Wait in two states: 1.NEW: The task is created and in the process of calculation. 2.COMPLETING: If the CPU has performed a calculation but has not yet saved the result, it will not be queued to Park. Instead, thread.yield () will temporarily yield the execution of the CPU until the result is saved. * /
s = awaitDone(false.0L);
// Calculate complete, or exception, cancel directly return, no wait.
return report(s);
}
Locksupport.parknanos () specifies how long the thread is suspended. * /
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
Copy the code
First look at the logic of awaitDone() waiting:
- Determine if the thread is interrupted, throw an exception if it is, and remove the node because get() responds to the interrupt.
- judge
state
If more thanCOMPLETING
If yes, the task calculation is complete or fails and there is no need to wait. - judge
state
Whether is equal to theCOMPLETING
If yes, the calculation has been completed, but the result has not been savedThread.yield()
Temporarily relinquish execution from the CPU, and we’ll see what happens later. - If the calculation is not complete, create
WaitNode
Join the team and Park.
/* Wait for calculation to complete: when to terminate? 1. The calculation is complete. 2. I'm out of time. 3. It's interrupted. * /
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// The wait process responds to interrupts
if (Thread.interrupted()) {
// If an interrupt occurs, delete the waiting node.
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
/* 实 习 实 习 : 实 习 实 习 The calculation is done. 2. The mission was canceled. 3. There is an anomaly in the calculation process. 4. The computing task is interrupted. * /
if(q ! =null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
/* The result cannot be returned after the calculation is complete. We need to wait for the calculation result to be saved to outcome and set state to :NORMAL. At this point, CPU execution can be relinquished. * /
Thread.yield();
else if (q == null)
// Create a node and prepare to join the queue
q = new WaitNode();
else if(! queued)/* CAS+ spin mode, head insert method to join the queue. Why the head plug? 1. No separate attribute is used to record the tail. If the tail insertion method is adopted, it needs to search from the chain head to the chain tail every time, which is very inefficient. 2. During concurrency, all nodes may be woken up, but newly created nodes cannot be woken up. * /
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// Support timeout waiting
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// Remove the node and return
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// Join the queue successfully, suspend the current thread, and wait to wake up after the run() calculation completes
LockSupport.park(this); }}Copy the code
The node needs to be removed if the wait times out or the thread is interrupted:
/* If the task is interrupted or times out, the node needs to be removed. Instead of removing only nodes passed in, the entire list is checked and all nodes with Thread =null are removed. * /
private void removeWaiter(WaitNode node) {
if(node ! =null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
s = q.next;
if(q.thread ! =null)
pred = q;
else if(pred ! =null) {
pred.next = s;
/* If a new node needs to be removed, it will jump out of the loop and check again. It doesn't matter what nodes are in the back, because they scan from front to back. * /
if (pred.thread == null)
continue retry;
}
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break; }}}Copy the code
If the calculation is complete, the result is returned via report(), which has three cases:
- The calculation completes normally and the result is returned.
- The mission has been canceled. Dump
CancellationException
. - The computation task itself is abnormal. Throw
ExecutionException
.
// Report the result of the task calculation, or throw an exception.
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
// The task calculation is complete and the calculation result is saved
return (V)x;
if (s >= CANCELLED)
// The task is cancelled or interrupted
throw new CancellationException();
// The task calculation process is thrown abnormally
throw new ExecutionException((Throwable)x);
}
Copy the code
run
The run() method should not be explicitly called, but handed over to the thread pool. The thread pool calls run(), which essentially executes the Callable, retrieves the return value, and wakes up the waiting thread.
// Execute the task
public void run(a) {
/* There are two conditions to start the calculation: 1. State indicates the NEW state. 2. If the executing thread of the task is null, change it to the current thread by CAS. Ensure that multiple threads execute run() concurrently and that only one thread can compute the task. * /
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;// A marker for successful completion of the calculation
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// The result is null and ran is false
result = null;
ran = false;
setException(ex);// Save the exception information and notify the node
}
if (ran)
// If the calculation is complete, save the calculation result and notify the nodeset(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
// When the task is INTERRUPTED, the spin calls Thread.yield(), temporarily giving the CPU the ability to execute, and waits for state to be set to INTERRUPTED.handlePossibleCancellationInterrupt(s); }}Copy the code
There are two kinds of results: abnormal and normal.
Set (V V)
// Save the calculated result
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Save the calculated result
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// Wake up the blocking nodefinishCompletion(); }}Copy the code
Exception :setException(Throwable t)
/* An exception occurred during the task calculation. Save the exception information and return it to the client when calling get(). * /
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Save exception information
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// If the computation is abnormal, the blocking node must be woken up.finishCompletion(); }}Copy the code
All waiting threads need to be woken up, whether normal or abnormal.
/* Task complete, wake up blocking node */
private void finishCompletion(a) {
// assert state > COMPLETING;
// Iterate over the unidirectional list
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if(t ! =null) {
// If the bound thread is not empty, wake it up
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break; }}// The hook function, which triggers the action after completion, does nothing by default and is extended by subclasses
done();
callable = null; // to reduce footprint
}
Copy the code
At the end of the task, FutureTask calls done(), which is a hook function that does nothing by default and is handed over to subclasses to extend.
cancel
FutureTask tasks can be cancelled: mayInterruptIfRunning interrupts the execution of the task, true interrupts the computation, and false interrupts the task, but does not care about its outcome. Tasks can only be cancelled if they are in the NEW state. When the task is cancelled, all waiting threads are woken up and told to stop waiting.
/* Cancel the task: mayInterruptIfRunning: Does it interrupt the execution of the task? True: Interrupts the worker thread and stops the task. (Just modify the interrupt flag of the thread. You need to implement it yourself.) False: After the task is canceled, the worker thread continues to execute the task. * /
public boolean cancel(boolean mayInterruptIfRunning) {
/* State is NEW, and true is returned only when CAS is changed to INTERRUPTING/CANCELLED. * /
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// If the task has been calculated or cancelled, it will be cancelled if the calculation is abnormal or interrupted.
return false;
try { // Prevent calls to interrupt() to throw exceptions
if (mayInterruptIfRunning) {
// If interrupts are required, the worker thread interrupt()
try {
Thread t = runner;
if(t ! =null)
t.interrupt();
} finally {
// Change the status to Interrupted
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
// The blocking node needs to be woken up after the task is cancelled
finishCompletion();
}
return true;
}
Copy the code
Other methods
IsDone () task complete:
/* All tasks are completed/all tasks are completed. */
public boolean isDone(a) {
returnstate ! = NEW; }Copy the code
IsCancelled () Mission has not been cancelled:
/* Whether the task is cancelled or not: If the task is cancelled, the calculation is abnormal, and the task is cancelled. * /
public boolean isCancelled(a) {
return state >= CANCELLED;
}
Copy the code
conclusion
The source code for FutureTask is not too difficult and relatively easy to read. It uses a state variable of type int to mark the state of the task. The default is NEW, which remains NEW until the calculation is complete, and only tasks in this state can be cancelled. If not, the thread call get() will block, and FutureTask will create a thread-bound WaitNode, add it to the head of the list, and Park it. Those calls to get() are not executed by Park, but by thread.yield (). We will see later. After that, either the calculation is successfully completed and the calculation result is obtained, or there is an exception, or it is interrupted. In any case, the task is finished and all waiting threads need to be woken up.