JDK concurrent library

1.1 the Future

The downside of a Future is that it can’t be asynchronous. The main thread calls get() when further processing is needed. But the timing of a call to get() is hard to control; calling it immediately is serial, and calling it after a long time wastes waiting time.

1.2 CompletableFuture

In jdk1.8, Java launched CompletableFuture. After the asynchronous task is completed, there is no need to wait for the return result of the task. ThenAccept, thenApply, thenCompose and other methods are directly used to process the result of the asynchronous call.

2. Business scenarios

Currently, the concurrency scenarios applied in the project are mainly the remote RPC interface calls, which are IO intensive scenarios.

  • In the first scenario, two RPC interfaces need to be called separately, in no order, and data processing needs to rely on the data returned by the two interfaces simultaneously.
  • The second scenario is that the RPC interface needs to be called circularly within 100 times, and the returned results will be processed uniformly after all the calls.

Third, concrete implementation

3.1 Data processing after invocation of the two interfaces

  • supplyAsync

Used to perform asynchronous operations and can support return values. A method that does not specify Executor uses ForkJoinPool.commonPool() as its thread pool to execute asynchronous code. Runs with the specified thread pool, if specified.

  • thenCombineAsync

ThenCombine will deliver the results of both tasks to thenCombine for processing after completing the tasks of both completion stages.

  • join

The Join method in the CompletableFuture class has the same meaning as get in the Future interface, waiting to finish running.

    public List<String> findIdList(String subjectId,Integer classType) {
        CompletableFuture<List<String>> aIds = CompletableFuture
                .supplyAsync(() -> clientA.queryClassIdList("1", classType), TestThreadExecutor.getInstance());

        CompletableFuture<List<String>> bIds = CompletableFuture
                .supplyAsync(() -> clientB.queryClassIdList("2", classType), TestThreadExecutor.getInstance());

        CompletableFuture<List<String>> unionClassIds = aIds
                .thenCombineAsync(bIds, (aIds, bIds) -> {
                    aIds.addAll(bIds);
                    return aIds.stream().distinct().collect(Collectors.toList());
                }, TestThreadExecutor.getInstance());

        return unionClassIds
                .exceptionally(exception -> {
                    System.out.println("Something's wrong.");
                    return List.of();
                })
                .join();
    }
Copy the code

3.2 Circularly calling the same interface

  • allOf

The main thread will block until all the threads in allOf have finished executing, and the thread will wake up.

List<String> res = new ArrayList<>();
List<String> idList = Lists.of("1"."2"."3"."4"."5"."6"); CompletableFuture[] cfs = idList.stream().map(object-> CompletableFuture.supplyAsync(()->client.queryClassIdList(object,  classType), TestThreadExecutor.getInstance()) .thenApply(dtoList->{if(CollectionUtils.isEmpty(dtoList)){
                              return "";
                          } else {
                              returnclaCourseId; }})// If you need to obtain the task completion order, this code can be used
                      .whenComplete((v, e) ->                        
                      	  if(! StringUtils.isEmpty(v)){ bindClassCourseList.add(v); } })).toArray(CompletableFuture[]::new);
              // Wait for the total task to complete, but there is no return value after encapsulation, must be acquired by whenComplete()
              CompletableFuture.allOf(cfs).join();
Copy the code

3.3 Customizing thread Pools

In actual development, we will customize the thread pool on a case-by-case basis. Here is a rough template.

public class TestThreadExecutor {

    /** * Thread pool name */
    private static final String TEST_THREAD_POOL = "teacher-city-code-thread-pool";

    /** * Number of available CPU cores */
    private static final int DEFAULT_CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();

    /** * Spring supports thread pool task wrapper classes */
    private static final ThreadPoolTaskExecutor TEST_THREAD_EXECUTOR = new ThreadPoolTaskExecutor();

    /** * Default thread pool */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(TEST_THREAD_POOL);

    static {
        TEST_THREAD_EXECUTOR.setThreadFactory(DEFAULT_THREAD_FACTORY);
        // Services in this thread pool are IO intensive. Set the number of core threads to CPU cores x 2
        TEST_THREAD_EXECUTOR.setCorePoolSize(DEFAULT_CPU_PROCESSORS * 2);
        TEST_THREAD_EXECUTOR.setMaxPoolSize(DEFAULT_CPU_PROCESSORS * 25);
        TEST_THREAD_EXECUTOR.setQueueCapacity(1024);
        // The default value is 60seconds. The purpose of the display declaration is to visualize the thread lifetime in the thread pool
        TEST_THREAD_EXECUTOR.setKeepAliveSeconds(60);
        //CallerRunsPolicy This rejection policy represents that if the work queue is full (exceeding QueueCapacity) and MaxPoolSize reaches the threshold, the current task is invoked using the main thread
        TEST_THREAD_EXECUTOR.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // Core threads will recycle if they exceed keepTimeOut. The reason for setting this parameter is that other Maven-Modules may not need to rely on this thread pool, resulting in a waste of thread resources
        TEST_THREAD_EXECUTOR.setAllowCoreThreadTimeOut(true);
        TEST_THREAD_EXECUTOR.initialize();
    }

    /** * get the thread pool task instance **@return ThreadPoolTaskExecutor
     */
    public static ThreadPoolTaskExecutor getInstance(a) {
        returnTEST_THREAD_EXECUTOR; }}Copy the code

Iv. Reference materials

  • Some API: www.jianshu.com/p/6bac52527…
  • www.cnblogs.com/dennyzhangd…