Introduction to the
Future is the top-level abstract interface for asynchronous programming introduced in version 1.5, and FutureTask is the base implementation class for Future. FutureTask also implements the Runnable interface, so it can be used either as a encapsulated task unit or as a stand-alone Runnable task. Today’s focus is on FutureTask to start learning about asynchronous programming.
Inheritance relationships
instructions
- FutureTask does not implement the Future interface directly, but indirectly through the RunnableFuture interface – the RunnableFuture interface simply inherits the Runnable and Future interfaces
- The uppermost FunctionalInterface is an annotation, and the interface with the annotation supports functional programming, as shown in the Runnable interface
The Future interface
public interface Future<V> {
// Task cancellation method
boolean cancel(boolean mayInterruptIfRunning);
// Check whether the task is canceled
boolean isCancelled(a);
// Check whether the task is complete
boolean isDone(a);
// Block to get the result of the task
V get(a) throws InterruptedException, ExecutionException;
// Block gets the result of the task and sets the blocking timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
RunnableFuture interface
public interface RunnableFuture<V> extends Runnable.Future<V> {
/** * Sets this Future to the result of its computation * unless it has been cancelled. */
void run(a);
}
Copy the code
attribute
public class FutureTask<V> implements RunnableFuture<V> {
// State machine: there are 7 states
private volatile int state;
/ / new
private static final int NEW = 0;
// Task completed
private static final int COMPLETING = 1;
// Task completed
private static final int NORMAL = 2;
// The task is abnormal
private static final int EXCEPTIONAL = 3;
// Task cancelled
private static final int CANCELLED = 4;
// The task is interrupted
private static final int INTERRUPTING = 5;
// The task has been interrupted
private static final int INTERRUPTED = 6;
// Support the result return task
private Callable<V> callable;
// Task execution result: normal and abnormal results can be obtained using the GET method
private Object outcome;
// Task execution thread
private volatile Thread runner;
// Wait queue of stack structure, this node is the topmost node in the stack
private volatile WaitNode waiters;
}
Copy the code
instructions
- The state machine is used by FutureTask to mark the state of the task execution. In the source code, the author also describes the possible changes of these states please:
- 实 习 : NEW -> 实 习 -> NORMAL
- Exception: NEW -> COMPLETING -> EXCEPTIONAL
- Task cancelled: NEW -> CANCELLE
- The task is INTERRUPTED: NEW -> INTERRUPTED -> INTERRUPTED
- Callable is the specific task to be performed by FutureTask and is passed in externally
- Outcome is the result returned after a task has been performed
- Runner is the worker who actually performs the task
- Waiters is a wait node, but a top-most node, like a head node. The main purpose of the FutureTask wait queue is that when multiple threads request the get method to obtain the result, they may block at the same time, so the blocking information of these threads is stored in the wait node, and forms a stack wait structure
A constructor
// Pass the callable task directly
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// Pass in the runnable task and result variables
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Copy the code
instructions
- FutureTask provides two constructors that support both runnable and Callable tasks, but ultimately convert to callable tasks
- Runnable is converted to a Callable using the RunnableAdapter adapter. RunnableAdapter implements the Callable interface, and then in the call method, runnable’s Run method is actually executed. In addition, the result parameter passed in is returned as the result. The specific source code is as follows:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call(a) {
task.run();
returnresult; }}Copy the code
- The effect of result on the second constructor is somewhat questionable, at least because it does not tell whether the task was successfully executed, because even though the runnable exception thrown by the Run method in FutureTask is caught, However, the GET method throws an exception based on the state machine if it is abnormal or cancelled
The main method
Run ()- Task execution
- The run method is the actual execution body of the FutureTask task. It mainly completes the call method execution of the wrapped callable and saves the execution result to the outcome. At the same time, it catches the exception of the call method execution and saves the exception information instead of throwing it directly.
- In addition, the run method exists because it maintains the state machine, such as new-completeing-normal or new-completeing-exceptional, to ensure that the task is processed properly
- The run method initially updates runner to the current thread through CAS, thus avoiding the problem of running being executed multiple times in multiple threads.
public void run(a) {
// If the state machine is not NEW, the execution is complete or the task is canceled
// Set the state machine to NEW, and set runner to the current thread to ensure that only one thread executes the run method at the same time. If the setting fails, it will return directly
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// The task check is not empty and the recheck status is NEW. Just to avoid the possibility of the task being cancelled between the start and this point?)
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
// Execute the task
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
When a task throws an exception, the exception information is saved instead of being thrown directly
setException(ex);
}
// If the command is executed successfully, the result is saved
if(ran) set(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// The executor must not be null in order to avoid running () being called concurrently by multiple threads
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// The state must be retrieved after the task is cancelled to prevent missed processing of interrupt requests
int s = state;
// If it is set to an interrupt state, the processing is interrupted
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
Get ()- blocks to get the execution result of the task
- There are two types of GET methods, which are always blocking and timeout blocking
- The get method is intended to directly obtain the execution result of a task. However, if the task is not completed, the current thread will be blocked and wait until the task is completed.
- When the requesting thread blocks, a waiter node is created and added to the stack of blocking waits.
- When a task is completed or a thread with a blocking timeout timeout is set, the thread is removed from the blocking stack. The method of removal is very complex, taking into account the concurrency of multiple threads.
// always block fetching
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
// The task is blocked by awaitDone until it is not finished
if (s <= COMPLETING)
s = awaitDone(false.0L);
return report(s);
}
// Timeout blocks access
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// Block gets and throws a timeout exception when the blocking timeout is reached
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
// Thread blocks the wait method
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// Calculate the blocking timeout
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
// No blocking by default
boolean queued = false;
for (;;) {
If the blocking thread is interrupted, the current thread is removed from the blocking queue
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// Return the result when the task is complete
if(q ! =null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// If the task is completed but the last step is not completed, the CPU is released to the task thread to continue execution
Thread.yield();
else if (q == null)
// The new thread adds the wait node
q = new WaitNode();
else if(! queued)// The node is not yet added to the waiters stack after it was created at the previous step, so the next loop will push it here and put the wait node for the current thread at the top of the stack
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// If the blocking timeout period is set, it checks whether the blocking timeout period is reached. If the blocking timeout period is reached, it deletes the waiting node of the current thread and exits the loop. Otherwise, it continues blocking
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// Non-timeout blocking
LockSupport.park(this); }}If the task is executed normally, the result is returned directly; otherwise, an exception is thrown
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Copy the code
Cancel – The task is cancelled
- When a task is canceled, check whether the task can be canceled first. The task cannot be canceled when it is completed or in the process of being completed (the task is executed normally and the abnormal result is processed or the abnormal result is processed).
- The cancel method has a Boolean input parameter. If false, it wakes up only all waiting threads and does not interrupt the executing thread. True interrupts the task execution thread and changes the state machine to INTERRUPTED.
public boolean cancel(boolean mayInterruptIfRunning) {
// Cancel not allowed: The state machine is not NEW or the CAS fails to update the state machine
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// If a task in progress is INTERRUPTED, the task execution thread is INTERRUPTED and the status machine is updated to the final status INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if(t ! =null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
finishCompletion();
}
return true;
}
Copy the code
Other methods
SetException ()- Task execution exception handling
The setException method is mainly used for work execution exception handling. It saves exception information to outcom results, updates state machine changes from NEW to EXCEPTIONAL, and wakes up blocking all threads that request get in the waiters queue
protected void setException(Throwable t) {
// Update the state machine from NEW to COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Save the exception information to the output result
outcome = t;
Exception exception exception exception exception exception exception exception exception exception exception
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// A generic waiters completion operation that woks up blocking threads requesting GET in the waiters queue, more on that belowfinishCompletion(); }}Copy the code
Set ()- Normal task execution processing
The NORMAL task processing process is basically the same as the exception processing process, except that the state changes to new-Completeing-normal
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code
The cancellation of the handlePossibleCancellationInterrupt () – processing can interrupt
This method has A long and vague name, but what it does is to release the CPU from thread B when the INTERRUPTING thread A updates its state machine with A INTERRUPTING status. Allows thread A, which initiated the interrupt, to continue processing the interrupt B. (You need to combine the cancel method to fully understand this sentence.)
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
Copy the code
RemoveWaiter ()- Removes the waiting node
Delete the wait node method and clean up the thread wait node that is returned directly due to the task. The main purpose is to avoid concurrent updates in multiple threads
private void removeWaiter(WaitNode node) {
if(node ! =null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
s = q.next;
if(q.thread ! =null)
pred = q;
else if(pred ! =null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break; }}}Copy the code
conclusion
So far, FutureTask’s source code is basically finished, in general, FutureTask is the simplest implementation of Future, basically is the Future interface method to achieve again, without too much function expansion. Let’s briefly summarize some of its functions and features:
- FutureTask implements the Runnable interface, so it can perform task processing as a thread. For example, a Submit method in a thread pool wraps a Runnable or Callable task in the FutureTask class.
- FutureTask has an internal state machine that records the processing status of the task, for example, there are three final states: normal completed, execution exception, and task canceled
- The get method blocks to obtain the execution result of the task, and a blocking wait stack is maintained internally, which is used for multiple threads to call the GET method concurrently. At the same time, these threads are blocked and their blocking information is saved, so as to wake up after the task is executed
- The cancellation of a task is supported, but it is allowed only when the task is not fully executed successfully. The cancellation can be divided into two types: wake up the thread that blocks and waits for the result, or wake up the thread and forcibly interrupt the thread that executes the task