Those familiar with Java multithreaded programming know that when we create too many threads, it is easy to cause memory overflow, so it is necessary to use thread pool technology. Recently read some relevant articles, and personally studied the source code, found some articles or some problems, so I also summed up, in this dedication to you.

Advantages of thread pools

In general, thread pools have the following advantages: (1) reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads. (2) Improve the response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created. (3) Improve the manageability of threads. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.

Use of thread pools

The real implementation class for a thread pool is ThreadPoolExecutor, which has four constructors:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

As you can see, it takes the following parameters:

  • corePoolSizeRequired: Number of core threads. By default, the core thread is always alive, but when theallowCoreThreadTimeoutWhen set to true, the core thread will also timeout reclaim.
  • MaximumPoolSize (required) : The maximum number of threads that a thread pool can hold. When the number of active threads reaches this value, subsequent new tasks will block.
  • keepAliveTime(Required) : Idle timeout of a thread. If this time is exceeded, non-core threads are reclaimed. If you haveallowCoreThreadTimeoutWhen set to true, the core thread will also timeout reclaim.
  • unit(Required) : Specifies the time unit for the keepAliveTime parameter. Commonly used are:TimeUnit.MILLISECONDS(ms),TimeUnit.SECONDS(in seconds),TimeUnit.MINUTES(points).
  • WorkQueue (required) : Task queue. Runnable objects submitted via the thread pool’s execute() method will be stored in this parameter.
  • ThreadFactory (optional) : threadFactory. Used to specify how new threads are created for the thread pool.
  • Handler (Optional) : rejects the policy. The saturation strategy that needs to be executed when the maximum number of threads is reached.

The thread pool usage flow is as follows:

// Create a thread pool
Executor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
                                             MAXIMUM_POOL_SIZE,
                                             KEEP_ALIVE,
                                             TimeUnit.SECONDS,
                                             sPoolWorkQueue,
                                             sThreadFactory);
