preface

In our minds, programs that are executed synchronously are more in tune with our way of thinking, while things that are asynchronous are often more difficult to process. In the case of asynchronous computation, the actions represented as callbacks tend to be scattered across the code and may be nested internally with each other, making the situation worse if you need to deal with errors that might occur in one of the steps. Java 8 introduces a number of new features, including the CompletableFuture class, which makes it much easier to write clean, readable asynchronous code. The class is very powerful and contains over 50 methods…

What is a CompletableFuture

The CompletableFuture class is inspired by Google Guava’s ListenableFuture class, which implements the Future and CompletionStage interfaces and adds many new methods. It supports lambda, The asynchronous programming model is improved by using non-blocking methods through callbacks. It allows us to write non-blocking code by running tasks on a different thread from the main application thread (that is, asynchronously) and notifying the main thread of the task’s progress, completion, or failure.

Why introduce CompletableFuture

Version 1.5 of Java introduced a Future, which you can think of simply as a placeholder for the result of an operation, providing two methods to retrieve the result.

  • get(): The thread calling the method will wait indefinitely for the result of the operation.
  • get(long timeout, TimeUnit unit): The thread that calls this method will only call at the specified timetimeoutThe result of waiting inside, and thrown if waiting times outTimeoutExceptionThe exception.

The Future can use Runnable or Callable instances to complete the submitted task, and as you can see from its source code, it has several problems:

  • blockingcallget()The method blocks until the calculation is complete, provides no method to notify when it is done, and does not have the ability to attach a callback function.
  • Chain calls and result aggregation processingA lot of times we want to link more than oneFutureTo perform a long calculation that requires merging the results and sending them to another task, which is difficult for the interface to do.
  • Exception handling FutureNo exception handling is provided.

All of these problems have been solved in CompletableFuture, so let’s look at how to use CompletableFuture.

How do I create a CompletableFuture

The simplest way to create is called CompletableFuture.com pletedFuture (U value) method to get a completed CompletableFuture object.

@Test
public void testSimpleCompletableFuture(a) {
    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello mghio");
    assertTrue(completableFuture.isDone());
    try {
        assertEquals("Hello mghio", completableFuture.get());
    } catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}Copy the code

Note that when we call the get method on an incomplete CompleteableFuture, the Future is incomplete, so the get call will always block, At this point you can use CompletableFuture.com plete method done manually Future.

Asynchronous task processing

When we want our program to execute tasks asynchronously in the background without caring about the result of processing the task, we can use the runAsync method, which takes a parameter of type Runnable and returns CompletableFuture

.

@Test
public void testCompletableFutureRunAsync(a) {
    AtomicInteger variable = new AtomicInteger(0);
    CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
    runAsync.join();
    assertEquals(100, variable.get());
}

public void process(AtomicInteger variable) {
    System.out.println(Thread.currentThread() + " Process...");
    variable.set(100);
}
Copy the code

If we want a task to execute asynchronously in the background and need to retrieve the result of processing the task, we can use the supplyAsync method, which takes an argument of type Supplier

and returns a CompletableFuture

.

@Test
public void testCompletableFutureSupplyAsync(a) {
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch(ExecutionException | InterruptedException e) { e.printStackTrace(); }}public String process(a) {
    return "Hello mghio";
} 
Copy the code

Looking at this, you might have a question: Where are the threads that perform runAsync and supplyAsync tasks coming from and who created them? In fact, similar to parallelStream in Java 8, CompletableFuture performs these tasks from threads obtained by the global ForkJoinPool.commonPool(). Both methods also provide custom thread pools to perform tasks. In fact, if you look at the source code for CompletableFuture, you’ll see that all methods in the API have an overloaded version, with or without a custom Executor.

@Test
public void testCompletableFutureSupplyAsyncWithExecutor(a) {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
    try {
        assertEquals("Hello mghio", supplyAsync.get()); // Blocking
    } catch(ExecutionException | InterruptedException e) { e.printStackTrace(); }}public String process(a) {
    return "Hello mghio";
}
Copy the code

