Welcome to King of Concurrency. This is the 26th article in the series, and the third in Brick.

Since Java8, the JDK has introduced many new features, including lambda expressions, streaming computation, and CompletableFuture, which is detailed in this article. With a CompletableFuture, you might first think of the Future interface, which is familiar to us from ThreadPoolExecutor and ForkJoinPool. If you’re confused, read our first two articles.

The Future’s interface definition itself is not complicated and easy to use. At its core are the get() and isDone() methods. However, the simplicity of Future also leads to its inherent deficiency in some aspects. In some scenarios, the Future may not be able to meet our needs, such as when we cannot orchestrate concurrent tasks through the Future. Fortunately, the CompletableFuture makes up for many of the shortcomings of the Future, and it may be your best choice, which is why this article is about the CompletableFuture.

In this article, we’ll explore how a CompletableFuture differs from a Future, as well as its core capabilities and best practices, in combination with Futures and thread pools.

1. Understand CompletableFuture

1. Future’s limitations

Essentially, a Future represents the result of an asynchronous computation. It provides isDone() to detect whether the calculation has completed, and the result can be retrieved by the get() method when the calculation is complete. A Future is a really good interface for asynchronous computing. However, it does have its own limitations:

  • Concurrent multitasking: Future only provides a get() method to get results and is blocked. So there is nothing to do but wait for you;
  • The inability to chain calls to multiple tasks: If you want to perform a specific action, such as sending an email, after a computational task has completed, the Future does not provide that capability;
  • Failure to combine multiple tasks: If you run 10 tasks and expect to perform a specific action after they have all finished, there is no way to do this in a Future;
  • No exception handling: The Future interface has no methods for exception handling;

2. The difference between CompletableFuture and Future

Simply put, CompletableFuture is an extension and enhancement of the Future interface. CompletableFuture inherits the Future interface completely, and extends it abundantly on this basis, perfectly making up for the problems mentioned above. More importantly, CompletableFuture implements the ability to orchestrate tasks. With this capability, we can easily organize the order, rules, and ways in which different tasks are run. To some extent, this ability is its core ability. In the past, although it was possible to do this with tools such as CountDownLatch, it required complex logic that was laborious and difficult to maintain.

3. Initial experience of CompletableFuture

Of course, seeing is better than hearing, since CompletableFuture is so marvelous, we might as well experience the use of CompletableFuture through a specific scene.

As we all know, there are marked in the king of “grass three jie (B)”, Daji is one of them, her squat grass skills can be described as a unique. That day, Daji saw the place from a distance small luban bouncing up and down, to deal with such a crisp skin is most suitable for a wave of operation in the grass. So, Daji sidestepped into the grass, in the small Lu Ban merrily passing by, Daji a set of skilled 231 consecutive strokes seconds to kill the small Lu Ban. Little Lu Ban died with his eyes open, for he had not even seen his opponent’s face. Soon!

In this process, there are several sets of actions: Capture Lu Ban, hit skill 2, hit skill 3 and hit skill 1. We can express these actions with a chained call to CompletableFuture:

