“This is the ninth day of my participation in the First Challenge 2022. For details: First Challenge 2022”
Today, I found the use of CompletableFuture in the project when READING the project code. Although I have known about this class for a long time, I can also use it, but I have never explored the implementation logic of the code. Today, I have a chance to pick this class from the inside out, hoping to explain it clearly.
Thread,Runnable, Callable
1.1 The concept of threads
Something about thread first, I want to beginner students all know what is the thread, the thread is in order to improve the CPU efficiency, prevent blocking the execution of the unit, for example, you are cooking, for example, found that no soy sauce in the home, you have two choices, one is to stop what live, to play soy sauce, another way is that call your son to play soy sauce, Your son is a new thread, helping you solve your problems. This is a task.
1.2 Thread Creation
All things can be found in life, as we can see from the above example, a thread is an execution unit, can be concretely understood as a person, and the task to be done can be understood as a task, so migrating to a program we can understand threads.
Thread – >
Runnable-> No reply task
Callable -> Reply task
The difference between Runnable and Callable is that there is no reply to the result. For example, when issuing a command in development, there is no reply to the result. When storing the database, there is no concern about the result. This is where callable is appropriate.
1.2.1 Creating a Thread directly
Create Thread by inheriting Thread, overwrite the implementation of run method, write task code in run, start Thread.
class MyThread extends Thread{ // Inherits the Thread class as the implementation class of the Thread
private String name ; // Represents the name of the thread
public MyThread(String name){
this.name = name ; // Configure the name attribute using the constructor
}
public void run(a){ // Override the run() method as a thread task
for(int i=0; i<10; i++){ System.out.println(name +"Run, I ="+ i) ; }}};public class ThreadDemo{
public static void main(String args[]){
MyThread mt1 = new MyThread("Coriander A");// instantiate the object
MyThread mt2 = new MyThread("Coriander B");// instantiate the object
mt1.start() ; // Start the task
mt2.start() ; // Start the task}}Copy the code
1.2.2 Implementing Runnable definition tasks
Runnable is an interface, or specification, that defines the basic method run for a task, which in some cases is called a contract.
In this way, the task and thread are separated, and the thread is told what task to perform when it is started. Only define tasks, which can be executed by any thread
class MyThread implements Runnable{ // Implement the Runnable interface
public void run(a){ // Override the run() method
while(true){
System.out.println(Thread.currentThread().getName() + "It's running."); }}};public class ThreadDemo{
public static void main(String args[]){
MyThread mt = new MyThread() ; // instantiate the Runnable subclass object
Thread t = new Thread(mt,"Thread"); // Instantiate the Thread object
t.setDaemon(true);// This thread runs in the background
t.start() ; // Start the thread}};Copy the code
1.2.3 Using Callable to create a task that can return a value
Implement the Callable interface. Methods can return values and throw exceptions as opposed to the Runnable interface.
Using Callable requires FutureTask’s support. Again, using the example above, you tell your son to get soy sauce and put it directly in the top left corner of the table when he comes back. When you find soy sauce in the top left corner of the table during cooking, the task is completed.
FutureTask can be understood as a place of convention where a thread executes and puts the result in the Result container of FutureTask. How can there be a Future without understanding it?
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//1. Perform Callable, FutureTask implementation class support, used to receive operation results.
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
//2. Receive the result of the thread operation
try {
Integer sum = result.get(); //FutureTask can be used to latch something like CountDownLatch, which will not execute until all threads have executed
System.out.println(sum);
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
} catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}}class ThreadDemo implements Callable<Integer> {
@Override
public Integer call(a) throws Exception {
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
returnsum; }}Copy the code
Summary: When understanding multithreading, it is mainly mapped to the requirements of real life. Thread is a person, and what needs to be done is a task, which can be divided into the task with reply and the task without reply, which is also divided into Runnable and Callable.
2, function, consumer
The second topic is actually the new features of Java8. In my previous work experience, I found that many people are still stuck in the syntax of Java6, and are not willing to learn or accept the new syntax features. The reason is that they do not understand and cannot grasp the new syntax features well.
I will master 41 classes in lambda and Function in one article. I believe I can master all functions after reading it.
In a word, all functions are functionally the same as defined functions, but they only define a common structure, equivalent to a common rule, as long as you fill in the code, and this group of functions can be divided into different requirements
Function A Function that returns a value, using the accept Function
Consumer must have functions that take arguments, using the apply function
Supplier has a function that returns a value, using the get function
Executtor, Executors, ExecutorService
3.1 Why use thread pools
Thread pool that is known to all, it is in order to improve the efficiency, we can live analogy, if open a shop, need a few employees, normal operations are hiring, rather than the use of temporary workers every day, so out of fire, hiring cost is too high for the owner, also need training, I think normally wouldn’t do it, the same is true with a thread pool, The overhead of creating and destroying threads is avoided.
3.2 Executor
A top-level interface in a Java thread pool that defines a method executor that receives a Runnable object. The method is signed Executor (Runnable Command) and receives a Runable instance that is used to perform a task. A task is a class that implements the Runnable interface. In general, within executors, threads can be created using executors without being explicitly created:
executor.execute(new RunnableTask()); 3.3 ExecutorService: Is a subclass interface that uses a broader scope than Executor, providing methods for life cycle management, methods for returning Future objects, and methods for returning Future by tracking the execution of one or more asynchronous tasks; You can shutdown the ExecutorService by calling the shutdown () method of the ExecutorService, which causes the ExecutorService to stop accepting any new tasks and wait for committed tasks to complete. (committed tasks fall into two categories: ExecutorService is disabled when all committed tasks are completed. Therefore, we generally use this interface to implement and manage multithreading.
3.4 Executors
Executors are static factory classes. It returns objects of the ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes through static factory methods.
NewScheduledThreadPool Specifies the thread pool for periodic execution
NewCachedThreadPool Cache used threads
NewFixedThreadPool Specifies a fixed number of thread pools
NewWorkStealingPool a thread pool that breaks up large tasks into smaller ones
You can see that some specific thread pool models are provided that you can use according to your needs.
4, CompletableFuture
Finally, what I want to say today, basically all of the above can be found in my blog related topics, today mainly talk about CompletableFuture.
4.1 CompletionStage
The interface to the CompletionStage generally returns a new CompletionStage, which means that after executing some logic, a new CompletionStage is generated, forming a chained staged operation.
The interface methods of CompletionStage can be classified from a variety of angles, but
To classify by the names and functions of subordinate functions
Each function name can be split into three paragraphs, with the first word indicating when to fire, the second describing the relationship between stages, and the last indicating how to execute
The arguments to the function can be seen as function with return values, Consumer and runnable
4.2 CompletableFuture
Let’s start with an example. You can run the following example to see how soy sauce works, without affecting your cooking.
public static void main(String[] args) throws InterruptedException {
CompletableFuture
// Let the son go to the soy sauce
.supplyAsync(()-> {
try {
System.out.println("Son runs to get soy sauce.");
TimeUnit.SECONDS.sleep(1);
System.out.println("Soy sauce is ready.");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Soy sauce";
})
// Let me know
.thenAccept(oil->{
System.out.println("Sauce for cooking :" + oil);
});
System.out.println("Keep cooking.");
Thread.currentThread().join();
}
Copy the code
A completetableFuture represents a task, given its relationship to the Future by its name. Using future requires waiting for isDone to be true before the task is complete. Or blocking when called with the get method. CompletableFuture can be used to prevent blocking and polling isDone by using then, when and so on.
4.3 look at the source code in the end what is going on
4.3.1 What about thread pools
There is no thread pool. There is no thread pool.
Open the source code and take a look.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
Copy the code
Supplier is a function that returns a value, and in the above example we reversed a soy string.
In the return statement above we see an asyncPool being used. What is that? Let’s look for the definition
/** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code
From the code comments above, we can see that this is the default thread pool, ForkJoinPool.commonPool(). This shows that the default thread pool is used. The default number of threads created by this thread pool is the number of CPU cores
4.3.2 How serial?
The CompletionStage’s purpose is for chained programming, so you can guess that the CompletionStage plays an important role. Take a look at the source code.
/ / 1
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
/ / 2
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
Copy the code
AsyncSupplyStage () calls the specified thread pool and executes (new AsyncSupply(d,f)). ThenApply () relies on the source task for subsequent logical operations. F is the functional programming of Supplier.
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable.AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult(a) { return null; }
public final void setRawResult(Void v) {}
public final boolean exec(a) { run(); return true; }
public void run(a) {
CompletableFuture<T> d; Supplier<T> f;
if((d = dep) ! =null&& (f = fn) ! =null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch(Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); }}}Copy the code
In the run() method. In the run() method, check d.result == null to determine whether the task has completed, in case another thread completes the task in the case of concurrency. F.et () is the functional programming of the called Supplier.
The main thread returns D, our “dependent task,” in the asyncSupplyStage() method, which is still blocked. Then the main thread continues to execute the thenAccept of CompletableFuture (Comsumer<? Super T> Action), and then calls the uniAcceptStage() method of The CompletableFuture.
CompletableFuture has a “source task” and a “dependent task”, and the completion of the “source task” triggers the execution of the “dependent task”, which can either return a normal result or an exception.
conclusion
CompletableFuture is a default thread pool, and supports chained programming, eliminating the relationship between threads class, in multithreaded programming can reduce the amount of code, reduce the thread call, recommended
Code word is not easy, ask for three, ask for support, power generation for love