Chain calls and result aggregation processing

We know that CompletableFuture’s get() method blocks until it gets the result, and CompletableFuture provides thenApply, thenAccept, and thenRun methods to avoid this, We can also add a callback notification when the task is complete. These methods can be used in the following scenarios:

  • thenApplyWhen we’re going fromFutureYou can use this method when you run custom business code before a task after receiving values, and then want to return some values for that task
  • thenAcceptIf we wish to be fromFutureYou can use this method when you run custom business code after receiving some values and before executing the task, without caring about returning the resulting value
  • ThenRun this method can be used if we want to run custom business code after the Future is complete and do not want to return any values for this purpose
@Test
public void testCompletableFutureThenApply(a) {
    Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
        .thenApply(this::thenApplyNotify) // Non Blocking
        .join();
    assertEquals(new Integer(1), notificationId);
}

@Test
public void testCompletableFutureThenAccept(a) {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenAccept(this::thenAcceptNotify) // Non Blocking
        .join();
    assertEquals(100, variable.get());
}

@Test
public void testCompletableFutureThenRun(a) {
    CompletableFuture.supplyAsync(this::processVariable)
        .thenRun(this::thenRunNotify)
        .join();
    assertEquals(100, variable.get());
}

private String processVariable(a) {
    variable.set(100);
    return "success";
}

private void thenRunNotify(a) {
    System.out.println("thenRun completed notify ....");
}

private Integer thenApplyNotify(Integer integer) {
    return integer;
}

private void thenAcceptNotify(String s) {
    System.out.println(
    String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}

public Integer thenApplyProcess(a) {
    return 1;
}
Copy the code

If there is a lot of asynchronous computation, we can continue to pass values from one callback to another, using the chain call method, which is very simple to use.

@Test
public void testCompletableFutureThenApplyAccept(a) {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .thenAccept((i) -> notifyByEmail()).join();
}

private void notifyByEmail(a) {
    // business code
    System.out.println("send notify by email ...");
}

private Double notifyBalance(Double d) {
    // business code
    System.out.println(String.format("your balance is $%s", d));
    return 9527D;
}

private Double calculateBalance(Object o) {
    // business code
    return 9527D;
}

private Double findAccountNumber(a) {
    // business code
    return 9527D;
}
Copy the code

More observant friends may have noticed that in all of the previous method examples, all methods were executed on the same thread. If we want these tasks to run on separate threads, then we can use the corresponding asynchronous versions of these methods.

@Test
public void testCompletableFutureApplyAsync(a) {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
    ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
        .newSingleThreadScheduledExecutor();
    CompletableFuture<Double> completableFuture =
        CompletableFuture
            .supplyAsync(this::findAccountNumber,
                newFixedThreadPool) // Get the thread to execute the task from the thread pool newFixedThreadPool
            .thenApplyAsync(this::calculateBalance,
                newSingleThreadScheduledExecutor)
            .thenApplyAsync(this::notifyBalance);
    Double balance = completableFuture.join();
    assertEquals(9527D, balance);
}
Copy the code

Execution Result Processing

The thenCompose method is suitable for dependent tasks, such as a business that calculates account balances: first we find the account, then calculate the balance for that account, and then send a notification when the calculation is complete. All of these tasks rely on the CompletableFuture result returned by the previous task, at which point we need to use the thenCompose method, which is somewhat similar to the flatMap operation for Java 8 streams.

@Test
public void testCompletableFutureThenCompose(a) {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
    assertEquals(9527D, balance);
}

private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doCalculateBalance(Double d) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private CompletableFuture<Double> doFindAccountNumber(a) {
    sleepSeconds(2);
    // business code
    System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
    return CompletableFuture.completedFuture(9527D);
}

private void sleepSeconds(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch(InterruptedException e) { e.printStackTrace(); }}Copy the code

The thenCombine method is primarily used to combine the processing results of multiple independent tasks. Suppose we need to find the name and address of a person, we can use different tasks to obtain them respectively, and then to obtain the complete information of this person (name + address), we need to combine the results of the two methods, then we can use thenCombine method.

@Test
public void testCompletableFutureThenCombine(a) {
    CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
    String personInfo = thenCombine.join();
    assertEquals("mghio Shanghai, China", personInfo);
}

private CompletableFuture<String> findAddress(a) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "Shanghai, China";
    });
}

