You have one thought, I have one thought, and when we exchange, one person has two thoughts
If you can NOT explain it simply, you do NOT understand it well enough
Now the Demo code and technical articles are organized together Github practice selection, convenient for everyone to read and view, this article is also included in this, feel good, please also Star🌟
preface
How many ways can you create a thread? The answer to this question is easy to blurt out
- Thread class inheritance
- Implement the Runnable interface
But these two ways to create threads are “three products” :
- No parameters
- No return value
- No way to throw an exception
class MyThread implements Runnable{
@Override
public void run(a) {
log.info("my thread");
}
} Copy the code
The Runnable interface is a core artifact of JDK1.0
/ * * * @since JDK1.0
* /
@FunctionalInterface
public interface Runnable {
public abstract void run(a); } Copy the code
There are always some drawbacks to using a “three nothing product”, of which not being able to get a return value is one of the most annoying, and Callable was born
Callable
It’s Master Doug Lea again, and the magic of Java 1.5
/ * * * @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call} * / @FunctionalInterface public interface Callable<V> { V call(a) throws Exception; } Copy the code
Callable is a generic interface with a single call() method that returns the generic value V and is used like this:
Callable<String> callable = () -> {
// Perform some computation
Thread.sleep(2000);
return "Return some result";
};
Copy the code
Both Runnable and Callable are functional interfaces that have only one method in them.
Runnable VS Callable
Both interfaces are designed for multithreading tasks, but there are significant differences between them
Enforcement mechanism
Runnable can be used in both the Thread class and the ExecutorService class with Thread pools. Bu~~~~t, Callable is available only from the ExecutorService. You cannot find Callable from the Thread class
Exception handling
There is no throws on the signature of the RUN method on the Runnable interface, so the checked exceptions cannot be propagated upward. However, Callable’s Call () method signature has throws, so it can handle checked exceptions.
So the main differences are as follows:
The overall difference is small, but the difference is significant
Returning values and handling exceptions is easy to understand, and in practice we often use thread pools to manage threads (the reason is already why? ExecutorService), so how do you use both
ExecutorService
Take a look at the ExecutorService class diagram first
I have put the method marked above in its own place here
void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);Copy the code
As you can see, the execute() method of the ExecutorService still returns no value, while the Submit () method uniformly returns a Future value
The submit() method has a silly distinction between CountDownLatch and CyclicBarrier. It’s used several times in this article, but we didn’t get the return value, so
- What exactly is Future?
- How do I get the return value from that?
Let’s take these questions one by one
Future
The Future is an interface that contains only five methods:
You can already see what these methods do from their names
// Cancel the task
boolean cancel(boolean mayInterruptIfRunning);
// Obtain the task execution result
V get(a) throws InterruptedException, ExecutionException;
// Get the execution result of the task with a timeout limit V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // Check whether the task has been canceled boolean isCancelled(a); // Determine whether the task has ended boolean isDone(a); Copy the code
So with all this stuff going on, you might be a little bit confused by this, but let’s just go through an example and show you how some of these things work
@Slf4j
public class FutureAndCallableExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Use Callable to get the return value Callable<String> callable = () -> { log.info("Call method into Callable"); // Simulate subthread task, sleep 2s here, // Minor detail: Since the call method throws exceptions, there is no need to try/catch like the Runnable run method Thread.sleep(5000); return "Hello from Callable"; }; log.info("Submit Callable to thread pool"); Future<String> future = executorService.submit(callable); log.info("Main thread continues execution"); log.info("Main thread waiting for Future result"); // Future.get() blocks until the result is available String result = future.get(); log.info("Main thread gets Future result: {}", result); executorService.shutdown(); } } Copy the code
The running results of the program are as follows:
If you run the example code above, the main thread calling the future.get() method will block itself until the subtask completes. We can also use the isDone method provided by the Future method, which can be used to check whether the task has completed. Let’s make some minor changes to the above procedure:
// If the child thread does not terminate, sleep 1s to re-check
while(! future.isDone()) { System.out.println("Task is still not done...");
Thread.sleep(1000);
}
Copy the code
To see the results:
If the subroutine is running too long, or for some other reason, and we want to cancel the subroutine, we can continue to make some changes to the program using the Cancel method provided by the Future
while(! future.isDone()) { System.out.println("Child thread task not finished yet...");
Thread.sleep(1000);
double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;
// Cancel the child thread if the program runs for more than 1s if(elapsedTimeInSec > 1) { future.cancel(true); } } Copy the code
To see the results:
Why does a CancellationException appear when you call the Cancel method? Because when the get() method is called, it explicitly states:
When the get() method is called, a CancellationException is thrown if the result of the calculation is cancelled (as you will see in the source code analysis below).
It is very unprofessional not to handle exceptions, so we need to further modify the program to handle exceptions in a friendlier way
// Determine whether the program has been cancelled by isCancelled. If cancelled, the log is printed. If not, get() is called normally
if(! future.isCancelled()){ log.info("Child thread task completed");
String result = future.get();
log.info("Main thread gets Future result: {}", result);
}else { log.warn("Child thread task cancelled"); } Copy the code
View the running results of the program:
While Future is an interface, the executorService.submit () method always returns FutureTask, the implementation class of Future
Let’s dive into the core implementation class to see what’s going on
FutureTask
Again, let’s look at the class structure
public interface RunnableFuture<V> extends Runnable.Future<V> {
void run(a);
}
Copy the code
FutureTask implements the RunnableFuture interface. The RunnableFuture interface implements the Runnable and Future interfaces respectively, so it can be inferred that FutureTask has the characteristics of both interfaces:
- There are
Runnable
Properties, so can be used inExecutorService
In conjunction with thread pools - There are
Future
Property, so you can get execution results from it
FutureTask source code analysis
If you’ve read the full AQS analysis, you may have noticed that reading the source code of the Java concurrent utility class is all about the following three points:
- State (primary control of code logic) - queue (waiting queue) - CAS (secure set value)Copy the code
With these three points in mind, let’s take a look at the FutureTask source code and see how it implements the logic around these three points
As mentioned earlier in this article, threads that implement Runnable do not get the return value, but those that implement Callable do. Therefore, FutureTask must be associated with Callable if it wants to get the return value. As can be seen from the constructor:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
} Copy the code
The FutureTask constructor converts the thread to the Callable type by running the Executors. Callable factory method:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Copy the code
But FutureTask implements the Runnable interface, which means that the run() method can only be overridden, and the run() method does not return a value.
- How does FutureTask get the return value in the run() method?
- Where does it put the return value?
- How does the get() method get this return value?
Let’s take a look at the run() method (the key code is annotated)
public void run(a) {
// If the status is not NEW, the task has been executed or cancelled
// If the state is NEW, try to save the thread of execution in runnerOffset (runner field), and return directly if assignment fails
if(state ! = NEW ||! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread())) return; try { // Get the Callable value passed in by the constructor Callable<V> c = callable; if(c ! =null && state == NEW) { V result; boolean ran; try { // The return value can be obtained by calling the call method of Callable normally result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; Save the exception thrown by the call method setException(ex); } if (ran) // Save the result of the call method set(result); } } finally { runner = null; int s = state; // If the task is interrupted, interrupt processing is performed if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } Copy the code
The run() method does not return a value. How does the run() method save both the call() method’s return and the exception? In fact, it is very simple to use set(result) to save the normal program running results, or use setException(ex) to save the program exception information
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
// Save the exception result
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } // Save normal results protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } Copy the code
SetException and SET methods are very similar in that they both save exceptions or results in the Outcome variable of Object type. The outcome is a member variable, which requires thread safety, so they need to set the value of the outcome variable through CAS. Since the value of the outcome is changed after CAS is successful, this is why the outcome is not modified by volatile.
Save the normal result value (set method) and save the abnormal result value (setException method) two method code logic, the only difference is the state passed by CAS is different. As we mentioned above, state is mostly used to control code logic, as is FutureTask, so to understand code logic, we need to understand state changes
/ * *
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL //* NEW -> EXCEPTIONAL (实 习* NEW -> CANCELLED //* NEW -> interrupt -> INTERRUPTED // The thread is INTERRUPTED during execution* / private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; Copy the code
Seven states, don’t panic, the whole state flow is really only four lines
The FutureTask object is created, and the state is NEW. As you can see from the constructor above, the four final states are NORMAL, EXCEPTIONAL, CANCELLED, and INTERRUPTED. Two intermediate states are slightly confusing:
- COMPLETING: Outcome is being set
- INTERRUPTING: When a thread is being interrupted by the cancel(true) method
In general, these two intermediate states represent an instantaneous state. Let’s graphically show several states:
Now that we know how the run() method saves results, and now that we know how to store normal/abnormal outcomes in the outcome variable, we need to look at how FutureTask gets results using the get() method:
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
If state has not reached the set outcome, the awaitDone() method is called to block itself
if (s <= COMPLETING)
s = awaitDone(false.0L);
// Return the result return report(s); } Copy the code
The awaitDone method is one of the core methods of FutureTask
The // get method supports timeout limits, and if no timeout is passed, the arguments taken are false and 0L
InterruptedException (InterruptedException) InterruptedException (InterruptedException
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// Calculate the wait cutoff time
final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // If the current thread is interrupted, if so, the node is deleted while waiting and InterruptedException is thrown if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // A final state (normal end/abnormal end/cancellation) has been achieved // Empty thread and return the result if (s > COMPLETING) { if(q ! =null) q.thread = null; return s; } COMPLETING the task (实 实) : COMPLETING the task well (实 实); COMPLETING the task well (实 实) : COMPLETING the task well (实 实); COMPLETING the task well (实 实); COMPLETING the task well (实 实); else if (s == COMPLETING) // cannot time out yet Thread.yield(); // Wait node is empty else if (q == null) // Construct the node from the current thread q = new WaitNode(); // If the server is not yet queued, please add the current node to the first node and replace the original waiters else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // If the timeout period is set else if (timed) { nanos = deadline - System.nanoTime(); // When the time is up, no longer wait for the result if (nanos <= 0L) { removeWaiter(q); return state; } // Block waiting for a specific time LockSupport.parkNanos(this, nanos); } else // Suspend the current thread until it is woken up by another thread LockSupport.park(this); } } Copy the code
In general, entering this method usually takes three rounds
- In the first for loop, the logic is
q == null
, a new node q will be created, and the first cycle will end. - In the second for loop, the logic is
! queue
“, the next pointer to the waiters generated in the first run of the loop points to the waiters, and the CAS node replaces node Q with the waiters, which means that a newly generated node is added to the first node in the waiters. If the replacement succeeds, queued=true. The second cycle ends. - The third for loop blocks and waits. Either block for a certain amount of time, or block until it is woken up by another thread.
For the second loop, if you’re a little bit confused, as we said earlier, if there’s a block, there’s a queue, and if there’s a queue, there’s a queue, and FutureTask also maintains a queue
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
Copy the code
The wait queue is really a Treiber stack, and as a stack, it works like a pistol’s magazine (imagine the bullet inside the magazine), so the second loop adds a newly generated node to the front of the waiters Stack
If the program is running properly, the usual call to get() suspends the current thread. Who wakes it up? Naturally, the run() method will wake up after it has run, and the finishCompletion method will be called in either of the methods that return the result (set method) or the exception (setException method), which will wake up the thread in the waiting queue
private void finishCompletion(a) {
// assert state > COMPLETING;
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread; if(t ! =null) { q.thread = null; // Wake up the thread in the wait queue LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } Copy the code
There are only three ways to set the state of a task to terminated:
- set
- setException
- cancel
Now that the first two methods have been analyzed, let’s look at the Cancel method
Look at the Future Cancel (), which is annotated to specify three cases in which the Cancel operation must fail
- The mission has been carried out
- The mission has already been canceled
- The task cannot be cancelled for some reason
In other cases, cancel returns true. It is worth noting that the fact that the cancel operation returns true does not mean that the task is actually canceled, depending on the state in which the task is at the time the Cancel state is invoked
- If the task is not already running when cancel is issued, the task will not be executed subsequently;
- This is required if the task is already running when cancel is initiated
mayInterruptIfRunning
The parameters are:- If mayInterruptIfRunning is true, the current task is interrupted
- If mayInterruptIfRunning is false, the task in progress can be allowed to continue running until it finishes
With that in mind, take a look at the logic of the Cancel code
public boolean cancel(boolean mayInterruptIfRunning) {
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false; try { // in case call to interrupt throws exception // The task execution thread needs to be interrupted if (mayInterruptIfRunning) { try { Thread t = runner; // Interrupt the thread if(t ! =null) t.interrupt(); } finally { // final state // Change to the final status INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // Wake up the waiting thread finishCompletion(); } return true; } Copy the code
Now that the core method has been analyzed, let’s take a tea break
I mean, use FutureTask to practice the classic procedure of boiling water and making tea
As shown above:
- Wash the kettle for one minute
- Boil water for 15 minutes
- Wash the teapot for 1 minute
- Wash the cup for 1 minute
- Hold the tea for 2 minutes
In the end make tea
Let me do the math in my head, it would take a total of 20 minutes in serial, but obviously we can wash the teapot/cup/tea while the water is boiling
In this era of high concurrency, there are too many things that can be done in 4 minutes. It is inevitable to learn how to use Future optimization program (in fact, optimization program is to find the critical path, and the critical path was found. Non-critical path tasks can usually be executed in parallel with the contents of the critical path.
@Slf4j
public class MakeTeaExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Create thread 1's FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task()); // Create thread 2's FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); executorService.submit(ft1); executorService.submit(ft2); log.info(ft1.get() + ft2.get()); log.info("Start making tea"); executorService.shutdown(); } static class T1Task implements Callable<String> { @Override public String call(a) throws Exception { log.info("T1: Wash the kettle..."); TimeUnit.SECONDS.sleep(1); log.info("T1: Boil water..."); TimeUnit.SECONDS.sleep(15); return T1: Boiling water is ready.; } } static class T2Task implements Callable<String> { @Override public String call(a) throws Exception { log.info("T2: Wash the teapot..."); TimeUnit.SECONDS.sleep(1); log.info("T2: Wash the teacup..."); TimeUnit.SECONDS.sleep(2); log.info("T2: Get the tea..."); TimeUnit.SECONDS.sleep(1); return "T2: Fuding White Tea got it"; } } } Copy the code
Thread 1 takes longer to boil the water. Thread 1 wants to get the tea when the water boils. How to make the tea?
We just need to get the result of thread 2’s FutureTask from thread 1. Let’s modify the program slightly:
@Slf4j
public class MakeTeaExample1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Create thread 2's FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); // Create thread 1's FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2)); executorService.submit(ft1); executorService.submit(ft2); executorService.shutdown(); } static class T1Task implements Callable<String> { private FutureTask<String> ft2; public T1Task(FutureTask<String> ft2) { this.ft2 = ft2; } @Override public String call(a) throws Exception { log.info("T1: Wash the kettle..."); TimeUnit.SECONDS.sleep(1); log.info("T1: Boil water..."); TimeUnit.SECONDS.sleep(15); String t2Result = ft2.get(); log.info("T1 gets the {} of T2 and starts making tea.", t2Result); return "T1: 上茶!!!"; } } static class T2Task implements Callable<String> { @Override public String call(a) throws Exception { log.info("T2: Wash the teapot..."); TimeUnit.SECONDS.sleep(1); log.info("T2: Wash the teacup..."); TimeUnit.SECONDS.sleep(2); log.info("T2: Get the tea..."); TimeUnit.SECONDS.sleep(1); return "Fuding White Tea"; } } } Copy the code
To see the results of the program:
With this change in mind, let’s go back to the three submit methods of the ExecutorService:
<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);<T> Future<T> submit(Callable<T> task);
Copy the code
The first method, layer by layer code, is shown here:
You’ll notice that, in a similar way to the way we thought about our program for boiling water and making tea, you can pass in a result that acts as a bridge between the main thread and the child thread, through which the main child thread can share data
The second method argument is of type Runnable, which returns null even when the get() method is called, so it can only be used to declare that the task has ended, similar to thread.join ().
The third method argument is of type Callable, and the return value of the call() method can be explicitly retrieved through the get() method
That’s the end of the whole lecture on Future, which needs to be digested briefly
conclusion
For those of you familiar with Javascript, Future’s features are similar to Javascript promises, and private banter often compares it to a boyfriend’s Promise
Back to Java, let’s take a look at the evolution of the JDK, and talk about the birth of Callable, which makes up for the absence of Runnable returns, through a simple demo of Callable and Future use. FutureTask is the core implementation class of the Future interface, through reading the source code to understand the entire implementation logic, and finally combined with FutureTask and thread pool demonstration of boiling water and tea procedures, I believe that here, you can easily obtain the results of the thread
It is very simple to boil water and make tea. If the business logic is more complex, using Future in this way will definitely bring great chaos (there is no active notification at the end of the program, and the link and integration of Future need to be manually operated). In order to solve this shortcoming, yes, Doug Lea again, the CompletableFuture utility class appears in Java1.8, and with the use of Lambda, it makes writing asynchronous programs as easy as writing serial code
Now let’s look at the use of CompletableFuture
Soul asking
- How do you divide tasks and collaborate on a daily basis? Are there any basic rules?
- How do you batch asynchronous tasks?
reference
- Java concurrent programming practice
- The art of Concurrent programming in Java
- The beauty of Concurrent programming in Java