This is the 26th day of my participation in the August More Text Challenge

In this paper, the content

  • Future model introduction and core ideas
  • The difference between the number of core threads and the maximum number of threads, and what the queue capacity represents;
  • ThreadPoolTaskExecutor saturation strategy;
  • SpringBoot asynchronous programming actual combat, understand the implementation of the code logic.

The Future model

  • Asynchronous programming is especially useful when dealing with time-consuming operations and multitasking scenarios, where we can make better use of the machine’s CPU and memory.

  • There are many multithreaded design modes, Future mode is a common design mode in multithreaded development, this paper is based on this mode to illustrate SpringBoot for asynchronous programming knowledge.

Let me briefly introduce the core idea of Future mode before actual combat. .

The core idea of Future

The core idea of the Future pattern is asynchronous invocation. When we execute a method, if there are multiple time-consuming tasks in the method that need to be done at the same time, and we don’t have to wait for the result, we can tell the client to go back immediately and then, in the background, compute the task slowly. Of course, you can choose to wait for these tasks to complete and then return to the client.

SpringBoot asynchronous programming practice

If we need to do asynchronous programming in Spring/SpringBoot, Spring provides two annotations that make it very easy.

  1. @enableAsync: Enable support for asynchronous methods by adding @enableAsync to the configuration class or Main class.
  2. @AsyncIt can work on a class or a method, and all methods that work on a class are asynchronous methods.
TaskExecutor
  • Most people don’t know much about TaskExecutor, so let’s take a moment to introduce it first. As the name suggests, it is the executor of the task. It leads the execution of the thread to handle the task, like a commander, and our thread is like an army, which can asynchronously attack the enemy.

  • Spring provides an abstraction of the TaskExecutor interface as a TaskExecutor, much like the Executor interface under the java.util.concurrent package. The slightly different TaskExecutor interface uses the Java 8 syntax @functionalinterface to declare this interface to be a FunctionalInterface.

org.springframework.core.task.TaskExecutor
@FunctionalInterface
public interface TaskExecutor extends Executor {
    void execute(Runnable var1);
}
Copy the code

If there is no custom Executor, Spring creates a SimpleAsyncTaskExecutor and uses it.

Custom AsyncConfigurer
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;


@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

  private static final int CORE_POOL_SIZE = 6;
  private static final int MAX_POOL_SIZE = 10;
  private static final int QUEUE_CAPACITY = 100;

  @Bean
  public Executor taskExecutor(a) {
    // Spring's default configuration is to have a core thread size of 1, unlimited maximum thread capacity, and unlimited queue capacity.
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // Number of core threads
    executor.setCorePoolSize(CORE_POOL_SIZE);
    // Maximum number of threads
    executor.setMaxPoolSize(MAX_POOL_SIZE);
    // Queue size
    executor.setQueueCapacity(QUEUE_CAPACITY);
    // When the maximum pool is full, this policy guarantees that no task requests will be lost, but it may affect overall application performance.
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
    executor.initialize();
    returnexecutor; }}Copy the code
Common concepts of ThreadPoolTaskExecutor
  • The number of threads defines the minimum number of threads that can run at the same time.
  • Queue Capacity: When a new task arrives, the system determines whether the number of threads currently running reaches the core number. If so, trust is stored in the Queue.
  • Maximum Pool Size: When the number of tasks in a queue reaches the queue capacity, the number of threads that can run concurrently becomes the Maximum number of threads.

In general, you don’t set the queue size to integer.max_value, and you don’t set the number of core threads to the same size as the maximum number of threads, so the maximum number of threads doesn’t make any sense, and you don’t know what the current CPU and memory utilization is.