private CompletableFuture<String> findName(a) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio ";
    });
}
Copy the code

Wait for multiple tasks to complete

In many cases, we want to run multiple tasks in parallel and do some processing after all the tasks are complete. Suppose we want to look up the names of three different users and merge the results. At this point, we can use the static method allOf of CompletableFuture, which waits for all tasks to complete. Note that this method does not return the combined results of all tasks, so we have to manually combine the execution results of the tasks.

@Test
public void testCompletableFutureAllof(a) {
    List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);
    IntStream.range(0.3).forEach(num -> list.add(findName(num)));

    CompletableFuture<Void> allFuture = CompletableFuture
        .allOf(list.toArray(new CompletableFuture[0]));

    CompletableFuture<List<String>> allFutureList = allFuture
        .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));

    CompletableFuture<String> futureHavingAllValues = allFutureList
        .thenApply(fn -> String.join("", fn));

    String result = futureHavingAllValues.join();
    assertEquals("mghio0mghio1mghio2", result);
}

private CompletableFuture<String> findName(int num) {
    return CompletableFuture.supplyAsync(() -> {
        sleepSeconds(2);
        // business code
        return "mghio" + num;
    });
} 
Copy the code

Exception handling

Program exceptions are actually a lot harder to handle in multithreading, but luckily CompletableFuture gives us a handy way to handle exceptions, as in our example code above:

@Test
public void testCompletableFutureThenCompose(a) {
    Double balance = this.doFindAccountNumber()
        .thenCompose(this::doCalculateBalance)
        .thenCompose(this::doSendNotifyBalance).join();
}
Copy the code

In the above code, methods doFindAccountNumber, doCalculateBalance, and doSendNotifyBalance will not run if any of them fails. CompletableFuture provides three methods for handling exceptions — exceptionally, Handle, and whenComplete. The first approach is to use the exceptionally method to handle exceptions, and if the previous method fails and an exception occurs, the exception callback is called.

@Test
public void testCompletableFutureExceptionally(a) {
    CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .exceptionally(ex -> {
            System.out.println("Exception " + ex.getMessage());
            return 0D;
        });
    Double join = thenApply.join();
    assertEquals(9527D, join);
}
Copy the code

The second method, which uses the Handle method, is exceptionally more flexible than the exceptionally method, which allows us to obtain both the exception object and the current result.

@Test
public void testCompletableFutureHandle(a) {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .handle((ok, ex) -> {
            System.out.println("The final code to run...");
            if(ok ! =null) {
            System.out.println("No Exception !!");
            } else {
            System.out.println("Exception " + ex.getMessage());
            return -1D;
            }
            return ok;
        });
}
Copy the code

The third is to use the whenComplete method to handle the exception.

@Test
public void testCompletableFutureWhenComplete(a) {
    CompletableFuture.supplyAsync(this::findAccountNumber)
        .thenApply(this::calculateBalance)
        .thenApply(this::notifyBalance)
        .whenComplete((result, ex) -> {
            System.out.println("result = " + result + ", ex = " + ex);
            System.out.println("The final code to run...");
        });
}
Copy the code

conclusion

In this article, the CompletableFuture class is introduced in part of the method and use, the method of this class many at the same time to provide a very powerful function, in the asynchronous programming used more, familiar with the basic method of use to understand or to in-depth source analysis of its implementation principle.