Summary: The Java Development Manual emphasizes that thread resources must be provided through a thread pool, and that creating a thread pool must use ThreadPoolExecutor. The manual mainly emphasizes the use of thread pool to avoid two problems, one is thread transition, and the other is to avoid OOM caused by too many requests. However, if the parameters are configured incorrectly, both of the above problems can occur. So in this section we’ll focus on the technical details of ThreadPoolExecutor and suggest a few common best practices.
The author source | | wind floor ali technology to the public
The Java Development Manual emphasizes that thread resources must be provided through a thread pool, and that creating a thread pool must use ThreadPoolExecutor. The manual mainly emphasizes the use of thread pool to avoid two problems, one is thread transition, and the other is to avoid OOM caused by too many requests. However, if the parameters are configured incorrectly, both of the above problems can occur. So in this section we’ll focus on the technical details of ThreadPoolExecutor and suggest a few common best practices.
In the course of my research, I found that some questions were controversial. Part of the reason, it turns out, is that the reality of the JDK varies from version to version. Therefore, the following analysis is based on the most commonly used version of JDK1.8, and for the issues in dispute, we analyze the source code, and the source code is the most accurate.
What happens if 1 corePoolSize=0
This is a point of contention. I’ve found that most blog posts, both domestic and foreign, answer this question like this:
- After the task is submitted, determine whether the number of threads in the current pool is less than CorePoolSize. If so, create a new thread to execute the task.
- If not, determine if the waiting queue is full, and if not, add it to the waiting queue.
- If not, determine if the number of threads in the current pool is greater than MaximumPoolSize. If so, reject.
- Otherwise, create a new thread to perform the task.
As described above, if CorePoolSize =0, the capacity of the waiting queue is determined, and if there is any capacity, the queue is queued, and no new threads are created.
However, this is the old version of the implementation, since 1.6, the implementation has changed. Let’s look directly at the source code for execute (Submit also relies on it), and I’ve noted the key line:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // Create a worker whose firstTask is null. The worker will fetch the task from the waiting queue and execute it. else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! addWorker(command, false)) reject(command);
- After the thread pool commits a task, it first determines whether the number of threads in the current pool is less than CorePoolSize.
- If less than, try to create a new thread to perform the task; Otherwise, try adding to the wait queue.
- If the queue is added successfully, determine whether the number of threads in the pool is 0. If so, create a worker whose firstTask is null. The worker will fetch the task from the waiting queue and execute it.
- If adding to the wait queue fails, the queue is usually full before attempting to create a new thread.
- However, it needs to be compared with MaximumPoolSize before it can be created. If it is less than MaximumPoolSize, the creation is successful.
- Otherwise, a rejection policy is executed.
a
The above issues require JDK version differentiation. After version 1.6, if CorePoolSize =0, if the thread pool is empty when the task is submitted, a thread will be created immediately to execute the task (queued first, then retrieved later). If the thread pool is not empty when the task is submitted, it will be queued in the waiting queue and a new thread will not be created until the queue is full.
So, the optimization is that during the time the queue is not full, one thread will be consuming the submitted work; Prior to 1.6, the queue must be full before consumption begins.
Will the core thread be created immediately after the thread pool is created
I was asked this question earlier because he noticed that some of the beans in the application created a thread pool that was not normally used and asked me if I should comment out the thread pool to reduce the number of threads in the application runtime (the application is running with too many threads).
a
Don’t. Can be seen from the above source code, in the newly created ThreadPoolExecutor, thread does not start immediately, but will wait to have a task submitted to start, unless call prestartCoreThread/prestartAllCoreThreads core thread start in advance.
- PrestartCoreThread: Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
- PrestartAllCoreThreads: Starts all core threads.
The core thread will never be destroyed
This problem is somewhat tricky. First of all, let’s be clear about the concept. Although “core/non-core threads” is also described in Javadoc, this is a dynamic concept, and the JDK does not mark some threads as “core” for special treatment. What I think I want to talk about here is an idle thread termination strategy.
Prior to JDK1.6, the thread pool tried to keep core threads corePoolSize even if they were idle for a long time. This has been criticized by developers, so starting with JDK1.6, there is a method allowScoreThreadTimeout, which allows idle core threads to be terminated if passing arguments is true.
Note the difference between this policy and CorePoolSize =0. The difference I summed up is this:
- CorePoolSize =0: In general, only one thread is used to consume tasks. Multithreading is used only when the number of concurrent requests is too high and the waiting queue is full.
- AllowScoreThreadTimeout =true &&CorePoolSize >1: Start using multiple threads (corePoolSize) in general, and increase the number of threads when the queue is full and there are too many concurrent requests. However, the core thread is allowed to terminate when the request is not available.
CorePoolSize =0 is basically the same as allowScoreThreadTimeout =true && CorePoolSize =1, but the implementation details are different.
a
After JDK1.6, the core thread can also be terminated if allowScoreThreadTimeout =true.
4. How to ensure that threads are not destroyed
First we need to clarify the thread pool model. The thread pool has an internal class Worker, which implements the Runnable interface. First, it runs itself. It will then get the Runnable task we submitted when appropriate, and call the task’s run() interface. A Worker can continue to perform tasks without terminating.
When we say “thread in thread pool”, we mean Worker; The element in the wait queue is the Runnable task that we committed.
Each Worker will call its own run() method when it is created, and the implementation is RunWorker (this). The core of this implementation is a while loop, and the Worker thread will not be terminated until the loop ends. This is the basic logic.
- In this while condition, there is a getTask() method that is the core of the core, and all it does is pull the task from the waiting queue to execute it:
- If the CorePoolSize is not reached, then the Worker created will use workqueue.take () to fetch the task after executing the task it accepts. Note that this interface is a blocking interface. If the task cannot be fetched, the Worker thread will always block.
- If the CorePoolSize or AllowCoreThreadTimeout is exceeded, a Worker will use workQueue. Poll (KeepAliveTime, TimeUnit.NANOSECONDS) to fetch the task when it is idle. Note that this interface only blocks waiting for KeepAliveTime. If NULL is returned after this time, the Worker’s while loop finishes and is terminated.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; While (task!) {// while (task! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {// Note that Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}
a
The core thread does not terminate even if it is idle. It is implemented by workqueue.take (), which blocks until a new task is picked up from the waiting queue. A non-core thread terminates after idle for a specified time using workqueue. poll(keepAliveTime, TimeUnit.NANOSECONDS). An idle Worker only waits for keepAliveTime, and the loop terminates if the task has not yet been picked up. The thread is finished.
Extended thinking
The Worker itself is a thread. Will it start a child thread by calling Runnable.run() that we passed in? If you don’t have an answer, think back to the relationship between Runnable and Thread.
5 What’s wrong with too many idle lines
The general answer is that it consumes memory, and let’s analyze what memory it consumes. First, the more common part of a thread’s memory model:
- The virtual machine stack
- Local Method Stack
- Program counter
I would like to emphasize the following memory footprint, need to be careful:
- ThreadLocal: Does the business code use a ThreadLocal? If not, ThreadLocal is heavily used in the Spring framework, and probably in your company’s framework as well.
- Local variable: The thread is blocked, there must be a stack frame not out of the stack, there are local variables in the stack frame, all the memory referenced by the local variables can not be recycled. So if the thread creates a large local variable, then this portion of memory cannot be collected.
- Tlab mechanism: If your application thread count is high, new thread initialization may trigger YoungGC because Eden does not have enough space to allocate Tlab.
a
The core thread that keeps the thread pool free is its default configuration and is generally OK because it generally takes up a small amount of memory. The fear is that the data cached using ThreadLocal in business code is too large and not cleaned up.
If you have a high number of application threads, you need to take a look at your YoungGC and estimate if the Eden size is sufficient. If not, you might want to be careful to create a new thread and let the idle thread terminate. If necessary, parameters to the JVM may need to be tuned.
6 KeepAliveTime =0
This is also a point of contention. Some posts say that equal to 0 means the idle thread never terminates, while others say that it terminates immediately after execution. Others say that equal to -1 means that the idle thread never terminates. In fact, a little look at the source code to know, here I directly throw out the answer.
a
In JDK1.8, keepAliveTime=0 means that a non-core thread terminates immediately after execution.
By default, keepAliveTime is less than 0 and is initialized only when an error is reported. If there is an allowScoreThreadTimeout, KeepAliveTime must be greater than 0, otherwise initialization will report an error.
7 How to handle exceptions
A lot of code is written according to a common pattern, without thinking about why. Such as:
- If we use execute() to commit the task, we would normally add try-catch to the Runable task code for exception handling.
- If we use submit() to submit the task, we would normally try to catch futured.get () in the main thread for exception handling.
As mentioned above, submit() relies on execute(), which should be the same. Why the difference? Let’s take a look at the source for submit(), which is an interesting implementation.
ThreadPoolExecutor contains three submit overloads in its parent class AbstractExecutorService. The code is very simple, and the key code is just two lines:
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
Because each of the three overloaded methods calls execute, I say submit relies on execute at the bottom. Looking at the implementation of execute here, we can easily see that it is the implementation of ThreadPoolExecutor, so the code that makes the difference between submit and execute is not here. So the difference must be in the newTaskFor method. This method simply creates a new FutureTask. FutureTask implements the RunnableFuture interface, which inherits the Runnable and Future interfaces. Callable is just a member variable of FutureTask.
So with that in mind, there’s another basic Java knowledge point: the relationship between Callable and Future. We generally use Callable to write task code, Future is an asynchronous return object, through its get method, blocking to get the result. The core code of FutureTask is to implement the Future interface, which is the implementation of the GET method:
public V get() throws InterruptedException, ExecutionException { int s = state; If (S <= consistents) // Key S = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // loop for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // Only when the task is' completed 'will the loop break out. = null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}
The core implementation of get is an awaitDone method, which is an endless loop that only breaks out of the loop if the task’s status is “completed”; Otherwise, it will rely on the locksupport.park primitive under the unsafepackage to block and wait for the locksupport.unpark semaphore. This semaphore will only be emitted when a result is obtained at the end of the run, or when an exception occurs. The corresponding methods set and setException, respectively. This is the principle of asynchronous execution, blocking fetching, which is going a bit too far.
So going back to our original question, why is it that after submit, we can get an exception using the get method? The reason is that FutureTask has an Outcome member variable of type Object that is used to record the results of the execution. The result can be either an passed generic type or a Throwable exception:
public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
// report the result of the get method
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Another neat thing about FutureTask is that it borrows the RunnableAdapter inner class and wraps Submit’s Runnable as a Callable. So even if you submit it as a Runnable, you can still get the exception using get.
a
- Either execute or submit, you can add a try-catch to your business code for exception handling. I generally like to use this approach because I like to handle exceptions differently, or at least log them differently, for different business scenarios.
- If it is execute, you can also customize the thread pool, inherit from ThreadPoolExecutor and duplicate its afterExecute(Runnable R, Throwable T) methods.
- Or implement Thread. UncaughtExceptionHandler interface, the realization of void uncaughtException (Throwable Thread t, e); Method and pass the handler to the ThreadFactory of the thread pool.
- Note, however, that neither AfterExecute nor UncaughtExceptionHandler applies to submit. As can be seen from futureTask.run () above, it performs a try-catch on Throwable itself, encapsulating the outcome attribute, so the Worker of the underlying method execute cannot get the exception information.
Does the thread pool need to be closed
a
In general, the thread pool life cycle follows the life cycle of the service. If a Service is not being serviced, the shutdown method needs to be called. So executorServer. shutdown is encapsulated in the Service shutdown method in Java and some middleware source code.
If the Server side does not restart the service, I do not think it needs special treatment.
The difference between Shutdown and ShutdownNow
a
Shutdown => shutdown gently waits for all tasks that have been added to the thread pool to complete. ShutdownNow => shuts down immediately, stopping the executing tasks and returning the unexecuted tasks in the queue. Originally wanted to analyze the source of both, but found that the length of this article has been long, the source also posted a lot. Interested friends can have a look.
What tools are available in Spring similar to ThreadPoolExecutor
a
The one thing I want to emphasize here is the SimPleasynCtaskExecutor, the @Async annotation that you use in Spring, the underlying thing is SimPleasynCtaskExecutor to execute tasks, except instead of a pool of threads, it starts a new thread each time.
The other thing you want to emphasize is the Executor interface. Java beginners tend to assume that the class at the end of an Executor is a thread pool, and the above examples are counterexamples. We can see this comment in the JDK’s execute method:
/ * *
- Executes the given command at some time in the future. The command
- may execute in a new thread, in a pooled thread, or in the calling
- Thread, at the discretion of the {@code Executor} implementation. */ So, its job is not to provide an interface to the thread pool, but to provide an interface to the “future command”. It’s the ThreadPoolExecutor class, not the Executor interface, that really represents the meaning of the thread pool.
Summary of best practices
[Enforce] Use the constructor of ThreadPoolExecutor to declare the thread pool, avoiding the use of the NewFixedThreadPool and NewCachedThreadPool of the Executors class.
[Mandate] When creating a thread or thread pool, specify a meaningful thread name to facilitate backtracking in case of error. That is, the ThreadFactory parameter is constructed.
【 Recommendation 】 It is recommended that different types of business use different thread pools.
CPU-intensive tasks (N+1) : This type of task consumes most of the CPU resources. You can set the number of threads to N (CPU cores) +1. The extra thread is used to prevent the thread from occasionally missing pages or other causes of the task to pause. Once the task is paused, the CPU is idle, and the extra thread in this case can take advantage of the idle time of the CPU.
I/O intensive tasks (2N) : In this case, the system will spend most of the time processing I/O interactions, and the thread will not use the CPU to process the I/O during the time, so the CPU can be surrendered to other threads. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.
[Recommendation] WorkQueue does not use unbounded queues. Try to use bounded queues. Avoid large quests waiting, causing OOM.
[Recommendation] For resource-constrained applications, use AllowScoreThreadTimeout to improve resource utilization.
Although there are several ways to handle exceptions using thread pools, try-catch is the most common way to handle exceptions in task code. It can also be refined for different tasks.
[Recommendation] For resource-constrained applications, if you are concerned about the improper use of thread pool resources, you can use the ThreadPoolExecutor API to implement simple monitoring, then analyze and optimize.
Examples of thread pool initialization:
private static final ThreadPoolExecutor pool;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.allowCoreThreadTimeOut(true);
}
- ThreadFactory: Names threads with business semantics.
- CorePoolSize: Quickly starting 4 threads to process this business is sufficient.
- MaximumPoolSize: IO intensive business, my server is 4C8G, so 4*2=8.
- KeepAliveTime: Server resource tight, let idle thread release quickly.
- Pool. AllowCoreThreadTimeOut (true), but also to when you can, for the thread to release, and release resources.
- Workqueue: A task with execution time between 100 and 300ms, 8 threads at peak time, and timeout by 10s (already high). 10 seconds, 8 threads, can handle 10 1000ms / 200ms 8 = about 400 tasks, take a little higher, 512 is a lot.
- Handler: In extreme cases, some tasks can only be discarded to protect the server.
The original link
This article is the original content of Aliyun, shall not be reproduced without permission.