A preface
Usually USES to see some source FutureTask objects, such as ThreadPoolExecutor and ScheduledThreadPoolExecutor, etc. But the study of FutureTask is just staying on the surface, today I have in-depth study, the source code has been analyzed, I hope to help you.
The introduction of
1 What is FutureTask
1.1 Official Description
A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation.
The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset).
A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.
Here’s a simple translation:
-
Cancelable asynchronous computation. This class provides the basic implementation of the Future, which contains methods to start and cancel computations, query to see if the computation is complete, and retrieve the results of the computation.
-
The results can only be retrieved after the calculation is complete, and if the calculation is not complete, the GET method blocks. Once the calculation is complete, it cannot be restarted or cancelled (unless the calculation is called using runAndReset).
-
FutureTask can be used to wrap Callable or Runnable objects. Because FutureTask implements Runnable, it can be submitted to Executor for execution.
-
In addition to being a standalone class, this class also provides protected methods, which can be useful when creating custom task classes.
1.2 Personal Understanding
- I think
FutureTask
Is a task class that separates the execution of a task from the return of a result, delays the computation, or the retrieval, and ensures concurrency safety. It can be executed separately or in a thread pool, facilitating concurrent asynchronous execution and improving operation efficiency.
1.3 FutureTask class diagram
At the same time to seeFutureTask
Class diagram, we can find that,FutureTask
To achieve theRunableFuture
Interface, andRunableFuture
And inheritedRunable
和 Future
Interface, that is,FutureTask
bothFuture
和 Runnable
Ability.
1.4 Core Methods
public class FutureTask<V> implements RunnableFuture<V> {
// Get the result, if the calculation is not complete, the current thread will block
public V get(a) throws InterruptedException, ExecutionException {... }If the calculation is not complete, the current thread will be blocked, but there is a time limit
// If the timeout is not complete, throw a TimeoutException
public V get(long timeout, TimeUnit unit){... }// Perform the calculation, run the Callable call method, and assign the result
public void run(a){... }// Perform the calculation, run Callable's call method, and reset state to NEW
// The result is not assigned, can be repeated multiple times, only subclasses can be called, for extension, such as ScheduledFutureTask
protected void runAndReset(a){... }MayInterruptIfRunning specifies whether the current thread is interrupted during the cancellation process
public boolean cancel(boolean mayInterruptIfRunning){... }// Check whether the task is canceled
public boolean isCancelled(a){... }// Check whether the task is complete
public boolean isDone(a){...}
}
Copy the code
2 Use cases of FutureTask
2.1 Independent Use
/ * * *@author CodderFengzi
* @date2021/1/16 * * /
public class FutureTaskDemo {
private Response response = new Response("No response");
/** * get the response **@paramDelay delay *@paramTimeUnit timeUnit *@return {@link Response}
*/
private Response getResponse(long delay, TimeUnit timeUnit) {
FutureTask<Response> ft = new FutureTask<>(new Task());
ft.run();
try {
response = ft.get(delay, timeUnit);
} catch (InterruptedException e) {
response = new Response("Break");
} catch (ExecutionException e) {
response = new Response("Error");
} catch (CancellationException e) {
response = new Response("Cancelled");
} catch (TimeoutException e) {
response = new Response("Request timed out");
ft.cancel(true);
}
return response;
}
private static class Response {
private final String content;
private Response(String content) {
this.content = content;
}
@Override
public String toString(a) {
return "Response{" + "content='" + content + '\' ' +
'} '; }}private static class Task implements Callable<Response> {
@Override
public Response call(a) throws Exception {
// Simulate network transmission time consumption
Thread.sleep(2000L);
return new Response("Correct response"); }}public static void main(String[] args) {
FutureTaskDemo0 taskDemo = new FutureTaskDemo0();
Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
Response{content=' Request timeout '}
System.out.println(timeoutResponse);
// Print the result: Response{content=' correct Response '}System.out.println(normalResponse); }}Copy the code
2.2 Used in thread pools
The following provides a simple example of creating a FutureTask object and executing it into a ThreadPoolExecutor thread pool. You can set the timeout period and throw a TimeoutException if the task is not completed after the timeout period.
/ * * *@author CodderFengzi
* @date2021/1/16 * * /
public class FutureTaskDemo1 {
private Response response = new Response("No response");
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4.4.0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/** * get the response **@paramDelay delay *@paramTimeUnit timeUnit *@return {@link Response}
*/
private Response getResponse(long delay, TimeUnit timeUnit) {
FutureTask<Response> ft = new FutureTask<>(new Task());
executor.execute(ft);
try {
response = ft.get(delay, timeUnit);
} catch (InterruptedException e) {
response = new Response("Break");
} catch (ExecutionException e) {
response = new Response("Error");
} catch (CancellationException e) {
response = new Response("Cancelled");
} catch (TimeoutException e) {
response = new Response("Request timed out");
ft.cancel(true);
}
return response;
}
private static class Response {
private final String content;
private Response(String content) {
this.content = content;
}
@Override
public String toString(a) {
return "Response{" + "content='" + content + '\' ' +
'} '; }}private static class Task implements Callable<Response> {
@Override
public Response call(a) throws Exception {
// Simulate network transmission time consumption
Thread.sleep(2000L);
return new Response("Correct response"); }}private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-"+ COUNT.getAndIncrement()); }}public static void main(String[] args) {
FutureTaskDemo taskDemo = new FutureTaskDemo();
Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
Response{content=' Request timeout '}
System.out.println(timeoutResponse);
// Print the result: Response{content=' correct Response '}System.out.println(normalResponse); }}Copy the code
2.3 Used as a cache value
There is also an example of FutureTask being used as a cache, but this is a simple use, with no logic to write cache obsolescence.
/ * * *@author CodderFengzi
* @date2021/1/16 * * /
public class FutureTaskDemo2 {
private final ConcurrentHashMap<Integer, Future<String>> map = new ConcurrentHashMap<>();
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
4.4.0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/** * get the result **@paramThe key key *@return {@link String}
*/
private String getResult(int key) throws InterruptedException, ExecutionException {
Future<String> f;
if ((f = map.get(key)) == null) {
// Create a task
FutureTask<String> fu = new FutureTask<>(new Task(key));
// If put succeeds, null is returned. If put fails, the result of the previous insert is returned
f = map.putIfAbsent(key, fu);
if (f == null) { f = fu; fu.run(); }}try {
return f.get();
} catch (InterruptedException e) {
e.printStackTrace();
map.remove(key);
throw e;
} catch (ExecutionException e) {
// Remove the task
map.remove(key);
throwe; }}private static class Task implements Callable<String> {
private final int number;
public Task(int number) {
this.number = number;
}
@Override
public String call(a) throws Exception {
System.out.println(Thread.currentThread().getName() + "Do the calculations.");
return "CoderFengzi"+ number; }}private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-"+ COUNT.getAndIncrement()); }}public static void main(String[] args) throws InterruptedException {
final FutureTaskDemo2 taskDemo = new FutureTaskDemo2();
final CountDownLatch cdl = new CountDownLatch(1);
final int count = 1000;
for (int i = 0; i < count; i++) {
EXECUTOR.execute(new Runnable() {
@Override
public void run(a) {
try {
// Simulate multiple threads executing concurrently at the same time
cdl.await();
String result = taskDemo.getResult(Awesome!);
System.out.println(Thread.currentThread().getName() + " result = " + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("Execution error"); }}}); } Thread.sleep(5000L);
cdl.countDown();
// Run result: only one calculation is done, everything else is fetched directly from the cache
// CoderFengzi-1 performs the calculation
// CoderFengzi-3 result = CoderFengzi666
// CoderFengzi-4 result = CoderFengzi666
// CoderFengzi-3 result = CoderFengzi666
// CoderFengzi-2 result = CoderFengzi666
// CoderFengzi-1 result = CoderFengzi666
// CoderFengzi-2 result = CoderFengzi666}}Copy the code
2.4 Customize subclasses for extension
If we wanted to extend FutureTask, we could implement FutureTask to create a custom subclass. Here we implement a custom subclass that can be repeated multiple times
/ * * *@author CoderFengzi
* @date2021/1/17 * * /
public class RepeatableFutureTask<V> extends FutureTask<V> {
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
4.4.0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/** ** run times */
private final int count;
public RepeatableFutureTask(Callable<V> callable, int count) {
super(callable);
this.count = count;
}
@Override
protected void done(a) {
super.done();
System.out.println("Assignment is complete and all threads are released.");
}
@Override
public void run(a) {
runAndReset(count);
}
/** * run ** repeatedly@paramCount Number of runs *@return boolean
*/
public synchronized boolean runAndReset(int count) {
if (count < 0) {
throw new IllegalArgumentException("count must be positive");
}
for (int i = 0; i < count; i++) {
if (!super.runAndReset()) {
break; }}return true;
}
private static class Task implements Callable<Void> {
public static final String CODER_FENGZI = " CoderFengzi 666";
@Override
public Void call(a) throws Exception {
System.out.println(Thread.currentThread().getName() + CODER_FENGZI);
return null; }}private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-"+ COUNT.getAndIncrement()); }}public static void main(String[] args) throws InterruptedException {
final RepeatableFutureTask<Void> rfu = new RepeatableFutureTask<>(new Task(), 3);
final int num = 4;
for (int i = 0; i < num; i++) {
EXECUTOR.execute(rfu);
}
// Result:
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666}}Copy the code
Three source code analysis
1 Core variables
Before getting into the source code, let’s take a look at some of the important variables of FutureTask.
1.1 Status Description
State is the variable FutureTask uses to represent the state, Those who have become successful (实 习), well (实 习), COMPLETING(实 习), NORMAL(实 习), EXCEPTIONAL(实 习), CANCELLED(实 习), INTERRUPTED(实 习), INTERRUPTED(实 习), INTERRUPTED(实 习) Use seven digits from 0 to 6. The initial state is NEW, while COMPLETING and INTERRUPTING are intermediate states, and NORMAL, EXCEPTIONAL, and INTERRUPTED are final states that cannot be reversed.
/** * The running status of this task, originally NEW. The running state transitions to terminal state only in the set, setException, and Cancel methods. * During completion, the state may take transient values from COMPLETING (while the result is being set) or INTERRUPTING (only when the INTERRUPTING runner satisfies cancel (true)). * Transitions from these intermediate states to the final state use more efficient sequential writes, and the values are incremented because the values are unique and cannot be modified further. * Volatile state * * Volatile state * * Volatile state * * Volatile state * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED * /
private volatile int state;
// New state, the initial state when FutureTask was created
private static final int NEW = 0;
// The state after the Callable call method has been executed, but no assignment has been made
private static final int COMPLETING = 1;
// The result is returned normally, that is, the result was obtained and the assignment was successful
private static final int NORMAL = 2;
// Execute the exception state, that is, when an exception occurs when Callable's call method is called, the exception is assigned to the result
private static final int EXCEPTIONAL = 3;
// Cancel the state, NEW state to cancel the state after using the cancel(false) method
private static final int CANCELLED = 4;
// The interrupted state. The NEW state is called after cancel(true) but has not terminated the current thread
private static final int INTERRUPTING = 5;
// Interrupted state, INTERRUPTING state The state after the interrupt() method of a thread has been downgraded
private static final int INTERRUPTED = 6;
Copy the code
FutureTask actually uses state patterns, and we use a graph to show the transitions between these states:
1.2 Other Variables
/** * The underlying callable; Nulled out after Running * The Callable object to be executed */
private Callable<V> callable;
/** * The result to return or exception to throw from get() * FurtureTask. Since the assignment of outcome is performed after the state variable is operated, * and state is volatile, the visibility of outcome can be guaranteed without volatile modification. * here you can refer to zhihu Forest Wang, answer to understand those words, https://www.zhihu.com/question/41016480/answer/551056899 * /
private Object outcome; // non-volatile, protected by state reads/writes
/** * The thread running the callable; CASed During Run () * The thread used to execute the Callable object successfully completes the CAS operation and points the Runner reference to its own thread. * The volatile modifier */ is also added to ensure memory visibility
private volatile Thread runner;
/** * Treiber stack of waiting threads * /** * Treiber stack of waiting threads * https://segmentfault.com/a/1190000012463330 * /
private volatile WaitNode waiters;
Copy the code
2 Core data structure – Treiber Stack
FutureTask’s concurrency security is based on the Treiber stack, which is used to store threads that are blocked after calling the GET method. Later get methods will be parsed. Let’s take a look at the Treiber stack. How does FutureTask use the Treiber Stack? The Treiber Stack begins:
-
The Treiber stack is a lock-free stack that supports concurrent operations based on the CENTRAL Authentication Service (CAS). That is, CAS operations are performed for each stack entry and exit.
-
When pushing the stack, get the top of the stack, and then point the next of the newly created node to the top of the stack, then get the top of the stack again and compare with the original top of the stack, if they are equal, change the top of the stack to the newly created node, otherwise, repeat the CAS operation until it succeeds.
-
Retrieves the top of the stack, or returns it if the top is null. Otherwise, next at the top of the stack is obtained, and then the top of the stack and the original top are compared again. If they are equal, the top of the stack is changed to Next and the original top is returned. Otherwise, the CAS operation is repeated until it succeeds.
Here is a simple implementation of the Treiber Stack, from the Book Concurrent Programming in JAVA (Doug Lea).
public class ConcurrentStack<E> {
private AtomicReference<Node<E>> top = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while(! top.compareAndSet(oldHead, newHead)); }public E pop(a) {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while(! top.compareAndSet(oldHead, newHead));return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item; }}}Copy the code
If the above code is not easy to understand, you can use the following figure to understand:
3 the get method
java.util.concurrent.FutureTask#get
When the method is executed by a thread, if the calculation is complete and the outcome parameter is not null, the result is obtained, and the result is an exception when an execution exception occurs. There are two versions, one with a time limit and one with no time limit, and timeouts with time limits throw a TimeoutException.
// Get the result, if the calculation is not complete, the current thread will block
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
// It is well spent (实 习)
// That is, becoming NEW or well
if (s <= COMPLETING)
// Wait, we will explain below
s = awaitDone(false.0L);
// The status can be NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED, or INTERRUPTED
// Get the result
return report(s);
}
If the calculation is not complete, the current thread will be blocked, but there is a time limit
// If the timeout is not complete, throw a TimeoutException
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// It is well spent (实 习)
// Wait (实 习) in those conditions
// He had spent those years well
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// Timeout Throws a timeout exception
throw new TimeoutException();
// The status can be NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED, or INTERRUPTED
// Get the result
return report(s);
}
Copy the code
java.util.concurrent.FutureTask#awaitDone
This method is used to block the thread waiting for the return of the result, wrap the thread as a WaitNode object, and try to displace the waiters through a CAS operation.
// The current thread blocks, waiting for the run() method to complete, or terminates when interrupted or timed out.
// timed indicates whether there is a time limit, and nanos indicates the wait time
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
/ / loop
for (;;) {
The static method checks whether the current Thread has interrupted and resets the interrupted status
// If the current thread is interrupted
if (Thread.interrupted()) {
// Entering here indicates that the thread has been interrupted
// Remove interrupted or timed nodes, as described below, using the Treiber stack
removeWaiter(q);
// Throw an interrupt exception
throw new InterruptedException();
}
int s = state;
// If state is COMPLETING(3), it has been completed well
if (s > COMPLETING) {
if(q ! =null)
// Set the thread of the node to null for the following reasons:
// If not empty, the subsequent finishCompletion method calls the locksupport.unpark method on the thread
// Here to reduce unnecessary unpark
q.thread = null;
// Return state directly
return s;
}
// If it is being completed
else if (s == COMPLETING) // cannot time out yet
// Then the current thread will wait for other threads
There is no need for the current thread to enter the wait stack and block if other threads can complete
Thread.yield();
else if (q == null)
// Create a wait node whose thread variable is the current thread
q = new WaitNode();
// If queuing fails, continue spinning
else if(! queued)// Treiber stack is also used here. CAS spin is performed to try to push the node to the top of the stack
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// If none of the above is true, that is, the thread is still in the NEW state
// If there is a time limit
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// Remove the interrupted or timed node
removeWaiter(q);
return state;
}
// Block for nanos time and then recover
LockSupport.parkNanos(this, nanos);
}
else
// Block
After that, if another thread calls the run method to assign the outcome value successfully
// Then the finishCompletion method is called to complete the unpark operation on all threads
LockSupport.park(this); }}// A simple list of waiting threads.
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
// Set thread to the current threadWaitNode() { thread = Thread.currentThread(); }}Copy the code
java.util.concurrent.FutureTask#removeWaiter
This method is used to delete waiting nodes with timeout and thread interruption. Treiber stack is used here. CAS is used to remove waiting nodes from the stack.
/** * Try to delete timeout or interrupted wait nodes to avoid garbage accumulation. * The data of internal nodes does not occur without CAS, so it is harmless to traverse them anyway. * To avoid other threads modifying the head node while deleting the waiting node, the list is retraversed in the event of an apparent race. * /
private void removeWaiter(WaitNode node) {
// This method is used to delete waiting nodes that have timed out or interrupted
// If the wait node is not empty
if(node ! =null) {
// Set the waiting node thread to NULL
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// perform traversal
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
// s Records the node
s = q.next;
// If the thread variable on the waiting node is not empty, the thread is blocked
if(q.thread ! =null)
// pred is used to record the previous node
pred = q;
// If pred is not null
else if(pred ! =null) {
// If q.read is null, this operation deletes the q node, otherwise nothing changes
// Change pred -> q -> s to pred -> s
// Since this is an internal node, it can be deleted without concurrency
pred.next = s;
// If pred thread is null, the node's data has been modified by another thread
if (pred.thread == null) // check for race
// Re-work the outer loop to prevent errors, i.e. repeat the work to get the waiters again
continue retry;
}
// The thread waiting for the node is empty, so use CAS to delete the node
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
// Waiters failed to remove the server, re-run the outer loop, i.e., duplicate get the waiters, and re-work the loops
continue retry;
}
// If the node was not modified by another thread during the previous operation
// Normal delete, then it will go here, and then back
break; }}}Copy the code
4 run method
java.util.concurrent.FutureTask#run
This method is used to call the Call method of the Callable object and assign the returned result to the outcome variable, or if an exception execution occurs, to the outcome variable.
// Unless cancelled, the outcome for this Future is set to its run result.
public void run(a) {
// If the state is not NEW, or if the state is NEW but the CAS fails, the CAS is returned
// means that only one thread can return
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// If callable is not empty and state is NEW
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
// Call the call method to get result
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// Outcome is set to ex
setException(ex);
}
// If ran is true, the execution is successful without exceptions
if (ran)
// Outcome is set to resultset(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// If you have INTERRUPTED or INTERRUPTED
if (s >= INTERRUPTING)
// Using the thread.yield () method, try to delegate execution rights to the Thread until the result is not INTERRUPTINGhandlePossibleCancellationInterrupt(s); }}// Set the result to return
protected void set(V v) {
// CAS transforms the NEW state into the COMPLETING state
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Assign the returned result to outcome
outcome = v;
// The result was successfully assigned and the state changed to NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}// The configuration is abnormal
protected void setException(Throwable t) {
// CAS transforms the NEW state into the COMPLETING state
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// Assign the exception to the outcome
outcome = t;
Exception () {exception () {exception () {exception () {exception ()
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// Release all blocked threads and run the done methodfinishCompletion(); }}Copy the code
java.util.concurrent.FutureTask#finishCompletion
This method is used to release all the threads waiting for a return result and clear all the threads at the waiters.
// Remove the waiting threads from all waiters and release all waiting threads
// Also call done() with the callable invalidated (null).
private void finishCompletion(a) {
// assert state > COMPLETING;
// Run the loops if they aren't empty, in case the threads on the blocking list aren't released
// Because it is multithreaded, i.e., there may still be threads entering the waiters chain table at this time
for(WaitNode q; (q = waiters) ! =null;) {
// Waiters assign null through CAS
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// Start traversal from the beginning node
for (;;) {
Thread t = q.thread;
if(t ! =null) {
q.thread = null;
// Release the blocked thread
LockSupport.unpark(t);
}
// Continue traversing the list
WaitNode next = q.next;
if (next == null)
// Exit the loop
break;
q.next = null; // unlink to help gc
q = next;
}
break; }}// Empty methods, extension points, implemented by subclasses
done();
// Task is empty to help GC
callable = null; // to reduce footprint
}
Copy the code
Four Matters needing attention
FutureTask
The state of being is constantly moving forward and not going back,Only one calculation can be run, that is, once the terminal state is enteredNORMAL
,EXCEPTIONAL
,INTERRUPTED
State, then it can’t change.- If we want to be able to run it repeatedly, we can
FutureTask
, the callrunAndReset
Method that can change the state back toNEW
, but note that this method does not workoutcome
Assign, seeFutureTask
A subclass ofScheduledFutureTask
. - Call out
get()
Method,Always remember to call the run() method or put it into the thread pool to execute (essentially run the run() method), otherwise the thread will always be blockedTherefore, it is also recommended to useget(long, TimeUnit)
Methods. - If we want to extend it, we can
FutureTask
, which leaves extension pointsdone
Methods, and there are manyprotected
Methods.
5. Reference Articles
- Treiber Stack simple analysis