primers
In order to make the program more efficient and make the CPU work at maximum efficiency, we will use asynchronous programming. The first thing that comes to mind is to start a new thread to do something. Further, in order for the new thread to return a value that tells the main thread that the job is done, the Future comes into play. However, the way the Future provides is that the main thread actively asks for new threads, which would be nice if there were a callback function. So, to satisfy some of Future’s regrets, the powerful CompletableFuture comes with Java8.
Future
Traditional multithreading makes your program more efficient. It’s asynchronous, after all, and it allows the CPU to do its full work, but only for new threads that don’t need to be bothered by your main thread. Let’s say you start a new thread just to calculate 1+… +n and print the result. Sometimes you need a child thread to return the result of a calculation, and then you need a Future for further calculation in the main thread.
In this example, the main thread computes 2+4+6+8+10; The child thread calculates 1+3+5+7+9; Finally, you need to add the two results together in the main thread.
public class OddNumber implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
int result = 1 + 3 + 5 + 7 + 9;
returnresult; }}Copy the code
public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
OddNumber oddNumber = new OddNumber();
Future<Integer> future = executor.submit(oddNumber);
long startTime = System.currentTimeMillis();
int evenNumber = 2 + 4 + 6 + 8 + 10;
try {
Thread.sleep(1000);
System.out.println("0. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds"); int oddNumberResult = future.get(); // This time will be blocked system.out.println ("1 + 2 +... + 9 + 10 ="+(evenNumber+oddNumberResult));
System.out.println("1. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds"); } catch (Exception e) { System.out.println(e); }}} Output: 0. Start: 1001 seconds 1+2+... 1. Here we go: 3002 secondsCopy the code
Looking at the Future interface, there are only five simple methods
// Cancel the task and return Boolean cancel(Boolean mayInterruptIfRunning) if it has completed or canceled; Boolean isCancelled(); // check whether the task isCancelled. Boolean isDone(); V get() throws InterruptedException, ExecutionException; // Same as above, but with an expiration time to prevent long blocking, V GET (Long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException;Copy the code
CompletableFuture
Since our main thread is called main, I should be the main thread. I prefer that the child thread notifies me when it finishes something. To that end, Java8 comes with CompletableFuture, an implementation class for a Future. The most fascinating thing about CompletableFuture isn’t that it vastly enriches Future’s capabilities, but that it perfectly incorporates new features in the Java8 stream.
Implement callback, automatic follow-up operations
Here’s how CompletableFuture implements the callback: thenAccept()
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
Copy the code
Parameters there is a Consumer, which uses a new Java8 feature called behavior parameterization, which means that arguments don’t have to be primitive types or classes, but can also be functions (actions) or methods (interfaces).
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return1 + 3 + 5 + 7 + 9; }}Copy the code
public class CompletableFutureTest {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final int evenNumber = 2 + 4 + 6 + 8 + 10;
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
try {
Thread.sleep(1000);
System.out.println("0. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds"); ThenAccept (oddNumberResult-> {system.out.println (oddNumberResult-> {system.out.println (oddNumberResult-> {system.out.println (oddNumberResult-> {system.out.println))"1. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds");
System.out.println("At this time, the calculation result is:"+(evenNumber+oddNumberResult)); }); oddNumber.get(); } catch (Exception e) { System.out.println(e); }}} Output: 0. Start: 1006 seconds 1. Start: 3006 seconds The calculation result is 55Copy the code
It is worth noting that this example does not show a task connection pool. By default, the program selects a task connection pool ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code
ForkJoinPool originated in JDK7 and is called the branch/merge framework. You can recursively divide a task into many molecular tasks, forming different streams that can be executed in parallel, along with powerful job-stealing algorithms. Greatly improve efficiency. Of course, you can specify the connection pool yourself.
CompletableFuture merger
Java8 does enrich Future implementations, and CompletableFuture has lots of methods you can use, but from the example above, CompletableFuture can actually do something that looks like a Future. After all, your CompletableFuture is blocked when you use get(), so I can get the return value myself and do something manually (although that would be pretty annoying for the main method). Then the next thing, Future is very troublesome to do. Suppose our main method does only one operation on odd sets plus even sets, and we precompute both sets asynchronously to two child threads, what do we need to do? Yeah, we start two threads, and when both threads finish counting, we do the final addition, and the question is, how do you know which child thread ends last? Call isDone() as a polling method. The rich CompletableFuture functionality gives us a way to wait for both child threads to finish before adding:
//asyncPool is the default thread pool mentioned above, ForkJoinPool Public <U,V> CompletableFuture<V>thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
Copy the code
Here’s an example:
public class OddCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return1 + 3 + 5 + 7 + 9; }}Copy the code
public class EvenCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return2 + 4 + 6 + 8 + 10; }}Copy the code
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
return odd + even;
});
System.out.println(resultFuturn.get());
System.out.println("0. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds"); 0. Start: 3000 secondsCopy the code
This simulates a 1-second sleep and a 3-second sleep, but the actual network request time is variable. Isn’t that cool? What’s cool is not the phenomenon, but the fact that the above operation has taken advantage of the Java8 flow concept.
If two child threads are not enough, there is also the **anyOff()** function, which can hold multiple CompletableFutures and will wait for all tasks to complete.
public static CompletableFuture<Void> allOf(CompletableFuture<? >... cfs) {return andTree(cfs, 0, cfs.length - 1);
}
Copy the code
Similar to this, there is a method that when the first execution is finished, it is finished, and the next task is not to wait, which can be considered a sufficient condition.
public static CompletableFuture<Object> anyOf(CompletableFuture<? >... cfs) {return orTree(cfs, 0, cfs.length - 1);
}
Copy the code
Based on the above example, make the OddNumberPlus class longer:
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return1 + 3 + 5 + 7 + 9; }}Copy the code
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
long startTime = System.currentTimeMillis();
CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
System.out.println(resultFuturn.get());
System.out.println("0. Here we go:"+ (System.currentTimeMillis()-startTime) +"Seconds"); }} Output: 30 0. Start: 1000 secondsCopy the code
summary
CompletableFuture has many other CompletableFuture methods, such as runAsync(), which is similar to supplyAsync() but does not return a value; In addition to thenApply(), there is thenApply(); There are also injections of runAfterBoth() and runAfterEither(), which are known by name. There are many more, and you can check out the source code for the CompletableFuture class. Through CompletableFuture, I feel more powerful of Java8. Powerful flow concept, parameterization of behavior, efficient parallel concept and so on not only make Java write more cool, but also enrich the whole Java ecology. Java is improving all the time, so it is not obsolete. We Javaer can continue our career, thanks to Java, and progress together.