This article introduces Java 8’s CompletionStage API and its implementation of the standard library CompletableFuture. The API demonstrates its behavior with examples, one or two of which are shown in each example.
Since the CompletableFuture class implements the CompletionStage interface, first we need to understand the contract for this interface. It represents a specific stage of computation that can be performed synchronously or asynchronously. You can think of it as a unit in a computational pipeline that eventually produces a final result, which means that several CompletionStages can be concatenated, and one completed stage can trigger the next execution, and then the next, and then…
In addition to implementing the CompletionStage interface, the CompletableFuture also implements the Future interface, which represents an unfinished asynchronous event. The completableFuture provides methods that can explicitly complete the Future, so it is called completableFuture.
1. Create a completableFuture
The simplest example is to create a completed completableFuture with a predefined result, which is usually used at the beginning of a calculation.
static void completedFutureExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));
} getNow(NULL) returns the result if the future is complete, as in the example above, otherwise it returns NULL (the passed argument).
2. Run a simple asynchronous phase
This example creates a phase that executes asynchronously:
static void runAsyncExample() {
CompletableFuture cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());
} Two things can be learned from this example:
A completableFuture method that ends in Async will execute asynchronously (if no executor is specified) through a ForkJoinPool, which uses a daemon thread to execute the task. Note that this is a feature of the completableFuture, and other completionstages can override this default behavior.
Read: The magic of parallel task execution: the FORK &JOIN framework
3. Apply the function on the previous stage
The following example takes the completed future from #1, which returns the result as the string message, and then applies a function to change it to uppercase.
static void thenApplyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
assertEquals("MESSAGE", cf.getNow(null));
} Note the behavior that the TheApply method name represents.
Then means that the action of this phase occurs after the normal completion of the current phase. In this case, the current node completes and returns the string message.
Apply means that the returned stage will Apply a function to the result of the previous stage.
The execution of the function is blocked, which means that getNow() returns only after the skew operation has been completed.
In addition, pay attention to the public number Java technology stack, reply in the background: interview, you can get my finishing Java concurrent multi-threaded series of interview questions and answers, very complete.
4. Apply functions asynchronously on the previous stage
By calling an asynchronous method (followed by the Async suffix), the CompletableFuture is concatenated and can be executed asynchronously (using ForkJoinPool.commonPool()).
static void thenApplyAsyncExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
} 5, Use custom Executors to apply functions asynchronously on the previous stage
A very useful feature of asynchronous methods is the ability to provide an Executor to execute a CompletableFuture asynchronously. A thorough analysis of thread pools is recommended.
This example demonstrates how to apply uppercase functions using a fixed size thread pool.
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "custom-executor-" + count++);
}
});
static void thenApplyAsyncWithExecutorExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
} 6. The results of the previous period of consumption
If the next stage receives the result of the current stage, but does not need to return the value when it is evaluated (its return type is void), then it can apply a consumer instead of a function, and the calling method becomes theAccept:
static void thenAcceptExample() {
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);
} In this case the consumer executes synchronously, so we don’t need to call the join method on the completableFuture.
Consume the results of the migration phase asynchronously
Again, you can use the TheAcceptAsync method, and the concatenated CompletableFuture can be executed asynchronously.
static void thenAcceptAsyncExample() {
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);
} 8. Complete the calculation of exceptions
Now let’s look at how an asynchronous operation can explicitly return an exception to indicate that the computation failed. Let’s simplify this example by manipulating a string, converting it to an acknowledgment, and we simulate a delay of one second.
We use the ThenApplyAsync (Function, Executor) method, and the first argument is passed to the uppercase Function, Executor is a delayed Executor with a delay of one second before execution.
static void completeExceptionallyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th ! = null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue(“Was not completed exceptionally”, cf.isCompletedExceptionally());
try {
cf.join();
fail("Should have thrown an exception");
} catch(CompletionException ex) { // just for testing
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
} Let’s look at the details.
So first we create a completableFuture, and when it’s done it returns a string message, and then we call the TheApplyAsync method, which returns a completableFuture. This method applies the uppercase function asynchronously after the first function completes.
This example also demonstrates how to delay the execution of an asynchronous task with a DelayedExecutor (timeout, timeUnit).
We create a separate handler stage: ExceptionHandler, which handles the exception exception and returns Message Upon Cancel in the case of an exception.
Next we complete the second stage explicitly with exceptions. The join method is called on the phase, which performs the uppercase conversion and then throws a CompletionException. (A normal join waits for 1 second and then gets the uppercase string. But our example completes the exception before it executes), and then it triggers the Handler phase.
9. Cancel the calculation
Similar to completing the exception, we can cancel the computation by calling Cancel (Boolean MayInterruptifrunning). For the CompletableFuture class, the Boolean parameter is not used because it does not use an interrupt to cancel the operation; instead, Cancel is equivalent to CompleteException (new CancellationException()).
static void cancelExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue("Was not canceled", cf.cancel(true));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
assertEquals("canceled message", cf2.join());
} 10, Apply the function to one of the two completion phases
The following example creates a completableFuture, and applyToEither handles two phases and applies the function on one of them (the package guarantees which one is executed). The two phases in this example are one to apply uppercase conversions to the original string, and the other to apply smaller conversions.
static void applyToEitherExample() {
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
} 11, Call the consumption function on one of the two completion stages
This is very similar to the previous example, except that we call the Consumer Function (Function becomes Consumer):
static void acceptEitherExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
cf.join();
assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
} 12, Run a Runnable after both phases have finished executing
This example demonstrates how the dependent completableFuture executes a Runnable after waiting for two phases to complete.
Note that all the following phases are performed synchronously, with the first phase performing the uppercase conversion and the second phase performing the lowercase conversion.
static void runAfterBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
assertTrue("Result was empty", result.length() > 0);
} 13, Use Biconsumer to process the results of two phases
The above example can also be implemented via BiConsumer:
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
} 14, Use biFunction to process the results of both phases
If the completableFuture relies on the results of two previous phases, and it compiles the results of both phases and returns one result, we can use the TheCombine () function. The pipeline is synchronized, so getNow() gets the final result, concatenating the uppercase and lowercase strings.
static void thenCombineExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
} 15, Use biFunction asynchronously to process the results of both phases
Similar to the example above, but with one difference: the first two phases of the dependency are executed asynchronously, so TheCombine () is also executed asynchronously, even if it doesn’t have the Async suffix.
There are comments in Javadoc:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method
So we need the join method to wait for the result to complete.
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
} 16, Assemble completableFuture
We can complete the above two examples by using theCompose(). This method waits for the completion of the first stage (uppercase conversion), and its result is passed to a specified completableFuture function, whose result is the result of the returned completableFuture.
It’s a mouthful, but let’s look at an example to understand. The function takes an uppercase string and returns a completableFuture that converts the string to lowercase and concatenates it after the uppercase string.
static void thenComposeExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
} 17. When one of several phases is complete, create a completed phase
The following example demonstrates creating a completableFuture when any of the completableFuture is complete.
The stages to be processed are created first, and each stage converts a string to uppercase. Since in this case the stages are executed synchronously (theApply), the completableFuture created from anyOf will complete immediately so that all the stages have completed, we use whenComplete(BiConsumer<? super Object, ? Super Throwable> Action) processes the completed result.
static void anyOfExample() {
StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); }}); assertTrue("Result was empty", result.length() > 0);
} 18. Create a phase when all phases are complete
The previous example shows processing after any phase is complete, and the next two examples show processing after all phases are complete, both synchronously and asynchronously.
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue("Result was empty", result.length() > 0);
} 19, Create a phase asynchronously when all phases are complete
By replacing those single CompletableFutures methods with TheNApplyAsync (), allOf() is executed asynchronously in the thread in the common pool. So we need to call the join method and wait for it to finish.
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue("Result was empty", result.length() > 0);
} 20. Real examples
Now that you know what some of the functions of CompletionStage and CompletableFuture are capable of, the following example is a practical scenario:
We first call the cars method asynchronously to get the list of cars, which returns the CompletionStage scene. CARS consumes a remote REST API.
Then we’ll compound a CompletionStage to fill in the score for each car, and return a CompletionStage through the Rating (ManufacturerId), which asynchronously gets the score for the car (probably another REST API call).
We end the list when all the cars have filled in their scores, so we call allOf to get the final stage, which is completed after all the previous stages have been completed.
In the final stage whenComplete() is called, we print out each car and its score.
cars().thenCompose(cars -> {
List<CompletionStage> updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
})).collect(Collectors.toList());
CompletableFuture done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
if (th == null) {
cars.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join(); Because each instance of the car is independent, the score for each car can be performed asynchronously, which improves system performance (latency). Also, waiting for all car scores to be processed uses the allOf method, rather than a manual thread wait (Thread#join() or a countDownlatch).
Finally, pay attention to the public number Java technology stack, reply in the background: interview, can get my Java concurrent multi-threaded series of interview questions and answers, very complete.