Why thread pools
background
Many server applications often need to deal with a lot of little request (for example, a Web server, database server, etc.), they usually receive the number of requests is large, a simple model is that, when the server receives a request from the remote, to open a thread for each request, the request is then to destroy the thread. The problem with this approach is that it often takes much more time to create and destroy threads than the resources consumed by the task itself. So what should be done? Thread pools provide a solution to the problem of thread lifecycle overhead and resource scarcity. We can reuse threads by using thread pools, instead of constantly creating and destroying threads, let the threads from the thread pool always exist in the thread pool, and then the thread takes tasks from the task queue to execute them. Another benefit of doing this is to prevent resource exhaustion by properly adjusting the number of threads in the thread pool, that is, by forcing any other incoming requests to wait until a thread is available to process them when the number of requests exceeds a certain threshold.
What is a thread pool
The basic idea of a thread pool is that it is a pool of objects that, when a program is started, creates an area of memory in which a number of (not dead) threads are stored, and the thread scheduling in the pool is handled by the pool manager. When there is a task, a thread object is taken from the pool and returned to the pool after execution. In this way, the performance cost brought by repeatedly creating and destroying thread objects can be avoided and system resources can be saved.
Thread pool benefits < compared to creating one thread per task >
- Reduce thread creation and destruction times. (Requires system call support, to kernel mode execution)
- Speed up task response. (Direct allocation of idle threads)
- You can effectively control the maximum number of concurrent requests based on the system load.
Thread Pool Principle
The structure
ThreadPoolManager: Used to create and manage thread pools.
2. Workthreads: threads in a thread pool.
3. Task Interface: The interface that each Task must implement for the worker thread to schedule the execution of the Task.
4. Task queue: Used to store unprocessed tasks. Provide a buffer mechanism.
Thread pool parameters
corePoolSize
Thread pool The number of core threads that are not reclaimed and remain idle even if no task is executed. If there are fewer threads in the thread pool, they are created when the task is executed (regardless of whether there are currently idle threads in the core thread pool).
workQueue
When the number of current threads exceeds corePoolSize, the new task is in the wait state and stored in the workQueue.
maximumPoolSize
The pool allows a maximum number of threads to continue to be created when the number of threads reaches corePoolSize and the workQueue is full of tasks.
keepAliveTime
Exceeds the lifetime of the “temporary thread” after corePoolSize.
unit
The unit of keepAliveTime.
allowCoreThreadTimeout
Whether to allow the core thread to exit idle. The default value is false.
RejectExecutionHandler
The thread pool performs a rejection policy, and when the workQueue is full and the number of threads reaches maximumPoolSize, the thread pool calls the Handler rejection policy to process the request. The default denial policies are as follows:
- AbortPolicy
Direct selling RejectedExecutionExeception exceptions to prevent normal operation of system, the system default policy.
- DisCardPolicy
Discard the current task without processing it.
- DiscardOldestPolicy
Discards the oldest task in the queue.
- CallerRunsPolicy
Assign tasks to the thread currently executing the execute method for processing.
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); }}}Copy the code
Note that the addition of the fifth task, task 5, was also rejected by the thread pool, so the rejectedExecution method of CallerRunsPolicy was executed, which directly executes the run method of the task. So you can see that task 5 is executed in the main thread.
As you can see, the main thread is blocked because the fifth task is running in the main thread, so that when the fifth task is finished and the sixth task is added, the first two tasks are finished and there are free threads, so thread 6 can be added to the thread pool again.
- Custom Policies
You can also customize rejection policies by implementing the RejectedExecutionHandler interface. The friendly rejection policies can be implemented as follows:
- Save the data to the database for processing when the system is idle
- The data is recorded in a log and then processed manually
static class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void RejectedExecution (Runnable r, ThreadPoolExecutor Executor) {new Thread(r," new Thread "+new Random().nextint (10)).start(); rejectedExecution(Runnable r, ThreadPoolExecutor executor) {new Thread(r," new Thread "+new Random(). } } ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2), new MyRejectedExecutionHandler());Copy the code
Implementation process
When new tasks come in:
- If there are fewer threads currently running than corePoolSize< note: even if there are idle threads, they are created directly >, then a new thread (the core thread) is created to perform the task.
- If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.
- If the BlockingQueue queue is full, a new thread (non-core) is created to process the task.
- If the total number of core and non-core threads exceeds maximumPoolSize, the task will be rejected and the RejectedExecutionHandler rejection policy will be invoked.
A flowchart for executing the execute() method of ThreadPoolExecutor, the thread pool core class under the Java Util package
Thread pool life cycle
- RUNNING: Receives new tasks and processes tasks in the queue.
- SHUTDOWN: Does not receive new tasks, but processes tasks in the queue.
- STOP: does not receive new tasks, does not process tasks in the queue, and interrupts tasks in process.
- TIDYING: All tasks completed, valid thread count is 0.
- TERMINATED: the method TERMINATED () is complete.
Thread pools that are transitioned to TIDYING state run terminated methods.
Thread pool usage
Two ways to create a thread pool
ThreadPoolExecutor
Public ThreadPoolExecutor(int corePoolSize,// core thread size int maximumPoolSize,// maximum thread size long KeepAliveTime,// Maximum time a thread can hold when the number of threads is greater than corePoolSize TimeUnit unit,// Non-core thread pool thread hold time BlockingQueue<Runnable> RejectedExecutionHandler Handler // RejectedExecutionHandler)Copy the code
Executors (not recommended)
- NewFixedThreadPool (LinkedBlockingQueue)
The thread pool has a fixed number of threads, i.e. corePoolSize and maximumPoolSize have the same number of threads. The wait queue is infinite.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Copy the code
- newSingleThreadExecutor
Single-threaded execution, i.e. CorePoolSize and maximumPoolSize with the number specified to 1. The wait queue is infinite
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) ); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }Copy the code
- newCachedThreadPool
Since corePoolSize is 0, the task is placed in the SynchronousQueue, which can only hold 0, so a new thread is immediately created. Since maxumumPoolSize is integer. MAX_VALUE, the size can be considered as 2147483647, which is limited by the memory size. The thread lifetime is 60 seconds, which causes the task to be created to execute each time it comes in, and the thread resource to be released when the thread lifetime expires.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Copy the code
- newscheduledThreadPool
/ / ScheduledThreadPool public ScheduledThreadPoolExecutor (int corePoolSize) {/ / allows you to create threads for Integer. MAX_VALUE super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }Copy the code
It is used to handle delayed or timed tasks.
-
It receives tasks of type SchduledFutureTask and has two ways of submitting tasks:
-
scheduledAtFixedRate
-
scheduledWithFixedDelay
-
SchduledFutureTask receives the following parameters:
-
Time: indicates the start time of a task
-
SequenceNumber: indicates the sequenceNumber of a task
-
Period: period of task execution
-
It uses DelayQueue to store waiting tasks
-
DelayQueue contains a PriorityQueue, which is sorted by time. If time is the same, the PriorityQueue is sorted by sequenceNumber.
-
DelayQueue is also an unbounded queue;
-
Execution process of worker thread:
-
The worker thread will fetch expired tasks from the DelayQueue and execute them.
-
After the task is executed, reset the expiration time of the task and restore the DelayQueue
Do not use Executors to create a thread pool
- NewFixedThreadPool&newSingleThreadExecutor uses an infinite queue with a size of integer.max_value.
- NewCachedThreadPool&newscheduledThreadPool The maximum thread pool size is set to integer.max_value.
Memory overflow is easily caused.
Create thread Demo
package executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; Public class Executor {/** * @param args */ public static void main(String[] args) {// It is not recommended to create threads ExecutorService using this method executorService=Executors.newFixedThreadPool(10); }}Copy the code
Add a task to the thread pool and get the result after the thread completes execution
executorService.execute(Runnable);
package executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Executor { /** * @param args * */ public static void main(String[] args) { // TODO Auto-generated method stub ExecutorService executorService=Executors.newFixedThreadPool(2); // Defines the maximum number of threads in the thread pool... // Add the first task... This mission will always be performed... executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub while(true){ System.out.println("aa"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}); // Add a second task, executed three times to stop... executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub int i=0; while(true){ i++; System.out.println("bb"); if(i==3){ break; } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}); /* * @param * Can only be executed after the second quest has been executed three times... * Since the thread pool was full three times ago, the task could not be executed. After three attempts, the second task is terminated and the thread pool is idle, so the task is put into the thread pool for execution... * */ executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub while(true){ System.out.println("cc"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}); }}Copy the code
The executorService. Submit (Runnable);
The difference between this approach and the first is that you can use a Future object to determine whether the current thread has finished executing. However, this method can only determine whether the current thread is finished, and cannot return data information.
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); }}); Return null system.out.println ("future.get()=" + future.get());Copy the code
The executorService. Submit (Callable);
This call differs from the previous one by passing a Callable object. Callable is similar to Runnbale, but Callable’s call() method returns data information. This information is available through the Future. The Runnbale. Run () method does not retrieve data information.
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; }}); System.out.println("future.get() = " + future.get()); //Asynchronous Callable //future.get() = Callable ResultCopy the code
executorService.invokeAny(callables);
InvokeAny () accepts as an argument a collection of Callable objects. Calling this method does not return a Future. Instead, it returns the result of one of the Callable objects in the collection. There is no guarantee which Callable will be returned after the call, only that it will be one of the Callable objects that has finished executing. I honestly don’t know what the purpose of this method is. The result of this execution is random. That is, the output is not fixed.
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; }}); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; }}); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; }}); String result = executorService.invokeAny(callables); System.out.println("result = " + result);Copy the code
executorService.invokeAll(callables);
This method differs from the above in that it returns the results of all Callable executions. All the execution results are captured and we can manage them. Relatively speaking, I think this method is more practical than the previous one.
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; }}); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; }}); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; }}); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get());Copy the code
Thread pool shutdown
executorService.shutdown();
Shutdown does not directly shutdown the thread pool; it does not accept new tasks. If there are tasks in the thread pool, close the thread pool after completing those tasks.
executorService.shutdownNow();
This method means that no new tasks are accepted, and the task is removed from the queue, and if there are ongoing tasks, an attempt is made to stop them.
package executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Executor { /** * @param args * */ public static void main(String[] args) { // TODO Auto-generated method stub ExecutorService executorService=Executors.newFixedThreadPool(1); // Defines the maximum number of threads in the thread pool... // Add the first task... This executes three stops... executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub int j=0; while(true){ j++; System.out.println("aa"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(j==3){ break; }}}}); Executorservice.shutdown () : executorservice.shutdown () : executorservice.shutdown () : executorservice.shutdown () : executorservice.shutdown () : executorservice.shutdown () : executorservice.shutdown () / / if we use the executorService shutdownNow (); Methods, even before he joined, because the call the executorService. ShutdownNow () method / / so this task will be removed from the queue and will not be executed directly... executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub int i=0; while(true){ i++; System.out.println("bb"); if(i==3){ break; }}}}); executorService.shutdown(); // Whatever method is used here, an exception is thrown... /* * @param * Can only be executed after the second quest has been executed three times... * Since the thread pool was full three times ago, the task could not be executed. After three attempts, the second task is terminated and the thread pool is idle, so the task is put into the thread pool for execution... * */ executorService.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub while(true){ System.out.println("cc"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }}}}); }}Copy the code
Reference:
www.cnblogs.com/ldq2016/p/8…
Thread pool parameter setting tuning?
Parameter Settings are directly related to the system load. The following are related indicators of the system load:
- Tasks, number of tasks to be processed per second (for system requirements)
- Threadtasks, number of tasks per thread (for the thread itself)
- Responsetime: specifies the maximum responsetime allowed by the system for a task. For example, the responsetime of each task cannot exceed 1 second.
Core thread pool size
Usually we need to determine the core thread pool size based on the nature of the task execution:
- IO intensive tasks: Since threads are not always running, you can configure as many threads as possible, such as CPU number x 2.
- Cpu-intensive tasks (lots of complex operations) should be allocated with fewer threads, such as a comparable number of cpus.
These are all rules of thumb, of course, and it’s best to test the best configurations based on the actual situation.
If the system has tasks to process per second, each thread can process threadTasks per second. The required number of threads is: Tasks/threadTasks.
If the system has 100 to 1000 tasks per second and each thread can process 10 tasks per second, 100/10 to 1000/10, that is, 10 to 100 threads are required. CorePoolSize should be greater than 10, based on the 8020 rule, because the number of system tasks per second is 100-1000, i.e. 80% of the time the number of system tasks per second is less than 1000 * 20% = 200. CorePoolSize can be set to 200/10 = 20.
Maximum thread pool size
When the system load reaches its maximum, the number of core threads cannot handle all tasks on time, and more threads are needed. 200 tasks per second requires 20 threads, so when 1000 tasks per second is reached, (1000-queuecapacity)*(20/200) is required, which is 60 threads. MaxPoolSize can be set to 60.
workQueue
When the number of current threads exceeds corePoolSize, the new task is in the wait state and stored in the workQueue.
Thread pool monitoring
Improper use of thread pool will also deplete server resources, leading to abnormal situations, such as too many blocked queue tasks in fixed thread pool, too many threads created by cache thread pool, resulting in memory overflow, system suspension and other problems. Therefore, we need a simple monitoring scheme to monitor thread pool usage, such as number of completed tasks, number of unfinished tasks, thread size, and so on. Thread pools provide the following methods to monitor thread pool usage:
* * * * method | * * * * meanings |
getActiveCount() | Number of threads executing tasks in the thread pool |
getCompletedTaskCount() | The number of completed tasks in the thread pool, which is less than or equal to taskCount |
getCorePoolSize() | The number of core threads in the thread pool |
getLargestPoolSize() | The maximum number of threads ever created by the thread pool. This data lets you know if the thread pool is full, i.e., maximumPoolSize |
getMaximumPoolSize() | The maximum number of threads in the thread pool |
getPoolSize() | The current number of threads in the thread pool |
getTaskCount() | Total number of executed and unexecuted tasks in the thread pool |
The ThreadPoolExecutor class provides several empty methods, such as beforeExecute, afterExecute, and terminated methods, that override the beforeExecute of the thread pool by inheriting the beforeExecute, AfterExecute and terminated methods can do things before, after, and before the thread pool is closed. For example, monitor the average, maximum, and minimum execution time of a task. These methods are empty methods in the thread pool.
In addition, you can also check hardware information such as CPU usage and memory usage.
Thread pool isolation
If many of our businesses depend on the same thread pool, when one of them consumes all threads for reasons beyond our control, the thread pool becomes full.
The rest of the business can’t function properly, which is a huge blow to the system.
For example, Tomcat accepts requests from the thread pool, assuming that some of them respond very slowly and thread resources cannot be reclaimed; The thread pool slowly fills up, and in the worst case, the entire application becomes unserviceable.
So we need to isolate the thread pool.
The usual practice is to divide by business:
For example, the ordering task uses one thread pool and the fetching task uses another thread pool. This way, if one of them fails and deplets the thread pool, the other tasks will not be affected.
Reference: juejin. Cn/post / 684490…
Segmentfault.com/a/119000001…
Segmentfault.com/a/119000001…
Cloud.tencent.com/developer/a…
Blog.csdn.net/qq_25806863…
www.jianshu.com/p/2a80237c3…