This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.
1. Introduction
In the Thread creation method, we can directly inherit Thread and implement the Callable interface to create threads, but neither method of Thread creation can return the result of execution. Starting with JDK1.5, Callable and Future interfaces are available to create threads that return the results of their execution after the task is completed.
The Future mode allows threads to get the results of execution asynchronously, rather than waiting for the logic to run() and then execute.
Here’s what asynchrony is:
Port A is the master thread, and port B is the auxiliary thread. When port B is blocked, port A does not affect water flow.
Let’s write a little demo to prove that the Future thread is asynchronous.
// Future is an interface, and FutureTask is an implementation class of Future
package FutureTask;
import java.util.concurrent.*;
public class Demo02 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
long start = System.currentTimeMillis();
FutureTask<Integer> futureTask1 = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call(a) throws Exception {
// Sleep for 2000 milliseconds
Thread.sleep(2000);
return 1; }}); FutureTask<Integer> futureTask2 =new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call(a) throws Exception {
// Sleep 1000 milliseconds
Thread.sleep(1000);
return 2; }});// This thread sleeps for 2000 milliseconds
new Thread(futureTask1).start();
// This thread sleeps 1000 milliseconds
new Thread(futureTask2).start();
Integer s1 = futureTask1.get();
Integer s2 = futureTask2.get();
Integer s3 = s1 + s2;
long end = System.currentTimeMillis();
System.out.println("Result:" + s3);
// Outputs the total execution time
System.out.println("Take time:" + (end - start) + "毫秒"); }}/* If the Future is not asynchronous, the output time must be greater than 3000 ms. * /
Copy the code
The results are as follows:
As you can see, the elapsed time is less than 3000 milliseconds, indicating that the worker thread created with the Future executes asynchronously with the main thread. A Future is an interface, and you must implement the interface if you want to use the functionality of a Future. FutureTask, which we will introduce next, is the Future implementation class
2. FutureTask inheritance system
We see that FutureTask implements the RunnableFuture interface, where the RunnableFuture interface inherits the Runnable and Future interfaces, so we can say that FutureTask implements the Future and Runnable interfaces, It is important to note, however, that FutureTask also implements the Callable interface, which is not visible in the inheritance architecture but can be seen in the FutureTask source code. Let’s take a look at FutureTask, RunnableFuture inheritance system part of the source code.
- FutureTask inheritance system
//
refers to the generic type passed in as the type of the result to return
public class FutureTask<V> implements RunnableFuture<V>
Copy the code
- RunnableFuture inheritance system
//
refers to the generic type, which is passed in the type of the result to be returned
public interface RunnableFuture<V> extends Runnable.Future<V> {
void run(a);
}
Copy the code
# FutureTask usage scenarioFutureTask is used for asynchronously executing or canceling tasks. The FutureTask object is passed either a Runnable implementation class or a Callable implementation class to a Thread object or Thread pool. The get() method asynchronously returns the result of the execution, and if the task is not complete, the thread calling the get() method blocks until the task is complete. The run() logic in FutureTask is executed only once, no matter how many threads call the GET () method. We can also call the FutureTask object's cancel() method to cancel the current task.Copy the code
3. Source code analysis
3.1 Member Variables
State indicates the status of the current task:
- NEW: indicates that the task is created and has not been executed.
- COMPLETING: Indicates that the current task is about to end, but not completely, and the return value has not been written, in a critical state.
- NORMAL: Indicates that the task is normally terminated (no exception, interruption, or cancellation occurs).
- EXCEPTIONAL: Indicates that the current task is interrupted because an exception occurs and is abnormally terminated.
- CANCELLED: Indicates that the current task is CANCELLED due to a call to Cancel.
- INTERRUPTING: Indicates a phase when a task is interrupted but not completely interrupted.
- INTERRUPTED: Indicates that the current task is completely INTERRUPTED.
// Indicates the status of the current task
private volatile int state;
// Indicates that the state of the current task is newly created and has not yet been executed
private static final int NEW = 0;
// Indicates that the current task is about to end. It is not completely finished, and the value has not been written
private static final int COMPLETING = 1;
// Indicates that the current task is successfully completed
private static final int NORMAL = 2;
// Indicates that an exception occurred during the execution of the current task, and the enclosed callable.call() raised an exception
private static final int EXCEPTIONAL = 3;
// Indicates that the current task is canceled
private static final int CANCELLED = 4;
// Indicates that the current task is interrupted
private static final int INTERRUPTING = 5;
// Indicates that the current task is interrupted
private static final int INTERRUPTED = 6;
// When we use the FutureTask object, we pass in a Callable implementation class or Runnable implementation class that stores the Callable
// The incoming Callable implementation class or Runnable implementation class (Runnable would be disguised as Callable using decorator design patterns)
//submit(callable/runnable): Runnable uses decorator design mode to masquerade as callable
private Callable<V> callable;
Under normal circumstances, outcome stores the returned results of a task
In abnormal cases, outcome saves exceptions thrown by the task
private Object outcome; // non-volatile, protected by state reads/writes
// Save a reference to the thread executing the task during the current task execution
private volatile Thread runner;
// Since there are many threads to get results, the thread is encapsulated as a WaitNode, a data structure: a stack, with a headplug
private volatile WaitNode waiters;
static final class WaitNode {
// Thread object
volatile Thread thread;
// Next WaitNode
volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code
3.2 Construction method
There are two constructors: one is to pass in a single Callable object and the Callable object returns the same result as the FutureTask object. The other is to pass in a Runnable object and a generic variable, where the Runnable object is disguised as a Runnable object and the generic variable is returned by FutureTask after it executes the task.
// This constructor passes a callable, and the result returned by the call to get is the result returned by callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// Set the state to newly created
this.state = NEW; // ensure visibility of callable
}
// This constructor passes in a runnable, a result variable, which is returned by calling get
public FutureTask(Runnable runnable, V result) {
// Decorator mode to convert runnable to callable interface
this.callable = Executors.callable(runnable, result);
// Set the state to newly created
this.state = NEW; // ensure visibility of callable
}
Copy the code
3.3. Membership method
3.3.1 Run () method and its related methods
Run () executes the specific logic of the current task. It involves setException method, set method, handlePossibleCancellationInterrupt, finishCompletion, we will talk about these methods.
The run () method
It will call the specific logic of the Callable object’s run() task and some logic about the task’s status and return results.
public void run(a) {
// If the status of the current task is not new or the old value of the runner is not null, it will be returned directly
// Execute once.
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// Only if the task status is new and the runner's old value is null
try {
// The incoming Callable task
Callable<V> c = callable;
// The task is executed only when the task is not null and the current task status is new
// Condition 1: prevent null pointer exceptions
// Condition 2: Prevent external thread cacle from dropping the current task
if(c ! =null && state == NEW) {
// Store the result of the task
V result;
// Save the execution successfully
boolean ran;
try {
// Call callable.run() and return the result
result = c.call();
// Set ran to true
ran = true;
} catch (Throwable ex) {
// Callable's run() method throws an exception
// Set the result to null
result = null;
Failed to execute set ran to false
ran = false;
Outcome is set internally to the exception thrown,
// Update the task status to EXCEPTIONAL and wake up the blocked thread
setException(ex);
}
// If the command is executed successfully (normal)
if (ran)
// Internally sets the outcome to the result of the callable execution, updates the status of the task to NORMAL (the task executes normally) and wakes up the blocked threadset(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// Set the thread of the current task to null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// Current task status
int s = state;
// If state>=INTERRUPTING, the current task is interrupted or has been interrupted
if (s >= INTERRUPTING)
// If the current task is in a state, the thread executing this method will continue to relinquish the CPU until the task is interruptedhandlePossibleCancellationInterrupt(s); }}Copy the code
SetException Throwable (t) method
This method is executed if an exception occurs during execution of the run() method. The logic is as follows:
- Set the state of the task to EXCEPTIONAL(exception)
- Set the outcome to the exception thrown by the Run () method of the Callable object
- Execute the finishCompletion() method to wake up the thread that is blocked by calling the get() method.
protected void setException(Throwable t) {
// If the current task is in the new state, set the task state to critical (about to complete).
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Sets the outcome to the exception thrown by callable.run()
outcome = t;
// Set the status of the current task to an interrupt exception
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// Wake up all waiting threads calling get() and empty the stackfinishCompletion(); }}Copy the code
The set V (V) method
This method is executed if the run() method ends without exception, and the logic is as follows:
- Set the status of the task to NORMAL.
- Set the outcome to the result returned by a call to the run() method on the Callable object
- Wake up a thread that is blocked by calling the get() method.
protected void set(V v) {
// If the current task is in the new state, set the current task to the critical state (about to complete).
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Set the outcome to the result returned by callable.run()
outcome = v;
// Set the status of the current task to Normal end
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// Wake up all waiting threads calling get() and empty the stackfinishCompletion(); }}Copy the code
HandlePossibleCancellationInterrupt (int s) method
This method might be executed in the run() method, where, when the task status is interrupted, the thread that grabs the CPU frees up CPU resources until the task status is updated to interrupted.
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
Copy the code
FinishCompletion () method
Task completion (normal and abnormal completion) calls this method to wake up any threads that have been blocked by calling the GET () method.
private void finishCompletion(a) {
// assert state > COMPLETING;
// If the condition is true, there are currently blocked threads
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// Get the thread encapsulated by the current node
Thread t = q.thread;
// Prevent null pointer exceptions
if(t ! =null) {
// Set q.htread to null for GC
q.thread = null;
// Wake up the thread wrapped by the current node
LockSupport.unpark(t);
}
// Get the next WaitNode
WaitNode next = q.next;
// If next is empty, the stack is now empty, and all threads blocked by get() have been woken up
if (next == null)
break;
// There are threads blocked by calls to get(), and the spin then wakes up
// where q.ext is set to null to help GC
q.next = null; // unlink to help gc
// Next WaitNode
q = next;
}
/ / the interrupt
break; }}// Empty method, subclasses can be overridden
done();
// Set callable to null for GC
callable = null; // to reduce footprint
}
Copy the code
3.3.2 Get () method and its related methods
The get () method
The get() method retrieves the result returned after the task is executed. For the get() method of an empty parameter, if a thread calls the get() method to get the result before the task is finished, the thread will block. The blocking method is awaitDone, as we’ll see below.
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
// Determine whether the current task state is smaller than that of COMPLETING Those tasks
if (s <= COMPLETING)
// Condition creation calls the awaitDone method to spin and wait until the task is complete
s = awaitDone(false.0L);
return report(s);
}
Copy the code
A TimeoutException will be thrown if no result is returned within the specified time.
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
Copy the code
AwaitDone (Boolean timed, long nanos) method
This method is used to block all threads that have not finished executing FutureTask because they called get() to get results. The awaitDone method is called inside the **get()** method.
AwaitDone = awaitDone = awaitDone = awaitDone = awaitDone
- The mission is not finished yet
- The thread calls the **get()** method
// The function of this method is to wait for the task to complete (complete normally or abnormally), be interrupted, or be timed out
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// This WaitNode actually stores the current thread
WaitNode q = null;
// Indicates whether the WaitNode object represented by the current thread was successfully pushed
boolean queued = false;
for (;;) {
// If the current thread has an interrupt exception, the WaitNode represented by the thread is removed from the stack and an interrupt exception is thrown
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// Get the status of the current task
int s = state;
COMPLETING the task (COMPLETING, interrupted, and cancelled) // If the current task status is greater than that of COMPLETING those tasks (COMPLETING, interrupted, and cancelled), return the task status directly
if (s > COMPLETING) {
if(q ! =null)
// Set q.htread to null for GC
q.thread = null;
return s;
}
// The current thread releases the CPU when the task is in the critical state and is about to complete
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// For the first spin, if the current WitNode is null, new a WaitNode
else if (q == null)
q = new WaitNode();
// The second spin, if the current WaitNode is not enqueued, attempts to enqueue
else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// The third spin indicates whether the timeout is defined
else if (timed) {
nanos = deadline - System.nanoTime();
// When the specified time is exceeded, the current node is removed and the task status is returned
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// Suspend the current thread for a certain amount of time
LockSupport.parkNanos(this, nanos);
}
else
// Suspends the current thread, which sleeps (when will the thread resume execution? Unless another thread calls unpark() or interrupts it.)
LockSupport.park(this); }}Copy the code
RemoveWaiter method
When an interrupt occurs, the node in the stack is emptied.
private void removeWaiter(WaitNode node) {
if(node ! =null) {
The GC / / help
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
s = q.next;
// Subsequent nodes are not empty
if(q.thread ! =null)
pred = q;
// The precursor node is not null
else if(pred ! =null) {
// The successor of the precursor points to the successor of the current node, which is equivalent to deleting the current node
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break; }}}Copy the code
The report (int s) method
This method is actually used to get the return result of the task. This method is called inside the **get()** method, and if it is called, the task is finished.
private V report(int s) throws ExecutionException {
// Get the value of outcome
Object x = outcome;
If the status of the current task is normal end, the outcome value is returned
if (s == NORMAL)
return (V)x;
// If the status of the current task is >= CANCELLED, the status is CANCELLED, or interrupted, or has been interrupted
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Copy the code
3.3.3 Cancel (Boolean mayInterruptIfRunning) method
This method cancels the current task.
public boolean cancel(boolean mayInterruptIfRunning) {
// If condition 1 is true, the task is running or in a thread pool queue
// If condition 2 is true, CAS can execute the following logic successfully; otherwise, false is returned, indicating cancel failed
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// Retrieves the current thread of execution, or null, which means the current task is in the queue and the thread has not fetched it yet
Thread t = runner;
// Give Thread an interrupt signal. If your program responds to interrupts, follow the interrupt logic. If your program is not interrupt responsive, do nothing
if(t ! =null)
t.interrupt();
} finally { // final state
// Set the task status to Interrupted
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
// Wake up all threads blocked by the call to get()
finishCompletion();
}
return true;
}
Copy the code
4. To summarize
- FutureTask is executed asynchronously.
- FutureTask objects can use the get() method to return the result of execution, and if the task is not complete, the thread calling get() will block until the task is complete.
- The FutureTask object can call the Cancel method to cancel the execution of the task.