“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”
preface
When you talk about FutureTask, you have to talk about Callabl and Future; Where Callabl is an interface, used to define tasks, and where there is a return value, and can have a return value. The Future is used to retrieve the results of Callabl execution. This note mainly writes FutureTask source code.
The body of the
public class FutureTask<V> implements RunnableFuture<V>{}
public interface RunnableFuture<V> extends Runnable.Future<V> {}
Copy the code
FutureTask implements the RunnableFuture interface. The RunnableFuture interface in turn inherits the Runnable and Future interfaces.
So, FutureTask can be passed as a Runnable parameter to Thread and then start the Thread to execute the task.
constant
/** The underlying callable; nulled out after running */
private Callable<V> callable; // The task being performed
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/ index //
/** The thread running the callable; CASed during run() */
private volatile Thread runner; // The thread that executes the task
/** Treiber stack of waiting threads */
private volatile WaitNode waiters; // Wait node, used to store wait results of the thread, linked list structure
Copy the code
A state is defined in FutureTask to mark the execution status of the task.
/* * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */
private volatile int state;
private static final int NEW = 0; / / new
private static final int COMPLETING = 1; / / implementation
private static final int NORMAL = 2; // The execution is complete
private static final int EXCEPTIONAL = 3; // Execute an error
private static final int CANCELLED = 4; / / cancel
private static final int INTERRUPTING = 5; / / the interrupt
private static final int INTERRUPTED = 6; / / the interrupt
Copy the code
Transitions between states are also explained in comments.
A constructor
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Copy the code
There are two constructors, which set the state to NEW; The construction method that receives the Runnable parameter needs to pass in a fixed value and use execnable. Callable to convert Runnable and result to Callable. The adaptor mode is used. Interested can go into the source code to see.
Run method
The run method is where the thread calls directly. In fact, the basic principle is the same as the execution of Runnable, but there is one more wrapper, the result of the execution is processed.
public void run(a) {
// Determine the status
if(state ! = NEW ||! UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if(ran) set(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
The run method still looks simple, just executing the call method and saving the result. I’m going to focus on two methods here, setException and set.
setException
This method is called when a task fails, taking the error message as an argument. Here is the source code:
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion(); }}Copy the code
The source code is also relatively simple:
CAS
willstate
fromNEW
toCOMPLETING
- Assign error messages to
outcome
- will
state
toEXCEPTIONAL
- perform
finishCompletion
methods
Set method
This method is called after the task has been successfully executed, passing in the execution result as a parameter. The source code is as follows:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code
The process is the same as the setException method, except that the third step is to convert state to NORMAL.
finishCompletion
This method is called in both set and setException. Remove all waiting threads, signal the thread, execute the done() method, and set callable to null; Look at the source code below:
private void finishCompletion(a) {
// assert state > COMPLETING;
for(WaitNode q; (q = waiters) ! =null;) { // Loop stores a list of threads
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // CAS modifies the waiters value to NULL
for (;;) {
Thread t = q.thread; // Get the thread
if(t ! =null) {
q.thread = null; / / set to null
LockSupport.unpark(t); // Notify the thread
}
WaitNode next = q.next; // point to the next node
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
Copy the code
The source code is relatively simple, that is, the for loop traverses the list of waiting threads, and then notifies the thread through locksupport.unpark. This section focuses on the use and principle of locksupport. unpark.
In fact, the thread to perform the task of the general flow is:
- Thread execution task
- Determine the execution result and save it (failure or success)
- Modify status and save result information
- Notify waiting threads one by one (
LockSupport.unpark
)
isCancelled
Determine whether to cancel based on the status
public boolean isCancelled(a) {
return state >= CANCELLED;
}
Copy the code
isDone
Determines whether the thread is complete
public boolean isDone(a) {
returnstate ! = NEW; }Copy the code
cancel
Cancel the task
The source code is as follows:
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS modifies the status to interrupt or CANCELLED according to mayInterruptIfRunning
If the modification fails, return false. If the modification succeeds, go to the next step
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// Whether to cancel a task in progress
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if(t ! =null)
t.interrupt(); / / use
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
finishCompletion();
}
return true;
}
Copy the code
The source code is also simpler, with the mayInterruptIfRunning parameter, which is used to determine whether to cancel a task in progress. If true, Thread interrupt is used to cancel the task. Finally, the waiting thread is notified to cancel the wait.
get
This name has two methods, one is always waiting, and one is waiting for a finite amount of time.
// This method is to wait until there is a result
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // If the state is running or NEW, wait (call awaitDone)
s = awaitDone(false.0L);
return report(s);
}
// This is a finite waiting time
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
Copy the code
The process is as follows:
- Determine the status, whether to call
awaitDone
- Returns results based on status
Both methods are similar, calling the awaitDone method with different arguments.
awaitDone
private int awaitDone(boolean timed, long nanos)throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// The task is complete, and the status code is returned
if (s > COMPLETING) {
if(q ! =null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null) // Build the node
q = new WaitNode();
else if(! queued)// Waiters modify CAS
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // Use this method for limited time
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else // Unlimited time
LockSupport.park(this); }}Copy the code
There are three main points in this method:
- Build with the current thread
WaitNode
And then useCAS
willWaitNode
joinwaiters
,waiters
It’s just a linked list - use
LockSupport
To block the thread (for finite or infinite duration) - A status code is returned on success
report
This method is also relatively simple
private V report(int s) throws ExecutionException {
Object x = outcome; // Get the result
if (s == NORMAL)
return (V)x; // The result will be converted if completed normally
if (s >= CANCELLED)
throw new CancellationException(); // Cancel return
throw new ExecutionException((Throwable)x); // Execute an error return
}
Copy the code
The last
conclusion
Overall, the source code is relatively simple. Conceptually, FutureTask is an intermediate class, essentially a Runnable that wraps the Callable inside (overriding the run method) and then blocks and notifies the thread using LockSupport to wait for the task to complete and get the result.
In FutureTask, from the perspective of the callable of performing the task, the state is not completely one-to-one in a strict sense. For example, the NEW state may have completed the task, or the execution may have gone wrong (only the outcome has not been saved to the shared variable). This can be seen in the run method, where the state changes after the Call method of the Callable is executed. If FutureTask is viewed as a whole, it’s different.
Callable can be started in two ways: First, build FutureTask with Callable as a parameter, then create Thread instance with FutureTask as a parameter, and then start start. The second option is to put the Callable directly into the thread pool’s Submit method and return FutureTask. The principle of thread pool and the first way is the same, thread pool is implemented in the submit method of the first step.