Excutor framework

The main parameters

  • CorePoolSize: The number of core threads in the thread pool. By default, the core thread is always alive in the thread pool, even if the core thread is idle. AllowCoreThreadTimeOut can be set to true and will be destroyed when the core thread is idle for longer than keepAliveTime

  • MaximumPoolSize: the maximum number of threads in the thread pool

  • KeepAliveTime: The amount of time the thread pool allows non-core threads to be idle. The core thread can be controlled by setting AllowCoreThreadTimeOut

  • Unit: keepAliveTime unit

  • WorkQueue: A buffer queue in a thread pool for tasks waiting to be executed

  • Handler: When the wait queue is full and the number of threads reaches the maximum, the thread pool performs subsequent operations according to the saturation strategy, AbortPolicy is the default strategy, the strategy guarantee under the condition of the thread pool with any attempt to submit tasks to the thread of the thread pool threads are thrown RejectedExecutionException.

Status of the thread pool

There are five thread pool states:

  • The RUNNING state is also the smallest, as is the state of the thread pool you just created.
  • SHUTDOWN: No new tasks will be received. The received tasks will continue to be executed
  • STOP: Stops receiving new tasks and interrupts those that have been received and are being executed
  • TIDYING, clearing the state, all tasks are stopped, the work thread is all finished
  • TERMINATED state, thread pool TERMINATED

Their circulation relationship is as follows:

Thread pool task execution

1. Add a task

  • Execute () : No return value
  • Submit (): Returns the future object, which can be retrieved by the get() method or cancelled by the cancel() method

2. Task submission process

  1. If the number of threads in the thread pool is less than corePoolSize at this point, new threads are created to handle the added tasks even if all threads in the thread pool are idle.

  2. If the number of threads in the thread pool is equal to corePoolSize, but the buffer queue workQueue is not full, then the task is put into the buffer queue.

  3. If the number of threads in the pool is greater than or equal to corePoolSize, the buffer queue is full, and the number of threads in the pool is less than maximumPoolSize, a new thread is created to handle the added task.

  4. If the number of threads in the pool is greater than corePoolSize, the buffer queue workQueue is full, and the number of threads in the pool is equal to maximumPoolSize, the task is processed using the policy specified by the handler.

  5. When the number of threads in the thread pool is greater than corePoolSize, if a thread is idle for longer than keepAliveTime, the thread is terminated. In this way, the thread pool can dynamically adjust the number of threads in the pool.

The priority of processing tasks is corePoolSize, workQueue, and maximumPoolSize. If all three are full, use handler to process rejected tasks.

3. The task is stopped

  • Shutdown: If you do not add a new task, the running task will not be stopped immediately
  • Shutdownnow: Does not add new tasks and interrupts ongoing tasks

Common blocking queue

  • ArrayBlockingQueue: A bounded blocking queue of fixed capacity for an array implementation
  • LinkedBlockingQueue: block queue implemented by a single list. The size can be set. If this is not set, the default size is integer. MAX_VALUE
  • SynchronousQueue: An empty queue that cannot store data. A put of one thread must wait for another thread to poll() or take(), returning false if there is no other thread polling () or taking ().

Common thread pools

Executors provides factory methods for creating several common thread pools

  • NewSingleThreadExecutor () single thread
public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>()));  
    }  
Copy the code

Create a single thread pool. This thread pool has only one thread working, which is equivalent to a single thread executing all tasks in serial

BlockingQueue selects LinkedBlockingQueue, which has the characteristic that it is unbounded. Since it is unbounded, the queue is never full, and when the number of threads equals the number of core threads, no non-core threads are created. This is why newSingleThreadExecutor is single-threaded

  • NewFixedThreadPool (int nThreads) Fixed-size thread pool
public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }  
Copy the code

This is similar to newSingleThreadPool, except that it is a fixed-size thread rather than a single thread

CorePoolSize and maximumPoolSize are the same size (in fact, maximumPoolSize is meaningless if you use an unbounded queue)

  • NewCachedThreadPool () unbounded thread pool
public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }  
Copy the code

The first is the unbounded thread pool, so we can see that maximumPoolSize is very large. SynchronousQueue is used on the selection of BlockingQueue. SynchronousQueue submits tasks directly to threads without holding them, where attempts to queue tasks will fail if there is no thread available to run the task immediately. So a new thread is constructed. This thread pool is ideal for a large number of lightweight tasks in a short period of time

  • NewScheduledThreadPool (int) A delayed or periodic task
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
Copy the code

This thread pool is used in Rxjava. Create a fixed – length thread pool to support scheduled and periodic task execution. The blocking queue is DelayedWorkQueue.

Pay attention to the point

The thread pool specified in the Java development manual of Alibaba requires that the thread pool be created by ThreadPoolExecutor instead of using Executors. By doing so, you can clear the running rules of the thread pool and avoid the risk of resource depletion.

