With the rapid expansion of the project business, have you noticed that there are a lot of separate threads floating around in each module, and when it comes to thread monitoring and optimization, the code will have to do a lot of work.
You must have used popular frameworks such as RXJava and okHttp, which deal with thread scheduling internally and encapsulate a series of apis for you to use. You don’t even have to care about how these threads work. They are fine if you use them individually, but if you consider whether you should rethink how you use them from a project architecture perspective.
Why thread pools?
- Threads are scarce resources, and their creation consumes a lot of system resources.
- Frequent thread destruction frequently triggers GC and degrades system performance.
- Multi-threaded concurrent execution lacks unified management and monitoring.
Use of thread pools
Thread pools can be created using the Java Executors class, which provides common methods for creating thread pools.
- newFixedThreadPool
- newSingleThreadExecutor
- newCachedThreadPool
- newScheduledThreadPool
We’ll talk about them later, but let’s start with an example.
public void main() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for(int i = 0; i < 20; i++) {
executorService.execute(new MyRunnable(i));
}
}
static class MyRunnable implements Runnable {
int id;
MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
try {
Thread.sleep(3000);
Log.i("threadpool"."task id:"+id+" is running threadInfo:"+Thread.currentThread().toString()); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code
The example creates a thread pool with a fixed number of threads and adds 20 tasks to it.
As can be seen from log printing, logs are printed three times at a time, every three seconds. All tasks are run in the thread named Pool-1-thread-1, Pool-1-thread-2, and Pool-1-thread-3, which is consistent with the size we set for the thread pool. The reason for this is that there are only three threads in the thread pool, and when 20 tasks are added to the pool at a time, the first three tasks are executed first and the rest wait.
And if we put the ExecutorService ExecutorService = Executors. NewFixedThreadPool (3); For the ExecutorService ExecutorService = Executors. NewCachedThreadPool (); So let’s see what happens.
All of a sudden, the task is finished, and you can expect that with a thread pool created using newCachedThreadPool, enough threads will be created to execute the task.
Next, let’s take a formal look at the inner workings of a thread pool, divided into three parts.
Common thread pool types have been mentioned above. Let’s look at how they are created. Let’s take two examples.
# -> Executor.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
# -> Executor.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copy the code
The creation of a visible thread pool is all done through ThreadPoolExecutor, so take a look at its constructor.
# -> ThreadPoolExecutor constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
The constructor declares a set of parameters that are very important. If you understand the basic principles of their thread pools, let’s look at what they mean:
- CorePoolSize Number of core threads that, unless allowCoreThreadTimeOut is set, remain alive in the thread pool, even if the thread is idle.
- MaximumPoolSize Maximum number of threads allowed in the thread pool.
- WorkQueue indicates the workQueue to which tasks are submitted when the core threads are busy. If the work queue also exceeds capacity, an attempt is made to create a non-core thread to execute the task.
- KeepAliveTime Specifies the maximum time that a non-core thread can handle idle state. If this value is exceeded, the thread is reclaimed.
- ThreadFactory threadFactory class, used to create threads.
- RejectedExecutionHandler Work queue saturation policy, such as discard or throw exceptions.
After the thread pool is created, tasks can be submitted through the execute method. The thread pool processes tasks according to the current running status and specific parameters. The overall model is shown as follows:
The following figure clearly shows the flow of the thread pool after the task has been submitted without further elaboration.
Next, take a look at the construction parameters used by thread pools common to the Executors tool class.
Thread pool type | Core threads | Maximum number of threads | Non-core thread idle time | The work queue |
---|---|---|---|---|
newFixedThreadPool | specific | specific | 0 | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newScheduledThreadPool | specific | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
Specific means that the user is required to pass in fixed values.
This requires additional analysis of the blocking queue first.
Blocking queue
Have you ever wondered why you use blocking queues, not non-blocking queues?
In fact, blocking queues are often used in producer-consumer models, where tasks are added as producers and tasks are scheduled to execute as consumers, often in different threads. If non-blocking queues are used, additional synchronization and inter-thread wake up strategies are required. For example, when the task queue is empty, the consumer thread will block to fetch elements, and when a new task is added to the queue, the consumer thread will need to wake up to process the task.
The implementation of a blocking queue is to set various Lock operations (Lock+Condition) when elements are added and acquired.
Another concern is the size of the blocking queue, because according to the thread pool processing flowchart, the size of the blocking queue directly affects the creation of non-core threads. Specifically, non-core threads are not created when the blocking queue is full, but tasks continue to be added behind the blocking queue and wait for the core thread (if any) to execute.
- LinkedBlockingQueue The blocking queue internally implemented as a linked list, used by the default constructor
Integer.MAX_VALUE
As capacity, often referred to as “unbounded,” capacity can also be limited by a constructor that takes capacity. The capacity of the thread pool created by using the Executors tool is unbounded. - SynchronousQueue has a capacity of 0, and consumption is triggered whenever a task is added, meaning that each insert must be accompanied by a remove, and vice versa.
- DelayedWorkQueue array is used to implement the default capacity of 16, support for dynamic expansion, to delay tasks for sorting, similar to a priority queue, tie-in ScheduledThreadPoolExecutor timing or delay the task can be completed.
- ArrayBlockingQueue is not part of the thread pool architecture. It is implemented based on arrays and has a fixed and non-scalable capacity.
The appropriate blocking queue should be selected based on the actual requirements. Now let’s look at the usage scenarios of these thread pools.
- NewFixedThreadPool features no non-core threads, meaning that no new threads are created even if there are too many tasks, and a number of core threads remain even if the task is idle. With infinite waiting queues and relatively stable performance, it is suitable for scenarios where there are long-term tasks to be executed and the number of tasks is not large.
- NewSingleThreadExecutor is equivalent to a newFixedThreadPool with a thread count of 1, which is used in scenarios where tasks are executed sequentially.
- NewCachedThreadPool is characterized by no core thread and unlimited non-core threads, which can handle an infinite number of tasks in a short time. However, in fact, creating threads consumes resources. Excessive creating threads may lead to OOM. The performance is unstable when a large number of tasks are parallel. This scenario can be used when a small number of tasks are parallel and no other tasks need to be executed later.
- NewScheduledThreadPool is usually used for timed or delayed tasks.
You are not advised to use the Executors method during actual development. If the task scale and response time are roughly determined, manually create the task by using various Constructors of ThreadPoolExecutor based on actual requirements. You can also control the number of threads, timeout times, blocking queues, and saturation policies (the default saturation policy is AbortPolicy, which means throwing exceptions).
Saturated strategy
There are four built-in saturation strategies
- DiscardPolicy discards rejected tasks.
- DiscardOldestPolicy Discards the task at the head of the queue, i.e. the task that enters the queue first is removed to make room.
- AbortPolicy thrown RejectedExecutionException anomalies.
- The CallerRunsPolicy runs the rejected task in the calling thread of the execute method.
Users can also customize the saturation policy by implementing the RejectedExecutionHandler interface and passing it in through the ThreadPoolExecutor multi-parameter constructor.
Next, we need to look at the thread pool inheritance structure.
Thread pool class diagram
- The Executor base class interface defines only a execute method, which is used to submit tasks. Tasks added using execute have no return value.
- ExecutorService remained an interface, starting with the concept of pooling, defining a submit task method with a return result, and a shutDown method to shutDown thread pools.
- AbstractExecutorService implements most of the interface methods, but the rest of the shutDown related and Execute abstractions are not implemented.
- ThreadPoolExecutor The most commonly used thread pool
- ScheduledThreadPoolExecutor defines a series of support delay task thread pool.
- ForkJoinPool is different from ThreadPoolExecutor in that it takes a divide-and-conquer approach and subdivides a task into multiple sub-tasks for execution in multiple threads. For example, to calculate the sum of integers from 1 to 1 million, a ForkJoinPool splits a task into multiple threads and submits it to a thread pool. ForkJoinPool splits a task into a thread pool and submits it to a custom task. If you are interested, please refer to this article about Multi-threaded ForkJoinPool.
Thread pool size selected
Given the internal structure of a thread pool, how should we choose the size of a thread pool in practice?
This requires a general understanding of whether the task is CPU-intensive or IO intensive.
- For example, if a large number of CPU intensive computing tasks are performed, the CPU usage is high. In this case, if more threads are opened, the CPU’s performance will be reduced due to frequent thread scheduling. The recommended number of threads is the number of CPU cores +1, which is used as a backup in case a core thread is blocked or unexpectedly interrupted.
- IO intensive refers to file I/O and network I/O. The number of threads is related to the ratio between THE I/O and CPU time.
Optimal number of threads = Number of CPU cores * [1 + (I/O time/CPU time)]
The ratio is set to maximize the UTILIZATION of both THE I/O device and CPU.
Taking a single core, CPU computation and I/O time of 1:2 as an example, you can see that three threads can lead to 100% CPU utilization (this example comes from Geek Time –Java Concurrent Programming in action).
Thread pool state
The status of the thread pool is very important in the whole process of task processing. For example, when adding a task, the thread pool is judged to be in the running state first; when adding a task to the queue, the running state is judged; if the thread pool is closed at this time, the task is removed and the saturation policy is implemented.
Let’s look at several states of the thread pool:
- RUNNING: Can accept newly submitted tasks and also process tasks in a blocking queue.
- SHUTDOWN: The state is closed. New submitted tasks are no longer accepted, but saved tasks in the blocking queue can continue to be processed. When the thread pool is in the RUNNING state, calling shutdown() brings it to that state. (The finalize() method will also call shutdown() method to enter this state during execution);
- STOP: new tasks cannot be accepted and tasks in the queue are not processed. The processing thread is interrupted. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.
- TIDYING: If all tasks terminated and workerCount is 0, the thread pool terminated by calling the terminated() method.
- TERMINATED: this state is entered after the TERMINATED () method is executed. By default, nothing is done in TERMINATED ().
A diagram illustrates state flow
conclusion
Back to the question at the beginning of the article: What thread pool do RxJava and OkHttp use for scheduling?
- Several common thread schedulers are defined internally in RXJava
Schedulers.io()
andSchedulers.computation()
Respectively corresponding IO intensive and CPU intensive scheduler, internal use isScheduledThreadPoolExecutor
Thread pool, which is designed for the possibility of delayed operations such as delay in chain calls. The difference is the maximum number of threads, which for Computation is CPU cores, whereas for IO it’s infinite. - Thread pools are used by default in OkHttp
newCachedThreadPool
, the disadvantages of this thread pool have been introduced above, which may be chosen for the consideration of high concurrency, and can be flexibly configured according to the actual situation in practice.
# -> Dispatcher
public Dispatcher() {
}
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
}
return executorService;
}
Copy the code
Refer to the article
- Java thread pool implementation principle detailed explanation
- Understanding Java thread pools in depth: ThreadPoolExecutor
- Android thread pool principle and use
- Java thread pool -ThreadPoolExecutor principle analysis and practice
- ForkJoinPool with multiple threads