This is the first day of my participation in the August More Text challenge

If the profile

Prior to Java 8, we used Multithreaded programming in Java, usually through the Run method in Runnable, which had the obvious disadvantage of having no return value. At this point, you might try using the call method in Callable and return the result with the Future as follows:

public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> stringFuture = executor.submit(new Callable<String>() {
            @Override
            public String call(a) throws Exception {
                Thread.sleep(2000);
                return "async thread"; }}); Thread.sleep(1000);
        System.out.println("main thread");
        System.out.println(stringFuture.get());
}
Copy the code
  • Looking at the console, we found that printing the main thread first and the Async thread one second later seemed to satisfy our needs. However, if we think about it more carefully, the current main thread is blocked when calling the Future’s get() method, which is not what we want.

  • Another way to get the result returned is to poll first, call isDone, and get it when it’s done, but that’s not satisfactory either.

  1. Multiple asynchronous threads may have inconsistent execution times, and my main thread cannot wait forever. In this case, I may want to wait only for the fastest thread to complete or the most important task to complete, or I may want to wait only one second, and I will use the default values for threads that do not return results.

  2. My two asynchronous tasks execute independently of each other, but the second one depends on the results of the first.

Java8’s CompletableFuture, in this chaotic and imperfect multi-threaded arena shining debut. CompletableFuture greatly improves and extends the functions and usage scenarios of the Future, provides functional programming capabilities, makes the code more beautiful and elegant, and can calculate the processing results through callbacks, and has a better handling method for exception handling.

The CompletableFuture source code has four static methods for performing asynchronous tasks:

Create a task

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){.. }public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){.. }public static CompletableFuture<Void> runAsync(Runnable runnable){.. }public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){.. }Copy the code
  • If you know the basics of multithreading, it’s easy to see that the two methods at the beginning of run are used to perform tasks that do not return values because their input arguments are Runnable objects.

  • A method that starts with supply obviously performs a task that returns a value. As for the method’s entry argument, if no Executor object is passed in, the async code will be executed using ForkJoinPool.commonPool() as its thread pool. In practice, it is usually faster to pass in a thread pool object as a parameter.

The way to perform asynchronous tasks is also very simple, just use the above method:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {/ /... Perform a task
    return "hello";
}, executor)
Copy the code

Let’s take a look at a few ways to get results.

V get(a);
V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join(a);
Copy the code
  • The above two methods are implemented in the Future. Get () blocks the current thread, which creates a problem. If the executing thread does not return data late,get() will wait forever.

  • The getNow() method is interesting because it returns the result when it is available, or if the asynchronous thread throws an exception it will return its own default value.

Let’s look at some of the other commonly used methods in CompletableFuture with some scenario examples

thenAccept()
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
Copy the code
  • Run the following command to execute a task after the current task is complete: The execution result of the current task can be used as the input parameter of the next task.
  • Scenario: Task A is executed and task B is asynchronously executed. After task B returns normally, the return value of task B is executed and task C has no return value
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "The task A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "The task B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
      System.out.println("Mission C.");
      System.out.println("Parameters." + b);// Parameter: task B
      return "a";
});
thenRun(..)
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Copy the code
  • Function: Perform the next operation on the result of an unconcerned step
  • Scenario: Task A is executed. After task A is executed, task B is executed. Task B does not accept the return value of task A (no matter whether A returns A value) and has no return value
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "The task A");
futureA.thenRun(() -> System.out.println("Mission B"));
thenApply(..)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Copy the code
  • Run the following command to execute a task after the current task is completed: The execution result of the current task is used as the input parameter of the next task and has a return value
  • Scenario: Multiple tasks are executed in series, the execution of the next task depends on the results of the previous task, and each task has inputs and outputs

Task A is executed asynchronously. When task A is completed, resultA returned by A is used as the input parameter to process task B, and any multiple tasks can be executed in series

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");
CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);
System.out.println(future3.join());
Copy the code

In the above code, we could have called future.join() to get the return value of task A and then use the return value as an input to perform task B. ThenApply simplifies this step by not blocking the calling thread while waiting for A calculation to complete. It tells the CompletableFuture to proceed to the next step as soon as you’re done executing. You’ve got multiple tasks strung together.

thenCombine(..)  thenAcceptBoth(..)  runAfterBoth(..)
public <U,V> CompletableFuture<V>  thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Copy the code
  • Function: Combine the results of two Completionstages, transform and return
  • Scenario: You need to query the current price of the product based on the product ID. The original price and discount of the product are independent of each other. When both are found, multiply the original price by the discount to calculate the current price. Usage :thenCombine(..)
 CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
 CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
 CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
 System.out.println("Final price is :" + futureResult.join()); // Final price :80.0
Copy the code
  • thenCombine(..) If you don’t need to return, then yes
  • thenAcceptBoth(..) , by the same token, if the two tasks don’t care about the return value, that requires runAfterBoth, if you understand the above three methods, thenApply, thenAccept, thenRun, there would be no need for separate mention these two methods, only mention here.
