- “Crime and Punishment” of Executors
- Executors of the source
- Let your JVM OutOfMemoryError
- Alibaba Java Development Manual
- lead
- The structure of the ThreadPoolExecutor
- Description of the ThreadPoolExecutor construction parameters
- When a task is added to the thread pool
- The use of ThreadPoolExecutor
- ThreadPoolExample3
- The execution result
- conclusion
- Links
- The author resources
- The related resources
“Crime and Punishment” of Executors
Back in the Java Concurrent Thread Pool series (1), we covered the following methods for creating a thread pool using the JDK Concurrent package/Factory and tool Executors:
// Create a pool with a fixed number of threads
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
// Create a thread pool that creates new threads as needed, but reuses previously created threads if they are available
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// Create a single thread pool
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Copy the code
Admittedly, this approach to creating a thread pool is very simple and convenient. But carefully read the source code, but scared me a: this is to Lao Tzu’s life ah!
As we mentioned earlier, if new requests come in, new threads are created in the thread pool to handle these tasks until the maximum Size of the thread pool is created. Tasks that exceed the maximum capacity of the thread pool are placed in a Blocking Queue until a thread resource is released. You should know that the blocking queue also has the maximum capacity, redundant queue maximum capacity requests not only do not get the opportunity to execute, even queue eligibility is not!
What about Tasks that are not even eligible to queue? Handler Policies for Rejected Task (ThreadPoolExecutor)
If you’re wondering, I usually don’t specify the above while using Executors. Yes, because Executors are so “smart” to do it for us.
Executors of the source
* / * newFixedThreadPool and newSingleThreadExecutor methods * / newFixedThreadPool and newSingleThreadExecutor methods
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copy the code
As with any ExecutorService, ThreadPoolExecutor is used to create executorServices. The new LinkedBlockingQueue
() parameter is described below.
LinkedBlockingQueue is the queue that blocks when the number of tasks is greater than the number of threads in the thread pool. This uses the no-parameter constructor. Let’s look at the constructor again:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue(a) {
this(Integer.MAX_VALUE);
}
Copy the code
We see that the default size of the blocking queue is integer.max_value!
If you don’t have control, you can “Out of Memory” by putting tasks into a blocking queue.
Even better, the newCachedThreadPool method:
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Copy the code
By default, the maximum number of threads is also integer. MAX_VALUE, which means that if a new task comes in before the previous task is completed, new threads will continue to be created until integer. MAX_VALUE is created.
Let your JVM OutOfMemoryError
The following provides an example of using newCachedThreadPool to create a large number of threads to process Tasks, resulting in outofMemoryErrors.
Friendly reminder: the scene is too bloody, do not use in the production environment.
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample2 {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
private static class Task implements Runnable {
@Override
public void run(a) {
try {
Thread.sleep(1000 * 600);
} catch(InterruptedException e) { e.printStackTrace(); }}}private static void newCachedThreadPoolTesterBadly(a) {
System.out.println("begin............");
for (int i = 0; i <= Integer.MAX_VALUE; i++) {
executorService.execute(new Task());
}
System.out.println("end.");
}
public static void main(String[] args) { newCachedThreadPoolTesterBadly(); }}Copy the code
When the main method is started, open the control panel and see that the CPU and memory are almost exhausted:
Raise the Java console soon. Lang. OutOfMemoryError:
begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)
Copy the code
Alibaba Java Development Manual
The following we look at the Java development manual this provision, should understand the author’s good intentions.
【 Mandatory 】 Thread pools cannot be created by Executors. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion. 1) FixedThreadPool and SingleThreadPool: The allowed request queue length is Integer.MAX_VALUE, which may accumulate a large number of requests and result in OOM. 2) CachedThreadPool and ScheduledThreadPool: the number of threads allowed to create is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.
lead
The key to avoiding the risk of OutOfMemoryError is hidden under Executors’ source file: Use ThreadPoolExecutor yourself.
The structure of the ThreadPoolExecutor
Constructing a ThreadPoolExecutor takes a lot of arguments. Here is the constructor for ThreadPoolExecutor.
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
The following describes the specific meanings of these parameters.
Description of the ThreadPoolExecutor construction parameters
JavaDoc: JavaDoc: JavaDoc: JavaDoc: JavaDoc: JavaDoc: JavaDoc
- corePoolSize
Number of core threads in the thread pool.
By default, core threads are always alive and are not subject to keepAliveTime limits even when idle, unless allowCoreThreadTimeOut is set to true.
- maximumPoolSize
The maximum number of threads that a thread pool can hold. Threads that exceed maximumPoolSize will be blocked.
The maximum number of threads maximumPoolSize cannot be smaller than corePoolSize
- keepAliveTime
Idle timeout for non-core threads.
After this time, non-core threads are reclaimed.
- TimeUnit
KeepAliveTime TimeUnit, for example, timeunit.seconds.
This applies to corePoolSize when allowCoreThreadTimeOut is true.
- workQueue
A task queue in a thread pool.
Tasks that do not receive thread resources are placed in the workQueue and wait for thread resources to be released. If the number of tasks added to the workQueue exceeds the capacity of the workQueue, the RejectedExecutionHandler reject policy will be used to process the tasks.
There are three kinds of commonly used queue: SynchronousQueue will, LinkedBlockingDeque, ArrayBlockingQueue.
- threadFactory
A thread factory that provides the ability to create new threads.
ThreadFactory is an interface that has only one newThread method:
Thread newThread(Runnable r);
Copy the code
- rejectedExecutionHandler
A handler for a task that cannot be processed by a thread pool.
This is usually because the number of tasks exceeds the capacity of the workQueue.
When a task is added to the thread pool
To summarize, when a task is added to the thread pool via the execute(Runnable) method:
-
If the number of threads in the thread pool is less than corePoolSize at this point, new threads are created to handle the added tasks even if all threads in the thread pool are idle.
-
If the number of threads in the thread pool is equal to corePoolSize, but the buffer queue workQueue is not full, then the task is put into the buffer queue.
-
If the number of threads in the pool is greater than corePoolSize, the buffer workQueue is full, and the number of threads in the pool is less than maximumPoolSize, a new thread is created to handle the added task.
-
If the number of threads in the pool is greater than corePoolSize, the buffer queue workQueue is full, and the number of threads in the pool is equal to maximumPoolSize, the task is processed using the rejection policy specified by the handler.
The priorities of processing tasks are as follows: corePoolSize > workQueue capacity > maximumPoolSize. If all three are full, use rejectedExecutionHandler to process the rejected task.
The use of ThreadPoolExecutor
Here is a simple example of using a thread pool constructed by ThreadPoolExecutor to perform tasks.
ThreadPoolExample3
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/ * * *@author ijiangtao.net
*/
public class ThreadPoolExample3 {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private static class Task implements Runnable {
@Override
public void run(a) {
try {
Thread.currentThread().sleep(2000);
System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
} catch(InterruptedException e) { e.printStackTrace(); }}}private static class MyThreadFactory implements ThreadFactory {
private final String namePrefix;
public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable runnable) {
return new Thread(runnable, namePrefix + "-"+ threadNumber.getAndIncrement()); }}private static final ExecutorService executorService = new ThreadPoolExecutor(
10.20.30, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(50),
new MyThreadFactory("MyThreadFromPool"),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// creates five tasks
Task r1 = new Task();
Task r2 = new Task();
Task r3 = new Task();
Task r4 = new Task();
Task r5 = new Task();
The submit method returns a value
Future future = executorService.submit(r1);
System.out.println("r1 isDone ? " + future.isDone());
The execute method returns no value
executorService.execute(r2);
executorService.execute(r3);
executorService.execute(r4);
executorService.execute(r5);
// Close the thread poolexecutorService.shutdown(); }}Copy the code
The execution result
r1 isDone ? falseMyThreadFromPool - 2-21:04:03. 215 MyThreadFromPool - 5-21:04:03. 4-21:04:03 MyThreadFromPool - 215. 215 MyThreadFromPool - 3-21:04:03. 215-21:04:03 MyThreadFromPool - 1. 215Copy the code
As a result, five threads were pulled from the thread pool and five tasks were executed concurrently.
conclusion
This chapter introduces a safer and more customized way to build thread pools: ThreadPoolExecutor. Don’t use Executors to construct thread pools.
We’ll cover more ways to implement thread pools later (for example, using the Google core library Guava), as well as more knowledge and practice on thread pools.
Links
The author resources
-
Java Concurrent thread pool series (1) a thread pool that makes multithreading no longer a problem
-
Introduction and basic usage of Future in the Java Concurrent Future series
-
Java Concurrent Thread Communication series Wait and Notify
The related resources
-
Concurrent-ThreadPool-threadpoolexecutor
-
Concurrent-ThreadPool-ThreadPoolExecutorJavaDoc
-
Concurrent-ThreadPool-java-thread-pool-executor-example