preface

Most cpus today are multi-core, and we all know that if we want to increase the efficiency of our applications, we must take full advantage of the computing power of multi-core cpus.

Java has long provided us with a multithreaded API, but the implementation is a bit cumbersome, and today we will take a look at the improvements that Java8 offers in this area.

What-if scenarios

Now you need to provide an API for online education platform to query user details. This interface needs to return user basic information, tag information, these two information stored in different locations, need to remote call to obtain these two information;

To simulate the remote call, we need to delay it for 1s in the code.

public interface RemoteLoader {

    String load();

    default void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class CustomerInfoService implements RemoteLoader {
    public String load() {
        this.delay();
        return "Basic Information";
    }
}

public class LearnRecordService implements RemoteLoader {
    public String load() {
        this.delay();
        return "Learning information"; }}Copy the code

The version is implemented in synchronous mode

If we implement the API in a synchronous manner, our implementation code:

@Test
public void testSync() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

Unsurprisingly, since both interfaces are called 1s late, the result is greater than 2 seconds

Future implementation version

Let’s use this example to implement an asynchronous version of a Future provided by Java7. The code is as follows:

@Test
public void testFuture() {
    long start = System.currentTimeMillis();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<Future<String>> futures = remoteLoaders.stream()
            .map(remoteLoader -> executorService.submit(remoteLoader::load))
            .collect(toList());

    List<String> customerDetail = futures.stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .filter(Objects::nonNull)
            .collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

This time we used multi-threaded way to transform our example, the result is relatively satisfactory, the time spent about a little more than 1s

Future.get () will block the execution of one task and then commit the next. This will not be asynchronous

Here we can see that while the Future does what we want it to do, it gets a little messy if we need to implement merging two asynchronous results.

I’m not going to go into detail here, but I’m going to focus on what CompletableFuture has done in this regard

Java8 parallel flows

We use Java8 before the implementation of the method, let’s look at the Java8 to provide parallel flow to practice our example.

@Test
public void testParallelStream() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

The result of the run is still quite satisfactory, taking a little more than 1s

Compared to previous Java8 implementations, we found that the entire code is much more concise;

To change our example, the interface for querying user details also needs to return video viewing history, user tag information, and purchase order

public class WatchRecordService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "Viewing record";
    }
}

public class OrderService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "Order Information";
    }
}

public class LabelService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "Label Information"; }}Copy the code

Let’s go ahead and use the parallel flow provided by Java8 to see if it works

@Test
public void testParallelStream2() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

However, the results of this run were not very good, taking more than 2 seconds

CompletableFuture

Basic usage

@Test
public void testCompletableFuture() {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        doSomething();
        future.complete("Finish");          // Set the result returned after the task is completed
    }).start();
    System.out.println(future.join());      // Get the result returned by the task thread
}

private void doSomething() {
    System.out.println("doSomething...");
}
Copy the code

A problem with this usage is that the main thread will not be aware of the exception, and the task thread will not throw the exception.

This causes the main thread to wait forever, and usually we need to know what exception has occurred and respond accordingly;

Improved way is try-catch all anomalies in the task, and then call future.com pleteExceptionally (e), the code is as follows:

@Test
public void testCompletableFuture() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        try {
            doSomething();
            future.complete("Finish");
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }).start();
    System.out.println(future.get());
}

private void doSomething() {
    System.out.println("doSomething...");
    throw new RuntimeException("Test Exception");
}
Copy the code

Now, using CompletableFuture has a lot of things to do, and it’s not very neat, and it looks like a lot of trouble;

But that’s just the surface. Java8 encapsulates this process and provides a lot of simplicity. Now let’s look at how to modify the above code. Okay

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        doSomething();
        return "Finish";
    });
    System.out.println(future.get());
}
Copy the code

We’re using supplyAsync, so it looks a lot simpler, the world is a lot brighter;

Java8 not only provides supplyAsync, which allows tasks to return results, but also provides runAsync with no return value;

We can pay more attention to the development of business, do not need to deal with the management of exceptions and errors

CompletableFuture Exception handling

The main thread is exceptionally needed if it needs to care about what exception happened to the task

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                doSomething();
                return "Finish";
            })
            .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage());
    System.out.println(future.get());
}
Copy the code

Use CompletableFuture to complete our API for querying user details

@Test
public void testCompletableFuture3() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());
    
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

Here, the two streams are used again, and the result is as follows:

This result is not satisfactory, and is similar to that of the parallel flow, consuming more than 2 seconds.

We’re doing so much work with CompletableFuture in this scenario, and it’s not working so well, is there another way to make it faster?

To solve this problem, we need to take a closer look at how parallel streams and CompletableFuture are implemented.

The size of the underlying thread pool they use is the number of CPU cores runtime.getruntime ().availableProcessors();

So let’s try changing the size of the thread pool and see what happens?

Custom thread pool, optimized for CompletableFuture

You can’t customize a thread pool with parallel streams, but CompletableFuture can

@Test
public void testCompletableFuture4() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    
    ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
    
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());

    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("Total time spent :" + (end - start));
}
Copy the code

Let’s use a custom thread pool, set the maximum number of thread pools to 50, and see what happens

The result of this execution is more satisfactory, more than 1 second; In theory, this result can continue until the thread pool size is 50

How do I choose between a parallel stream and a CompletableFuture

How do you choose between the two

  • If your task is computationally intensive and there are no I/O operations, then it is recommended that you choose the parallel Stream of Stream as simple parallelism is the most efficient
  • This is recommended if your task involves frequent I/O or network connection operationsCompletableFuture, using a custom thread pool, according to the situation of the server to set the size of the thread pool, as much as possible to keep the CPU busy

Other common methods for CompletableFuture

  • thenApply,thenApplyAsync: If the task is completed, there is a need for subsequent operations, such as parsing the return results, etc. There are two ways to do this
  • thenCompose,thenComposeAsync: allows you to pipeline two asynchronous operations, passing the result of the first operation to the second operation when the first operation is complete
  • thenCombine,thenCombineAsync: allows you to combine two asynchronous operations. For example, concatenate the results of the first and second operations

conclusion

  • Java8 parallel flow usage
  • The way CompletableFuture is used, the exception handling mechanism, gives us the opportunity to manage the exceptions that are sent during task execution
  • Java8 parallel streams andCompletableFutureHow to choose between the two
  • A common method for CompletableFuture

From: juejin. Cn/post / 6897844374093496328