“This is the 17th day of my participation in the First Challenge 2022. For details: First Challenge 2022.”
Are you familiar with FutureTask? Or do you need asynchronous computation? FutureTask is a great way to do asynchronous computations and get the results of asynchronous tasks synchronously. Let’s examine FutureTask from source code.
What is FutureTask?
FutureTask is a cancelable asynchronous computation.
FutureTask provides a basic implementation of the Future, which can call methods to start and cancel a calculation, query whether the calculation is complete, and retrieve the result.
FutureTask can only get the result after the calculation is complete. Once the calculation is complete, it cannot be restarted or cancelled unless the runAndReset method is called.
FutureTask implements the Future interface as well as the Runnable interface, so FutureTask can be executed by executors in the thread pool or directly using an asynchronous thread call (futureTask.run()).
How is FutureTask implemented?
First, let’s look at the inheritance structure of the FutureTask class. It implements the RunnableFuture interface. RunnableFuture inherits from the Future and the functional interface Runnable, so FutureTask is essentially a RunnableFuture.
The Future interface specifies some functions that asynchronous computing classes must implement, source code is as follows:
package java.util.concurrent; Public interface Future<V> {/** ** Attempts to cancel the task and returns the cancellation result. * mayInterruptIfRunning: Whether to interrupt the thread. */ boolean cancel(boolean mayInterruptIfRunning); /** * Check whether the task has been cancelled (true if cancelled before normal end) */ Boolean isCancelled(); /** * Check whether the current task is completed, including complete, abnormal, or canceled. */ boolean isDone(); /** * Gets the execution result of the task. The task will be blocked before it ends. */ V get() throws InterruptedException, ExecutionException; /** * Attempts to obtain execution results within the specified time. TimeoutException */ V GET (long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code
We are all familiar with the Runnable interface, which is a functional interface that we often use to create a thread.
package java.lang;
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Copy the code
FutureTask is a task to be executed, which contains the implementation of the above interface. FutureTask internally defines the task’s state and some state constants. The core of FutureTask is a Callable. We can pass either a Callable or a runnable through the constructor, which will be internally converted to a Callable because we need to get the result of an asynchronous task, and only threads created with a Callable will return the result.
We can judge the return result of isCancelled(), isDone() in the Future by the state at this time.
Below is the FutureTask source code, with core source analysis comments
package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;
public class FutureTask<V> implements RunnableFuture<V> {
/** * Task running status */
private volatile int state;
private static final int NEW = 0; / / new
private static final int COMPLETING = 1; / / finish
private static final int NORMAL = 2; / / normal
private static final int EXCEPTIONAL = 3; / / exception
private static final int CANCELLED = 4; / / cancel
private static final int INTERRUPTING = 5; / / the interrupt
private static final int INTERRUPTED = 6; / / the interrupt
private Callable<V> callable;
/** * returns the result */
private Object outcome;
private volatile Thread runner;
private volatileWaitNode waiters; .public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public boolean isCancelled(a) {
return state >= CANCELLED;
}
public boolean isDone(a) {
returnstate ! = NEW; }/* * Cancel task implementation * If cancel(true) is called before the task is started, the task will never be executed. * If the task is already started, the mayInterruptIfRunning parameter determines whether the task should interrupt the thread executing the task to try to interrupt the task. * If the task task has been cancelled, completed, or cannot be cancelled for some other reason, the attempt will fail. * /
public boolean cancel(boolean mayInterruptIfRunning) {
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if(t ! =null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
finishCompletion();
}
return true;
}
/* * Wait to obtain the result * Obtain the current status and check whether the execution is complete. If the task is not completed, it blocks and waits for completion. If it times out, it throws a timeout wait exception. * /
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false.0L);
return report(s);
}
/* * Wait to obtain the result * Obtain the current status and check whether the execution is complete. * If the task is not completed, block and wait for completion. * /
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/** * Check whether the result is abnormal */
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
protected void done(a) {}/** * Check whether the state is complete with CAS */
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}/** * Set exception, when the operation is complete, set exception state */
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion(); }}/* * Execute callable to obtain the result, or exception * to determine whether the state is started, if it is new to run method */
public void run(a) {
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if(ran) set(result); }}finally {
runner = null;
int s = state;
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}/** * re-execute */
protected boolean runAndReset(a) {
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if(c ! =null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch(Throwable ex) { setException(ex); }}}finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
/* * Handle interrupts that may be cancelled */
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
static final class WaitNode {
volatile Thread thread;
volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}/** * remove and wake up all waiting threads, execute done, and null callable */
private void finishCompletion(a) {
// assert state > COMPLETING;
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if(t ! =null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
/** * wait for completion * first check whether timeout * interrupt, then handle exception state, complete... * /
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if(q ! =null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); }}/** ** remove wait */
private void removeWaiter(WaitNode node) {
if(node ! =null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
s = q.next;
if(q.thread ! =null)
pred = q;
else if(pred ! =null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break; }}}// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw newError(e); }}}Copy the code
FutureTask runs the process
In general, we can think of FutureTask as having three states:
-
Not started: The new FutureTask is not started until run() is executed.
private static final int NEW = 0; / / new Copy the code
-
Started: FutureTask is started when the Run method of the FutureTask object is started and executed.
-
Completed: The FutureTask is completed when it finishes normally, when the FutureTask execution is cancelled (the FutureTask cancel method), or when the FutureTask run method throws an exception and terminates.
private static final int COMPLETING = 1; / / finish private static final int NORMAL = 2; // Result of normal setting after completion private static final int EXCEPTIONAL = 3; // The setting is abnormal private static final int CANCELLED = 4; // Execute cancel private static final int INTERRUPTING = 5; / / the interrupt private static final int INTERRUPTED = 6; / / the interrupt Copy the code
The use of FutureTask
Use one (directly create a new thread call) :
FutureTask<Integer> task = new FutureTask<>(new Callable() {
@Override
public Integer call(a) throws Exception {
returnsum(); }});new Thread(task).stat();
Integer result = task.get();
Copy the code
Use two (in conjunction with thread pools)
FutureTask<Integer> task = new FutureTask<>(new Callable() {
@Override
public Integer call(a) throws Exception {
returnsum(); }}); Executors.newCachedThreadPool().submit(task); Integer result = task.get();Copy the code