If the queue is full and the number of threads currently running at the same time reaches the maximum, what happens if a new task comes in? Spring is used by default ThreadPoolExecutor. AbortPolicy (ThreadPoolExecutor throws RejectedExecutionException to reject the new task, This means you will lose the processing of the task.

For scalable applications, it is recommended to use ThreadPoolExecutor. CallerRunsPolicy, as the largest pool is filled in with CiCe slightly we provide scalable queue.

ThreadPoolTaskExecutor saturation policy definition:

If the maximum number of threads currently running at the same time is reached, ThreadPoolTaskExecutor defines some policies:

  • ThreadPoolExecutor. AbortPolicy: throw RejectedExecutionException to reject the processing of a new task.
  • ThreadPoolExecutor. CallerRunsPolicy: call own thread running tasks. You do not task requests. However, this strategy will reduce the speed of submitting new tasks and affect the overall performance of the program.
    • In addition, this policy likes to increase the queue capacity. You can choose this strategy if your application can withstand this delay and you cannot task drop a single task request.
  • ThreadPoolExecutor. DiscardPolicy: does not handle the new task, discarded directly.
  • ThreadPoolExecutor. DiscardOldestPolicy: this policy will discard the first requests pending tasks.
  1. Write an asynchronous method

The @async annotation tells Spring that this method is asynchronous. In addition, the return value of this method CompletableFuture.com pletedFuture (results) this means that we need to return a result, that is to say, the program must finish the task and then returned to the user.

Notice that the first line of the completableFutureTask method prints the log, which will be used later in the analysis program. It’s important!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Service
public class AsyncService {
  private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
  private List<String> movies =
      new ArrayList<>(
          Arrays.asList(
              "Forrest Gump"."Titanic"."Spirited Away"."The Shawshank Redemption"."Zootopia"."Farewell "."Joker"."Crawl"));
  /** Example use: find the movie */ at the beginning of the specified character/string
  @Async
  public CompletableFuture<List<String>> completableFutureTask(String start) {
    // Prints logs
    logger.warn(Thread.currentThread().getName() + "start this task!");
    // Find the movie at the beginning of the specified character/string
    List<String> results =
        movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
    // This is a time-consuming task
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    // Returns a new CompletableFuture that has been completed with the given value.
    returnCompletableFuture.completedFuture(results); }}Copy the code
  1. Test asynchronous methods written
@RestController
@RequestMapping("/async")
public class AsyncController {

  @Autowired 
  AsyncService asyncService;

  @GetMapping("/movies")
  public String completableFutureTask(a) throws ExecutionException, InterruptedException {
    // Start time
    long start = System.currentTimeMillis();
    // Start performing a large number of asynchronous tasks
    List<String> words = Arrays.asList("F"."T"."S"."Z"."J"."C");
    List<CompletableFuture<List<String>>> completableFutureList = words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
    The completableFuture.join () method takes their results and concatenates them
    List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
    // It takes time to print the results and run the program
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    returnresults.toString(); }}Copy the code

Requesting this interface, the console prints the following:

The 13:50:17 2019-10-01. 18793-007 WARN [lTaskExecutor - 1] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-1start this task! The 2019-10-01 13:50:17. 18793-007 WARN [lTaskExecutor - 6] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-6start this task! The 2019-10-01 13:50:17. 18793-007 WARN [lTaskExecutor - 5] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-5start this task! The 2019-10-01 13:50:17. 18793-007 WARN [lTaskExecutor - 4] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-4start this task! The 2019-10-01 13:50:17. 18793-007 WARN [lTaskExecutor - 3] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-3start this task! The 2019-10-01 13:50:17. 18793-007 WARN [lTaskExecutor - 2] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-2start this task! Elapsed time: 1010Copy the code

First of all, we can see that it takes about 1 second to process all the tasks. This is related to our custom ThreadPoolTaskExecutor, where we configured the number of core threads to be 6, and then assigned six tasks to the system to execute through the following code simulation. So each thread is assigned a task, and each task takes 1 s to execute, so the total time to process six tasks is 1 s.

List<String> words = Arrays.asList("F"."T"."S"."Z"."J"."C");  
List<CompletableFuture<List<String>>> completableFutureList =
        words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
Copy the code

Try changing the number of core threads to 3 and request the interface again and you’ll see that it takes about 2 seconds to process all the tasks.

Special cases do not return a value

Also, as you can see from the above run results, the results are not returned until all tasks are completed. This situation corresponds to the case where we need to return the result to the client for the request, but what if we don’t need to return the task execution result to the client? For example, if we upload a large file to the system, we can upload it successfully as long as the format of the large file meets the requirements. Normally we would have to wait for the file to be uploaded before sending a message back to the user, but this would be slow. With asynchrony, a message is returned to the user as soon as the user uploads, and the system silently processes the upload. This also adds a bit of trouble, as files may fail to be uploaded, so the system also needs some mechanism to compensate for this, such as sending a message to the user when uploading problems occur.

Here is an example of a case where the client does not need to return the result:

Change the completableFutureTask method to a void type
@Async
public void completableFutureTask(String start) {...// It is possible that the system will process the result of the task execution, such as saving to the database and so on......
  //doSomeThingWithResults(results);
}
Copy the code
The Controller code is modified as follows:
  @GetMapping("/movies")
  public String completableFutureTask(a) throws ExecutionException, InterruptedException {
    // Start the clock
    long start = System.currentTimeMillis();
    // Kick of multiple, asynchronous lookups
    List<String> words = Arrays.asList("F"."T"."S"."Z"."J"."C");
        words.stream()
            .forEach(word -> asyncService.completableFutureTask(word));
    // Wait until they are all done
    // Print results, including elapsed time
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    return "Done";
  }
Copy the code

Requesting this interface, the console prints the following:

Elapsed time: 0 2019-10-01 14:02:44. 19051-052 WARN [lTaskExecutor - 4] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-4start this task! The 2019-10-01 14:02:44. 19051-052 WARN [lTaskExecutor - 3] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-3start this task! The 2019-10-01 14:02:44. 19051-052 WARN [lTaskExecutor - 2] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-2start this task! The 14:02:44 2019-10-01. 19051-052 WARN [lTaskExecutor - 1] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-1start this task! The 2019-10-01 14:02:44. 19051-052 WARN [lTaskExecutor - 6] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-6start this task! The 2019-10-01 14:02:44. 19051-052 WARN [lTaskExecutor - 5] G.J.A.S ervice. AsyncService: My ThreadPoolTaskExecutor-5start this task!Copy the code

You can see that the system returns the results directly to the user before the system actually executes the task.

The next chapter:

  • A comparative analysis of Future vs. CompletableFuture
  • CompetableFuture and Future source code analysis

Refer to the reference

  • Spring. IO/guides/gs/a…
  • Medium.com/trendyol-te…