CompletableFuture. SupplyAsync (CompletableFutureDemo: : capture ban thenAccept (player - > note (player. The getName ()))// Receive the result of the supplyAsync and get the name of the other party
    .thenRun(() -> attack("2 Skill - Idol Charm: Lu Ban takes 285 spell damage from Daji and is stunned for 1.5 seconds..."))
    .thenRun(() -> attack("3 Ability - Queen Worship: Daji casts 5 bands of fox fire. Lu Ban takes 325++ spell damage."))
    .thenRun(() -> attack("1 Ability - Soul Shock: Lu Ban takes 520 points of spell damage from Daji..."))
    .thenRunAsync(() -> note("Lu Ban, pawn...")); // Use other threads in the thread pool
Copy the code

Look, isn’t it easy to use a CompletableFuture to arrange actions? In just six lines of code, we’ve used four different methods such as supplyAsync() and thenAccept(), and used both synchronous and asynchronous. In the past, a manual implementation would have required at least dozens of lines of code. So how did CompletableFuture do it? Then look down.

Ii. Core design of CompletableFuture

Overall, A CompletableFuture implements both the Future and CompletionStage interfaces and has only a small number of properties. However, it has nearly 2,400 lines of code, and the relationships are complex. So, in terms of core design, we’re not going to talk about it.

By now, you already know that the Future interface provides only simple methods like get() and isDone, and that a Future alone does not provide rich capabilities for a CompletableFuture. So how does the CompletableFuture extend its capabilities? That brings us to the CompletionStage interface, which is the core of the CompletableFuture, and that’s what we’re going to focus on.

As the name suggests, by referring to the “Stage” in the name of the CompletionStage, you can think of it as a step in the choreography of the task. A step is the basic unit of a task orchestration, which can be a pure calculation or a specific action. In a choreography, there are multiple steps, and there are dependencies, chains, and combinations, as well as parallel and serial relationships among these steps. This relationship is similar to Pipeline or streaming computing.

Since it is choreography, you need to maintain the creation of tasks and establish computing relationships. To that end, CompletableFuture offers more than 50 methods, a truly huge and eye-popping number that is obviously impossible and certainly unnecessary to understand. Although there are a large number of methods for CompletableFuture, there are still rules to follow in the understanding. We can simplify the understanding of methods by classification, understand types and varieties, and basically master the core ability of CompletableFuture.

By type, these methods can be summarized into the following four categories, and most other methods are based on variations of these four types:

type Receive parameters Returns the result Support asynchronous
Supply ✔ ︎ ✔ ︎
Apply ✔ ︎ ✔ ︎ ✔ ︎
Accept ✔ ︎ ✔ ︎
Run ✔ ︎

Variations on methods

The methods for the above inoculation types generally have three variations: synchronous, asynchronous, and the specified thread pool. For example, the three variant methods of thenApply() look like this:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Copy the code

The following class diagram shows the relationship between a CompletableFuture and Future, CompletionStage, and Completion. Of course, because there are many methods, this picture does not show all of them, but only selected some important methods.

Three, the core usage of CompletableFuture

As mentioned earlier, the core methods of the CompletableFuture fall into four categories, and these four categories fall into two patterns: synchronous and asynchronous. So, we’ve chosen some core apis from these four types of methods, and they’re all apis that we use frequently.

  • Synchronization: Running tasks using the current thread;
  • asynchronous: Another thread runs the task using the CompletableFuture thread pool, with the name of the asynchronous methodAsync.

1. runAsync

RunAsync () is one of the most commonly used methods for CompletableFuture, which can receive a task to be run and return a CompletableFuture.

When we wanted to run a task asynchronously, we used to implement Thread manually or with Executor. With runAsync() ‘it’s much easier. For example, we can directly pass in tasks of type Runnable:

CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run(a) {
        note("Daji went into the grass to squat... Waiting for little Lu Ban to appear."); }});Copy the code

In addition, in Java8 and later, we can use lambda expressions to further simplify the code:

CompletableFuture.runAsync(() -> note("Daji went into the grass to squat... Waiting for little Lu Ban to appear."));
Copy the code

Doesn’t that look easy? I’m sure many students use runAsync() in this way as well. However, if you use runAsync() in this way, you should be careful, there are pitfalls. At the end of the article, there is a brief explanation of the CompletableFuture thread pool to help you avoid pit mining.

2. Supply and supplyAsync

The supply() method can be confusing at first, and many people don’t know what it does. But the name says it all: “Supply” means providing results, of course! In other words, when we use supply(), we are indicating that we will return a result that can be used by subsequent tasks.

For example, in the example code below, we return the result via supplyAsync(), which is then used by thenApply().

// Create a nameFuture and return the name
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Da ji";
});

 // Use thenApply() to receive the result of nameFuture and perform a callback
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "Love you," + name;
});

// Block the result of obtaining the confession
System.out.println(sayLoveFuture.get()); // Love you, Daji
Copy the code

You see, once you understand what supply() means, it’s that simple. If you want to run a task with a new thread, you can use supplyAsync().

3. thenApply与thenApplyAsync

We’ve already introduced supply(), which we know is used to provide results, and we’ve mentioned thenApply() in passing. Obviously, it goes without saying that you probably already know that thenApply() is a partner of Supply () and is used to receive the result of supply(), perform specific code logic, and finally return the CompletableFuture result.


 // Use thenApply() to receive the result of nameFuture and perform a callback
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "Love you," + name;
});

