General documentation: Article directory Github: github.com/black-ant

Thread pool this thing, use very simple, want to use well, not easy ~~

1. Introduction to thread pools

Element of the thread pool

A thread pool consists of two concepts, one is a task queue and the other is a worker thread.

  • A task queue is a blocking queue that holds tasks to be executed.
  • The worker thread body is a loop that accepts tasks from the queue and executes them.

Why use thread pools

  • Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing threads that have been created.
  • Improve response speed. When a task arrives, it can be executed without needing to wait until the thread is created.
  • Improved thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for uniform allocation, tuning, and monitoring.

3 Core concepts in thread pools

  • BlockingQueue workQueue: Queue used to hold tasks and hand them over to the worker thread
  • HashSet workers: All worker threads in the thread pool

4 Thread pool principle definition:

The thread pool uses an AtomicInteger called CTL to determine the running status, creates the thread using a ThreadFactory, and places the waiting thread in the workQueue, waiting to be handed over to the worker thread

Common thread pools

// Basic object
ThreadPoolExecutor 

// A thread pool with a fixed number of threads can be reused
FixedThreadPool 
	- ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
	
// Use a single worker thread Executor
SingleThreadExecutor 

// Thread pools are created for new threads as needed
CachedThreadPool 


Copy the code

Create a thread pool

Thread pool creation can be accomplished by using ThreadPoolExecutor and the utility class Executors

3.1 Implementation by Construction Method (Recommended)

In the case of a ThreadPoolExecutor, this gives the writer a clearer idea of how the thread pool is run

3.2 Implement via Executors of the Executor framework tool (personal demo can consider)

3.2.1 FixedThreadPool

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
Copy the code
  • This method returns a thread pool with a fixed number of threads. (corePoolSize == maximumPoolSize)
  • Use LinkedBlockingQuene as a blocking queue
  • Threads are not released when there are no executable tasks in the thread pool
  • The number of threads in this thread pool is always the same. When a new task is submitted and there are idle threads in the thread pool, it is executed immediately. If not, the new task is temporarily stored in a task queue, and the task in the task queue is processed when a thread is idle.

3.2.2 SingleThreadExecutor

return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

Copy the code
  • Method returns a pool of only one thread.
  • If more than one task is submitted to the thread pool, the task is stored in a task queue and executed on a first-in, first-out basis when the thread is idle.

3.2.3 CachedThreadPool:

return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
Copy the code
  • This method returns a pool of threads that can be adjusted according to the actual situation. (Default cache 60s, the number of threads in the thread pool can reach integer.max_value, i.e. 2147483647)
  • Internal uses SynchronousQueue as the blocking queue
  • The number of threads in the thread pool is uncertain, but if there are idle threads that can be reused, the reusable threads are preferred. If all threads are working and a new task is submitted, a new thread is created to handle the task.
  • All threads will return to the thread pool for reuse after the current task is completed.
  • Thread resources are automatically released when the idle time of the thread exceeds the keepAliveTime when no task is executed

3.2.4 ScheduledExecutorService:

return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); 
Copy the code
  • The initialized thread pool can be used to periodically execute the submitted tasks within a specified period of time. In actual business scenarios, the thread pool can be used to periodically synchronize data

4. Fork/Join

1The core of Fork/Join is the ForkJoinPool, which is used to manage worker threads: a worker thread can only execute one task at a time. Instead of creating a thread based on a task, the task is stored in a dual-ended queue with the worker thread2< span style = "max-width: 100%; clear: both3> Work stealing algorithm: Idle threads try to steal work from busy threads (their dual-enqueued)// Fork/Join depends on ForkJoinPool, which is briefly introduced here. See Chapter 16 for details
Copy the code

Five ThreadPoolExecutor.

ThreadPoolExecutor implements a producer/consumer pattern, where the worker thread is the consumer, the task submitter is the producer, and the thread pool maintains its own task queue. > ThreadPoolExecutor - AtomicInteger ctl =new AtomicInteger(ctlOf(RUNNING, 0)); This variable records the number of tasks in the thread pool and the state of the thread pool: high3Who said"Thread pool status"Low,29Who said"Number of tasks in the thread pool"
	  - RUNNING : 111: This thread pool can receive new tasks and process new tasks - SHUTDOWN:000: Cannot receive new tasks, but can process them. -stop:001: Do not add new tasks, do not process tasks, will interrupt the ongoing task - TIDYING:010: When all tasks have been terminated, CTL records"Number of Tasks"for0The thread pool is in a TIDYING state - when all tasks have terminated, which the CTL logs"Number of Tasks"for0The thread pool will change to the TIDYING state - TERMINATED:011: thread pool thoroughly terminated status -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- > - corePoolSize ThreadPoolExecutor parameters: -MaximumPoolSize: Specifies the maximum number of threads allowed in the pool. -KeepaliveTime: specifies the time during which a thread is idle. -Unit: Specifies the unit of keepAliveTime. -threadFactory: used to set the factory for creating threads -allowCoreThreadTimeout: Allows core threads to expire -handler: allows core threads to expire Processor defaultHandler: The task rejects the processorCopy the code

