Author: Tangyuan
Personal blog: Javalover.cc
preface
When creating a Thread, we used new Thread() directly;
This is not a problem in the short term, but once the service volume increases and the number of threads is too large, it may lead to OOM exceptions and CPU overcrowding
Fortunately, Java has the concept of thread pools, and the core framework of thread pools is our topic today, Executor
Let’s swim in the sea of Java thread pools
This section introduces the core concepts of thread pools using a bank business scenario, which is easy to understand
Introduction to the
Executor is the core framework for thread pools;
The following takes a look at some common thread pools: Closure-builder provides factory methods for creating all kinds of thread pools
// Thread pool with fixed capacity
Executor fixedThreadPool = Executors.newFixedThreadPool(5);
// A thread pool whose capacity increases or decreases dynamically
Executor cachedThreadPool = Executors.newCachedThreadPool();
// A thread pool for a single thread
Executor singleThreadExecutor = Executors.newSingleThreadExecutor();
// Scheduling based thread pool (unlike the above thread pool, this pool creates tasks that are not executed immediately, but are executed periodically or delayed)
Executor scheduledThreadPool = Executors.newScheduledThreadPool(5);
Copy the code
The differences in these thread pools are mainly the number of threads and the timing of the task execution
So let’s get started
If there is any problem with the article, you are welcome to criticize and correct it. Thank you
directory
- The underlying class of the thread pool
ThreadPoolExecutor
- Why doesn’t Ali recommend using Executors to create thread pools?
- The thread pool life cycle
ExecutorService
The body of the
1. The underlying class of the thread poolThreadPoolExecutor
The thread pools created at the beginning of this article have internal calls to the ThreadPoolExecutor class, as shown below
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
This class is an implementation of Exexutor, and the diagram is as follows:
-
Executors are used to create various thread pools
-
The interface ExecutorService is a subinterface of an Executor that extends the lifetime of the thread pool as well as tasks that can only be performed. (Described below)
So let’s start with the underlying class, whose full construction parameters are as follows:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
Copy the code
Before introducing these parameters, we can take an example of life – going to a bank; And then if you compare it, it will be clearer
(The green window in the picture means always open)
- CorePoolSize: the number of core threads that exist all the time (whether used or not); = “window 1 and window 2
- MaximumPoolSize: the maximum number of threads that can be created; = Windows 1, 2, 3, and 4
- KeepAliveTime: the time that extra threads (Max threads minus core threads) are idle; = “window 3 and window 4 idle time, if the keepAliveTime, no one to do business, then window 3 and window 4 will be temporarily closed
- WorkQueue: a workQueue. When all the core threads are executing tasks, incoming tasks are added to the workQueue. Chairs, customer waiting area
- ThreadFactory: a threadFactory used to create the initial core thread, described below;
- Handler: Reject policy. When all threads are executing tasks and the work queue is full, incoming tasks will be rejected (for example, discarded). The little man in the lower left corner
The basic workflow is as follows:
We’ll focus on work queues and rejection policies, and thread factories next
Work queue:
- ArrayBlockingQueue:
- Array block queue, this queue is a bounded queue, follow FIFO, tail insert, head fetch
- Capacity specifies the capacity of the queue during initialization
- In the scenario above, the number of chairs is the initial capacity
- LinkedBlockingQueue:
- The linked list blocks the queue, which is an unbounded queue, following the FIFO, tail insertion, head fetch
- The default capacity is integer. MAX_VALUE, which is basically unbounded. In this case, the queue can always be inserted (if the processing speed is less than the insertion speed, it may cause OOM).
- Analogously to the above scenario, the number of chairs is integer.max_value
- SynchronousQueue will:
- Synchronous queues, the special version of blocking queues, that is, blocking queues without capacity, come in and go out without stopping
- Analogy to the above scenario, is the number of chairs is 0, a person to the counter, if the counter is full, refused
- PriorityBlockingQueue
- The priority blocking queue, which is an unbounded queue, does not follow FIFO but executes according to the priority order of the tasks themselves
- The default value is 11 (if there is a capacity, it is unbounded). Because it expands when it adds elements.)
- Analogies to the above scenario, is the new can jump the queue for business, like various members
Rejection strategy:
- AbortPolicy (default) :
- Interruption strategy, throw an exception RejectedExecutionException;
- If reached the maximum number of threads, and the work queue is full, the extra in tasks, it throws RejectedExecutionException (system will stop running, but not quit)
- DiscardPolicy:
- Drop the strategy, drop the new assignment
- If the number of threads reaches the maximum and the work queue is full, then the task will be discarded.
- DiscardOldestPolicy:
- Discard the oldest policy, discard the task that entered the queue first (a bit cruel), and then perform the insert again
- If the number of threads reaches the maximum and the work queue is full, the task at the head of the queue is discarded and the task is inserted again
- CallerRunsPolicy:
- * If the main thread calls executors. Execute (task), the task will return to the main thread for execution. * If the main thread calls executors.
- If the number of threads reaches the maximum and the work queue is full, the task is returned directly to the calling thread for execution
2. Why doesn’t Ali recommend using Executors to create a thread pool?
The words are as follows:
We can write some code to test that out
Test FixedThreadPool as follows:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
// Create a thread pool with a fixed capacity of 10, with a core thread and a maximum thread count of 10
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1 _000_000; i++) {
try{
executorService.execute(()->{
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}); }catch(Exception e){ e.printStackTrace(); }}}}Copy the code
Here we need to make a few changes to the VM parameters to make the problem easier to reproduce
As shown below, we add -XMx8m -xMS8m to VM Option (-XMx8m: the maximum memory of the JVM heap is 8M, -xMS8m, the initial memory of the JVM heap is 8M) :
Click Run, you will find the following error:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.jalon.concurrent.chapter6.FixedThreadPoolDemo.main(FixedThreadPoolDemo.java:21)
Copy the code
Let’s look at why
- First, the task force used internally by newFixedThreadPool is listed as LinkedBlockingQueue, which is an unbounded queue (up to integer.max_value, which basically adds tasks all the time)
- If the task is inserted faster than the task is executed, the queue will definitely get longer and longer, resulting in OOM
CachedThreadPool is for a similar reason, except that it is because the maximum number of threads is integer.max_value;
So when the rate of task insertion exceeds the rate of task execution, the number of threads will increase and eventually result in OOM
So how do we create a thread pool?
You can customize it with ThreadPoolExecutor to limit the number of related threads by setting a boundary for both the maximum number of threads and the work queue, as shown below:
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(
1.// Number of core threads
1.// Maximum number of threads
60L.// Free time
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), // Array work queue, length 1
new ThreadPoolExecutor.DiscardPolicy()); // Reject policy: discard
for (int i = 0; i < 1 _000_000; i++) {
// From the printed information here, we can know that the loop is 3 times
// The reason is that the first task is executed in the core thread, the second task is put to the work queue, and the third task is rejected
System.out.println(i);
service.execute(()->{
// The exception is reported because the reject policy is executed (when the maximum number of threads is reached and the queue is full, the new task will be rejected)
// It is important to note that when an exception is thrown, the code does not exit, but is stuck in the exception, including the main thread (this is the default rejection policy).
// We can use another rejection policy, such as DiscardPolicy, and the code will continue to execute
System.out.println(Thread.currentThread().getName());
});
}
try {
Thread.sleep(1000);
System.out.println("Main thread sleep");
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
3. Lifecycle of the thread poolExecutorService
By default, the Executor interface has only one method void execute(Runnable command); Used to perform tasks
Once a mission is started, we can no longer intervene, such as stop, monitor, etc
That’s where the ExecutorService comes in, which is an Executor subinterface that extends it as follows:
public interface ExecutorService extends Executor {
void shutdown(a); // Gracefully close, the shutdown lasts for a period of time, waiting for submitted tasks to complete (submitted tasks will be rejected after shutdown)
List<Runnable> shutdownNow(a); // Crudely close, which immediately closes all executing tasks and returns the tasks waiting in the work queue
boolean isShutdown(a);
boolean isTerminated(a);
// Wait for the thread to execute
// If all threads have finished executing within timeout, return true;
// If timeout is not completed, return false;
// If the thread is interrupted within timeout, an interrupt exception is thrown
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
}
Copy the code
As you can see from the above, the thread pool life cycle is divided into three steps:
- Run: Run after creation
- Shutdown: Call shutdown to enter the shutdown state
- Terminated: All threads have finished
conclusion
- The underlying class of the thread pool
ThreadPoolExecutor
: The core concepts are the number of core threads, the maximum number of threads, the work queue, and the rejection policy - Why doesn’t Ali recommend using Executors to create thread pools? : Because it will result in OOM, the solution is to customize
ThreadPoolExecutor
To set boundaries for the maximum number of threads and work queues - The thread pool life cycle
ExecutorService
: Running state (entered after creation), closed state (entered after shutdown), terminated state (entered after execution of all threads)
Reference content:
- Java Concurrent Programming
- Real Java High Concurrency
- The disadvantages of newFixedThreadPool: my.oschina.net/langwanghua…
- B23.tv /ygGjTH
Afterword.
May the right person be the right person for you