// Submit tasks to the thread pool
threadPool.execute(new Runnable() {
    @Override
    public void run(a) {...// The task executed by the thread}});// Close the thread pool
threadPool.shutdown(); // Set the state of the thread pool to SHUTDOWN, and then interrupt all threads that are not executing tasks
threadPool.shutdownNow(); // Set the state of the thread pool to STOP, then try to STOP all threads executing or suspending tasks and return to the list of waiting tasks
Copy the code

3 How thread pools work

Let’s describe how thread pools work and get a better understanding of the parameters. Its working principle flow chart is as follows:

From the diagram above, I think you have an idea of all the parameters. In fact, there is no distinction between core threads and threads in the thread pool. Let’s talk more about task queues, thread factories, and rejection policies.

4 Thread pool parameters

4.1 workQueue

Task queues are implemented based on blocking queues, using the producer-consumer pattern, with the BlockingQueue interface implemented in Java. But Java already provides us with seven implementations of blocking queues:

  1. ArrayBlockingQueue: A bounded blocking queue consisting of an array structure that implements a circular queue with Pointers.
  2. LinkedBlockingQueue: a bounded blocking queue consisting of a linked list structure. If the capacity is not specified, the capacity defaults toInteger.MAX_VALUE.
  3. PriorityBlockingQueue: an unbounded blocking queue that supports priority sorting, with no requirements on elements, can be implementedComparableInterfaces can also provide comparators to compare elements in queues. It has nothing to do with time, it’s just prioritizing tasks.
  4. DelayQueue: similar toPriorityBlockingQueue, is an unbounded priority blocking queue implemented by binary heap. Elements are required to be implementedDelayedInterface to extract tasks from the queue by executing the delay. The task cannot be extracted until the time is up.
  5. SynchronousQueue: a blocking queue that does not store elements, which blocks when a consumer thread calls take() until a producer thread produces an element, which the consumer thread can retrieve and return; The producer thread also blocks when it calls the put() method and does not return until one of the consumer threads consumes an element.
  6. LinkedBlockingDeque: a bounded two-ended blocking queue implemented using a two-way queue. Double-endian means you can FIFO (first in, first out) like a normal queue, or you can FILO (first in, last out) like a stack.
  7. LinkedTransferQueue: it isConcurrentLinkedQueue,LinkedBlockingQueueandSynchronousQueueBut use it in ThreadPoolExecutor, andLinkedBlockingQueueBehaves identically, but is an unbounded blocking queue.

Note the difference between bounded and unbounded queues: with bounded queues, the rejection policy is executed when the queue is saturated and the maximum number of threads is exceeded. With unbounded queues, however, it makes no sense to set maximumPoolSize because tasks can always be added to a task queue.

4.2 threadFactory

Thread factories specify how threads are created by implementing the ThreadFactory interface and the newThread(Runnable R) method. The Executors framework has implemented a default thread factory for us:

/** * The default thread factory. */
private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}Copy the code

4.3 Rejection Policy (Handler)

When the number of threads in the thread pool reaches the maximum number of threads, a rejection policy needs to be implemented. The RejectedExecutionHandler interface and the rejectedExecution(Runnable r, ThreadPoolExecutor Executor) method are required for the RejectedExecutionHandler policy. But the Executors framework has implemented 4 rejection strategies for us:

  1. AbortPolicy (default): Discards the task and throws itRejectedExecutionExceptionThe exception.
  2. CallerRunsPolicy: This task is handled by the calling thread.
  3. DiscardPolicy: Discards the task without throwing an exception. You can customize the processing with this pattern.
  4. DiscardOldestPolicy: Discards the earliest unprocessed task in the queue and tries to execute the task again.

5 Functional thread pools

Too cumbersome to use thread pools above? 4 common functional thread pools have been wrapped up by Executors, as follows:

  • Fixed length thread pool (FixedThreadPool)
  • Timed thread pool (SchedledThreadPool)
  • Cacheable Thread pool
  • Single threaded thread pool (SingleThreadExecutor)

5.1 Fixed Length ThreadPool (FixedThreadPool)

Create method source:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
Copy the code
  • Features: there are only core threads, the number of threads is fixed, and the task queue is a bounded queue with linked list structure.
  • Application scenario: Controls the maximum number of concurrent threads.

Example:

Create a thread pool object & set the number of threads in the pool to 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. Create a Runnable thread object & a task to execute
Runnable task =new Runnable(){
  public void run(a) {
     System.out.println("On a mission."); }};// 3. Submit the task to the thread pool
fixedThreadPool.execute(task);
Copy the code

5.2 ScheduledThreadPool (ScheduledThreadPool)

Create method source:

private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
Copy the code
  • Features: the number of core threads is fixed, and the number of non-core threads is unlimited. The task queue is a delayed blocking queue.
  • Application scenario: Perform scheduled or periodic tasks.

Example:

Create a timer thread pool object & set the number of threads in the pool to 5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. Create a Runnable thread object & a task to execute
Runnable task =new Runnable(){
  public void run(a) {
     System.out.println("On a mission."); }};// 3. Submit the task to the thread pool
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // The task will be executed after 1s
scheduledThreadPool.scheduleAtFixedRate(task,10.1000,TimeUnit.MILLISECONDS);// perform tasks every 1000ms after a delay of 10ms
Copy the code

5.3 Cacheable ThreadPool

Create method source:

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
Copy the code
  • Features: No core thread, the number of non-core threads is unlimited, the execution is idle for 60 seconds after recycling, the task queue is a blocking queue that does not store elements.
  • Application scenario: Perform a large number of time-consuming tasks.

Example:

// 1. Create cacheable thread pool objects
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. Create a Runnable thread object & a task to execute
Runnable task =new Runnable(){
  public void run(a) {
     System.out.println("On a mission."); }};// 3. Submit the task to the thread pool
cachedThreadPool.execute(task);
Copy the code

5.4 Single Threaded Thread Pool (SingleThreadExecutor)

Create method source:

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
Copy the code
  • Features: only 1 core thread, no more than the core thread, the execution of the immediate recovery, the task queue for the linked list structure of the bounded queue.
  • Application scenario: The operations, such as database operations and file operations, are not suitable for concurrent operations but may cause I/O block and affect UI thread response.

Example:

// 1. Create a single thread pool
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. Create a Runnable thread object & a task to execute
Runnable task =new Runnable(){
  public void run(a) {
     System.out.println("On a mission."); }};// 3. Submit the task to the thread pool
singleThreadExecutor.execute(task);
Copy the code

5.5 contrast

6 summarizes

Use the Thread pool for the Executors. If so, you can clear the running rules of the thread pool and avoid resource depletion. If so, you can complete the following steps: 1.

Executors have the following disadvantages:

  • FixedThreadPoolandSingleThreadExecutor: The main problem is the use of stacked request processing queuesLinkedBlockingQueue, may consume very large memory, or even OOM.
  • CachedThreadPoolandScheduledThreadPoolThe main problem is that the maximum number of threads isInteger.MAX_VALUE, may create a very large number of threads, or even OOM.

reference

  • Android multithreading: ThreadPool full parsing
  • Still creating a thread pool using Executors? Beware of Memory overflow
  • Alibaba Java Development Manual