1. Introduction
When we use asynchronous programming in Java, most of the time we use the Future and submit a Callable object using the thread pool’s Submit method. The Future’s get method is then called to wait for the return value. And FutureTask is an implementation of Future, which is our hero today.
Let’s analyze FutureTask at the source level.
2. Initial experience of FutureTask
We tend to work with FutureTask, which is an interface, and FutureTask, which is a standard implementation. When we submit a task to the thread pool, the thread pool creates a FutureTask return.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Copy the code
The newTaskFor method creates a FutureTask return.
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
Copy the code
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
Copy the code
The thread pool executes FutureTask’s run method.
So, let’s look at FutureTask’s UML.
As you can see, FutureTask implements Runnable, Future. Not to mention Runnable, a run method, but what about Future?
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled(a);
boolean isDone(a);
V get(a) throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Copy the code
It is mainly these five methods that support the Future. Its functions are relatively weak. After all, it is only a Future, not a Promise.
FutureTask also has an inner class, WaitNode, structured as follows:
static final class WaitNode {
volatile Thread thread;
volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code
Does it look familiar to AQS nodes?
FutureTask maintains a stack structure internally, which is different from AQS queues.
In fact, in previous versions FutureTask did use AQS directly, but Doug Lea optimized the class to:
The main purpose is to prevent some users from retaining the interrupted state during the uncontested period.
Internally, a volatile state variable is used to control state, and a stack structure is used to hold waiting threads.
The reason, of course, is that FutureTask’s GET method supports concurrency. Multiple threads can get the same result from the same FutureTask, and these threads must suspend their wait during get blocking.
You know the structure of FutureTask. We know that the thread pool must execute FutureTask’s run method, so let’s go to its run method.
In the meantime, we’ll look at the key method, the GET method.
3. FutureTask get method
The code is as follows:
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false.0L);
return report(s);
}
Copy the code
First determine the status, then suspend yourself and wait, finally, return the result, the code is very simple.
Note: FutureTask has seven states:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
Copy the code
The state is NEW when constructed, and becomes well spent when the task is completed. When the task is complete, the state changes to NORMAL.
Let’s focus on the awaitDone and report methods.
AwaitDone method:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if(q ! =null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); }}Copy the code
The above method is relatively simple compared to other JUC classes. One thing to note is that the GET methods can be accessed concurrently, and when accessed concurrently, these threads need to be stored in a stack within FutureTask.
A brief description of the methods and steps:
- If the thread breaks, remove the node and throw an exception.
- If the font is larger than Those in COMPLETING, the task is complete and the result is returned.
- 13, If equals 实 习, the task is COMPLETING, spin for a while.
- If q is null, this is the first time to enter and create a new node. Saves the current thread reference.
- If you haven’t yet modified the waiters variables, use CAS to modify the current waiters to the current node, which is a stack structure.
- Suspends the current thread based on the time policy.
- When the thread wakes up, it continues the above judgment and normally returns data.
Now look at the report method:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Copy the code
Again, it’s very simple, take the result, judge the status, if the status is good, return the value, if not, throw an exception.
To summarize the GET method:
FutureTask suspends itself and waits for the asynchronous thread to wake up, then takes the data set by the asynchronous thread.
4. FutureTask’s run method
In summary, FutureTask suspends itself and waits for the asynchronous thread to wake up, then takes the data set by the asynchronous thread.
So where is the process? The answer is in the run method. We know that when a thread pool executes a FutureTask, it must execute its run method. So, let’s look at his run method:
public void run(a) {
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if(ran) set(result); }}finally {
runner = null;
int s = state;
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
The method logic is as follows:
- Judge the status.
- Execute the call method of callable.
- Sets the result and wakes up all waiting threads.
See how the set method sets the result:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code
Change the state to COMPLETING first, then set the result, then set the state to NORMAL, and finally wake up the waiting thread by executing the finishCompletion method.
The finishCompletion code looks like this:
private void finishCompletion(a) {
// assert state > COMPLETING;
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if(t ! =null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
Copy the code
The waiters modify the work to NULL, then traverse all the nodes in the stack, that is, all the waiting threads, waking them up in turn.
Finally, the done method is executed. This method is extended by leaving a subclass. Is an empty method in FutureTask. Spring’s ListenableFutureTask, for example, extends this method. The QueueingFuture class in JUC also extends this method.
If it is EXCEPTIONAL, change the state to EXCEPTIONAL.
If the user performs the cancel (true) method. The Java doc for this method is as follows:
Attempted to cancel execution of this task. This attempt will fail if the task is completed, cancelled, or cannot be cancelled for some other reason. When cancel is called, if the call succeeds and the task has not been started, the task will never run. If the task has been started, the mayInterruptIfRunning parameter determines whether the thread executing the task should be interrupted by attempting to stop the task.
That is, the mayInterruptIfRunning decides to abort the task when it is already running. If mayInterruptIfRunning is true, the status is changed to INTERRUPTING, the thread’s interrupt method is called, and finally, the status is set to INTERRUPTED.
In the finally block of the run method, INTERRUPTING is judged, that is, when INTERRUPTING or INTERRUPTED, the finally block is executed, and at this point, You need the spin wait state to become INTERRUPTED.
The specific code is as follows:
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
Copy the code
5. To summarize
The run method is responsible for executing the call method of the callable and setting the return value to a variable. The get method is responsible for blocking until the Run method finishes executing the task and wakes it up. And then the get method goes back to the result.
Meanwhile, FutureTask uses a stack structure to hold all waiting threads in order for multiple threads to call get methods concurrently. That is, all threads are waiting to get the result of the GET method.
While FutureTask is well designed, I still feel that using asynchrony is the better choice and more efficient.