Saturation and dynamic adjustment of thread pools

// Thread pool saturation policy when the thread pool is full. Will pass the corresponding policy
1AbortPolicy: throw an exception directly, default policy;2CallerRunsPolicy: Executes the task with the caller's thread3"DiscardOldestPolicy" : discards the first blocking task in the queue and executes the current task.4, DiscardPolicy: directly discards the task;// ThreadPoolExecutor dynamically adjusts the size of a thread pool:• setCorePoolSize: Sets the core pool size. • setMaximumPoolSize: Sets the maximum number of threads that can be created in a thread pool. ThreadPoolExecutor assigns a value to a thread as the above parameters grow from small to large, and may immediately create a new thread to execute the task.// Adjust the source code core dynamically:
    

Copy the code

The process by which a thread pool executes tasks

When creating a task, no thread calls execute(). When adding a task:

  • If the number of running threads is less than the core parameter corePoolSize, continue creating threads to run the task

    • Otherwise, if the number of running threads is greater than or equal to corePoolSize, add the task to the blocking queue.
    • Otherwise, if the queue is full and the number of running threads is less than the core parameter maximumPoolSize, continue creating threads to run the task.
    • Otherwise, if the queue is full and the number of threads running at the same time is greater than or equal to maximumPoolSize, the rejection policy is set.


  • Complete a task and proceed to the next task.

    • A thread exits execution when no work continues, the thread is interrupted, or the pool is closed. If the pool is closed, the thread terminates.
    • Otherwise, determine whether the number of running threads in the thread pool is greater than the number of core threads, if so, the thread terminates, otherwise the thread blocks. Therefore, after all thread pool tasks are executed, the remaining thread pool size is corePoolSize.

What is the difference between the submit and execute methods in the thread pool

Both methods can submit tasks to a thread pool.

  • #execute(…) Method with return type void, which is defined in the Executor interface and must implement the Runnable interface.
  • #submit(…) Method, which returns a Future object that holds the results of a calculation, is defined in the ExecutorService interface, which extends the Executor interface, Other types of thread pool as ThreadPoolExecutor and ScheduledThreadPoolExecutor has these methods.

9. What happens if the thread pool queue is full when you submit a task

The key is whether the queue of the thread pool is bounded or unbounded.

> < span style = “box-sizing: border-box; line-height: 21px; font-size: 13px; white-space: inherit! Important;”

> If you use a bounded queue like ArrayBlockingQueue, tasks will first be added to ArrayBlockingQueue, ArrayBlockingQueue is full, If a task is full, the RejectedExecutionHandler policy is used. The default is AbortPolicy.

10. The underlying logic of thread pools

10.1 Basic Code

1Several thread pools are implemented based on Executors class2 
    
