preface
The Java Development Manual emphasizes that thread resources must be provided through thread pools, and ThreadPoolExecutor must be used to create thread pools. The manual mainly emphasizes the use of thread pools to avoid two problems, one is the thread transition, and the other is to avoid the OOM caused by too many requests. However, if the parameters are incorrectly configured, the above two problems can still occur. So in this section we’ll focus on the technical details of ThreadPoolExecutor and suggest some common best practices. Summary of a thread pool atlas to share with you:
In my search for information, I found some controversial issues. Part of the reason, it turns out, is that the reality of JDK versions is different. Therefore, the following analysis is based on the current version of JDK1.8, which is most commonly used, and we analyze the source code, which is the most accurate for the issues at issue.
CorePoolSize =0
This is a controversial point. I find that most blogs, both domestic and foreign, answer this question in the following way:
1. After submitting the task, check whether the number of threads in the current pool is smaller than corePoolSize. If the number is smaller, create a new thread to execute the task.
2. Otherwise, check whether the waiting queue is full. If not, add it to the waiting queue.
3. Otherwise, check whether the number of threads in the current pool is greater than maximumPoolSize.
4. Otherwise, create a new thread to perform the task.
As described above, if corePoolSize=0, the capacity of the wait queue is determined, if there is capacity, it is queued, and no new threads are created.
But actually, this is the implementation of the old version, since 1.6, the implementation has changed. Let’s go straight to the execute source code (submit also relies on it) and note the key line:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); If so, create a worker whose firstTask is null. The worker will fetch the task from the wait queue and execute it. else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! addWorker(command, false)) reject(command);Copy the code
After a thread pool submits a task, it first checks whether the number of threads in the current pool is smaller than corePoolSize.
If less than, try to create a new thread to perform the task; Otherwise, try to add to the wait queue.
If the queue is added successfully, determine whether the number of threads in the current pool is 0. If so, create a worker whose firstTask is null, and the worker will get the task from the wait queue and execute it.
If adding to the wait queue fails, the queue is usually full before an attempt is made to create a new thread.
Before creating maximumPoolSize, compare it with maximumPoolSize. If it is smaller than maximumPoolSize, the creation succeeds.
Otherwise, the denial policy is executed.
A:
The above questions need to be based on JDK versions. After version 1.6, if corePoolSize=0, if the thread pool is empty when the task is submitted, a thread is immediately created to execute the task (queue before fetch); If the thread pool is not empty when the task is submitted, the queue is queued first and new threads are created only when the queue is full.
So the optimization is that during the period when the queue is not full, one thread is consuming the submitted task; Prior to 1.6, you had to wait until the queue was full before consuming.
2. Will core threads be created immediately after the thread pool is created?
Someone asked me this question earlier because he noticed that some of the beans in his application create thread pools, but this Bean is not generally used, so he asked me if I need to comment out this thread pool to reduce the number of threads in the application (the application is running with too many threads).
A:
Don’t. Can be seen from the above source code, in the newly created ThreadPoolExecutor, thread does not start immediately, but will wait to have a task submitted to start, unless call prestartCoreThread/prestartAllCoreThreads core thread start in advance.
PrestartCoreThread: Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
PrestartAllCoreThreads: Starts AllCoreThreads.
Will core threads never be destroyed?
This question is a bit tricky. While JavaDoc uses the term “core/non-core threads”, this is a dynamic concept, and the JDK does not label a subset of threads as “core” for any special treatment. I think what I want to talk about is the idle thread termination strategy.
Prior to JDK1.6, thread pools tried to keep corePoolSize core threads even if they were idle for a long time. This has been criticized by developers, so starting with JDK1.6, the allowsCoreThreadTimeOut method is provided to allow idle core threads to be terminated if the passed parameter is true.
Notice the difference between this strategy and corePoolSize=0. The differences I summarize are:
1. CorePoolSize =0: In general, only one thread is used to consume tasks. Multithreading is only used when there are too many concurrent requests and the wait queue is full.
2. AllowsCoreThreadTimeOut = true && corePoolSize > 1: in the general case started using multithreading (corePoolSize), when concurrent requests much more special, after waiting queue is full, continue to increase the number of threads. But allow the core thread to terminate when the request is not available.
CorePoolSize =0 is basically the same as allowsCoreThreadTimeOut=true && corePoolSize=1, but the implementation details are different.
A:
After JDK1.6, core threads can also be terminated if allowsCoreThreadTimeOut=true.
4. How to ensure that threads are not destroyed
First we need to clarify the thread pool model. The thread pool has an internal class, Worker, which implements the Runnable interface. First, it needs to run itself. It then gets the Runnable task we submitted when appropriate and calls the task’s Run () interface. A Worker can continue to perform tasks without terminating.
The “thread in the thread pool” we mentioned earlier is actually the Worker; The element in the wait queue is the Runnable task we submit.
When each Worker is created, it will call its own run() method, and its implementation is runWorker(this). The core of this implementation is a while loop, which will not terminate the Worker thread until the loop ends, which is the basic logic.
In this while condition, there is a getTask() method that is the core of the core, and all it does is pull tasks from the wait queue to execute:
If corePoolSize is not reached, the created Worker will fetch the task with workqueue.take () after executing the task it accepts. Note that this interface is a blocking interface, and the Worker thread will always block if the task is not fetched.
If corePoolSize is exceeded, or allowCoreThreadTimeOut.
When a Worker is idle, it uses workqueue.poll (keepAliveTime, timeUnit.nanoseconds) to fetch tasks.
Note that this interface blocks only to wait for keepAliveTime, beyond which null is returned, and the Worker’s while loop is terminated.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; While (task! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code
A:
The implementation is very clever. The core thread (Worker) does not terminate even if it is idle. It is implemented through workqueue.take (), which blocks until a new task is fetched from the wait queue. Poll (keepAliveTime, timeunit.nanoseconds). An idle Worker only waits for keepAliveTime and terminates if the task has not been retrieved. The thread ends up running.
Extended thinking
The Worker itself is a Thread, does it start a child Thread by calling runnable.run ()? If you still don’t have the answer, recall the relationship between Runnable and Thread.
5. What’s the problem if the idle line is too much?
The general answer is that it takes up memory, so let’s analyze what memory it takes up. First, the more general part, the memory model of a thread:
1. Vm stack
2. Local method stack
3. Program counter
I would like to emphasize the following memory usage, need to be careful:
1.ThreadLocal: Does the business code use ThreadLocal? Even if not, ThreadLocal is heavily used in the Spring framework, and probably in your company’s framework as well.
2. Local variables: if the thread is blocked, there must be some stack frames left on the stack. There are local variable tables in the stack frame, and any memory referenced by local variable tables cannot be reclaimed. So if this thread creates a large local variable, then this portion of memory cannot be GC.
3.TLAB mechanism: If your application thread count is high, the new thread initialization may trigger YoungGC because Eden does not have enough space to allocate tlabs.
A:
The thread pool’s default configuration is to keep the core threads free, which is generally fine because it doesn’t take up much memory. The fear is that business code that uses ThreadLocal caching is too large and not cleaned up.
If you have a high number of application threads, look at YoungGC and estimate whether Eden size is sufficient. If not, it may be prudent to create new threads and let idle threads die; If necessary, you may need to tune the JVM.
KeepAliveTime =0
This is also a controversial point. Some blogs say that zero means the idle thread never terminates, while others say it terminates immediately after execution. Or equal to -1 means that the idle thread never terminates. In fact, if YOU look at the source code a little bit, I’ll just throw out the answer.
A:
In JDK1.8, keepAliveTime=0 means that a non-core thread terminates immediately after execution.
By default, keepAliveTime is less than 0, so an error is reported during initialization. But if allowsCoreThreadTimeOut, keepAliveTime must be greater than 0, otherwise initialization error.
7. How to handle exceptions?
A lot of code is written according to common paradigms, without thinking about why. Such as:
If we submit a task using execute(), we would normally add a try-catch to the Runable task code for exception handling. If we submit the task using submit(), we typically do a try-catch on future.get () in the main thread for exception handling. Submit () relies on execute(). Submit () : submit(); submit() : submit();
ThreadPoolExecutor’s AbstractExecutorService class contains three submit overloaded methods. The code is simple and contains two key lines:
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }Copy the code
It’s because all three of these overloaded methods call execute that I say submit relies on execute. By looking at the execute implementation here, we can see that it is the implementation of ThreadPoolExecutor, so the code that differentiates Submit from Execute is not here. The difference must be in the newTaskFor method. FutureTask implements the RunnableFuture interface. The RunnableFuture interface inherits the Runnable and Future interfaces. Callable is just a member variable of FutureTask.
So that brings us to another Java basics: the relationship between Callable and Future. We write our task code in Callable, and a Future is an asynchronous return object that blocks the result through its GET method. The core code of FutureTask implements the Future interface, which is the implementation of the GET method:
public V get() throws InterruptedException, ExecutionException { int s = state; (s <= COMPLETING) // 实 code S = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // loop for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; (S > 实 习) {if (q! = null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}Copy the code
The core implementation of GET has an awaitDone method, which is an infinite loop and only breaks out of the loop if the task is in the “completed” state. Otherwise, the locksupport. park primitive in the UNSAFE package is required to block and wait for the locksupport. unpark semaphore. This semaphore is emitted only when a result is obtained at the end of a run, or when an exception occurs. The corresponding methods are set and setException. This is how asynchronous execution, blocking fetch works, taking things a little too far.
Back to our original question, why can we get the exception through get method after Submit? The reason is that FutureTask has an Outcome member variable of type Object, which records the outcome of the execution. The result can be either a generic passed in or a Throwable exception:
public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); Private V Report (int s) throws ExecutionException {Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }Copy the code
Another neat thing about FutureTask is to borrow the RunnableAdapter inner class and wrap the Submit Runnable as a Callable. So even if your submit is Runnable, you can still get the exception with get.
A:
Either execute or Submit allows you to add a try-catch to your business code for exception handling. I like to use this approach because I like to handle exceptions differently for different business scenarios, or at least log them differently.
If execute, you can also customize the thread pool by inheriting ThreadPoolExecutor and overriding its afterExecute(Runnabler,Throwable T) method.
Or implement Thread. UncaughtExceptionHandler interface, the realization of void uncaughtException (Throwable Thread t, e); Method and passes this handler to the ThreadFactory of the thread pool.
Note, however, that neither afterExecute nor UncaughtExceptionHandler applies to Submit. Because futureTask.run () can be seen from above, it carries out try-catch on Throwable itself and encapsulates the outcome attribute, so the Worker of the underlying method execute cannot get the exception information.
Do thread pools need to be closed?
A:
In general, the life cycle of a thread pool follows the life cycle of a service. If a Service stops serving, the shutdown method is called to shut it down. Executorservice. shutdown is packaged in the Service shutdown method in Java and some middleware sources.
If the Server does not restart, it does not stop providing services. In my opinion, no special treatment is required.
9. The difference between Shutdown and shutdownNow?
A:
1. Shutdown => gently shutdown the thread pool until all the tasks that have been added to the thread pool are complete.
2. ShutdownNow => Immediately stop the ongoing tasks and return to the unexecuted tasks in the queue.
Originally wanted to analyze the source of both, but found that the length of this article has been long, the source code is also posted a lot. Interested friends can have a look.
What tools are available in Spring that are similar to ThreadPoolExecutor?
A:
What I want to emphasize here is SimpleAsyncTaskExecutor, the @async annotation used in Spring, and the underlying SimpleAsyncTaskExecutor is to perform tasks based on SimpleAsyncTaskExecutor, but instead of a thread pool, a new thread is created each time.
Another thing I want to emphasize is the Executor interface. It’s easy for Java beginners to assume that an Executor ending class is a thread pool, and these are all counterexamples. We can see this comment on the JDK’s execute method:
/** * Executes the given command at some time in the future. The command * may execute in a new thread, In a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. Its job is not to provide an interface to a thread pool, but to provide an interface to "execute commands in the future." It is the ThreadPoolExecutor class, not the Executor interface, that truly represents thread pools.Copy the code
practice
【 Mandatory 】 Use the ThreadPoolExecutor constructor to declare the thread pool. Avoid using the Executors newFixedThreadPool and newCachedThreadPool.
[Mandatory] Specify a meaningful thread name when creating a thread or thread pool to facilitate backtracking in case of errors. The threadFactory parameter is constructed.
[Suggestion] It is recommended that different types of services use different thread pools.
[Suggestion] CPU-intensive task (N+1) : This type of task consumes CPU resources. You can set the number of threads to N(NUMBER of CPU cores)+1. The number of threads is one more than the number of CPU cores to prevent occasional page miss interrupts or task pauses caused by other reasons. Once the task is paused, the CPU is idle, and in this case the extra thread can take full advantage of the idle CPU time.
[Suggestion] I/O intensive task (2N) : In this task, the system spends most of its time processing I/O interactions, and the thread does not occupy the CPU during the I/O processing period. In this case, the CPU can be handed over to another thread. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.
[Suggestion] Use a bounded queue instead of an unbounded queue. Avoid waiting for a large number of tasks, resulting in OOM.
[Suggestion] For resource-constrained applications, use allowsCoreThreadTimeOut to improve resource utilization.
[Suggestion] Although there are many ways to handle exceptions using thread pools, try-catch is the most common in task code and can be used to refine the exception handling of different tasks.
[Suggestion] For resource-constrained applications, if you are concerned about improper use of thread pool resources, you can use the API of ThreadPoolExecutor to implement simple monitoring, analysis and optimization.
Example of thread pool initialization:
private static final ThreadPoolExecutor pool;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.allowCoreThreadTimeOut(true);
}
Copy the code
ThreadFactory: Gives the name of the thread with business semantics. CorePoolSize: A quick start of 4 threads to process the business is sufficient. MaximumPoolSize: IO intensive service, my server is 4C8G, so 4*2=8. KeepAliveTime: The server resources are tight, so that idle threads can be released quickly. Pool. AllowCoreThreadTimeOut (true), but also to when you can, for the thread to release, and release resources. WorkQueue: the execution duration of a task ranges from 100 ms to 300ms. There are eight threads in peak hours. The timeout is 10 seconds. 10 seconds, 8 threads, can handle 10 * 1000ms / 200ms * 8 = 400 tasks, go up a little more, 512 is already a lot. Handler: In extreme cases, some tasks can only be discarded to protect the server.
The last
I here organized a thread pool information document, Spring series of family bucket, Java systematic information (including Java core knowledge, interview topics and 21 years of the latest Internet real questions, e-books, etc.) friends who need to pay attention to the public number [procedures Yuan Small wan] can obtain.