Wechat public account: Tangerine Java Technology pit article first gold mining platform, follow-up synchronization update public account, after attention reply “group” can join the Internet technology exchange & internal push group, and discuss interview problems with a group of big factory executives. Reply “666” to obtain all information packs available on the First-line Internet (including development software, development specifications, classic e-PDFs, and some premium learning courses).

preface

Java8 is already very common in everyday development code, and it can be used well in development with a few lines of simplified code to accomplish the required functionality. Today I’ll show you how to use CompletableFuture in a production environment. The CompletableFuture class was introduced as an improvement for the Java 8 Concurrency API, and those familiar with it should know that there are also some improvements for CompletableFuture in Java 9. To read this article, you need to know the knowledge of functional programming, the principle of thread pools, and so on. For those of you who are not familiar with it, you can read the previous article, so let’s get started.

Scenario simulation

In order to better express, we will explain with examples. Suppose that Xiaotangerine received the TL task today, and required to complete the function of real-time data pulling. After the task was completed, it was told that the pulling was complete. Suppose that the pull data needs to be obtained from services A, B and C, and service D needs to be invoked after the pull and push is completed.

Requirement change 1: Pull data needs to be obtained from service E, but will depend on the results obtained from service A. Requirement change 2:10,000 + data can be pulled from service A at A time, but the performance of service E can not support large calls, so limited flow is the bottom line on the Provider side. Requirement Change 3: Data integrity must be ensured during data pulling. Statistical errors must not occur.

This can be implemented using the Future provided by JDK5.0. We will pull the data from the three service interfaces A, B, C into the FutureTask, asynchronously execute to get the data results, and then call the D service synchronously. OK, simple implementation of this function is no problem, but what is the defect, what can be improved? We can see from the source code comments that the result returned by the Future class needs to be blocked until the get method returns the result. It provides the isDone() method to detect whether the asynchronous operation has finished, and the get() method to wait for the asynchronous operation to finish and get the result of the calculation. When all Future tasks are complete, the thread is notified to get the results and merge them.

In terms of performance, you need to wait for all tasks in the Future set to be completed (no problem, see below), and in terms of robustness, the Futrue interface has no way to compute combinations or handle possible errors. In terms of functional extension, the Future interface cannot perform multiple asynchronous computations independently of each other, while the second depends on the results of the first. Today’s main character, CompletableFuture, can fulfill all of these functions, with about 50 different constructs that combine, perform asynchronous computation steps, and handle errors. (It is not realistic to learn all the methods, the soul and core methods can be processed according to law)

CompletableFuture API to use

API too many, simple list. The focus of this article is not on the API

// After the execution of task A, execute B. B does not depend on the result of A and B does not return A result.
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); 

/** Perform B after task A is executed. B depends on the result of A and B does not return A result */
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

/** After the execution of task A, execute B. B depends on the result of A and B returns the result */
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
Copy the code

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "orange")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " csong"));
//trueassertEquals("orangecsong", completableFuture.get());
Copy the code

Does the thenCompose method not implement the result merge calculation the same as thenApply? In Java 8, the Stream and Optional classes map and flatMap methods have similar design ideas. Both take a CompletableFuture and apply it to the result of the calculation, but the thenCompose (flatMap) method takes a function that returns another CompletableFuture object of the same type.

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "orange")
.thenCombine(CompletableFuture.supplyAsync(() -> " chizongzi"), (s1, s2) -> s1 + s2));
// assertEquals("orange chizongzi", completableFuture.get());
Copy the code

The thenCombine method is designed to concatenate results when you want to use more than one result and subsequent processing depends on the return value at the same time, the first result returns “orange”, the second result returns “chizongzi”, So the result is orange Chizongzi. You might ask what if the results don’t have to be dealt with? ThenAcceptBoth will be able to implement your functionality. So what’s the difference between that and thenApply? The thenCompose() method takes the previous Future as a parameter. This will directly make the result a new Future, not the nested Future we found in thenApply(), but a connection between two CompletableFutures that will generate a new CompletableFuture. Therefore, If you want to continue nesting the link CompletableFuture method, then it’s best to use thenCompose().

 public static CompletableFuture<Void> allOf(CompletableFuture
       ... cfs){... }Copy the code

When we need to execute multiple tasks in parallel, we usually want to wait for all of them to execute and then process their combined results. CompletableFuture provides the allOf static method that allows you to wait for all completed tasks, but it returns type CompletableFuture. The limitation is that it does not return a comprehensive result for all tasks. Instead, you have to manually get the results from Futures. To solve this problem, CompletableFuture provides a join() to solve this problem, and here the Stream implementation can also be used.

String multiFutures= Stream.of(future1, future2, future3).map(CompletableFuture::join)
.collect(Collectors.joining("")); assertEquals("Today is sun", multiFutures);
Copy the code

So how does the CompletableFuture handle exceptions?

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
Copy the code

If there is an exception in obtaining resultA,resultB, or resultC

CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {throw newRuntimeException(); }) .exceptionally(ex ->"errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
Copy the code

In the above code, task A throws an exception. It then handles the exception through the exceptionally() method and returns A new result, which is passed on to task B. If the inovke Future. join method results in “errorResultA resultB result C”, the above method is basically the use of the underlying functional API, Talk is cheap, show me code. Smart orange guys, let’s do it!

At the end of this article, we talk about a large number of calls, which are sequential invoke calls. In fact, we analyze how asynchronous calls with CompletableFuture need to be implemented.

CompletableFuture of actual combat

public class AsyncInvokeUtil {

    private AsyncInvokeUtil(a) {}

    / * * *@paramParamList Source data (data carrier to be processed) *@paramBuildParam transfer function (take the result and do a trans to satisfy the call service condition) *@paramTransParam transfer function (get the result to do a trans layer to satisfy the call service condition) *@paramProcessFunction Transfers the processing function *@paramSize Batch size *@paramExecutorService exposes an external custom implementation thread pool (if demo is not empty, you can make it optional) *@param <R>
     * @param <T>
     * @param <P>
     * @param <k>
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <R, T, P, k> List<R> partitionAsyncInvokeWithRes(List
       
         paramList, Function
        
         , P> buildParam, Function
         

> transParam, Function ,List > processFunction, Integer size, ExecutorService executorService)

,>
throws ExecutionException, InterruptedException
{ List<CompletableFuture<List<R>>> completableFutures = Lists.partition(paramList, size).stream() .map(buildParam) .map(transParam) .map(eachList -> CompletableFuture.supplyAsync(() -> processFunction.apply(eachList), executorService)) .collect(Collectors.toList()); //get CompletableFuture<Void> finishCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); finishCompletableFuture.get(); return completableFutures.stream().map(CompletableFuture::join) .filter(Objects::nonNull).reduce(new ArrayList<>(), (resList1, resList2) -> { resList1.addAll(resList2); returnresList1; }); }}Copy the code

The last

  • Articles are original, original is not easy, thanks to the mining platform, feel fruitful, help three even ha, thank you
  • Wechat search public number: Orange pine Java technology nest, make a friend, enter the Internet technology exchange group
  • All the code, sequence diagram and architecture diagram involved in the article are shared and can be requested for free through the public number plus group
  • Article if there is a mistake, welcome to comment messages pointed out, also welcome to reprint, the trouble to mark the source is good