public <U> CompletableFuture <U> thenApplyAsync(
    Function <? super T, ? extends U> fn) {
    return uniApplyStage(null, fn);
}
Copy the code

4. thenAccept与thenAcceptAsync

As a file for supply(), thenApply() is not the only one that exists, so is thenAccept(). But unlike thenApply(), thenAccept() only accepts data but does not return. Its return type is Void.


CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
     System.out.println("Love you," + name);
});
        
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
    return uniAcceptStage(null, action);
}
Copy the code

5. thenRun

ThenRun () is simpler. It does not receive the result of a task, only runs a specific task, and does not return the result.

public CompletableFuture < Void > thenRun(Runnable action) {
   return uniRunStage(null, action);
}
Copy the code

So, if you don’t want to return any results in a callback and only want to run specific logic, then you might consider using thenAccept and thenRun. Typically, these two methods are used at the very end of the call chain. .

6. ThenCompose thenCombine

These methods are all separate, but the difference between thenCompose() and thenCombine() is that they allow for both dependent and undependent types of tasks.

Choreograph two tasks that have dependencies

In the previous example, we used thenApply() when receiving the results of the previous task. In other words, sayLoveFuture must rely on nameFuture’s completion when executing, or execute a hammer.

/ / create the Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Da ji";
});

 // Use thenApply() to receive the result of nameFuture and perform a callback
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "Love you," + name;
});
Copy the code

But in fact, in addition to thenApply(), we can use thenCompose() to orchestrate two tasks that have dependencies. For example, the sample code above could be written as:

/ / create the Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Da ji";
});

CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
    return CompletableFuture.supplyAsync(() -> "Love you," + name);
});
Copy the code

As you can see, the core difference between thenCompose() and thenApply() lies in their return value types:

  • thenApply(): Returns the original type of the computed result. For example, mandatory String;
  • thenCompose(): Returns the CompletableFuture type, such as CompletableFuture.

Combine two separate tasks

Consider a scenario where we are performing a task that requires other tasks to be ready. What should we do? Such a scenario is not uncommon and can be implemented either using the concurrency tool class we learned earlier or using thenCombine().

For example, when calculating the win rate of a hero, such as Daji, we need to take the total number of games she has played, and the number of games she has won, and then use the winRounds/rounds to calculate that. For this calculation, we can do this:

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);

CompletableFuture < Object > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            return 0.0;
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    });
System.out.println(winRateFuture.get());

Copy the code

ThenCombine () combines the results of the other two tasks as parameters in its own computational logic. It will be in a wait state when the other two parameters are not ready.

7. allOf与anyOf

AllOf () and anyOf() are also twins, which we can consider when we need to organize the running of multiple futures:

  • allOf(): Given a set of tasks, waitAll the tasksEnd of execution;
  • anyOf(): Given a set of tasks, waiteitherNo further action is required.

The method signatures of allOf() and anyOf() are as follows:

static CompletableFuture<Void>	 allOf(CompletableFuture
       ... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture
       ... cfs)
Copy the code

Note that anyOf() returns the result of the finished task, but allOf() does not return any result, and returns Void.

The sample code for allOf() and anyOf() is shown below. We created roundsFuture and winRoundsFuture and simulated their execution times with sleep. When executed, winRoundsFuture will return the result first, so when we call CompletableFuture.anyOf we will also find that the output is 365.

 CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
   try {
     Thread.sleep(200);
     return 500;
   } catch (InterruptedException e) {
     return null; }}); CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {try {
     Thread.sleep(100);
     return 365;
   } catch (InterruptedException e) {
     return null; }}); CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture); System.out.println(completedFuture.get());/ / back to 365

 CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
Copy the code

Before CompletableFuture, if we want to implement a specific action when all tasks are finished, we can consider a utility class such as CountDownLatch. Now, with one more option, we can also consider using CompletableFuture.allof.

Exception handling in CompletableFuture

Exception handling is a must for any framework, and CompletableFuture is no exception. Previously, we have looked at the core method for CompletableFuture. Now, let’s look at how to handle exceptions during computation.

Consider the case where rounds=0 will throw a runtime exception. What should we do?

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("Total number of sessions wrong");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    });
System.out.println(winRateFuture.get());
Copy the code

