This blog post reviews JAVA8’s CompletionStageAPI and its standard implementation in the JAVA library, CompletableFuture. Several examples will be presented to illustrate the various behaviors of the API.
Because the CompletableFuture is an implementation of the CompletionInterface interface, we first need to understand the contract for that interface. It represents a phase of a synchronous or asynchronous computation. You can think of it as a unit on an assembly line of computations designed to produce a valuable end result. This means that multiple ComletionStage directives can be linked so that the completion of one stage triggers the execution of the next.
In addition to implementing the CompletionStage interface, Completion also inherits the Future, which is used to implement an asynchronous event before it starts. It is named CompletableFuture because it can explicitly complete the Future.
1. Create a new CompletableFuture
This simple example creates a CompletableFuture with pre-set results that are already complete. Usually used as the starting point of a calculation.
static void completedFutureExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));
}
Copy the code
The getNow method returns the finished result (message in this case), or the passed default value null if it has not done so.
2. Run a simple asynchronous stage
The following example explains how to create a stage that runs Runnable asynchronously.
static void runAsyncExample() {
CompletableFuture cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());
}
Copy the code
This example illustrates two things:
CompletableFuture
In order toAsync
Methods ending with are executed asynchronously- By default (that is, no incoming
Executor
Asynchronous execution will be usedForkJoinPool
Implementation, the thread pool uses a background thread to executeRunnable
Task. Note that this is only specific toCompletableFuture
Implementation, other thingsCompletableStage
Implementations can override this default behavior.
3. Apply the method to the previous Stage
The following example references the CompletableFuture already completed in the first example, which references the result of the generated string and capitalizes the string.
static void thenApplyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
assertEquals("MESSAGE", cf.getNow(null));
}
Copy the code
The key word here is thenApply:
then
An operation that takes place after the current phase completes normal execution (normal execution means no exceptions are thrown). In this case, the current phase has been completed and values have been obtainedmessage
.Apply
It means to put aFunction
Student: The result of the previous phase
Function is blocked, which means that the getNow() method is executed only after the uppercase operation has completed.
4. Asynchronously apply the method to the previous Stage
By adding an Async suffix to the end of the method, the CompletableFuture chain is executed asynchronously (using ForkJoinPool.commonPool()).
static void thenApplyAsyncExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}
Copy the code
Use a custom Executor to execute the method asynchronously
One advantage of the asynchronous approach is that you can provide an Executor to execute the CompletableStage. This example shows how to use a thread pool of a fixed size to implement capitalization.
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "custom-executor-"+ count++); }}); static voidthenApplyAsyncWithExecutorExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}
Copy the code
6. Consume the results of the previous Stage
If the next Stage receives the result of the current Stage but does not return a value in the evaluation (such as its return value of void), then it uses the method thenAccept and passes in a Consumer interface.
static void thenAcceptExample() {
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);
}
Copy the code
The Consumer will execute synchronously, so we don’t need to join on the returned CompletableFuture.
7. Run Comsume asynchronously
Again, use the Asyn suffix to implement:
static void thenAcceptAsyncExample() {
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);
}
Copy the code
8. When the calculation is abnormal
Let’s now simulate a scenario where an exception occurs. For brevity, we’ll capitalize a string, but we’ll simulate a delay. We will use thenApplyAsyn(Function, Executor). The first parameter is an uppercase conversion method, and the second parameter is a delayed Executor that will wait one second before submitting the operation to ForkJoinPool.
static void completeExceptionallyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture exceptionHandler = cf.handle((s, th) -> { return(th ! = null) ?"message upon cancel" : ""; });
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
try {
cf.join();
fail("Should have thrown an exception");
} catch(CompletionException ex) { // just for testing
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}
Copy the code
- First, we create a new one that is done with a return value
message
theCompletableFuture
Object. And then we callthenApplyAsync
Method, which returns a newCompletableFuture
. This method performs capitalization asynchronously. And it shows you how to use it, rightdelayedExecutor(timeout, timeUnit)
Method to delay asynchronous operations. - Then we created a handler stage,
exceptionHandler
, which handles any exceptions and returns another messagemessage upon cancel
. - Finally, we explicitly complete the second phase and throw an exception, which causes the uppercase phase to be thrown
CompletionException
. It also triggershandler
Phase.
API added:
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
Returns a new CompletionStage, whether or not the previous Stage completed normally. The parameters passed in include the results of the previous phase and the exception thrown.
9. Cancel the calculation
Much like computation-time exception handling, we can cancel the calculation with cancel(Boolean mayInterruptIfRunning) in the Future interface.
static void cancelExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue("Was not canceled", cf.cancel(true));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
assertEquals("canceled message", cf2.join());
}
Copy the code
API supplement
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
Returns a new CompletableFuture, the result of execution in this method if an exception occurs, otherwise the result of normal execution.
10. Apply Function to one of the results of two completed stages
The following example creates a CompletableFuture object and applies Function to either of the two completed stages (there is no guarantee which one will be passed to Function). The two phases are as follows: one capitalizes the string, the other lowercase.
static void applyToEitherExample() {
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
}
Copy the code
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)
Return a brand new CompletableFuture containing fn executed on either this or Other after the operation is complete
11. Consume the results of either phase
As in the previous example, replace Function with Consumer
static void acceptEitherExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
cf.join();
assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}
Copy the code
12. Run Runnable after both phases are complete
Note that both stages run synchronously: after the first Stage converts the string to uppercase, the second Stage converts it to lowercase.
static void runAfterBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
assertTrue("Result was empty", result.length() > 0);
}
Copy the code
13. Receive the results of two stages with Biconsumer
BiConsumer supports simultaneous manipulation of the results of two stages.
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
Copy the code
14. Bifunction is applied simultaneously to the results of both stages
If CompletableFuture wants to combine the results of both phases and return values, we can use the method thenCombine. The flow is synchronous, so the final getNow() method yields the final result, a concatenation of the uppercase and lowercase results.
static void thenCombineExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}
Copy the code
15. The Bifunction is asynchronously applied to both stages
Similar to the previous example, but in a different way: both phases are asynchronous. Then thenCombine will also execute asynchronously, even if it does not have the Async suffix.
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
Copy the code
16.Compose CompletableFuture
We can use thenCompose to do the same for the first two examples.
static void thenComposeExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}
Copy the code
17. Create a new completion phase when any of the phases is completed
static void anyOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a"."b"."c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); }}); assertTrue("Result was empty", result.length() > 0);
}
Copy the code
18. When all the phases are complete, create a new completion phase
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a"."b"."c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue("Result was empty", result.length() > 0);
}
Copy the code
19. When all phases are complete, create an asynchronous completion phase
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a"."b"."c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue("Result was empty", result.length() > 0);
}
Copy the code
20. Real life
Here is a scenario where CompletableFuture is practiced:
- First by calling
cars()
Method acquisition asynchronouslyCar
List. It’s going to return oneCompletionStage<List<Car>>
.cars()
Methods should be implemented using a remote REST endpoint. - We combine this Stage with another Stage that passes the call
rating(manufactureId)
To asynchronously get the score for each vehicle. - When all Car objects have been scored, we call
allOf()
To enter the final Stage, which is executed after the two phases are complete - Used on the final Stage
whenComplete()
To print out the vehicle’s rating.
cars().thenCompose(cars -> {
List<CompletionStage> updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
})).collect(Collectors.toList());
CompletableFuture done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
if (th == null) {
cars.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();
Copy the code
Original link:
20 Examples of Using Java’s CompletableFuture – DZone Java
Translation link:
20 examples of using JAVA CompletableFuture