Jane book Jiang Yi Jonny, reprint please indicate the original source, thank you!
This article is based on JDK1.7 source code analysis and interpretation. We will start this discussion with a simple case study, and at the end of the article, the author will give some lessons to avoid the pitfalls of latecomers.
If you like me, you can follow my public number ~ more dry goods ~
ThreadPoolExecutor is a thread pool tool provided by JUC, and it is the most widely used concurrency framework in the Java language. It can be said that Java thread pools can be used for almost anything that requires asynchronous or concurrent execution. First, let’s compare the solution to the problem with the ThreadPoolExecutor solution.
Case study: the scribe
In the Middle Ages, there were jobs called Scribe s, or Scribe s. Their duties were like copying a copy machine from book to book. Let’s say there’s a scribes’ studio at this time, and there’s only 2 scribes, and they have to copy 10 books.
In this example, we write thread management ourselves and ThreadPoolExecutor does thread management
public static class Book {
private static AtomicInteger id = new AtomicInteger(0); // Title generator
private String bookName; / / title
public void copy(a) { // Copy books
System.out.println("start copy " + bookName);
try {
Thread.sleep(100L); // sleep 100ms
} catch (Exception e) {
// ignore
}
System.out.println("end copy " + bookName);
}
public Book(a) {
bookName = "book-" + String.valueOf(id.incrementAndGet()); // The title is automatically generated}}Copy the code
Implement thread management yourself
Final BlockingQueue<Book> books = new BlocdBlockingdeque <Book>(10);for (int i = 0; i < 10; i++) {
try {
books.put(new Book());
} catch (Exception e) {
// ignore
}
}
System.out.println("start work..."); // Create two book scribes Thread Thread[] scribes = new Thread[2];for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex] = new Thread(new Runnable() {
public void run() {
for(; ;) {if (Thread.currentThread().isInterrupted()) {
System.out.println("time arrives, stop writing...");
}
try {
Book currentBook = books.poll(5, TimeUnit.SECONDS);
currentBook.copy();
} catch (Exception e) {
System.out.println("time arrives, stop writing...");
return; }}}}); scribes[scribeIndex].setDaemon(false); // set to non-daemon thread scribes[scribeIndex].start(); } // try {thread.sleep (10000L); } catch (Exception e) {// ignore} // When time is up, remind the two scribes to stop copyingfor (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex].interrupt();
}
System.out.println("end work...");
Copy the code
Having written a lot of code to do this, let’s take a look at what ThreadPoolExecutor does.
System.out.println("start work...");
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i ++) {
executorService.submit(new Runnable() {
public void run(a) {
newBook().copy(); }}); }// The work has been arranged, so just wait
try {
Thread.sleep(10000l);
} catch (Exception e) {
// ignore
}
executorService.shutdownNow();
System.out.println("end work...");
Copy the code
The whole process is very clear: task writing, thread creation, thread start, thread termination.
But many times, the problem is not limited to the above.
Developer dilemma
The earliest developers of concurrent programming had to do a lot of things themselves, and by using Java thread pools, they could do the following:
1) Thread management, including thread creation, start and destruction;
2) Thread reuse, thread creation will bring a certain cost to the server, how to reduce the cost of frequent repeated thread creation;
3) Elastic scaling. The server usually has peak and low peak periods. Whether the thread pool can be flexibly scaled, for example, whether the thread pool can be recycled after it is not used for a long time to reduce the waste of system resources, or whether the capacity of the thread pool can be increased at any time;
4) Rejection strategy: the number of threads is limited and many tasks need to be processed. Whether to reject or block the tasks beyond the scope of the system;
5) Exception handling. The thread may encounter exceptions or errors during execution, and how the developer should properly deal with these exceptions or errors;
6) Assignment of tasks, whether the assignment of tasks is based on first-in, first-out or some kind of priority strategy.
Let’s take a look at Doug Lea’s ThreadPoolExecutor thread pool framework and see how it solves these problems.
Source code for ThreadPoolExecutor
First, there are some important concepts about ThreadPoolExecutor that need to be introduced before we get to the source code
The life cycle
In the ThreadPoolExecutor thread pool design, the entire task execution framework thread pool is divided into five life cycles:
RUNNING: Allows to receive new tasks and process tasks in the queue
SHUTDOWN: no new tasks are received. Only tasks in the queue are digested
STOP: not only does it STOP receiving new tasks, but it stops processing the tasks in the queue and tries to interrupt the thread that is executing the task
TIDYING: All tasks terminated, workCount is set to 0, the thread state is set to TIDYING and terminated hook function terminated().
TERMINATED: The hook function TERMINATED () is TERMINATED
The transformation diagram of each life cycle is as follows:
As you can see from the diagram, the changes throughout the life cycle are irreversible.
Status word
ThreadPoolExecutor packages thread pool state and thread pool capacity into an int variable, as shown below
Thread pool state bits
state | High value enumeration | Plus or minus sex |
---|---|---|
RUNNING | 111 | Negative number (-536870912) |
SHUTDOWN | 000 | 0 |
STOP | 001 | Positive number (536870912) |
TIDYING | 010 | Positive (1073741824) |
TERMINATED | 011 | Positive number (1610612736) |
The state data are TERMINATED > TIDYING > STOP >SHUTDOWN > RUNNING
The code in ThreadPoolExecutor looks like this:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// The high bits of the status word hold thread pool status information
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Package/extract the status word information
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// Determine whether the current thread pool is executing
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
Copy the code
Thread pools mainly execute processes
The code is introduced
First, we create a thread pool.
1. Create a thread pool
ExecutorService executorService = Executors.newFixedThreadPool(2);
Copy the code
Use the factory method provided by Executors to create the following four types of thread pools:
NewFixedThreadPool. This method will be used to create a fixed size thread pool (corePoolSize = maxPoolSize at this point), one thread pool will be created for each task submitted until the maximum number of thread pools is reached, and the size of the thread pool will not change after that;
NewCachedThreadPool. This method creates a cacheable thread pool (corePoolSize = 0, maxPoolSize = integer.max_value) where idle threads are automatically reclaimed after 60 seconds. The risk with this thread pool is that if the server application reaches peak request times, New threads are created until memory runs out.
NewSingleThreadExecutor. This method creates a single-threaded thread pool that executes sequentially in order of tasks in the queue (FIFO, LIFO, priority);
NewScheduledThreadPool. This method creates a pool of fixed-length threads that can execute tasks in a deferred or timed manner.
2. Task submission
The general logic of task submission is as follows:
1) When the thread pool is smaller than corePoolSize, a new submitted task will create a new thread to execute the task, even if there are idle threads in the thread pool;
2) When the thread pool reaches the corePoolSize, the new submitted task will be put into the workQueue and wait for the task scheduling in the thread pool to execute;
3) If the workQueue is full and maximumPoolSize > corePoolSize, a new thread will be created to execute the newly submitted task.
4) If the number of submitted tasks exceeds maximumPoolSize, RejectedExecutionHandler will handle the new submitted task.
5) When there are more corePoolSize threads in the thread pool and the idle time reaches keepAliveTime, the idle thread is closed;
So how does the source code implement the above description
After the thread pool is created successfully, we submit the task to the thread pool:
executorService.submit(new Runnable() {
public void run(a) {
newBook().copy(); }});Copy the code
Submit to thread pool:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);// Wrap a new task
execute(ftask); // Thread pool entry
return ftask;
}
Copy the code
You can see that the entry method for ThreadPoolExecutor is execute(Runnable commad). The execution logic of this method is as follows:
int c = ctl.get();
// 1. If the number of threads in the current thread pool is less than the number of core threads, add new threads to the thread pool.
// And the new thread executes the task just submitted
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2. The thread pool was closed at the same time that the new thread was created.
// If the thread pool is already closed, roll back the task that was added
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// 3. If the number of core threads (corePoolSize) is full and the task queue is full,
// Try to increase the thread size to maximumPoolSize, if it still fails, execute the reject policy
else if(! addWorker(command,false))
reject(command);
Copy the code
Ctl.get (), workerCountOf(), and isRunning() are all used to read and write status words.
Next, let’s look at what addWorker does:
private boolean addWorker(Runnable firstTask, boolean core) {
// This part of the omitted code is to modify the status word, before adding and creating threads,
// Need to increment the work count (thread-safe operation).boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try{... w =new Worker(firstTask); // This encapsulates a new Work class, which we'll cover later
final Thread t = w.thread;
if(t ! =null) {...// Get the state of the thread pool. If the thread pool is already closed, no new threads will be created
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
ints = workers.size(); . workerAdded =true; .if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
// If the task fails to start or commit to the thread pool,
// Perform a rollback (remove the failed worker from the worker pool and reduce the count of tasks in the status word)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
3. Task execution
Tasks are executed in the Worker class, which is a class that inherits the Runnable interface.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{...public void run(a) {
runWorker(this); }... }Copy the code
You can see that the external runWorker() method is called in the Worker class. As a result, you can see that the main logic for task execution is performed in the external runWorker() method
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null; .boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) { // Loop the read task.try {
beforeExecute(wt, task); // User implemented callback method before the task starts
Throwable thrown = null;
try {
task.run();// Task execution
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // User implemented callback method after task execution}}finally {
task = null; w.completedTasks++; . } } completedAbruptly =false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
BeforeExecute and afterExecute are hook methods that specify actions to be performed when a thread starts execution and after it completes execution, which the developer needs to implement.
Also note the getTask() method called within the runWorker method, which returns null to terminate the worker thread’s execution loop if: 3) The current thread count is greater than corePoolSize and smaller than maxPoolSize, and the timeout time for fetching data from BlockingQueue is exceeded (default: 60 seconds).
The code implementation is as follows:
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Verify the current thread pool state
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// If the thread does not obtain the task within the specified time (60 seconds by default), a thread is about to expire
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
4. Task rejection
If a thread is committed to a thread pool, one of the following conditions occurs in the current thread pool: If the thread pool is shutdown (shutdown or shutdownNow), the preset callback policy will be invoked. ThreadPoolExecutor provides four types of callback policies:
1) AbortPolicy (suspended) : this strategy will directly thrown RejectedExecutionException abnormalities, the caller will get the exception;
2) DiscardPolicy: With this policy, the thread pool will quietly discard the task without the caller knowing;
3) CallerRunsPolicy (caller run) : this policy neither abandons the task nor throws an exception, but returns the task to the caller, thus reducing the traffic of the new task;
DiscardOldestPolicy: This policy will discarding the next task whose turn it is to execute. DiscardOldestPolicy results in discarding the highest priority task. Therefore, it is best not to use the DiscardOldestPolicy together with the priority queue. Here, the code implementation we will only show the **CallerRunsPolicy (caller run) ** policy:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy(a) {}// Policy implementation
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { r.run(); }}}Copy the code
Developers can also choose to define their own saturation strategy based on business needs.
5. Thread pool destruction
ThreadPoolExecutor provides two methods for destroying thread pools: shutdown() and shutdownNow()
The shutdown() method simply sets the state of the thread pool to shutdown and rejects all subsequent attempts to submit requests, but tasks already in the queue will still be consumed as normal.
The shutdownNow() method is much simpler. It forcibly shuts down ExecutorService, tries to cancel tasks that are being executed, and returns all tasks that have been committed but not yet started, which the developer can log for later processing. In addition, the attempt to cancel the executing task is only an attempt to interrupt the executing thread, and the specific thread response to interrupt policy needs to be written by the user. The code implementation is as follows:
public List<Runnable> shutdownNow(a) {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Copy the code
Tread carefully: thread pool experience talk
Do not use ThreadLocal
Do not use ThreadLocal in a ThreadPoolExecutor thread pool, where threads are reused and therefore shared by multiple tasks, thus potentially polluting dirty data. It needs to be used with care
Set the value of corePoolSize appropriately
Take a piece of code as an example:
LinkedBlockingQueue (10 threads
private static final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private static final ExecutorService service = new ThreadPoolExecutor(0.10.60L, TimeUnit.SECONDS, queue
);
Copy the code
If the number of tasks exceeds 10, maximumPoolSize threads will be created first and the rest will be queued for execution.
In the implementation of ThreadPoolExecutor, when the workQueue is full and maximumPoolSize>corePoolSize, the new submitted task creates a new thread to execute the task.
Therefore, the queue will never be full, so no maximumPoolSize threads will ever be created, which means that our task will always be running on one thread, not the desired number of threads.
Thread the interrupt
Although ThreadPoolExecutor provides the shutdownNow() method, which attempts to interrupt all threads after calling it, this interrupt is not a guarantee that the thread will terminate, so it is up to the developer to implement a thread interrupt strategy. We’ve covered all of this In Section 7.1.2 of Doug Lea’s Java Concurrency In Practice, but I won’t repeat it here.
Finalize function
In particular, ThreadPoolExecutor has a Finalize function, which is implemented as follows:
protected void finalize(a) {
shutdown();
}
Copy the code
The shutdown() function is called in this method, so don’t let the thread pool go out of your code’s scope if you don’t really want to stop it.
I’m Jonny Jiang yi.