* If the thread pool object returns by Executors, it has the following disadvantages:

  • FixedThreadPool and SingleThreadPool: The allowed request queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.
  • CachedThreadPool and ScheduledThreadPool: The number of threads allowed to be created is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.

Threads are system resources that incur memory overhead when they are created, and the operating system has a limit on the number of threads that can be created by a single process.

In Android, each thread initialization requires a certain amount of MMAP heap memory, and by default, it takes about 1MB of MMAP memory to initialize a thread. In /proc/pid/limits, the system limits the number of threads that can be created for each process. This limit varies from manufacturer to manufacturer. If you exceed this limit, you will still raise OOM even if there is free memory on the heap.

Use of thread pools in open source libraries

1. Use it in AsyncTask

 private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // We want at least 2 threads and at most 4 threads inthe core pool, // preferring to have 1 less than the CPU count to avoid saturating // the CPU with background work private static final  int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); Private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; Private static final int KEEP_ALIVE_SECONDS = 30; private static final ThreadFactory sThreadFactory = newThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #"+ mCount.getAndIncrement()); }}; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); /** * An {@link Executor} that can be used to execute tasksin parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;
static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() { try { r.run(); } finally { scheduleNext(); }}});if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if((mActive = mTasks.poll()) ! = null) { THREAD_POOL_EXECUTOR.execute(mActive); } } } @MainThread public final AsyncTask<Params, Progress, Result> execute(Params... params) {returnexecuteOnExecutor(sDefaultExecutor, params); // Default SerialExecutor}Copy the code

SerialExecutor creates a dual-ended ArrayDeque queue to ensure that tasks are serial, or THREAD_POOL_EXECUTOR can be used for concurrent execution

2. Use in okhttp

public synchronized ExecutorService executorService(a) {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(
      //corePoolSize 0 indicates that there are no core threads, and all threads executing requests are exhausted if they expire. KeepAliveTime is reclaimed
      //maximumPoolSize Unlimited thread pool space
          0, 
          Integer.MAX_VALUE, 
          60, 
          TimeUnit.SECONDS,
          // workQueue Tasks sent through the execute method are first cached in this queue
          SynchronousQueue is a blocking queue that does not store data
          new SynchronousQueue<Runnable>(), 
          Util.threadFactory("OkHttp Dispatcher".false));
    }
    return executorService;
  }
Copy the code

As you can see, Okhttp builds a thread pool with a threshold of [0, integer.max_value]. It does not keep any minimum number of threads, creates more threads at any time, and only lives for 60 seconds when a thread is idle. It uses a blocking work queue that stores no elements. A thread factory called “OkHttp Dispatcher”.

That is, in actual operation, when 10 concurrent requests are received, 10 threads will be created by the thread pool, and when the work is completed, all threads will be closed in 60 seconds.

3. Use in WorkManager

 private final ExecutorService mBackgroundExecutor =
            Executors.newSingleThreadExecutor(mBackgroundThreadFactory);
Copy the code

A single-threaded thread pool ensures that tasks are executed sequentially

Source code analysis

ThreadPoolExecutor is a member of a class called ThreadPoolExecutor, whose member is CTL. AtomicInteger is a member of a class called ThreadPoolExecutor, whose member is CTL. It is a member of a class called ThreadPoolExecutor. The lower 29 bits represent the number of existing threads in the thread pool, which is a genius of Doug Lea’s design to minimize lock contention and increase concurrency efficiency with minimal variables.

Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size - 3; private static final int COUNT_BITS = integer.size - 3; / / maximum number of threads, the amount is quite enough / / 00011111111111111111111111111111 (29 (1) private static final ints CAPACITY = (1 < < COUNT_BITS) - 1;  // runState is storedinThe high-order bits //1110 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 COUNT_BITS; Private static final int SHUTDOWN = 0 << COUNT_BITS; Private static final int STOP = 1 << COUNT_BITS; Private static final int TIDYING = 2 << COUNT_BITS; //0110 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 Private static int runStateOf(int c) {private static int runStateOf(int c) {returnc & ~CAPACITY; } private static int workerCountOf(int c) {returnc & CAPACITY; } private static int ctlOf(int rs, int wc) {return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking CTL. * These depend on the bit layout and on workerCount being never negative. */ private static Boolean runStateLessThan(int c, int s) {return c < s; } private static Boolean runStateAtLeast(int c, int s) {return c >= s; } private static Boolean isRunning(int c) {return c < SHUTDOWN; }Copy the code

Focus on the implementation of Execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task. The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } // Check whether the thread pool is running, and then add the task to the wait queue, if the queue is full, Return false if (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }// If the number of threads exceeds the maximum, add threads. If adding fails, the thread is closed or the thread pool is saturated. addWorker(command, false)) reject(command); }Copy the code

The thread pool

The thread pool ThreadPoolExecutor

Thoroughly understand the Principles of Java thread pools

Executors abandoned by developers, what’s wrong with them?