This article has participated in the “Digitalstar Project” to win a creative gift package and challenge the creative incentive money.
Hi, I’m crooked.
A few days ago I was chatting with a big guy who said that he was doing thread pool monitoring recently and had just finished the development of dynamic adjustment function.
Remembering that I had written about this before, I looked it up: “How to set thread pool parameters? Meituan gave an answer that blew the interviewer’s mind.”
Then he pointed out a problem to me. I thought about it carefully, and it seemed that there was indeed a hole left.
To better describe this pit, LET me review a few key points of thread pool dynamic tuning.
First, why do you need to dynamically adjust the parameters of the thread pool?
As the business grows, it is possible to have a thread pool that starts out full, but gradually gets filled up.
This will result in subsequent submissions being rejected.
There is no one-size-fits-all configuration solution, and parameters should float with the system.
Therefore, we can monitor the thread pool in multiple dimensions, for example, one of the dimensions is the monitoring of queue usage.
When the queue usage exceeds 80%, an early warning message will be sent to remind the corresponding person in charge to be vigilant. The thread pool parameters can be adjusted on the corresponding management background page to prevent tasks from being rejected.
In the future, when someone asks you how to configure the various parameters of the thread pool, you first divide into IO intensive and CPU intensive this essay answer to recite after.
But in addition to these solutions, I actually use a different solution to the problem.
Then repeat the above words.
So what are the parameters that thread pools can change?
Normally, you can adjust the number of core threads and the maximum number of threads.
The thread pool also directly provides its corresponding set method:
However, there is another key parameter that needs to be adjusted, and that is the length of the queue.
Oh, and for clarification, the default queue used in this article is LinkedBlockingQueue.
Its capacity is final, meaning that it cannot be modified after it is specified:
So the length of the queue is going to be a little bit tricky.
As for how to circumvent the limitation of final, I will talk about it ina moment, but I will give you a code first.
I don’t usually post large chunks of code, but why did I post them this time?
Because I found that my previous article was not posted, and the code I had written earlier had gone nowhere.
So, I again bitter ha ha of knock again…
import cn.hutool.core.thread.NamedThreadFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadChangeDemo { public static void main(String[] args) { dynamicModifyExecutor(); } private static ThreadPoolExecutor buildThreadPoolExecutor() { return new ThreadPoolExecutor(2, 5, 60, TimeUnit. SECONDS, new ResizeableCapacityLinkedBlockingQueue < > (10), a new NamedThreadFactory (according to "technology", false)); } private static void dynamicModifyExecutor() { ThreadPoolExecutor executor = buildThreadPoolExecutor(); for (int i = 0; i < 15; I++) {executor.execute(() -> {threadPoolStatus(executor," create task "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }}); } threadPoolStatus(executor," before changing "); executor.setCorePoolSize(10); executor.setMaximumPoolSize(10); ResizeableCapacityLinkedBlockingQueue<Runnable> queue = (ResizeableCapacityLinkedBlockingQueue)executor.getQueue(); queue.setCapacity(100); ThreadPoolStatus (executor," after change "); } /** * print threadPoolStatus ** @param executor * @param name */ private static void threadPoolStatus(ThreadPoolExecutor, String name) { BlockingQueue<Runnable> queue = executor.getQueue(); System.out.println(thread.currentThread ().getName() + "-" + name + "-:" +" "+ executor.getCorePoolSize() +" Number of active threads: "+ Executor.getActivecount () +" "+ executor. GetMaximumPoolSize () +" thread pool activity: "+ divide (executor. GetActiveCount (), executor. GetMaximumPoolSize ()) +" task number: "+ executor. GetCompletedTaskCount () +" queue size: "+ (queue. The size () + queue. RemainingCapacity ()) +" the current line number of threads: + queue.size() + "queue.remainingCapacity() +" + queue.remainingCapacity() +" " + divide(queue.size(), queue.size() + queue.remainingCapacity())); } private static String divide(int num1, int num2) {return String. Format ("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100); }}Copy the code
When you paste this code in, you’ll notice that you don’t have the NamedThreadFactory class.
If you don’t have one, you can either customize it or leave it out of the constructor. That’s not the point.
The problem is big ResizeableCapacityLinkedBlockingQueue this stuff.
Where did it come from?
As mentioned in the previous article:
Paste the LinkedBlockingQueue, rename it, remove the final modifier for Capacity and provide the corresponding GET /set method.
It feels very simple to implement dynamic change of capacity parameter.
However, when I wrote it, it felt flawed.
After all, if it is so simple, why did the official design it as final?
Where is the pit?
How LinkedBlockingQueue works is not explained here.
Let’s focus on what would have happened if I had just removed the final modifier and provided the corresponding GET /set method.
First of all, the source code in this article is JDK 8 unless otherwise noted.
Let’s look at the put method:
I’m going to focus on the part that’s framed.
Capacity in the while condition we know represents the current capacity.
So what is count.get?
It’s how many elements are in the current queue.
Count. Get == capacity means that the queue is full, and then suspend the current PUT operation by executing notfull.await ().
Here’s a simple example:
Apply for a queue of length 5 and call the put method inside the loop. When the queue is full, the program blocks.
By dumping the current thread, we can tell that the main thread is indeed blocked where we analyzed earlier:
So, think about it. If I change the capacity of the queue to another value, will this place sense it?
It can’t sense it. It’s waiting to be woken up.
Now let’s replace the queue with my modified queue to verify.
The idea behind the validator is to perform a put operation on a queue in a child thread until it is full and blocked.
And then the main thread changes the capacity to 100.
What I want to achieve in the above program is that the child thread should not continue to block as the capacity increases.
But after the previous analysis, we know that there is no need to wake up the child thread.
So, the output looks like this:
The child thread is still blocked, so it is not as expected.
So what should we do at this point?
To wake you up, of course.
SetCapacity logic:
public void setCapacity(int capacity) { final int oldCapacity = this.capacity; this.capacity = capacity; final int size = count.get(); if (capacity > size && size >= oldCapacity) { signalNotFull(); }}Copy the code
The core logic is to find that if the capacity increases, call the signalNotFull method:
Wake up the park thread.
If you’re feeling a little confused about what LinkedBlockingQueue does:
Take an hour to refresh your LinkedBlockingQueue. That’s the kind of thing that gets tested in interviews all the time.
All right, let’s get back to it.
After modifying our custom setCapacity method and executing the program again, we should see the expected output:
In addition to changing the setCapacity method, I inadvertently triggered another answer in my writing:
After calling setCapacity, call put again to get the expected output:
If we look at the PUT method, we can see the same thing:
When setCapacity is called, a call to put does not block because it does not satisfy the condition of code labeled ①.
The blocked thread can then be awakened by walking to the location labeled ②.
Therefore, the queue length can be changed to wake up the blocked task.
The bottom line is that a wake-up operation needs to be performed.
So which one is more elegant?
That’s certainly the first way to encapsulate logic in a setCapacity method that’s more elegant to do.
The second way is mostly for “you don’t know why, but it’s just normal” situations.
Now we know what a pit is in a thread pool that dynamically adjusts the queue length.
Once the queue is full, the thread that called put will block. Even if another thread calls setCapacity and changes the queue length, the blocked thread will not wake up if no thread triggers the PUT operation again.
Isn’t it?
Don’t you understand?
Isn’t it?
This is not right, my friends.
If you’re nodding your head, take note.
This place is starting to turn.
Begin to turn
When adding objects to a queue in a thread pool, use the offer command instead of the put command:
Let’s see what the offer command does:
When the queue is full, return false without blocking.
That is, there is no need to wake up in the thread pool, as I mentioned earlier, because there are no blocked threads.
In the process of communicate with bosses, he mentioned a VariableLinkedBlockingQueue.
This class is in the MQ package where I learned the setCapacity method modification I mentioned earlier:
The project also uses its put method:
So, it is possible that there are threads that need to be woken up as we analyzed earlier.
However, if you think about it, the thread pool does not use the put method, does it just avoid this situation?
Yes, it is.
But, not rigorous enough, if you know there is a problem, why leave a hole here?
You learn MQ VariableLinkedBlockingQueue of considerate, a bit, even if the put method blocks of time can also be used, it is not sweet?
I’m writing to familiarize myself with LinkedBlockingQueue.
However, I can make a big difference with this no-egg knowledge.
Because it’s actually a small detail.
If I go out for an interview and mention the dynamic adjustment method in the interview, I accidentally pick up this little detail, even though I have not actually touched the dynamic adjustment, but I mention such a small detail, it seems very real.
The interviewer a listen to: very good, there is the whole, there is local, should be false.
And several in VariableLinkedBlockingQueue details, take put method is:
The judgment condition is changed from count.get() >= capacity to count.get() = capacity to support the scenario where capacity changes from large to small.
There are several such places, but I will not list them all.
The devil is in the details.
Students have to take care of that.
JDK bug
In fact, the original plan was to finish at the beginning, because I just wanted to fill in the details that I hadn’t noticed before.
However, I went to the JDK bug list and searched LinkedBlockingQueue to see what else I could find.
I didn’t expect it, but there was a little bit of serendipity.
First of all, this bug was raised on 2019-12-29:
Bugs.openjdk.java.net/browse/JDK-…
To enable the LinkedBlockingQueue, change its size.
With the scenario he describes below, you should also want to work with thread pools, find the queue’s grabbers, drill down to the underlying logic, interconnect with the monitoring system, pull through the configuration page, and play a dynamic adaptive one-two punch.
But officials did not take that advice.
The folks who write the Concurrent package are very cautious about adding stuff to the concurrent class. They felt that providing dynamically modifiable features to ThreadPoolExecutor introduced, or already introduced, numerous bugs.
I understand is a simple sentence: the advice is good, but I dare not move. Concurrent this, pull a start the whole body, do not know what will be out of the unitary.
So to implement this feature, you still have to figure out how to do it yourself.
This explains why final is used to modify the size of the queue. After all, by reducing the functionality, there is less chance of bugs.
The second bug is interesting and matches our need to dynamically adjust thread pools:
Bugs.openjdk.java.net/browse/JDK-…
This is a bug introduced in March 2020 that describes a rejection exception thrown when updating the core thread count of the thread pool.
He posted a lot of code in the bug description section, but the code he wrote was very complicated and not easy to understand.
Fortunately, Martin wrote a simplified version, which is much easier to understand at a glance:
This code is doing a thing, a brief report to you.
First, there is a loop inside the main method that calls the test method and ends when the test method throws an exception.
The test method creates a new thread pool each time, submits the queue length plus the maximum number of threads to the thread pool, and closes the thread pool.
At the same time, another thread changed the number of core threads in the thread pool from 1 to 5.
You can open the aforementioned bug link, post this code and run it. It’s pretty weird.
Big Martin thinks it’s a BUG, too.
To be honest, I ran through the case and I thought it was a bug, but Doug Lea personally verified that he didn’t think it was a bug.
The main reason is that this bug is a little beyond my cognition, and the specific reason is not clearly stated in the link, so I took a long time to locate, and even wanted to give up at one time.
But after finally positioning the problem is also a long sigh: harm, this? It’s not interesting.
Let’s see how the problem behaves first:
After the program to run, throws RejectedExecutionException, namely thread pool refused to perform this task.
But as we analyzed earlier, the number of for loops is the number of tasks that the thread pool can just hold:
It shouldn’t be a problem, right?
And that’s where the guy who asked the question wondered:
He said: I am very confusing, I submit the number of tasks are never more than queueCapacity + maxThreads, why also throws a thread pool RejectedExecutionException? And this problem is very difficult to debug, because add any kind of delay to the task and the problem will not recur.
What he means is: this problem is very puzzling, but I can reproduce it stably, but every time the recurrence of the problem is very random, I can not solve it, I think it is a bug, please help to check.
I’ll leave aside the main reason for the bugs I’ve located.
Here’s what pops had to say:
The Don’s view can be summed up in two words:
Don said he hadn’t convinced himself that the program should work.
That means he thinks it’s normal to throw an exception. But he didn’t say why.
A day later, he added:
Let me first translate it for you:
He says that when a thread pool submit method and setCorePoolSize or prestartAllCoreThreads exist together and run on different threads, they compete.
There is a brief window when the new thread is prestarted but not fully ready to accept the task in the queue. The queue is still full in this window.
The solution is quite simple, such as removing the logic from the prestart thread in the setCorePoolSize method, but using the prestartAllCoreThreads method will cause the same problem.
But, whatever the case, I’m still not sure it’s a problem that needs fixing.
What do you say? Pops doesn’t look confused?
Yes, I read this paragraph for 10 times at the beginning and was completely confused, but when I understood the reason of this problem, I still had to sigh:
Or pops summed it up, no nonsense.
What’s the reason?
Let’s first look at the two places in the sample code that operate on thread pools:
The number of core threads is changed by one thread, which is a thread in the Default thread pool, ForkJoinPool, of the CompletableFuture.
Submitting tasks to the thread pool is another thread, the main thread.
That was the first thing the Don said:
Racing means driving fast cars. Competition means.
This is a multi-threaded scenario in which the main thread and the ForkJoinPool threads are running in a race.
Now let’s see what the setCorePoolSize method does:
The place marked ① calculates the difference between the newly set number of core threads and the original number of core threads.
The difference obtained is used at the place labeled ②.
That is, take the difference and the smaller number of tasks in the current queue.
For example, the current core thread count configuration is 2, in which case I will change it to 5. There are 10 tasks queuing in the queue.
Then the difference is 5-2=3, i.e. Delta =3 at label ①.
Workqueue. size is the 10 tasks that are queuing.
Math.min(3,10), so k at 2 =3.
3 core threads need to be added to help process queued tasks.
But are you necessarily right to add three?
Is it possible that during the process of adding, the tasks in the queue have already been processed, and there is no need for 3 tasks at all?
So, what are the conditions for the termination of a cycle other than an honest k cycles?
When the queue is empty:
At the same time, if you look at that big comment on the top of the code, you can see that it’s actually describing the same thing as me.
Ok, let’s move on to addWorker, where I want you to see:
After a series of judgments in this method, the logic of new Worker() will be entered, that is, the Worker thread.
This thread is then added to workers.
Workers are a collection of hashsets that hold worker threads:
You can see the two frames I’ve framed, from worker.add (w) to t.start().
There’s some logic in between joining the collection and actually launching it.
The time spent executing the intermediate logic is what Pops called “window.”
There’s a window while new threads are in the process of being prestarted but not yet taking tasks.
A window opens when a new thread is pre-started but has not yet received a task.
What happens to this window?
Here it is:
The queue may remain (transiently) full.
The queue may still be full, but only temporarily.
Let’s connect the dots:
So what to make of the underlined sentence?
Bring in an actual scenario like the previous example code, just tweak the parameters:
This thread pool has a core number of 1 threads, a maximum number of 2 threads, a queue length of 5, and a maximum of 7 tasks.
Another thread is changing the core thread pool from 1 to 2.
Let’s say that the thread pool Submit submitted six tasks and was submitting the seventh task at point T1.
Why emphasize this point in time?
When the seventh task is submitted, the non-core thread count needs to be disabled.
The specific source code is here:
java.util.concurrent.ThreadPoolExecutor#execute
That is, the queue is full and the workQueue. Offer (command) returns fasLE. So go to the addWorker(Command, false) method.
The point in time when the code goes to line 1378 is T1.
If the addWorker method on line 1378 returns false, adding a worker thread failed and a reject exception is thrown.
The previous example threw the reject exception because fasle was returned.
The question then becomes: why does addWorker in line 1378 return false after execution?
Because this condition is no longer satisfied, wc >= (core? CorePoolSize: maximumPoolSize) :
Wc is the number of active threads in the current thread pool.
So let’s put in our previous condition, wc >=(false? 2-2).
The wc = 2.
Why is it 2? Shouldn’t it be 1?
Where did the extra one come from?
There is only one truth: addWorker in setCorePoolSize also executes worker.add (w) at this time, causing WC to change from 1 to 2.
There was a crash, so a reject exception was thrown.
So why aren’t exceptions thrown most of the time?
Because the time window from workers. Add (w) to t.start() is very short.
In most cases, when the addWorker in the setCorePoolSize method is executed, it will understand that a task is removed from the queue.
In this case, another task is submitted through the thread pool, finds that there are still places in the queue, and is put in the queue without executing the addWorker method at all.
The truth is such a truth.
This multithreading problem is indeed more difficult to reproduce, how do I locate it?
Add a log.
How to add log inside the source code?
Not only did I create a custom queue, I also pasted a copy of the thread pool source code so that I could add logs:
In addition, in fact, I this positioning scheme is not rigorous.
When debugging multithreading, it is best not to use system.out.println.
scenario
Let’s go back to the don’s solution:
It actually gave two of them.
The first is to remove the addworker logic from the setCorePoolSize method.
The second one is that the prestartAllCoreThreads method is used in the original program, which must call the addWorker method, so there is still a certain probability of the previous problem.
But don couldn’t understand why it was written that way.
I think maybe he didn’t think of a proper scenario?
In fact, the previous mentioned Bug, in fact, in the dynamic adjustment of this scene, or may appear.
Although, the probability of occurrence is very low, the conditions are also very harsh.
But there is a chance.
In case it happens, when everyone else is picking their brains, say: Well, I’ve seen it. It’s a Bug. It doesn’t always happen.
This is another little detail that you can manipulate.
However, if you are asked this question in an interview, it is a stupid question.
It makes no sense.
The interviewer has come across a compelling idea somewhere, so make sure you look compelling.
But what he didn’t know was this question:
One last word
All right, there you go. Set up a like. Writing is tiring and needs some positive feedback.
Here’s one for readers:
This article has been collected from personal blog, welcome to play:
www.whywhy.vip/