// Problem 1: thread wrapper in ThreadPoolExecutor- The thread is encapsulated as an object Worker - an infinite loop that gets the task and executes it by calling runWorker(Worker W) - if something goes wrong with the task's run, call processWorkerExit() to handle C- ThreadPoolExecutor PVC- Worker M- run :public void run(a) {runWorker(this); }

            
// Issue 2: addWork function- check if new workers can be added based on the current pool state and given boundaries (core or maximum) - create and start the new worker, running firstTask as its firstTask// Issue 3: Thread pool running Work details
1 addWorker(Runnable firstTask, booleanCore) : You can see that the addWorker adds a Runnable2 newWorker(firstTask) : Creates a Worker object if the status matches. Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
    	this.firstTask = firstTask;
		this.thread = getThreadFactory().newThread(this);
	} 
 3 finalThread t = w.thread; ? - Thread is taken out here4The next section will be run by T.start ()// Problem 4: Create factoryFrom above, the thread pool creates a thread (newThread()) using a ThreadFactory,// Question 5: the starting point of operationThe excutor method in ThreadPoolExecutor is the starting point for execution, which takes place in three ways1When the thread pool is full, addWorker is run directly2When the thread pool is full and running, add threads to the workQueue3When all else fails, reject is called to handle the exception (RejectedExecutionHandler).// Issue 6: How thread pools reuse threadsThe main methods involved in this question include getTask () M-runWorker
     while (task ! =null || (task = getTask())!= null- getTask() workQueue. Poll (keepAliveTime, timeunit.nanoseconds) : workQueue.take();// Problem 7: close the thread pool
1CheckShutdownAccess Checks whether it can be disabled2 RunState 改为 STOP
3 ReentrantLock and isInterrupted
4DrainQueue: remove queue// Issue 8: thread pool formula> Computing intensive :Ncpu +1> Tasks that contain I/O and other blocking operations: Nthreads =Ncpu x Ucpu x (1 + W/C)Ncpu= Number of cpus Ucpu = Target CPU usage,0 <= Ucpu <= 1W/C = ratio of waiting time to computing time > IO intensive = 2NcpuCopy the code

10.2 Underlying complex analysis


// Question 1: How does CTL play?
> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ? The thread pool uses CTLS to determine the thread state3Who said"Thread pool status"Low,29Bit indicates the number of tasks in the thread pool -? - For example, in the STOP state, its decimal value is536870912- First, we convert it to binary ->10000 00000 00000 00000 00000 00000- Get the following29And then fill out the front, and then the last three are going to be001? - And TIDYING is00001 00000 00000 00000 00000 00000 00000 -> 010? - RUNNING to -1According to the few remaining bits of knowledge, it says here111Is it because negative numbers are represented in complement? - Binary processing is known to be the most efficient, so it makes sense// Issue 2: Submit callback
- <T> Future<T> submit(Callable<T> task)? - Submit obviously returns a Future, which means that the main thread can block waiting for RunnableFuture<T> fTask= newTaskFor(task);
        

// Problem 3: the thread pool handles the logic of the task
1	        int c = ctl.get();
2	        if (workerCountOf(c) < corePoolSize) {
3	            if (addWorker(command, true))
4	                return;
5	            c = ctl.get();
6	        }
7	        if (isRunning(c) && workQueue.offer(command)) {
8	            int recheck = ctl.get();
9	            if (! isRunning(recheck) && remove(command))
10	                reject(command);
11	            else if (workerCountOf(recheck) == 0)
12	                addWorker(null.false);
13	        }
14	        else if(! addWorker(command,false))
15	            reject(command);
     
// 2: workerCountOf determines whether the current number of threads is smaller than the corePoolSize to determine whether to create threads using addWorker
// 7: If the thread pool is full and the status is running, try to add the task to the workQueue
// 14: If step 7 fails, try addWorker. If step 7 fails, reject


// Question 4: Look at the rejection strategyWhen your thread pool is full, usually the anomaly is reported to the Java. Util. Concurrent. RejectedExecutionException: Task java.util.concurrent.FutureTask@523424b5 rejected from java.util.concurrent.ThreadPoolExecutor@319dead1 [Running, pool size =6, active threads = 6, queued tasks = 3, completed tasks = 0There are many different rejection strategies that we can use, but none of them are reliable for high concurrency, so let's first look at how this rejection strategy comes from1Workqueue.offer (command) from line 7 of the code in Problem 3, we can see that the queue has failed to offer, which means the queue is full214Line, again run directly through addWork, failed3Perform the reject method, handler. RejectedExecution (command,this); ? - Handler is an interface that has four implementation classes. See the above denial policy for details4For example, AbortPolicythrow newRejectedExecutionException CallerRunsPolicy is run again (the main thread running slowly, must slow) - so part of the business, we are going to change, how to change?1You can customize your rejection policy in Spring. See this usage article at @https://blog.csdn.net/tanglei6636/article/details/90721801
    2The ThreadPoolExecutor constructor has the same idea. The first is that the cluster is already unresolvable (which is basically what the cluster can do at this point) and you can't throttle1Put on a message queue2Put in storage3Write disc4Put the collection, a single thread, take one with one// workerCountOf(c) is used to compare the current thread CAPACITY.
1 > public static final int SIZE = 32; ? - In binary complement formintThe value of the digit2 >  private static final int COUNT_BITS = Integer.SIZE - 3;
3 >  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
4 >  private static int workerCountOf(int c)  { return c & CAPACITY; }

// The first reason is that the number of threads in the lower 29 bits is 0

Copy the code

Thread pool usage


// Construct a thread pool. Recommended
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5.6.3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));


// Except for testing, try to avoid building thread pools using the following methods
// Thread creation
// 1 CachedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {}
  
// 2 FixedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {}

// 3 SingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {}

// 4 SingleThreadScheduledExecutor
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(() -> {}
                                
                                
// Close the thread pool
executor.shutdown();
executor.shutdownNow();
                                
// Obtain informationExecutor.isterminated () : Whether to turn off executor.getPoolSize() executor.getQueue().size()Copy the code

12. Idea of thread pools

// There are some rules and recommendations for using thread pools:- If you have created a thread or thread pool, specify a meaningful thread name. - Thread pool does not allow use of Executors to create. Instead, use the ThreadPoolExecutor method - and it is not recommended to create unbounded threads, avoiding oom-must recycle custom ThreadLocal variables, especially in thread pool scenarios// 注意点 : Note that when the thread pool is full, the system may crash due to this policy. - newCachedThreadPool may also appear in OOM. The maximum value is newCachedThreadPool -Copy the code