Prior to Java8, if it was not convenient to asynchronize the results of a calculation, you would need to use a Future and the current thread would actively execute future.get to get the results. So Java8 has added support for asynchronous computation by CompletableFuture, which can be called back to the thread executing the task.

The CompletableFuture class implements the CompletionStage and the Future interface, so you can still use CompletableFuture just as you used Future before, although this is no longer recommended.

First of all, if you want to execute multiple tasks, each task will return corresponding results. Now, after all tasks are executed, you need to print the results of these tasks in a unified manner. How to do this? Be careful not to use business threads to wait for the results of multiple tasks, i.e., do not use future.get.

So you can use the CompletableFuture class to do this. After each task is completed, determine whether all tasks are completed. If so, print all tasks and return the results. The code implementation is as follows (3-task scenario) :

public static void main(String[] args) {
    int cnt = 3;
    AtomicInteger taskCnt = new AtomicInteger(cnt);
    DemoTask<String> demoTask = new DemoTask<>(taskCnt, new String[taskCnt.get()]);

    for (int i = 0; i < cnt; i++) {
        CompletableFuture.supplyAsync(() -> {
            String result = format("[supplyAsync] result");

            // random n millisecond
            int ms = new Random().nextInt(100);
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ms));

            System.out.println(result + String.format("[sleep %s ms]", ms));
            return result;
        }).whenComplete((s, throwable) -> demoTask.afterHandle(s));
    }

    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}

@AllArgsConstructor
static class DemoTask<T> {
    // Note: taskCnt == resultList.size()
    private AtomicInteger taskCnt;
    private T[] resultList;

    public void afterHandle(T result) {
        int num = taskCnt.decrementAndGet();
        resultList[num] = result;

        if (num == 0) {
            // all done
            System.out.println("all done:");
            for (T t : resultList) {
                System.out.println(""+ t); }}}}private static String format(String msg) {
    return String.format("[%s] %s", Thread.currentThread().getName(), msg);
}
Copy the code

The following output is displayed:

Let’s take a look at the use of the CompletableFuture class

The creation of CompletableFuture

The CompletableFuture class is created to set the result of the future itself, as shown in the following example:

// Create a CompletableFuture with result
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
future.get();
 
// The CompletableFuture created by default has no result, so calling future.get() blocks until there is a result or an exception
future = new CompletableFuture<>();
try {
    future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // no care
}
 
// Fill the future with a result
future.complete("result");
assert "result".equals(future.get());
 
// Fill the future with an exception
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("exception"));
try {
    future.get();
} catch (Exception e) {
    assert "exception".equals(e.getCause().getMessage());
}
Copy the code

The example above is a result that sets the future itself. Normally we let other threads or thread pools perform asynchronous tasks such as futures. In addition to creating a CompletableFuture object directly (which is not recommended), you can create a CompletableFuture object using the following four methods:

// runAsync is a Runnable task with no return value. If the input parameter has an Executor, the executor is used to execute the asynchronous task
public static CompletableFuture<Void>  runAsync(Runnable runnable)
public static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync is an asynchronous task to return a result
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
Copy the code

If an input parameter does not have an executor, by default ForkJoinPool.commonPool() is used as its thread pool to execute asynchronous tasks, as shown in the following example:

CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
}, executor);
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});
Copy the code

The completion action of the CompletableFuture

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
Copy the code

WhenComplete will execute the action directly in the current thread when the task is completed. Async is used by other threads to execute the action (if it is a thread pool, The action may be executed on the same thread as the asynchronous task.) The input argument with executor is given to the Executor thread pool to execute the action. The exceptionally method is executed on the current thread when an exception occurs.

CompletableFuture

CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).whenCompleteAsync((result, e) -> {
    System.out.println(result + "" + e);
}).exceptionally((e) -> {
    System.out.println("exception " + e);
    return "exception";
});
Copy the code

In addition to performing the complete action with whenComplete above, you can also use the handle method, which converts the return type of the previous CompletableFuture:

public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

// Handle method example:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
    System.out.println("handle");
    return 1;
});
Copy the code

In addition to using the Handle method to perform the CompletableFuture return type conversion, you can also use thenApply. The difference is that thenApply handles normal return values and exceptions, so you can mask exceptions to avoid further throws. The latter, on the other hand, can only handle normal return values and will throw any exceptions.

public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

// thenApply example method:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply((r) -> {
    System.out.println(r);
    return "aaa";
}).thenApply((r) -> {
    System.out.println(r);
    return 1;
});
Copy the code

The handle and thenApply above both return the new CompletableFuture type. If you are referring to some consumable action that does not return the new CompletableFuture type, you can use thenAccept.

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

// thenAccept method example:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAccept(r -> {
    System.out.println(r);
}).thenAccept(r -> {
    // r is Void (null)
    System.out.println(r);
});
Copy the code

Handle, thenApply, and thenAppept all operate on the result returned by the last CompletableFuture. Can we operate directly on the last CompletableFuture? In fact, it is also possible to use thenAppeptBoth method. Note that thenAppeptBoth and handle/thenApply/thenAppep process is the same, just thenAppeptBoth into the object can be another CompletableFuture refs.

public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)
  
// thenAcceptBoth example method:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
});
Copy the code

Note that the thenAcceptBoth method does not return a value (CompletableFuture

). If you want to use a function like thenAcceptBoth with a return value, you can use thenCombine.

public <U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Copy the code

If you don’t want to use the result of the CompletableFuture when the task completes, you can use thenRun to execute a Runnable.

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)
Copy the code

All of the above methods return a value (or none) in a method, but you can also return a CompletableFuture, which looks like a class combination. Yes, it does.

Auxiliary methods allOf and anyOf

The allOf method performs the calculation when all completableFutures have been executed.

The anyOf method performs a calculation when any CompletableFuture completes, with the same result.

public static CompletableFuture<Void>      allOf(CompletableFuture
       ... cfs)
public static CompletableFuture<Object>    anyOf(CompletableFuture
       ... cfs)
Copy the code

Recommended reading

  • Java thread pool implementation principles
  • In-depth understanding of Java thread pools
  • JMM Java memory model
  • Happens-before that happens
  • Why LockSupport is the cornerstone of Java concurrency?
  • FutureTask principle analysis