thenCompose(..)
public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
Copy the code

Function: This method takes input that is the calculated value of the current CompletableFuture and returns a new CompletableFuture

Much like thenApply, this method takes the result of the previous task as an input, performs its own action, and returns. So what’s the difference?

  • ThenApply (): This does the equivalent of converting the CompletableFuture to the CompletableFuture, changing the generic type in the same CompletableFuture
  • ThenCompose (): Connects two CompletableFutures and returns a new CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));
CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));
System.out.println(future3.join());
Copy the code
applyToEither(..)  acceptEither(..)  runAfterEither(..)
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
Copy the code

Scenario: If you want to query commodity A, there are two ways,A and B, but the execution speed of a and B is different, we want to use the return value of which is returned first.

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Obtain good A through Method A";
        });
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Obtain Good A through mode B";
        });

CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "The results." + product);
System.out.println(futureC.join()); // Result: Item A is obtained through mode A
Copy the code

Similarly, acceptEither() and runAfterEither() are siblings of applyToEither, and I don’t think I need to explain how to use them.

exceptionally(..)
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
Copy the code
  • Function: When an exception occurs, this method can be called to compensate for some actions, such as setting default values.
  • Scenario: Perform task A asynchronously to obtain the result. If an exception is thrown during task A’s execution, the default value 100 is returned.
CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + (100 / 0))
                .thenApply(s -> "futureA result:" + s)
                .exceptionally(e -> {
                    System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                });
CompletableFuture<String> futureB = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + 50)
                .thenApply(s -> "futureB result:" + s)
                .exceptionally(e -> "futureB result: 100");
System.out.println(futureA.join());//futureA result: 100
System.out.println(futureB.join());//futureB result: Result :50
Copy the code

The above code shows the normal process and the occurrence of exceptions, which can be understood as catch, and can be understood according to the return value.

whenComplete(..)
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
Copy the code

Function: When the CompletableFuture is finished, or an exception is thrown, the whenComplete method can be executed, for example

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .whenComplete((s, e) -> {
                    if(s ! =null) {
                        System.out.println(s);/ / not executed
                    }
                    if (e == null) {
                        System.out.println(s);/ / not executed
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                })
                .exceptionally(e -> {
                    System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
             return "futureA result: 100"; }); 
System.out.println(futureA.join());//futureA result: 100
Copy the code

ThenApply -> supplyAsync->whenComplete-> Exceptionally. ThenApply does not continue to execute the supplyAsync->whenComplete-> continuously It is executed when it returns normally. Note that whenComplete returns no value regardless of whether it executes properly.

The above code uses the functional programming style and calls whenComplete continuously. What happens if whenComplete is exceptionally then?

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .whenComplete((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());/ / not executed}}); System.out.println(futureA.join());//futureA result: 100
Copy the code

The code is exceptionally followed by whenComplete. As shown, the result received in whenComplete is exceptionally a normal result and is beautified because the exception is exceptionally handled and the default value is returned As a result, this is something to watch out for.

handle(..)
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
Copy the code

Function: When the CompletableFuture computes the result, or throws an exception, the Handle method can handle the result

 CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());/ / not executed
                    }
                    return "handle result:" + (s == null ? "500" : s);
                });
System.out.println(futureA.join());//handle result:futureA result: 100
Copy the code

From the console, we can see that the last print is Handle result:futureA result: (exceptionally) Then handle gets a continuously returned exception. If the exception is “exceptionally” and the default is returned, then Handle is exceptionally returned.

 CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Execution Result :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);/ / not executed
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                    return "handle result:" + (s == null ? "500" : s);
                })
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); / / not executed
                    return "futureA result: 100";
                });
System.out.println(futureA.join());//handle result:500
Copy the code

Based on the console output, handle was executed first, printed the exception, and set the default value 500. Exceptionally, it didn’t execute because it got the value returned by Handle. From this, we can deduce the difference between Handle and whenComplete

  1. Handle has a return value,whenComplete does not
  2. Since 1 is present, Handle is exceptionally enabled ona feature
allOf(..)  anyOf(..)
public static CompletableFuture<Void>  allOf(CompletableFuture
       ... cfs)
public static CompletableFuture<Object>  anyOf(CompletableFuture
       ... cfs)
Copy the code
  • AllOf: Computes when all completableFutures have been executed
  • AnyOf: The fastest CompletableFuture executes the computation after it’s done

Scenario 2: To query the details of a product, you need to query the product information, seller information, inventory information, and order information respectively. These queries are independent of each other. Assume that each query takes one to two seconds for different services, and the total query time is less than two seconds.

public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Details of Commodity";
        },executorService);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Seller information";
        },executorService);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Inventory information";
        },executorService);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Order Information";
        },executorService);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
        allFuture.join();

        System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());
        System.out.println("Total time :" + (System.currentTimeMillis() - start));
    }
Copy the code