In a chained call to a CompletableFuture, if an exception occurs on one task, subsequent tasks will not be executed. There are two ways to handle exceptions: exceptionally() and handle().

1. Use the exceptionally() callback to handle exceptions

Use exceptionally() at the end of a chained call to catch an exception and return the default value in the case of an error. Note that exceptionally() is called only when an exception occurs.

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("Total number of sessions wrong");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).exceptionally(ex -> {
        System.out.println("Error:" + ex.getMessage());
        return "";
    });
System.out.println(winRateFuture.get());
Copy the code

2. Use handle() to handle the exception

In addition to exceptionally(), the CompletableFuture also provides a handle() to handle exceptions. However, unlike exceptionally(), when we use handle() in the call chain it will be called regardless of whether an exception occurred. So, inside the handle() method, we need to pass if (ex! = null) to determine whether an exception has occurred.

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("Total number of sessions wrong");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).handle((res, ex) -> {
        if(ex ! =null) {
            System.out.println("Error:" + ex.getMessage());
            return "";
        }
        return res;
    });
System.out.println(winRateFuture.get());
Copy the code

Thread pool in CompletableFuture

Earlier we said that tasks in a CompletableFuture come in synchronous, asynchronous, and thread-pool-specific variants. For example, when we call thenAccept(), we will not use the new thread, but the current thread. When we use thenAcceptAsync(), we create a new thread. So, in all of the previous examples, we never created a thread. How does the CompletableFuture create a new thread?

The answer is ForkJoinPool.commonPool(), and our old familiar friend is back, again. When a new thread is needed, the CompletableFuture will retrieve the thread from the commonPool.

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

The problem is, we already know the potential risks of commonPool, and using it in a production environment is like digging a hole for yourself. So what do we do? Custom thread pools, of course, are important things to keep in your own hands. In other words, when I decide to use a CompletableFuture, the default is to create our own thread pool. Don’t be lazy, and don’t take chances.

Each of the core type methods in the CompletableFuture provides a custom thread pool overload and is easy to use:


// supplyAsync allows you to specify thread pool methods
public static < U > CompletableFuture < U > supplyAsync(Supplier < U > supplier,
    Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

// Custom thread pool example
Executor executor = Executors.newFixedThreadPool(10);

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(200);
        return 500;
    } catch (InterruptedException e) {
        return null;
    }
}, executor);
Copy the code

summary

At this point, we’re done with the CompletableFuture. As we already know, CompletableFuture is an extension and enhancement of the Future and offers a lot of great features that are powerful and fun to play with. These features can help us gracefully solve scenarios that would otherwise have been costly to implement.

Of course, as beautiful as the rose of CompletableFuture is, its thorns are equally sharp and it is not born perfect. Therefore, there are still some necessary constraints to follow when using a CompletableFuture:

  • Custom thread pool: When you decide to use CompletableFuture in production, you should also have a thread pool policy, rather than lazy with the default thread pool;
  • Team consensus: As with technology, there are always different standards for good and bad. When you say yes, your teammates may not think so, or you may disagree with a technical point of view. Therefore, when you decide to adopt a CompletableFuture, it is best to synchronize your strategy with the team and let everyone understand its advantages and potential risks. It is never a good strategy to go it alone.

Finally, the source code for CompletableFuture is nearly 2,400 lines long and has a large number of apis. To tell you the truth, in the king series analyzed source code articles, CompletableFuture source is by far the most difficult to understand. If the source code to explain, probably tens of thousands of words, which will directly dissuad more than 90 percent of readers. Therefore, we do not recommend that you eat all the source code, but suggest that on the basis of inductive classification, targeted to grasp the key parts of it. Of course, if you read all the source code with interest, I’ll give you a thumbs up here.

At the end of the text, congratulations on your another star ✨

The teacher’s trial

  • Hands-on: Coding experiencerunAsync()And specify the thread pool.

Further reading and references

  • “King concurrent course” outline and update progress overview: juejin.cn/post/696727…
  • Thepracticaldeveloper.com/differences…

About the author

Pay attention to [technology 8:30], get the article updates in time. Pass on quality technical articles, record the coming-of-age stories of ordinary people, and occasionally talk about life and ideals. 8:30 in the morning push author quality original, 20:30 in the evening push industry depth good article.

If this article is helpful to you, welcome to like, follow, supervise, we together from bronze to king.