Thread pool constructor parameters
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
CorePoolSize: number of core thread pools
MaximumPoolSize: maximum number of thread pools (including core thread pools)
KeepAliveTime: the keepAliveTime of a thread after execution. This parameter is used together with TimeUnit
TimeUnit: keepAliveTime after a thread completes execution
BlockingQueue: A task queue that joins a new task when the number of core threads has reached its limit
ThreadFactory: a ThreadFactory that produces threads
RejectedExecutionHandler: the new arrival of the task, if has exceeded the maximum number of threads And task queue is full, will be rejected strategy of the task
Second, the keepAliveTime
On non-core threads, and if needed on core threads, call
public void allowCoreThreadTimeOut(boolean value)
Block the queue
1.ArrayBlockingQueue
Final Object[] items;
Int capCity specifies the size of the array. Boolean specifies whether fair is a fair lock
ReentrantLock: Fair lock and unfair lock Main differences in the lock acquisition mechanism Fair lock: When obtaining a lock, the system checks whether other tasks in the queue want to obtain the lock. If other tasks want to obtain the lock, the system allows other tasks to obtain the unfair lock first. When obtaining a lock, the system tries to obtain the lock regardless of whether any task in the queue wants to obtain the lockCopy the code
2.LinkedBlockingDeque
Storage: bidirectional linked list
Constructor parameter: no parameter Default int Maximum capacity, capacity value can also be passed
3.PriorityBlockingQueue
Array private TRANSIENT Object[] Queue
Constructor: parameter 1 initial capacity default 11, parameter 2: comparator
4.SychronizeQueue
If there is no storage capacity, the thread of execution must be found. If not, the rejection policy is executed
5.DelayedWorkQueue
Storage: array, default size 16
private RunnableScheduledFuture<? >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY];Copy the code
Thread factories
1.DefaultThreadFactory: generates a thread group, thread number, thread name, thread priority (default 5)
2. PrivilegedThreadFactory DefaultThreadFactory inheritance
5. Rejection strategy
1.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { r.run(); }}}Copy the code
Execute directly on the caller thread, provided the thread pool is not closed
2.AbortPolicy
public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
Copy the code
An exception is thrown
3.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code
Just do nothing and drop the mission
4.DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
Remove the oldest from the task queue and re-execute the task. This is an implicit loop because excute may re-trigger the reject policy
Six, ThreadPoolExecutor
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
If the number of core threads equals the maximum number of threads, there are only core threads. The task queue size is unlimited. DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy
2.CacheThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
Copy the code
The number of core threads is 0, and the maximum number of threads is MAX_VALUE. The task queue has no capacity. Every time a task comes, a new thread will be opened to execute the task. DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy
3.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Copy the code
The core thread and the maximum thread count are both 1, the task queue size is unlimited, and DefaultThreadFactory(custom can also be passed in); The rejection policy is AbortPolicy
4. WorkStealingPool (java1.8)
Java8’s new method for creating a thread pool, if not actively set to the number of concurrent tasks, will be based on the number of CPU processors on the current machine as the number of threads. This thread pool will process tasks in parallel, with no guarantee of the order of execution
Seven, ThreadScheduledExecutor
1.SingleThreadScheduledExecutor
The number of core threads is 1, and the maximum number of threads is unlimited. The maximum lifetime of non-core threads is 0 seconds, and the execution ends immediately
2.ScheduledThreadPoolExecutor
The number of core threads can be passed in, and the maximum number of threads is unlimited. The maximum lifetime of non-core threads is 0 seconds, and the execution ends immediately
8. Basic implementation and selection
For CPU-intensive tasks, set this parameter to number of CPU cores +1. For IO intensive tasks, set this parameter to the number of CPU cores x 2.
Cpu-intensive tasks refer to tasks that require a large amount of CPU computation to improve CPU utilization. Do not set the number of core threads to be too large. Too many threads will seize CPU resources and lead to continuous thread switching, which may waste CPU. Ideally, each CPU is doing calculations with no waste, but it’s quite possible that one of the threads will suddenly suspend the wait IO, at which point the additional wait thread can work immediately without waiting for the suspension to end.
IO intensive tasks refer to tasks that require frequent I/O operations. These operations will cause threads to be suspended for a long time. Therefore, more threads are needed to work, so that cpus will not be suspended and resources will be wasted. Generally, set it to twice the number of CPU cores
1. When the number of threads does not reach the core number, each new task creates a new thread to execute the task.
2. When the number of threads reaches the number of core threads, each new task is put into a waiting queue to be executed.
3. When the queue is full, if the number of threads does not reach the upper limit of the total number of threads, a non-core thread is created to execute the task.
4. When the total number of threads reaches the limit, new tasks are processed by the policy rejector
Ix. Option for use by three parties
1.Okhttp
The number of core threads is 0, the maximum number of threads is integer. MAX_VALUE, and the maximum time allowed to survive a thread execution is 60 seconds. The queue is SynchronousQueue, and the queue is empty because the Dispatcher has its own implementation queue
//Dispatcher.java
// Maximum number of asynchronous requests
private int maxRequests = 64;
// Maximum number of simultaneous requests to a single host
private int maxRequestsPerHost = 5;
// An asynchronous queue ready for execution
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// An asynchronous queue in execution
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// The synchronization queue in progress
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
/ / thread pool
public synchronized ExecutorService executorService(a) {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
}
return executorService;
}
// Join the standby queue and determine whether it can be executed
void enqueue(AsyncCall call) {
synchronized (this) {
// Join the standby queue
readyAsyncCalls.add(call);
}
/ / execution
promoteAndExecute();
}
private boolean promoteAndExecute(a) {
assert(! Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
// Filter out the set of asynchronous requests that can be executed. If the number of simultaneous requests exceeds the maximum number of simultaneous requests, the loop will break out; otherwise, if the number of simultaneous requests exceeds the maximum number of simultaneous requests, the loop will continue
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
// If you are the son of that
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
// Execute the filtered executable tasks
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
Copy the code
2.EventBus
The number of core threads is 0, and the maximum number of threads is MAX_VALUE. The task queue has no capacity. Every time a task comes, a new thread will be opened to execute the task. The rejection policy is AbortPolicy
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
}
Copy the code