Custom thread pools
Constructor of ThreadPoolExecutor
Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int keepAliveTime, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) RejectedExecutionHandler handler The policy to take when the blocking queue is full and the number of threads exceeds maximumPoolSizeCopy the code
Parameters of the core: corePoolSize, maximumPoolSize and workQueue, feel ThreadFactory and RejectedExecutionHandler belong to assist.
The Task class Task
Private final AtomicLong count=new AtomicLong(0); private final AtomicLong count=new AtomicLong(0); Private ThreadPoolExecutor ThreadPoolExecutor; private ThreadPoolExecutor ThreadPoolExecutor; Task(ThreadPoolExecutor threadPoolExecutor){ this.threadPoolExecutor=threadPoolExecutor; } @Override public void run() { long l=this.count.getAndIncrement(); System.out.println(Thread.currentThread().getName()+" Start..." +l +" blocking queue size: "+threadPoolExecutor.getQueue().size()); try{ Thread.sleep(3*1000); }catch (InterruptedException ie){ System.out.println(ie.getMessage()); } System.out.println(Thread.currentThread().getName()+" End..." +l); }}Copy the code
The thread factory class CustomizeThreadFactory
/** * public class CustomizeThreadFactory implements ThreadFactory {private final String factoryName; Private Final AtomicLong nextId; private Final AtomicLong nextId; CustomizeThreadFactory(String factoryName){ this.factoryName=factoryName; this.nextId=new AtomicLong(1); } @Override public Thread newThread(Runnable r) { String threadName=this.factoryName+"-Task-"+this.nextId.getAndIncrement(); return new Thread(r,threadName); }}Copy the code
Saturated CustomizeRejectedExecutionHandler strategy class
/ * * * * / custom saturated strategy public class CustomizeRejectedExecutionHandler implements RejectedExecutionHandler {@ Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("task rejected. " + executor.toString()); }}Copy the code
Custom thread pools
Public class ThreadPoolTest {public static void main(String[] args) {ThreadPoolExecutor ThreadPoolExecutor =new threadPoolExecutor (5, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, New ArrayBlockingQueue<Runnable>(100), / / save the blocking queue on a mission to 100 new CustomizeThreadFactory (" ThreadPool - 1 "), / / create a thread factory new CustomizeRejectedExecutionHandler ()); Task Task =new Task(threadPoolExecutor); for(int i=0; i<10; i++){ threadPoolExecutor.execute(task); } threadPoolExecutor.shutdown(); }}Copy the code
run
ThreadPool-1-Task-2 Start... 1 blocking queue size: 5 ThreadPool-1-Task-5 Start... 4 blocking queue size: 5 ThreadPool-1-Task-3 Start... 3 blocking queue size: 5 ThreadPool-1-Task-4 Start... 2 blocking queue size: 5 ThreadPool-1-Task-1 Start... 0 blocking queue size: 1; sleep threadpool-1-task-3 End 3 ThreadPool-1-Task-1 End... 0 ThreadPool-1-Task-1 Start... 5 blocking queue size: 3 ThreadPool-1-Task-4 End... 2 ThreadPool-1-Task-2 End... 1 ThreadPool-1-Task-5 End... 4 ThreadPool-1-Task-4 Start... 7 blocking queue size: 2 ThreadPool-1-Task-3 Start... 6 blocking queue size: 3 ThreadPool-1-Task-2 Start... 8 blocking queue size: 0 ThreadPool-1-Task-5 Start... 9 blocking queue size: 0Copy the code
The number of core threads in the custom thread pool is 5, the maximum number of threads is 10, and the blocking queue size of the task is 100.
In the for loop, 10 tasks are submitted for execution by threads in the thread pool, but the result shows that the thread pool first allows 5 threads to execute tasks, rather than 10 threads to execute 10 tasks. Why is that? The answer is in the execute() source code.
Execute (Runnable Command) source code analysis
The source code
Ps: Comments in the source code explain how the code is performing
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // CTL :AtomicInteger If (workerCountOf(c) < corePoolSize) {if (addWorker(command, True))// Create a worker thread passing in a Runnable, true for core thread, false for non-core return; c = ctl.get(); } //if② if (isRunning(c) &&workqueue.offer (command)) {// workqueue.offer (command) add task to block queue int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //if③ else if (! addWorker(command, false)) reject(command); // Execute saturation policy}Copy the code
Logic diagram
The logic diagram for executor() is as follows:
The complete logical diagram is as follows: baijiahao.baidu.com/s?id=168166…
Run result parsing
Thread pool properties: core thread 5, maximum number of threads 10, blocking queue up to 100 tasks.
Create a core thread (repeat 5 times, create 5 core threads); create a core thread (repeat 5 times, create 5 core threads);
When the number of threads in the thread pool is 5, if (1) fails, continue to execute if (2) and put the task in the blocking task queue (repeat 5 times, because the queue size is 10> the number of remaining tasks 5, so the remaining tasks can be put in the blocking queue). The statement in if② to rejudge the state and number of threads in the thread pool is irrelevant to this analysis.
This process does not create non-core threads to handle tasks and does not trigger saturation policy (i.e., 10 tasks are executed by 5 core threads).
Modify the custom thread pool properties to view the running results
From the previous results and the logic diagram, the thread pool preferentially lets the core thread finish the task (otherwise, the core thread has to do more work), and when the core thread can’t handle the task (the blocking queue is full), then creates non-core threads to handle the task for them.
Create a non-core thread
When are non-core threads created? According to the source code, when the blocking queue is full and the thread pool is not full, the user submits another task to create a non-core thread.
Modify thread pool properties
Change the blocking queue size to 4 and view the result.
Public class ThreadPoolTest {public static void main(String[] args) {ThreadPoolExecutor ThreadPoolExecutor =new threadPoolExecutor (5, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, corePoolSize, New ArrayBlockingQueue<Runnable>(4), / / save the blocking queue on a mission to new CustomizeThreadFactory (" ThreadPool - 1 "), / / create a thread factory new CustomizeRejectedExecutionHandler ()); Task Task =new Task(threadPoolExecutor); for(int i=0; i<10; i++){ threadPoolExecutor.execute(task); } threadPoolExecutor.shutdown(); }}Copy the code
The results
Threadpool-1-task-2 threadpool-1-task-2 threadpool-1-task-2 0 blocking queue size: 0 ThreadPool-1-Task-4 Start... 3 blocking queue size: 4 ThreadPool-1-Task-5 Start... 4 blocking queue size: 4 ThreadPool-1-Task-3 Start... 2 blocking queue size: 0 ThreadPool-1-Task-1 Start... 1 blocking queue size: 0 ThreadPool-1-Task-6 Start... 5 blocking queue size: 4 ThreadPool-1-Task-1 End... 1 ThreadPool-1-Task-4 End... 3 ThreadPool-1-Task-5 End... 4 ThreadPool-1-Task-2 End... 0 ThreadPool-1-Task-3 End... 2 ThreadPool-1-Task-6 End... 5 ThreadPool-1-Task-3 Start... 9 blocking queue size: 0 ThreadPool-1-Task-4 Start... 8 blocking queue size: 1 ThreadPool-1-Task-5 Start... 7 blocking queue size: 2 ThreadPool-1-Task-1 Start... 6 blocking queue size: 3 ThreadPool-1-Task-1 End... 6 ThreadPool-1-Task-3 End... 9 ThreadPool-1-Task-4 End... 8 ThreadPool-1-Task-5 End... 7Copy the code
Analysis of the
Thread pool properties: Core thread size 5, maximum thread number 10, blocking queue size 4.
The for loop submits 10 tasks, and the first 5 loops create 5 core threads to execute the tasks; The sixth loop will place the task in the blocking queue, and since the blocking queue is 4 in size, the sixth through ninth loops will fill the blocking queue. When the blocking queue is full on the 10th for loop (if① and if② are not satisfied), execute if③ to create a non-core thread to execute the task. So you start with six threads doing the task.
Trigger saturation strategy
When will saturation be triggered? According to the source code, saturation is triggered when the blocking queue is full and the thread pool reaches its maximum number of threads.
Modify thread pool properties
Change the blocking queue size to 4 and the maximum number of threads and core threads to 5.
Public class ThreadPoolTest {public static void main(String[] args) {ThreadPoolExecutor ThreadPoolExecutor = new threadPoolExecutor (5, / / corePoolSize, core thread number 5, 5 / / maximumPoolSize, 5 10 maximum number of threads, New ArrayBlockingQueue<Runnable>(4), / / save the blocking queue on a mission to new CustomizeThreadFactory (" ThreadPool - 1 "), / / create a thread factory new CustomizeRejectedExecutionHandler ()); Task Task =new Task(threadPoolExecutor); for(int i=0; i<10; i++){ threadPoolExecutor.execute(task); } threadPoolExecutor.shutdown(); }}Copy the code
The results
ThreadPool-1-Task-2 Start... 0 blocking queue size: 4 ThreadPool-1-Task-4 Start... 3 blocking queue size: 4 # triggered the saturated strategy task rejected. Java. Util. Concurrent. ThreadPoolExecutor @ 6 d6f6e28 [Running, the pool size = 5, the active threads = 5, queued tasks = 4, completed tasks = 0] ThreadPool-1-Task-1 Start... 2 blocking queue size: 4 ThreadPool-1-Task-3 Start... 1 blocking queue size: 4 ThreadPool-1-Task-5 Start... 4 blocking queue size: 4 ThreadPool-1-Task-2 End... 0 ThreadPool-1-Task-1 End... 2 ThreadPool-1-Task-2 Start... 5 blocking queue size: 3 ThreadPool-1-Task-4 End... 3 ThreadPool-1-Task-4 Start... 7 blocking queue size: 1 ThreadPool-1-Task-5 End... 4 ThreadPool-1-Task-3 End... 1 ThreadPool-1-Task-5 Start... 8 blocking queue size: 0 ThreadPool-1-Task-1 Start... 6 blocking queue size: 2 ThreadPool-1-Task-2 End... 5 ThreadPool-1-Task-1 End... 6 ThreadPool-1-Task-4 End... 7 ThreadPool-1-Task-5 End... 8Copy the code
Analysis of the
Thread pool properties: core thread count 5, maximum thread count 5, blocking queue size 4.
The for loop commits 10 tasks. As in the above analysis, before executing the 10th loop, there are 5 core threads with a blocking queue full size of 4; Both if① and if② failed to submit the 10th task, and if③ failed to create a non-core thread (because the pool was full after the fifth for), at which point the saturation policy was implemented and the 10th task was abandoned.
There is no task 9 in the result.
Problem: 1000 concurrent threads, 10 machines, 4 cores per machine, design thread size
Link: baijiahao.baidu.com/s?id=168166…
Just looked at the general analysis, which has not looked at the design of the Tomcat thread pool, nor has it looked at load balancing…
Basic knowledge questions:
- What are the parameters of ThreadPoolExecutor?
- Various states?
- What are the common methods of ThreadPoolExecutor?
- Submit vs. execute
- The difference between Shutdown and shutdownNow
- How does a thread pool work?
Reference: The Beauty of Concurrent Programming in Java
<https://baijiahao.baidu.com/s?id=1681664088671801338&wfr=spider&for=pc>
Copy the code