The Future pattern is an extension of multithreaded concurrent programming that allows you to return results and block the current thread using the GET () method. The core idea is asynchronous invocation. When we execute a function, it may be slow, but we’re not in a hurry for the result. Therefore, we can make it return immediately, make it perform the request asynchronously, and block the calling thread to get the result when we need it.

A Future

interface represents a result that may be returned in the Future. It defines methods such as:

  • get(): Get results (may wait)
  • get(long timeout, TimeUnit unit): Gets the result, but only waits for the specified time;
  • cancel(boolean mayInterruptIfRunning): Cancel the current task.
  • isDone(): Checks whether the task is complete.

Goods query

A simple example of a production practice that needs to query commodity information (including basic commodity information, commodity price, commodity inventory, commodity picture, commodity sales status, etc.) while maintaining a promotional campaign. This information is distributed in different business centers and served by different systems. Assuming that an interface needs 50ms, then a commodity query needs 200ms-300ms, which is not satisfactory for us. If the Future transformation is used, all you need is the interface for the longest time service, which is about 50ms.

Of course, what is not solved here is a problem where the interface service is suddenly slow, and to solve this problem, you need to assist other components (flow control, degradation, etc.). The pseudocode is as follows:

public class FutureTest {
    static class T1Task implements Callable<String> {
        @Override
        public String call(a) throws Exception {
            System.out.println("T1: Query basic commodity information...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "Basic commodity information query succeeded"; }}static class T2Task implements Callable<String> {
        @Override
        public String call(a) throws Exception {
            System.out.println("T2: Query commodity price...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "Commodity price enquiry successful"; }}static class T3Task implements Callable<String> {
        @Override
        public String call(a) throws Exception {
            System.out.println("T3: Query commodity inventory...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "Merchandise inventory query successful"; }}static class T4Task implements Callable<String> {
        @Override
        public String call(a) throws Exception {
            System.out.println(T4: Query commodity picture...);
            TimeUnit.MILLISECONDS.sleep(50);
            return "Product picture query succeeded"; }}static class T5Task implements Callable<String> {
        @Override
        public String call(a) throws Exception {
            System.out.println("T5: Query product sales status...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "Product sales status query successful"; }}public static void main(String[] args) throws InterruptedException {
        FutureTask<String> ft1 = new FutureTask<>(new T1Task());
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());
        FutureTask<String> ft3 = new FutureTask<>(new T3Task());
        FutureTask<String> ft4 = new FutureTask<>(new T4Task());
        FutureTask<String> ft5 = new FutureTask<>(new T5Task());
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.submit(ft1);
        executorService.submit(ft2);
        executorService.submit(ft3);
        executorService.submit(ft4);
        executorService.submit(ft5);
        // Create a blocking queue
        BlockingQueue<String> bq = new LinkedBlockingQueue<>();
        System.out.println(System.currentTimeMillis());
        executorService.execute(() -> {
            try {
                bq.put(ft1.get());
                bq.put(ft2.get());
                bq.put(ft3.get());
                bq.put(ft4.get());
                bq.put(ft4.get());
            } catch(Exception e) { e.printStackTrace(); }}); System.out.println(System.currentTimeMillis());for (int i = 0; i < 5; i++) { System.out.println(bq.take()); } executorService.shutdown(); }}Copy the code

The working principle of

Enhanced Future: CompletableFuture

CompletableFuture is a very large tool class added to Java8. It implements not only the Future interface, but also the CompletionStage interface, which has over 40 methods in total (for process calls in functional programming).

When you get asynchronous execution results using a Future, you either call the blocking method get() or poll to see if isDone() is true, neither of which is very good because the main thread is forced to wait, too.

CompletableFuture improves on the Future by passing in a callback object whose callback method is automatically called when an asynchronous task completes or an exception occurs. A simple example:

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // Create asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(
            CompletableFutureDemo::fetchPrice);
        // If executed successfully:
        cf.thenAccept((result) -> System.out.println("price: " + result));
        // If the execution is abnormal:
        cf.exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        // The main thread should not end immediately, otherwise the thread pool used by CompletableFuture by default will be closed immediately:
        Thread.sleep(200);
    }

    static Double fetchPrice(a) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20; }}Copy the code

Create a CompletableFuture is through CompletableFuture supplyAsync () implementation, it needs a realized: Supplier interface objects:

public interface Supplier<T> {
    T get(a);
}
Copy the code

Here we use the lambda syntax simplifies the direct incoming CompletableFutureDemo: : fetchPrice, Because CompletableFutureDemo. FetchPrice () static method signature is in line with: Supplier interface definition (in addition to the method name).

Next, the CompletableFuture has been submitted to the default thread pool for execution, and all we need to define are instances where the CompletableFuture needs to be called back when it completes and when it fails. When finished, the CompletableFuture calls the Consumer object:

public interface Consumer<T> {
    void accept(T t);
}
Copy the code

In case of an exception, CompletableFuture calls:

public interface Function<T.R> {
    R apply(T t);
}
Copy the code

So the advantages of CompletableFuture are:

  • Methods that are automatically called back to an object when an asynchronous task ends.
  • When an asynchronous task fails, it automatically calls back an object’s method.
  • The main thread no longer cares about the execution of asynchronous tasks after the callback is set.

In addition, compleTableFutures also allow multiple CompletableFutures to be combined.

CompletionStage interface:

Describe and convergence relations:

  1. ThenCombine: Task merge with return value
  2. ThenAccepetBoth: After the two tasks are executed, thenAccepetBoth consumes the result and returns no value.
  3. RunAfterBoth: After both tasks are executed, the next operation (Runnable) is performed.

Describe the OR aggregation relationship

  1. ApplyToEither: Whichever of the two tasks executes faster is used, with a return value.
  2. AcceptEither: The swiftest execution of two tasks consumes the result. No value is returned.
  3. RunAfterEither: After a task is executed, the next operation (Runnable) is performed.

The CompletableFuture class itself provides anyOf() and allOf() to support the parallel execution of multiple CompletableFutures.

Rewrite commodity query

public class CompletableFutureTest {

    public static void main(String[] args) {


        List<String> queryList = Lists.newArrayList("Basic Commodity Information"."Commodity prices"."Commodity inventory"."Commodity picture"."Merchandise Sales Status");

        List<CompletableFuture<String>> futureList = queryList.stream()
                .map(v -> CompletableFuture.supplyAsync(() -> doQuery(v)))
                .collect(Collectors.toList());
        
        CompletableFuture<Void> allCompletableFuture = CompletableFuture
                .allOf(futureList.toArray(new CompletableFuture[0]));
        
        List<String> resultList = allCompletableFuture.thenApply(e -> 
                futureList.stream().map(CompletableFuture::join).collect(Collectors.toList())).join();
        System.out.println(resultList);

    }

    private static String doQuery(String type) {
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // Omit code
        return type + "Query successful"; }}Copy the code

CompletableFuture has a lot more to do, and there are so many interface methods out there, I’ll leave it to the reader to try it out for himself. Thanks for reading!