When performing some time-consuming tasks, you want to perform multithreading calculations, and then asynchronously return each result for combined processing. In Java5, you can achieve this through Java thread pool and Future interface.

Introduction to the Future Api documentation

  • public interface Future<V>
    Copy the code

    A Future represents the result of an asynchronous computation. Provides methods to check if the calculation is complete, wait for it to complete, and retrieve the results of the calculation. Results can only be retrieved using the method get when the calculation is complete, if necessary, until blocked. Cancellation is done by the cancel method. Additional methods are provided to determine whether a task completed properly or was cancelled. Once the calculation is complete, the calculation cannot be cancelled. If you want to use a Future for possible reasons, but do not provide a usable result, you can declare the form Future

    There are only a few simple methods:

The return value methods
boolean cancel(boolean mayInterruptIfRunning)Attempted to cancel execution of this task.
V get()Wait, if necessary, for the calculation to complete, and then retrieve its results.
V get(long timeout, TimeUnit unit)If needed, complete in most of the given computation time and then retrieve its results, if available.
boolean isCancelled()returntrueIf this task is completed before cancelling normal.
boolean isDone()returntrueIf you do this task.

Let me give you an example

Create two tasks

public class Task2 implements Callable<String> {
    @Override
    public String call(a) throws Exception {
         /** * returns a string after calculation, which takes 3 seconds */
        Thread.sleep(3000);
        return "world"; }}public class Task1 implements Callable<String> {
    @Override
    public String call(a) throws Exception {
        /** * returns a string after calculation, which takes 1 second */
        Thread.sleep(1000);
        return "hello"; }}Copy the code

Create a method to perform the above two tasks and merge the results

     /** * merge the result **@return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static String test(a) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        /** ** sumbmit the Callable argument passed to sumbmit is also a functional interface, which can also be written as:  * * Future
      
        task1 = executorService.submit(() -> { * Thread.sleep(1000); * return "hello"; *}); * * /
      
        Future<String> task1 = executorService.submit(new Task1());
        Future<String> task2 = executorService.submit(new Task2());

        /** * process other tasks, return result */
        Thread.sleep(3000);
        String s3 = ", I'm coming";

        String s = task1.get();
        String s1 = task2.get();

        long end = System.currentTimeMillis();

        System.out.println("Total time:" + (end - start) / 1000);

        executorService.shutdown();
        return s + "" + s1 + s3;
    }
Copy the code

Executes the method and finds that the entire method takes three seconds. In the Future, two time-consuming operations have been triggered, freeing up the calling thread to do other valuable work instead of waiting for time-consuming operations to complete

   public static void main(String[] args) throws ExecutionException, InterruptedException {

        String result = test();
        System.out.println(result);
// Total time: 3
// hello world, I'm coming
    }
Copy the code

Limitations of the Future interface

While the Future interface provides methods to operate asynchronously, allowing calling threads to process other tasks and wait for tasks to complete to retrieve the results of interface calculations, these features are not sufficient to write clean code and meet everyday development needs, such as

  • For example, several asynchronous computations are merged into one, and the asynchronous computations have sequencing problems

  • Wait for all asynchronous computations to complete

  • Only handle tasks that are completed first (such as calling two identical third-party apis, which takes less time to call, reducing interface time)

CompletableFuture use

To address the limitations of the Future interface, let’s start with a few examples of how CompletableFuture makes up for these limitations

1. Sequencing

  CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            /** * the calculation returns a string */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (x, y) -> {
            return x + y;
        });

        System.out.println("s:"+stringCompletableFuture.join());

        /** * s:helloworld */
Copy the code

2. Only deal with tasks that get done first

        CompletableFuture<String> sCf = CompletableFuture.supplyAsync(() -> {
            /** * the calculation returns a string */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }).applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "b";
        }), (x) -> {
            return x;
        });
        System.out.println("s:"+sCf.join());
        /** * s:a */Or CompletableFuture < String > sCf = CompletableFuture. SupplyAsync (() - > {/** * the calculation returns a string */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        });
        CompletableFuture<String> sCf2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "b";
        });
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(sCf, sCf2);

        String join = (String) objectCompletableFuture.join();




Copy the code

3. Wait for all asynchronous computing tasks to complete

        long start = System.currentTimeMillis();
        CompletableFuture<String> sCf = CompletableFuture.supplyAsync(() -> {
            /** * the calculation returns a string */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        });
        CompletableFuture<String> sCf2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "b";
        });
        CompletableFuture<String> sCf3= CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "c";
        });
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(sCf, sCf2,sCf3);
        voidCompletableFuture.join();
        long end = System.currentTimeMillis();
        System.out.println("Total time:" + (end - start) / 1000);

        System.out.println();
        ArrayList<CompletableFuture<String>> cfList = new ArrayList<>();
        cfList.add(sCf);
        cfList.add(sCf2);
        cfList.add(sCf3);
        cfList.stream().map(CompletableFuture::join).forEach(System.out::println);

        /** * Total time: 5 * A * B * C */
Copy the code

4. Cooperate with thread pool processing

CompletableFuture is implemented by default using a thread pool ForkJoin (the number of threads in the thread pool is equal to the number of computer cores -1. If the number of cores is too small in a production environment and the processing logic is too complex, the number of threads waiting to execute will slow down the server). So we use a custom thread pool implementation, for example

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<String> sCf3= CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "c";
        },executorService);
Copy the code

5. Exception handling

CompletableFuture provides many methods to meet the various needs of ordinary asynchronous computation. Some methods are suffixed with Async, indicating that the processing of the method is still handled by the thread pool. If not, it means that the calling thread will handle it. Also provides the exception processing to throw, get the thread specific exception prompt information

    ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<String> sCf3 = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "c";
        }, executorService).exceptionally(e -> {
            e.printStackTrace();
            return"1";
        });
        String str = sCf3.join();
        System.out.println("str:"+str);
        executorService.shutdown();
Copy the code