The introduction
In daily development work, thread pool often carries the most important business logic in an application, so we need to pay more attention to the execution of thread pool, including exception handling and analysis. This article focuses on how to use thread pools correctly and provides some practical advice. This article will cover a bit of thread pool implementation principles, but will not expand too much. There are many articles on the principle of thread pool and source code analysis on the network, interested students can consult by themselves.
Exception handling for thread pools
UncaughtExceptionHandler
We all know that the run method in the Runnable interface is not allowed to throw exceptions, so the main thread that spawned the thread may not be able to directly obtain the exception information during execution. The following cases:
public static void main(String[] args) throws Exception { Thread thread = new Thread(() -> { Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); System.out.println(1 / 0); // This line will cause an error! }); thread.setUncaughtExceptionHandler((t, e) -> { e.printStackTrace(); // If you comment out this line, the program will not throw any exceptions. thread.start(); }Copy the code
Why is that? If the current Thread does not have UncaughtExceptionHandler, the ThreadGroup will fetch the UncaughtExceptionHandler from the ThreadGroup. ** Each thread has its own ThreadGroup, even if you don’t specify it, and it implements UncaughtExceptionHandler.
public void uncaughtException(Thread t, Throwable e) { if (parent ! = null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh ! = null) { ueh.uncaughtException(t, e); } else if (! (e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); }}}Copy the code
The ThreadGroup if a father ThreadGroup, call father ThreadGroup uncaughtException, or call the global default Thread. The DefaultUncaughtExceptionHandler, If the global handler is not set, we simply locate the exception to System.err. This is why we should implement the UncaughtExceptionHandler interface when creating the thread to make troubleshooting easier.
Submit tasks to the thread pool using execute
Getting back to thread pools, if we submit a task to a thread pool without a try for an exception… Catch handler, and an exception occurs when it runs, what does that do to the thread pool? The answer is no, the thread pool still works, but the exception is swallowed. This is usually not a good thing because we need to get the original exception object to analyze the problem.
So how do I get the original exception object? Let’s start with the source code for thread pools. Of course, there are many online source code analysis articles about thread pool, here is limited to space, directly give the most relevant part of the code:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code
This method is the code that actually executes the task submitted to the thread pool. Let’s skip the irrelevant logic here and focus on lines 19 through 32, where line 23 actually starts executing the task submitted to the thread pool, so what does line 20 do? This means that you can do some pre-work before executing the task submitted to the thread pool, and again, as we see in line 31, you can do some post-work after executing the submitted task. BeforeExecute we’ll leave this out for the moment and focus on afterExecute. As you can see, afterExecute is submitted whenever any type of exception is thrown during the execution of a task. However, looking at the thread pool source code, we can see that afterExecute is an empty implementation by default, so, It is necessary to inherit from ThreadPoolExecutor to implement the afterExecute method. AfterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute)
* <pre> {@code * class ExtendedExecutor extends ThreadPoolExecutor { * // ... * protected void afterExecute(Runnable r, Throwable t) { * super.afterExecute(r, t); * if (t == null && r instanceof Future<? >) { * try { * Object result = ((Future<? >) r).get(); * } catch (CancellationException ce) { * t = ce; * } catch (ExecutionException ee) { * t = ee.getCause(); * } catch (InterruptedException ie) { * Thread.currentThread().interrupt(); // ignore/reset * } * } * if (t ! = null) * System.out.println(t); * } * }}</pre>Copy the code
In this way, exceptions that might otherwise have been swallowed by the thread pool can be successfully caught, making it easy to troubleshoot problems.
But there is a small problem here. We notice that in the runWorker method, task.run() is executed; After the statement, all types of exceptions are thrown. Where do those exceptions go? In fact the exception object here will ultimately be passed to the Thread dispatchUncaughtException method, the source code is as follows:
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
Copy the code
As you can see, it will fetch the implementation class of UncaughtExceptionHandler and then call the uncaughtException method. This goes back to the logic of the implementation of UncaughtExceptionHandler as we discussed in the previous section. AfterExecute (UncaughtExceptionHandler) {afterExecute (exceptionHandler) {afterExecute (exceptionHandler);
Submit the task to the thread pool via Submit
This is also very simple, let’s return to the source of the submit method:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Copy the code
The execute method invokes the Execute method of ThreadPoolExecutor, which performs the same logic as submitting tasks to the thread pool through execute. Let’s focus on the newTaskFor method here, which has the following source code:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
Copy the code
You can see that the submitted Callable object is wrapped in FutureTask. We know that the runWorker method will be executed, and the core execution logic is task.run(); This line of code. We know that the task here is of type FutureTask, so we need to look at the implementation of the Run method in FutureTask:
public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
You can see that the most critical exception code is on line 17, setException(ex); This place. Let’s look at the implementation of this place:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}Copy the code
The most critical point here is that the exception object is assigned to the outcome, which is a member variable in FutureTask. We call the Submit method, get the Future object, and then call its GET method, among which the most core method is the report method. The source code for each method is given below:
The first is the get method:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
Copy the code
You can see that the report method is finally called, and the source code is as follows:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Copy the code
The above are some state judgments. If the current task is not properly executed or cancelled, then the x here is the original exception object, as you can see, wrapped by ExecutionException. So when you call get, you might throw an ExecutionException, and then call its getCause method to get the original exception object.
To sum up, there are two main approaches to the problem that tasks submitted to the thread pool may throw exceptions:
- Try the submitted task yourself… Catch, but the downside is that if you are submitting multiple types of tasks to a thread pool, each type of task will need to make its own try… Catch, more cumbersome. And if you just
catch(Exception e)
You may still miss some exceptions that include errors, so just to be on the safe side, consider thiscatch(Throwable t)
. - AfterExecute or UncaughtExceptionHandler interface for Thread pools.
Here is an example of how I created a thread pool for your reference:
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, 60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder() .setThreadFactory(new ThreadFactory() { private int count = 0; private String prefix = "StatisticsTask"; @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + "-" + count++); } }).setUncaughtExceptionHandler((t, e) -> { String threadName = t.getName(); logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e); }).build(), (r, executor) -> { if (! executor.isShutdown()) { logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! "); Uninterruptibles.putUninterruptibly(executor.getQueue(), r); } }) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<? >) { try { Future<? > future = (Future<? >) r; future.get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t ! = null) { logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t); }}}; statisticsThreadPool.prestartAllCoreThreads();Copy the code
Thread count setting
We know that tasks generally come in two types: CPU intensive and IO intensive. Therefore, in the face of CPU-intensive tasks, the number of threads should not be too much. Generally, the number of CPU cores +1 or 2 times of the number of cores is a reasonable value. Therefore, we can consider setting corePoolSize to the number of CPU cores +1 and maxPoolSize to twice the number of cores. Similarly, in the face of IO intensive tasks, we can consider multiplying the number of cores by 4 as the number of core threads, and then multiplying the number of cores by 5 as the maximum number of threads to set the number of threads, which would be more reasonable than directly setting a value.
Of course, the total number of threads should not be too much, it is reasonable to control within 100 threads, otherwise too many threads may lead to frequent switching between the ground and the context, resulting in the system performance is worse than before.
How do I properly close a thread pool
When it comes to properly shutting down a thread pool, there is a bit of finesse involved. To achieve the goal of a graceful shutdown, we should first call the shutdown method, which means that the thread pool will not receive any new tasks, but that committed tasks will continue to execute, including those in the queue. So, you should then call the awaitTermination method, which sets the maximum timeout for the thread pool before closing. This method returns true if the thread pool is closed by the end of the timeout, and false if it is not. In general, we cannot wait indefinitely, so we need to estimate a reasonable timeout in advance and use this method.
If the awaitTermination method returns false and you want to do more reclamation as soon as possible after the thread pool is closed, you can consider calling shutdownNow again, where all unprocessed tasks in the queue are discarded and the interrupt flag bit is set for each thread in the pool. ShutdownNow does not guarantee that a running thread can be stopped unless the task submitted to the thread responds correctly to the interruption. At this point, you can consider continuing to call the awaitTermination method, or you can just give up and do what you need to do next.
Other useful methods in thread pools
You may have noticed that I also called this method prestartAllCoreThreads when I created the thread pool. What does this method do? We know that once a thread pool is created, the number of threads in the pool is zero before any tasks are submitted to it. Sometimes we know there will be a lot of tasks will be submitted to the thread pool, but such as it one by one to create a new thread overhead is too large, affect the performance of the system, so you can consider when creating a thread pool will create all the core thread one-off, this system can be used directly after.
There are other interesting methods available in thread pools. For example, let’s imagine a scenario where a thread pool becomes overloaded and triggers a rejection policy. Is there any way to mitigate the problem? There is, because thread pools provide methods for setting the number of core threads and the maximum number of threads, the setCorePoolSize and setMaximumPoolSize methods, respectively. Yes, you can change the number of threads in a ** thread pool after it is created! ** Therefore, when the thread pool is running under heavy load, we can handle it as follows:
- Start a timed polling thread (daemon type) that periodically checks the number of threads in the thread pool by calling getActiveCount
- If you find that the number of threads exceeds the core size, you can multiply both CorePoolSize and MaximumPoolSize by 2. However, it is not recommended to set a large number of threads, because more threads is not always better.
- At the same time, get the number of tasks in the queue by calling getQueue and then size. When the number of tasks in the queue is less than half of the queue size, we can assume that the thread pool is not so heavily loaded now, so we can consider restoring CorePoolSize and MaximumPoolSize if the thread pool was previously expanded, dividing by 2
Specifically, the following figure:
This is one of my personal suggestions for using thread pools.
Are thread pools necessarily the best solution?
Thread pools are not always the best performing solution. If you’re looking for extreme performance, consider using Disruptor, a high-performance queue. Disruptor aside, is there a better solution based solely on the JDK? The answer is yes.
We know that in a thread pool, multiple threads share the same queue, so in the case of a large number of tasks, the queue needs to be read and written frequently, so locking is needed to prevent collisions. In fact, when you read the thread pool source code, you can see that it is full of various locking code, so is there a better way to implement it? Instead, consider creating a list of single-threaded thread pools, each of which uses bounded queues for multi-threading. The advantage of this is that queues in each thread pool will only be handled by one thread, so there is no contention.
In fact, this idea of swapping space for time borrows the implementation mechanism of EventLoop in Netty. If thread pool performance is so good, why not Use Netty?
Other things to watch out for
- Scalable thread pools should never be used (thread creation and destruction are expensive)
- Unbounded queues should never be used, except for single tests. (Bounded queues are commonly used with ArrayBlockingQueue and LinkedBlockingQueue, the former based on arrays and the latter based on linked lists. In terms of performance, the throughput of LinkedBlockingQueue is higher but the performance is not consistent, and the actual situation should be tested to determine which recommendation to use. By the way, Executors newFixedThreadPool uses LinkedBlockingQueue)
- RejectedExecutionHandler, JDK is not very useful, you can implement your own logic in it. If you need specific context information, you can add your own to the Runnable implementation class so that it can be used directly in the RejectedExecutionHandler.
How do you stay on task
This refers to a special case, such as a sudden spike in traffic, which causes the thread pool load to be very high, i.e. when the rejection policy is triggered, what can be done to prevent the submitted tasks from being lost. Generally speaking, when this situation occurs, an alarm should be triggered as soon as possible to inform the r&d personnel to deal with it. Limiting traffic, adding machines, using Kafka, Redis, or even a database to temporarily store mission data is also possible, but after all, far water can’t cure the near fire. If we want to alleviate the problem as much as possible before formally solving it, what can we do?
The first thing to consider is to dynamically increase the number of threads in the thread pool, as I mentioned earlier, but if it has already been expanded, it should not continue to be expanded, otherwise it may lead to lower system throughput. In this case, you should implement your own RejectedExecutionHandler, specifically by creating a single thread pool in the implementation class and then calling the put method of the getQueue method of the original thread pool to try again to insert the task that could not be RejectedExecutionHandler. Of course it can’t be plugged when the queue is full, but that at least blocks the single thread and doesn’t affect the main flow.
Of course, this solution is a temporary solution to the problem of traffic surge. There are many mature practices in the industry, but it is only a temporary solution from the perspective of thread pool.
Author’s brief introduction
Lv Yadong is a technical expert of an Internet company in the field of risk control, focusing on the areas of high performance, high concurrency, and middleware underlying principles and tuning.