Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.
A Runnable and Callable
As we all know, when running a Runnable task with a thread, we do not support getting a return value because the Runnable interface’s run() method is void qualified and does not support returning values. In many scenarios, we need to execute tasks asynchronously through threads to improve performance, and expect to obtain the execution results of tasks. Especially in RPC framework, asynchronously obtaining the return value of a task is almost a function to be realized by every RPC interface. At this point, it became clear that Runnable was no longer enough for our needs, hence Callable.
Like Runnable, Callable is an interface that has only one method: call(), except that the call() method of Callable has a return value. The return value is a generic type, which is specified when the Callable object is created.
public interface Callable<V> {
V call(a) throws Exception;
}
Copy the code
Runnable objects can be passed into the Thread constructor to run Runnable tasks. Callable interfaces cannot be passed into threads to run tasks. Callable interfaces are usually used in conjunction with Thread pools. In addition to the execute() method, the thread pool ThreadPoolExecutor provides three overloaded methods of submit() to submit tasks, all of which return values. The ThreadPoolExecutor class inherits the Abstract AbstractExecutorService class, which defines three methods for the submit() overload. The specific definition is as follows.
The method name | instructions |
---|---|
Future<? >submit(Runnable task) |
This method returns a Future, but is used because it submits a task of type RunnableFuture.get() Null is returned when the result is retrieved. |
Future submit(Runnable task,T result) |
The return value object of the method is Future, which passesFuture.get() When we get the concrete return value, the result is equal to the second argument to the method, result. |
Future submit(Callable task) |
The argument to this method is oneCallable type The method has a return value. Call future.get () to get the value returned by the Call () method of the Callable interface |
The three overloaded methods of submit() return values of type Future. What is Future?
The Future with FutureTask
When a task is submitted to the thread pool, we may need to get the return value of the task, or we may want to know if the task is completed, or we may even need to cancel the task due to special circumstances.
Under the JUC package, we are provided with a utility class: Future. Future is an interface that provides five methods. When a task is submitted to the thread pool via submit(), the thread pool returns an object of type Future. We can obtain the state of the task in the thread pool through these five methods of the Future object. These methods are defined as follows.
The method name | instructions |
---|---|
boolean cancel(boolean mayInterruptIfRunning) |
The mayInterruptIfRunning parameter is used to indicate whether the thread needs to be interrupted. If true is passed to indicate that the thread needs to be interrupted, the status of the task is set toINTERRUPTING ; If false, the state of the task is set toCANCELLED (About the status of the taskINTERRUPTING andCANCELLED Explained later) |
boolean isCancelled() |
Checks if the task has been canceled, returning true indicates that the task has been canceled |
boolean isDone() |
Determine if the task is complete |
V get() |
Getting the return value of the task blocks the current thread until it gets the return value of the task |
V get(long timeout, TimeUnit unit) |
Retrieves the return value of the task as a timeout, or throws a TimeoutException if the task return value is not retrieved within the timeout period |
The Future interface has a concrete implementation class: FutureTask. In fact, the three submit() overloaded methods of ThreadPoolExecutor return objects of type Future that are instance objects of FutureTask. The UML diagram for FutureTask is shown below.
As can be seen from the UML diagram, FutureTask directly implements the RunnableFuture interface, which in turn inherits the Runnable and Future interfaces. Therefore, FutureTask is both a Runnable and a Future type. When you call the Submit () method to submit a task to the thread pool, you end up encapsulating the task as a FutureTask object, whether you submit a task of type Runnable or Callable. Future
submit(Callable
task) method as an example, look at the source code.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// Call newTaskFor() to encapsulate the Callable task into a FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// Execute the task
execute(ftask);
return ftask;
}
// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// Directly new a FutureTask object
return new FutureTask<T>(callable);
}
Copy the code
When the submit() method submits a Runnable task, another overloaded method of the newTaskFor() method is called to encapsulate the task as a FutureTask object, so all futureTask-type tasks are executed in the thread pool. (Note that the Runnable object will eventually be wrapped into a Callable object by Executors. Callable (). Executors. Callable () works by using the adapter mode, which is the RunnableAdapter class)
Now that we know how futures relate to these classes, how does a thread pool perform a FutureTask? And how does the future.get () method get the return value of the task?
Design principles and data structures
To understand how the methods of the Future interface work, you must first understand the data interface of FutureTask and how it is designed.
Since you want to get the state of the tasks in the pool from other threads outside the thread pool, and the tasks in the thread pool are of type FutureTask, there must be a variable related to the task state in the FutureTask object.
Very important attributes are defined in FutureTask. See the following table.
attribute | meaning |
---|---|
int state | The state variable is used to store the status of a task. Its value ranges from 0 to 6 and 7. Each value has a different meaning |
Callable callable | Callable indicates the tasks we submitted. Runnable tasks will be converted to Callable by Executors.callable() |
Object outcome | Used to hold the return value of Callable’s call() method |
Thread runner | Saves the thread executing the current task |
WaitNode waiters | The wait queue is used to hold the threads waiting to fetch the return value of the task. When we call the future.get () method on the main thread, we wrap the main thread as a WaitNode. When multiple threads call the future.get () method at the same time, WaitNode maintains a linked list through the next property |
There are seven possible values of state. The meaning of each value is commented in the code below.
private volatile int state;
// The initial state of the task. When a NEW FutureTask is created, the state value defaults to NEW
private static final int NEW = 0;
// A task is in completion. What is in completion? There are two scenarios
The task has been completed normally by the thread, but the return value has not yet been assigned to the outcome attribute
// 2. An exception occurred during execution of the task, was caught, and then handled, before assigning the exception object to the outcome attribute
private static final int COMPLETING = 1;
The task will be in NORMAL state after the task is completed and the outcome attribute is assigned to the returned value of the task
private static final int NORMAL = 2;
// The task is abnormal and the exception object is assigned to the outcome attribute
private static final int EXCEPTIONAL = 3;
Cancle (false) cancels the task
private static final int CANCELLED = 4;
Cancle (true) cancels the task, but before the thread interrupts
private static final int INTERRUPTING = 5;
Cancle (true) cancels the task, but after the thread is interrupted
private static final int INTERRUPTED = 6;
Copy the code
Although the state of a task has 7 values, it can be roughly divided into three categories: initial state, intermediate state and final state. The relationship between these states is shown in the figure below.
- When a task is submitted to the thread pool, its initial state is
NEW
; After a task is successfully executed, the task status is set toCOMPLETING
; The return value of the task (that is, the return value of Callable’s call() method) is then assigned to FutureTask’soutcome
Property to set the state of the task to when assignment is completeNORMAL
. This is a normal process of task execution, which is shown in the corresponding figure1.
The line shown. - When a task is submitted to the thread pool, an exception occurs during the execution of the task, and the state of the task is changed from
NEW
Set toCOMPLETING
; Then assign the exception object tooutcome
Property, and when assignment is complete, set the task state toEXCEPTIONAL
. This is the case where the task is abnormal, which is the corresponding diagram2.
The line shown. - When a task is submitted to the thread pool, if cancle() is called on the Future object, the state of the task will be changed directly from false when cancle() is passed in
NEW
Set toCANCELLED
, which is the corresponding graph3.
The corresponding route. - When the cancle() method passes true, the task state is first set to
INTERRUPTING
; The thread executing the current task is then calledinterrupt()
Method, and finally set the task status toINTERRUPTED
That’s the picture(4)
The corresponding line.
Source code analysis
When the submit() method is called to submit a task to the thread pool, the newTaskFor() method is called to encapsulate the task into a FutureTask object, and the execute() method is called to execute the task. The execute() method starts the Worker thread first, and when it starts, the thread’s runWorker() method is called. The task.run() method is eventually called in the runWorker() method, which is FutureTask’s run() method. A detailed source code analysis of this step can be found in the article ThreadPoolExecutor Implementation
Let’s just examine the futureTask.run () method. In the run() method, the call() method of the Callable property is eventually called. When the task completes, the set() method of FutureTask is called to update the state of the task, save the return value of the task, and finally wake up the waiting thread that fetched the result of the task. If an exception occurs, the setException() method is called to update the task state, save the exception, and wake up the waiting thread. Here is the source code for the run() method, which I have stripped down to keep only the core logic.
public void run(a) {...try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
// Execute the task
result = c.call();
ran = true;
} catch (Throwable ex) {
Exception exception exception exception exception state set to EXCEPTIONAL
setException(ex);
}
if (ran)
// Set the task state to COMPLETING, save the return value, and finally set it to NORMALset(result); }}finally {
// Other processing. }}Copy the code
For the SET () and setException() methods, it is simple to update the state of the task via CAS, assign the return value of the task to the outcome attribute, and wake up the threads in the waiting queue formed by the waiters attribute by calling the finishCompletion() method. (For more information about CAS, see the following two articles: Implementation principles of CAS and the unaddressed source code for the Unsafe class.)
Next, analyze the execution process of future.get () method with specific source code. When the future.get () method is called, the Get () method of FutureTask is called. In the get() method, the task is first checked to see if it is complete, if it is, the result is returned, and if it is not, it waits.
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
// If the state is in the NEW or COMPLETING state, the task has not been completed and needs to wait
if (s <= COMPLETING)
// awaitDone() to wait
s = awaitDone(false.0L);
// Return the result
return report(s);
}
Copy the code
Call report(s) to return the result. In the report() method, the system checks whether the task is in NORMAL state first, that is, whether the task is successfully executed. The system returns the result only when the task is successfully executed.
private V report(int s) throws ExecutionException {
Object x = outcome;
// Return only when the task ends normally
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Copy the code
When the task is in the NEW or COMPLETING state, it means that the task is in execution or the return value of the task has not been assigned to the outcome attribute. Therefore, it is necessary to enter the wait state, that is, call awaitDone(). In the awaitDone() method, there is an infinite for loop, first determining whether the task is in the COMPLETING state. Those in the COMPLETING state, let the current line first abandon the scheduling power of the CPU. That is because the transition from COMPLETING NORMAL, or any other state, is so short that the current line must abandon CPU scheduling so that other threads can obtain CPU resources. The state of the task will most likely change to NORMAL or some other final state, and since the code is in the for loop, it will enter the next loop). The COMPLETING task must be completed in the COMPLETING state, and the thread must park wait. Determined by the argument passed in to the awaitDone() method. When do these threads wake up after the park() method is called? When the task’s state reaches its final state, the finishCompletion() method is called to wake up the waiting threads.)
Here is part of the source code for the awaitDone() method, which I’ve stripped down to keep only the main logic.
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {...for(;;) {...// Task in COMPLETING those tasks, let the current line first temporarily relinquishes CPU execution
else if (s == COMPLETING) // cannot time out yetThread.yield(); .else if (timed) {
// Calculate the timeout
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// Wait a while
LockSupport.parkNanos(this, nanos);
}
else
/ / wait for
LockSupport.park(this); }}Copy the code
conclusion
- This article mainly introduces the difference between Runnable interface and Callable interface. Runnable interface has no return value and can be executed by Thread directly. The latter has a return value that cannot be executed directly by Thread and needs to be executed through the Thread pool.
- It then introduces the five methods of the Future interface, as well as several important properties and data structures of its implementation class, FutureTask. Both Runnable and Callable objects, when submitted to the thread pool, are encapsulated into a FutureTask and executed. For Future usage scenarios, there are a lot of applications in Netty and Dubbo.
- Finally, the realization principle of FutureTask’s GET () method and RUN () method is introduced in detail with the source code.
recommended
- The implementation principle of ReadWriteLock
- Semaphore source code analysis and use scenarios
- Concurrency tool class CountDownLatch source analysis and usage scenarios
- CyclicBarrier concurrency tool class source analysis and usage scenarios
- Do wait() and notify() always come in pairs? Thread.join()
- How ThreadPoolExecutor is implemented
- Why is the Alibaba Java Development Manual banning the use of Executors to create thread pools