Thread pool constructor parameters

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code

CorePoolSize: number of core thread pools

MaximumPoolSize: maximum number of thread pools (including core thread pools)

KeepAliveTime: the keepAliveTime of a thread after execution. This parameter is used together with TimeUnit

TimeUnit: keepAliveTime after a thread completes execution

BlockingQueue: A task queue that joins a new task when the number of core threads has reached its limit

ThreadFactory: a ThreadFactory that produces threads

RejectedExecutionHandler: the new arrival of the task, if has exceeded the maximum number of threads And task queue is full, will be rejected strategy of the task


Second, the keepAliveTime

On non-core threads, and if needed on core threads, call

public void allowCoreThreadTimeOut(boolean value)

Block the queue

1.ArrayBlockingQueue

Final Object[] items;

Int capCity specifies the size of the array. Boolean specifies whether fair is a fair lock

ReentrantLock: Fair lock and unfair lock Main differences in the lock acquisition mechanism Fair lock: When obtaining a lock, the system checks whether other tasks in the queue want to obtain the lock. If other tasks want to obtain the lock, the system allows other tasks to obtain the unfair lock first. When obtaining a lock, the system tries to obtain the lock regardless of whether any task in the queue wants to obtain the lockCopy the code
2.LinkedBlockingDeque

Storage: bidirectional linked list

Constructor parameter: no parameter Default int Maximum capacity, capacity value can also be passed

3.PriorityBlockingQueue

Array private TRANSIENT Object[] Queue

Constructor: parameter 1 initial capacity default 11, parameter 2: comparator

4.SychronizeQueue

If there is no storage capacity, the thread of execution must be found. If not, the rejection policy is executed

5.DelayedWorkQueue

Storage: array, default size 16

private RunnableScheduledFuture<? >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY];Copy the code

Thread factories

1.DefaultThreadFactory: generates a thread group, thread number, thread name, thread priority (default 5)
2. PrivilegedThreadFactory DefaultThreadFactory inheritance

5. Rejection strategy

1.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { r.run(); }}}Copy the code

Execute directly on the caller thread, provided the thread pool is not closed

2.AbortPolicy
public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
Copy the code

An exception is thrown

3.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

Just do nothing and drop the mission

4.DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

Remove the oldest from the task queue and re-execute the task. This is an implicit loop because excute may re-trigger the reject policy


Six, ThreadPoolExecutor

1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

If the number of core threads equals the maximum number of threads, there are only core threads. The task queue size is unlimited. DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy

2.CacheThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
Copy the code

The number of core threads is 0, and the maximum number of threads is MAX_VALUE. The task queue has no capacity. Every time a task comes, a new thread will be opened to execute the task. DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy

3.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
Copy the code

The core thread and the maximum thread count are both 1, the task queue size is unlimited, and DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy

4. WorkStealingPool (java1.8)

Java8’s new method for creating a thread pool, if not actively set to the number of concurrent tasks, will be based on the number of CPU processors on the current machine as the number of threads. This thread pool will process tasks in parallel, with no guarantee of the order of execution

Seven, ThreadScheduledExecutor

1.SingleThreadScheduledExecutor

The number of core threads is 1, and the maximum number of threads is unlimited. The maximum lifetime of non-core threads is 0 seconds, and the execution ends immediately

2.ScheduledThreadPoolExecutor

The number of core threads can be passed in, and the maximum number of threads is unlimited. The maximum lifetime of non-core threads is 0 seconds, and the execution ends immediately

8. Basic implementation and selection

For CPU-intensive tasks, set this parameter to number of CPU cores +1. For IO intensive tasks, set this parameter to the number of CPU cores x 2.

Cpu-intensive tasks refer to tasks that require a large amount of CPU computation to improve CPU utilization. Do not set the number of core threads to be too large. Too many threads will seize CPU resources and lead to continuous thread switching, which may waste CPU. Ideally, each CPU is doing calculations with no waste, but it’s quite possible that one of the threads will suddenly suspend the wait IO, at which point the additional wait thread can work immediately without waiting for the suspension to end.

IO intensive tasks refer to tasks that require frequent I/O operations. These operations will cause threads to be suspended for a long time. Therefore, more threads are needed to work, so that cpus will not be suspended and resources will be wasted. Generally, set it to twice the number of CPU cores

1. When the number of threads does not reach the core number, each new task creates a new thread to execute the task.
2. When the number of threads reaches the number of core threads, each new task is put into a waiting queue to be executed.
3. When the queue is full, if the number of threads does not reach the upper limit of the total number of threads, a non-core thread is created to execute the task.
4. When the total number of threads reaches the limit, new tasks are processed by the policy rejector

Ix. Option for use by three parties

1.Okhttp

The number of core threads is 0, the maximum number of threads is integer. MAX_VALUE, and the maximum time allowed to survive a thread execution is 60 seconds. The queue is SynchronousQueue, and the queue is empty because the Dispatcher has its own implementation queue

//Dispatcher.java
  // Maximum number of asynchronous requests
  private int maxRequests = 64;
  // Maximum number of simultaneous requests to a single host
  private int maxRequestsPerHost = 5;

  // An asynchronous queue ready for execution
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  // An asynchronous queue in execution
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  // The synchronization queue in progress
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
 / / thread pool
public synchronized ExecutorService executorService(a) {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
    }
    return executorService;
  }

// Join the standby queue and determine whether it can be executed
void enqueue(AsyncCall call) {
    synchronized (this) {
      // Join the standby queue
      readyAsyncCalls.add(call);
    }
    / / execution
    promoteAndExecute();
  }
 
  private boolean promoteAndExecute(a) {
    assert(! Thread.holdsLock(this));
   
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      // Filter out the set of asynchronous requests that can be executed. If the number of simultaneous requests exceeds the maximum number of simultaneous requests, the loop will break out; otherwise, if the number of simultaneous requests exceeds the maximum number of simultaneous requests, the loop will continue
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
        // If you are the son of that
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    // Execute the filtered executable tasks
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

Copy the code
2.EventBus

The number of core threads is 0, and the maximum number of threads is MAX_VALUE. The task queue has no capacity. Every time a task comes, a new thread will be opened to execute the task. The rejection policy is AbortPolicy

public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
}
Copy the code